Package com.hazelcast.jet.core.processor
Class DiagnosticProcessors
- java.lang.Object
-
- com.hazelcast.jet.core.processor.DiagnosticProcessors
-
public final class DiagnosticProcessors extends java.lang.Object
Static utility class with factories of sinks and wrappers that log the data flowing through the DAG. These processors are useful while diagnosing the execution of Jet jobs. For other kinds of processors refer to thepackage-level documentation
.- Since:
- Jet 3.0
-
-
Field Summary
Fields Modifier and Type Field Description static FunctionEx<java.lang.Object,java.lang.String>
PEEK_DEFAULT_TO_STRING
A function that uses `Object.toString()` for non-arrays, `Arrays.toString()` for arrays of primitive types and `Arrays.deepToString()` for `Object[]`.
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static <T> SupplierEx<Processor>
peekInputP(FunctionEx<T,? extends java.lang.CharSequence> toStringFn, PredicateEx<T> shouldLogFn, SupplierEx<Processor> wrapped)
Same aspeekInput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aSupplierEx
of processors instead of a meta-supplier.static <T> ProcessorMetaSupplier
peekInputP(FunctionEx<T,? extends java.lang.CharSequence> toStringFn, PredicateEx<T> shouldLogFn, ProcessorMetaSupplier wrapped)
Returns a meta-supplier that wraps the provided one and adds a logging layer to each processor it creates.static <T> ProcessorSupplier
peekInputP(FunctionEx<T,? extends java.lang.CharSequence> toStringFn, PredicateEx<T> shouldLogFn, ProcessorSupplier wrapped)
Same aspeekInput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aProcessorSupplier
instead of a meta-supplier.static SupplierEx<Processor>
peekInputP(SupplierEx<Processor> wrapped)
Convenience forpeekInput(toStringFn, shouldLogFn, metaSupplier)
with a pass-through filter andObject#toString
as the formatting function.static ProcessorMetaSupplier
peekInputP(ProcessorMetaSupplier wrapped)
Convenience forpeekInput(toStringFn, shouldLogFn, metaSupplier)
with a pass-through filter andObject#toString
as the formatting function.static ProcessorSupplier
peekInputP(ProcessorSupplier wrapped)
Convenience forpeekInput(toStringFn, shouldLogFn, metaSupplier)
with a pass-through filter andObject#toString
as the formatting function.static <T> SupplierEx<Processor>
peekOutputP(FunctionEx<? super T,? extends java.lang.CharSequence> toStringFn, PredicateEx<? super T> shouldLogFn, SupplierEx<Processor> wrapped)
Same aspeekOutput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aSupplierEx
of processors instead of a meta-supplier.static <T> ProcessorMetaSupplier
peekOutputP(FunctionEx<? super T,? extends java.lang.CharSequence> toStringFn, PredicateEx<? super T> shouldLogFn, ProcessorMetaSupplier wrapped)
Returns a meta-supplier that wraps the provided one and adds a logging layer to each processor it creates.static <T> ProcessorSupplier
peekOutputP(FunctionEx<? super T,? extends java.lang.CharSequence> toStringFn, PredicateEx<? super T> shouldLogFn, ProcessorSupplier wrapped)
Same aspeekOutput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aProcessorSupplier
instead of a meta-supplier.static SupplierEx<Processor>
peekOutputP(SupplierEx<Processor> wrapped)
Convenience forpeekOutput(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function.static ProcessorMetaSupplier
peekOutputP(ProcessorMetaSupplier wrapped)
Convenience forpeekOutput(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function.static ProcessorSupplier
peekOutputP(ProcessorSupplier wrapped)
Convenience forpeekOutput(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function.static <K,V>
SupplierEx<Processor>peekSnapshotP(FunctionEx<? super java.util.Map.Entry<K,V>,? extends java.lang.CharSequence> toStringFn, PredicateEx<? super java.util.Map.Entry<K,V>> shouldLogFn, SupplierEx<Processor> wrapped)
Same aspeekSnapshot(toStringFn, shouldLogFn, metaSupplier)
, but accepts aSupplierEx
of processors instead of a meta-supplier.static <K,V>
ProcessorMetaSupplierpeekSnapshotP(FunctionEx<? super java.util.Map.Entry<K,V>,? extends java.lang.CharSequence> toStringFn, PredicateEx<? super java.util.Map.Entry<K,V>> shouldLogFn, ProcessorMetaSupplier wrapped)
Returns a meta-supplier that wraps the provided one and adds a logging layer to each processor it creates.static <K,V>
ProcessorSupplierpeekSnapshotP(FunctionEx<? super java.util.Map.Entry<K,V>,? extends java.lang.CharSequence> toStringFn, PredicateEx<? super java.util.Map.Entry<K,V>> shouldLogFn, ProcessorSupplier wrapped)
Same aspeekSnapshot(toStringFn, shouldLogFn, metaSupplier)
, but accepts aProcessorSupplier
instead of a meta-supplier.static SupplierEx<Processor>
peekSnapshotP(SupplierEx<Processor> wrapped)
Convenience forpeekSnapshot(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function.static ProcessorMetaSupplier
peekSnapshotP(ProcessorMetaSupplier wrapped)
Convenience forpeekSnapshot(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function.static ProcessorSupplier
peekSnapshotP(ProcessorSupplier wrapped)
Convenience forpeekSnapshot(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function.static ProcessorMetaSupplier
writeLoggerP()
static <T> ProcessorMetaSupplier
writeLoggerP(FunctionEx<T,? extends java.lang.CharSequence> toStringFn)
Returns a meta-supplier of processors for a sink vertex that logs all the data items it receives.
-
-
-
Field Detail
-
PEEK_DEFAULT_TO_STRING
public static final FunctionEx<java.lang.Object,java.lang.String> PEEK_DEFAULT_TO_STRING
A function that uses `Object.toString()` for non-arrays, `Arrays.toString()` for arrays of primitive types and `Arrays.deepToString()` for `Object[]`. Used for `peek()` function in the DAG and Pipeline API.- Since:
- 5.1
-
-
Method Detail
-
writeLoggerP
@Nonnull public static <T> ProcessorMetaSupplier writeLoggerP(@Nonnull FunctionEx<T,? extends java.lang.CharSequence> toStringFn)
Returns a meta-supplier of processors for a sink vertex that logs all the data items it receives. The log category iscom.hazelcast.jet.impl.connector.WriteLoggerP.<vertexName>#<processorIndex>
and the level is INFO.Watermark
items are always logged, but at FINE level; they are not passed totoStringFn
.The vertex logs each item on whichever cluster member it happens to receive it. Its primary purpose is for development, when running Jet on a local machine.
- Type Parameters:
T
- stream item type- Parameters:
toStringFn
- a function that returns a string representation of a stream item
-
writeLoggerP
@Nonnull public static ProcessorMetaSupplier writeLoggerP()
-
peekInputP
@Nonnull public static <T> ProcessorMetaSupplier peekInputP(@Nonnull FunctionEx<T,? extends java.lang.CharSequence> toStringFn, @Nonnull PredicateEx<T> shouldLogFn, @Nonnull ProcessorMetaSupplier wrapped)
Returns a meta-supplier that wraps the provided one and adds a logging layer to each processor it creates. For each item the wrapped processor removes from the inbox, the wrapping processor:-
uses the
shouldLogFn
predicate to see whether to log the item -
if the item passed, uses
toStringFn
to get a string representation of the item -
logs the string at the INFO level, the logger is
com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
. The text is prefixed with "Input from X: ", where X is the edge ordinal the item is received from. Received watermarks are prefixed with just "Input: ".
Note: Watermarks are always logged.
Watermark
objects are not passed toshouldLogFn
andtoStringFn
.- Type Parameters:
T
- input item type- Parameters:
toStringFn
- a function that returns the string representation of the item. You can usePEEK_DEFAULT_TO_STRING
.shouldLogFn
- a function to filter the logged items. You can usealwaysTrue()
as a pass-through filter when you don't need any filtering.wrapped
- The wrapped meta-supplier.- See Also:
peekOutputP(FunctionEx, PredicateEx, ProcessorMetaSupplier)
,peekSnapshotP(FunctionEx, PredicateEx, ProcessorMetaSupplier)
-
uses the
-
peekInputP
@Nonnull public static <T> ProcessorSupplier peekInputP(@Nonnull FunctionEx<T,? extends java.lang.CharSequence> toStringFn, @Nonnull PredicateEx<T> shouldLogFn, @Nonnull ProcessorSupplier wrapped)
Same aspeekInput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aProcessorSupplier
instead of a meta-supplier.
-
peekInputP
@Nonnull public static <T> SupplierEx<Processor> peekInputP(@Nonnull FunctionEx<T,? extends java.lang.CharSequence> toStringFn, @Nonnull PredicateEx<T> shouldLogFn, @Nonnull SupplierEx<Processor> wrapped)
Same aspeekInput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aSupplierEx
of processors instead of a meta-supplier.
-
peekInputP
@Nonnull public static ProcessorMetaSupplier peekInputP(@Nonnull ProcessorMetaSupplier wrapped)
Convenience forpeekInput(toStringFn, shouldLogFn, metaSupplier)
with a pass-through filter andObject#toString
as the formatting function.
-
peekInputP
@Nonnull public static ProcessorSupplier peekInputP(@Nonnull ProcessorSupplier wrapped)
Convenience forpeekInput(toStringFn, shouldLogFn, metaSupplier)
with a pass-through filter andObject#toString
as the formatting function. This variant accepts aProcessorSupplier
instead of a meta-supplier.
-
peekInputP
@Nonnull public static SupplierEx<Processor> peekInputP(@Nonnull SupplierEx<Processor> wrapped)
Convenience forpeekInput(toStringFn, shouldLogFn, metaSupplier)
with a pass-through filter andObject#toString
as the formatting function. This variant accepts aSupplierEx
of processors instead of a meta-supplier.
-
peekOutputP
@Nonnull public static <T> ProcessorMetaSupplier peekOutputP(@Nonnull FunctionEx<? super T,? extends java.lang.CharSequence> toStringFn, @Nonnull PredicateEx<? super T> shouldLogFn, @Nonnull ProcessorMetaSupplier wrapped)
Returns a meta-supplier that wraps the provided one and adds a logging layer to each processor it creates. For each item the wrapped processor adds to the outbox, the wrapping processor:-
uses the
shouldLogFn
predicate to see whether to log the item -
if the item passed, uses
toStringFn
to get a string representation of the item -
logs the string at the INFO level, the logger is
com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
. The logged text is prefixed with "Output to X: ", where X is the edge ordinal the item is sent to
Technically speaking, snapshot data is emitted to the same outbox as regular data, but this wrapper only logs the regular data. See
peekSnapshot()
.Logging of Watermarks
There are two kinds of watermarks:- Watermarks originated in the processor, prefixed in the logs
with
"Output to N: "
- Watermarks received on input, which are forwarded automatically.
These are prefixed with
"Output forwarded: "
Watermark
objects are not passed toshouldLogFn
ortoStringFn
.- Type Parameters:
T
- output item type- Parameters:
toStringFn
- a function that returns the string representation of the item. You can usePEEK_DEFAULT_TO_STRING
.shouldLogFn
- a function to filter the logged items. You can usealwaysTrue()
as a pass-through filter when you don't need any filtering.wrapped
- The wrapped meta-supplier.- See Also:
peekInputP(FunctionEx, PredicateEx, ProcessorMetaSupplier)
,peekSnapshotP(FunctionEx, PredicateEx, ProcessorMetaSupplier)
-
uses the
-
peekOutputP
@Nonnull public static <T> ProcessorSupplier peekOutputP(@Nonnull FunctionEx<? super T,? extends java.lang.CharSequence> toStringFn, @Nonnull PredicateEx<? super T> shouldLogFn, @Nonnull ProcessorSupplier wrapped)
Same aspeekOutput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aProcessorSupplier
instead of a meta-supplier.
-
peekOutputP
@Nonnull public static <T> SupplierEx<Processor> peekOutputP(@Nonnull FunctionEx<? super T,? extends java.lang.CharSequence> toStringFn, @Nonnull PredicateEx<? super T> shouldLogFn, @Nonnull SupplierEx<Processor> wrapped)
Same aspeekOutput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aSupplierEx
of processors instead of a meta-supplier.
-
peekOutputP
@Nonnull public static ProcessorMetaSupplier peekOutputP(@Nonnull ProcessorMetaSupplier wrapped)
Convenience forpeekOutput(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function.
-
peekOutputP
@Nonnull public static ProcessorSupplier peekOutputP(@Nonnull ProcessorSupplier wrapped)
Convenience forpeekOutput(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function. This variant accepts aProcessorSupplier
instead of a meta-supplier.
-
peekOutputP
@Nonnull public static SupplierEx<Processor> peekOutputP(@Nonnull SupplierEx<Processor> wrapped)
Convenience forpeekOutput(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function. This variant accepts aSupplierEx
of processors instead of a meta-supplier.
-
peekSnapshotP
@Nonnull public static <K,V> ProcessorMetaSupplier peekSnapshotP(@Nonnull FunctionEx<? super java.util.Map.Entry<K,V>,? extends java.lang.CharSequence> toStringFn, @Nonnull PredicateEx<? super java.util.Map.Entry<K,V>> shouldLogFn, @Nonnull ProcessorMetaSupplier wrapped)
Returns a meta-supplier that wraps the provided one and adds a logging layer to each processor it creates. For each item the wrapped processor adds to the snapshot storage, the wrapping processor:-
uses the
shouldLogFn
predicate to see whether to log the item -
if the item passed, uses
toStringFn
to get a string representation of the item -
logs the string at the INFO level, the category being
com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
- Type Parameters:
K
- type of the key emitted to the snapshotV
- type of the value emitted to the snapshot- Parameters:
toStringFn
- a function that returns the string representation of the item. You can useDEFAULT_TO_STRING
shouldLogFn
- a function to filter the logged items. You can usealwaysTrue()
as a pass-through filter when you don't need any filtering.wrapped
- The wrapped meta-supplier.- See Also:
peekInputP(FunctionEx, PredicateEx, ProcessorMetaSupplier)
,peekOutputP(FunctionEx, PredicateEx, ProcessorMetaSupplier)
-
uses the
-
peekSnapshotP
@Nonnull public static <K,V> ProcessorSupplier peekSnapshotP(@Nonnull FunctionEx<? super java.util.Map.Entry<K,V>,? extends java.lang.CharSequence> toStringFn, @Nonnull PredicateEx<? super java.util.Map.Entry<K,V>> shouldLogFn, @Nonnull ProcessorSupplier wrapped)
Same aspeekSnapshot(toStringFn, shouldLogFn, metaSupplier)
, but accepts aProcessorSupplier
instead of a meta-supplier.
-
peekSnapshotP
@Nonnull public static <K,V> SupplierEx<Processor> peekSnapshotP(@Nonnull FunctionEx<? super java.util.Map.Entry<K,V>,? extends java.lang.CharSequence> toStringFn, @Nonnull PredicateEx<? super java.util.Map.Entry<K,V>> shouldLogFn, @Nonnull SupplierEx<Processor> wrapped)
Same aspeekSnapshot(toStringFn, shouldLogFn, metaSupplier)
, but accepts aSupplierEx
of processors instead of a meta-supplier.
-
peekSnapshotP
@Nonnull public static SupplierEx<Processor> peekSnapshotP(@Nonnull SupplierEx<Processor> wrapped)
Convenience forpeekSnapshot(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function. This variant accepts aSupplierEx
of processors instead of a meta-supplier.
-
peekSnapshotP
@Nonnull public static ProcessorMetaSupplier peekSnapshotP(@Nonnull ProcessorMetaSupplier wrapped)
Convenience forpeekSnapshot(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function.
-
peekSnapshotP
@Nonnull public static ProcessorSupplier peekSnapshotP(@Nonnull ProcessorSupplier wrapped)
Convenience forpeekSnapshot(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function. This variant accepts aProcessorSupplier
instead of a meta-supplier.
-
-