Package com.hazelcast.jet.core.processor
Class DiagnosticProcessors
java.lang.Object
com.hazelcast.jet.core.processor.DiagnosticProcessors
Static utility class with factories of sinks and wrappers that log
the data flowing through the DAG. These processors are useful while
diagnosing the execution of Jet jobs. For other kinds of processors
refer to the
package-level
documentation
.- Since:
- Jet 3.0
-
Field Summary
Modifier and TypeFieldDescriptionstatic final FunctionEx<Object,
String> A function that uses `Object.toString()` for non-arrays, `Arrays.toString()` for arrays of primitive types and `Arrays.deepToString()` for `Object[]`. -
Method Summary
Modifier and TypeMethodDescriptionstatic <T> SupplierEx<Processor>
peekInputP
(FunctionEx<T, ? extends CharSequence> toStringFn, PredicateEx<T> shouldLogFn, SupplierEx<Processor> wrapped) Same aspeekInput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aSupplierEx
of processors instead of a meta-supplier.static <T> ProcessorMetaSupplier
peekInputP
(FunctionEx<T, ? extends CharSequence> toStringFn, PredicateEx<T> shouldLogFn, ProcessorMetaSupplier wrapped) Returns a meta-supplier that wraps the provided one and adds a logging layer to each processor it creates.static <T> ProcessorSupplier
peekInputP
(FunctionEx<T, ? extends CharSequence> toStringFn, PredicateEx<T> shouldLogFn, ProcessorSupplier wrapped) Same aspeekInput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aProcessorSupplier
instead of a meta-supplier.static SupplierEx<Processor>
peekInputP
(SupplierEx<Processor> wrapped) Convenience forpeekInput(toStringFn, shouldLogFn, metaSupplier)
with a pass-through filter andObject#toString
as the formatting function.static ProcessorMetaSupplier
peekInputP
(ProcessorMetaSupplier wrapped) Convenience forpeekInput(toStringFn, shouldLogFn, metaSupplier)
with a pass-through filter andObject#toString
as the formatting function.static ProcessorSupplier
peekInputP
(ProcessorSupplier wrapped) Convenience forpeekInput(toStringFn, shouldLogFn, metaSupplier)
with a pass-through filter andObject#toString
as the formatting function.static <T> SupplierEx<Processor>
peekOutputP
(FunctionEx<? super T, ? extends CharSequence> toStringFn, PredicateEx<? super T> shouldLogFn, SupplierEx<Processor> wrapped) Same aspeekOutput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aSupplierEx
of processors instead of a meta-supplier.static <T> ProcessorMetaSupplier
peekOutputP
(FunctionEx<? super T, ? extends CharSequence> toStringFn, PredicateEx<? super T> shouldLogFn, ProcessorMetaSupplier wrapped) Returns a meta-supplier that wraps the provided one and adds a logging layer to each processor it creates.static <T> ProcessorSupplier
peekOutputP
(FunctionEx<? super T, ? extends CharSequence> toStringFn, PredicateEx<? super T> shouldLogFn, ProcessorSupplier wrapped) Same aspeekOutput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aProcessorSupplier
instead of a meta-supplier.static SupplierEx<Processor>
peekOutputP
(SupplierEx<Processor> wrapped) Convenience forpeekOutput(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function.static ProcessorMetaSupplier
peekOutputP
(ProcessorMetaSupplier wrapped) Convenience forpeekOutput(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function.static ProcessorSupplier
peekOutputP
(ProcessorSupplier wrapped) Convenience forpeekOutput(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function.static <K,
V> SupplierEx<Processor> peekSnapshotP
(FunctionEx<? super Map.Entry<K, V>, ? extends CharSequence> toStringFn, PredicateEx<? super Map.Entry<K, V>> shouldLogFn, SupplierEx<Processor> wrapped) Same aspeekSnapshot(toStringFn, shouldLogFn, metaSupplier)
, but accepts aSupplierEx
of processors instead of a meta-supplier.static <K,
V> ProcessorMetaSupplier peekSnapshotP
(FunctionEx<? super Map.Entry<K, V>, ? extends CharSequence> toStringFn, PredicateEx<? super Map.Entry<K, V>> shouldLogFn, ProcessorMetaSupplier wrapped) Returns a meta-supplier that wraps the provided one and adds a logging layer to each processor it creates.static <K,
V> ProcessorSupplier peekSnapshotP
(FunctionEx<? super Map.Entry<K, V>, ? extends CharSequence> toStringFn, PredicateEx<? super Map.Entry<K, V>> shouldLogFn, ProcessorSupplier wrapped) Same aspeekSnapshot(toStringFn, shouldLogFn, metaSupplier)
, but accepts aProcessorSupplier
instead of a meta-supplier.static SupplierEx<Processor>
peekSnapshotP
(SupplierEx<Processor> wrapped) Convenience forpeekSnapshot(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function.static ProcessorMetaSupplier
peekSnapshotP
(ProcessorMetaSupplier wrapped) Convenience forpeekSnapshot(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function.static ProcessorSupplier
peekSnapshotP
(ProcessorSupplier wrapped) Convenience forpeekSnapshot(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function.static ProcessorMetaSupplier
static <T> ProcessorMetaSupplier
writeLoggerP
(FunctionEx<T, ? extends CharSequence> toStringFn) Returns a meta-supplier of processors for a sink vertex that logs all the data items it receives.
-
Field Details
-
PEEK_DEFAULT_TO_STRING
A function that uses `Object.toString()` for non-arrays, `Arrays.toString()` for arrays of primitive types and `Arrays.deepToString()` for `Object[]`. Used for `peek()` function in the DAG and Pipeline API.- Since:
- 5.1
-
-
Method Details
-
writeLoggerP
@Nonnull public static <T> ProcessorMetaSupplier writeLoggerP(@Nonnull FunctionEx<T, ? extends CharSequence> toStringFn) Returns a meta-supplier of processors for a sink vertex that logs all the data items it receives. The log category iscom.hazelcast.jet.impl.connector.WriteLoggerP.<vertexName>#<processorIndex>
and the level is INFO.Watermark
items are always logged, but at FINE level; they are not passed totoStringFn
.The vertex logs each item on whichever cluster member it happens to receive it. Its primary purpose is for development, when running Jet on a local machine.
- Type Parameters:
T
- stream item type- Parameters:
toStringFn
- a function that returns a string representation of a stream item
-
writeLoggerP
-
peekInputP
@Nonnull public static <T> ProcessorMetaSupplier peekInputP(@Nonnull FunctionEx<T, ? extends CharSequence> toStringFn, @Nonnull PredicateEx<T> shouldLogFn, @Nonnull ProcessorMetaSupplier wrapped) Returns a meta-supplier that wraps the provided one and adds a logging layer to each processor it creates. For each item the wrapped processor removes from the inbox, the wrapping processor:-
uses the
shouldLogFn
predicate to see whether to log the item -
if the item passed, uses
toStringFn
to get a string representation of the item -
logs the string at the INFO level, the logger is
com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
. The text is prefixed with "Input from X: ", where X is the edge ordinal the item is received from. Received watermarks are prefixed with just "Input: ".
Note: Watermarks are always logged.
Watermark
objects are not passed toshouldLogFn
andtoStringFn
.- Type Parameters:
T
- input item type- Parameters:
toStringFn
- a function that returns the string representation of the item. You can usePEEK_DEFAULT_TO_STRING
.shouldLogFn
- a function to filter the logged items. You can usealwaysTrue()
as a pass-through filter when you don't need any filtering.wrapped
- The wrapped meta-supplier.- See Also:
-
uses the
-
peekInputP
@Nonnull public static <T> ProcessorSupplier peekInputP(@Nonnull FunctionEx<T, ? extends CharSequence> toStringFn, @Nonnull PredicateEx<T> shouldLogFn, @Nonnull ProcessorSupplier wrapped) Same aspeekInput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aProcessorSupplier
instead of a meta-supplier. -
peekInputP
@Nonnull public static <T> SupplierEx<Processor> peekInputP(@Nonnull FunctionEx<T, ? extends CharSequence> toStringFn, @Nonnull PredicateEx<T> shouldLogFn, @Nonnull SupplierEx<Processor> wrapped) Same aspeekInput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aSupplierEx
of processors instead of a meta-supplier. -
peekInputP
Convenience forpeekInput(toStringFn, shouldLogFn, metaSupplier)
with a pass-through filter andObject#toString
as the formatting function. -
peekInputP
Convenience forpeekInput(toStringFn, shouldLogFn, metaSupplier)
with a pass-through filter andObject#toString
as the formatting function. This variant accepts aProcessorSupplier
instead of a meta-supplier. -
peekInputP
Convenience forpeekInput(toStringFn, shouldLogFn, metaSupplier)
with a pass-through filter andObject#toString
as the formatting function. This variant accepts aSupplierEx
of processors instead of a meta-supplier. -
peekOutputP
@Nonnull public static <T> ProcessorMetaSupplier peekOutputP(@Nonnull FunctionEx<? super T, ? extends CharSequence> toStringFn, @Nonnull PredicateEx<? super T> shouldLogFn, @Nonnull ProcessorMetaSupplier wrapped) Returns a meta-supplier that wraps the provided one and adds a logging layer to each processor it creates. For each item the wrapped processor adds to the outbox, the wrapping processor:-
uses the
shouldLogFn
predicate to see whether to log the item -
if the item passed, uses
toStringFn
to get a string representation of the item -
logs the string at the INFO level, the logger is
com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
. The logged text is prefixed with "Output to X: ", where X is the edge ordinal the item is sent to
Technically speaking, snapshot data is emitted to the same outbox as regular data, but this wrapper only logs the regular data. See
peekSnapshot()
.Logging of Watermarks
There are two kinds of watermarks:- Watermarks originated in the processor, prefixed in the logs
with
"Output to N: "
- Watermarks received on input, which are forwarded automatically.
These are prefixed with
"Output forwarded: "
Watermark
objects are not passed toshouldLogFn
ortoStringFn
.- Type Parameters:
T
- output item type- Parameters:
toStringFn
- a function that returns the string representation of the item. You can usePEEK_DEFAULT_TO_STRING
.shouldLogFn
- a function to filter the logged items. You can usealwaysTrue()
as a pass-through filter when you don't need any filtering.wrapped
- The wrapped meta-supplier.- See Also:
-
uses the
-
peekOutputP
@Nonnull public static <T> ProcessorSupplier peekOutputP(@Nonnull FunctionEx<? super T, ? extends CharSequence> toStringFn, @Nonnull PredicateEx<? super T> shouldLogFn, @Nonnull ProcessorSupplier wrapped) Same aspeekOutput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aProcessorSupplier
instead of a meta-supplier. -
peekOutputP
@Nonnull public static <T> SupplierEx<Processor> peekOutputP(@Nonnull FunctionEx<? super T, ? extends CharSequence> toStringFn, @Nonnull PredicateEx<? super T> shouldLogFn, @Nonnull SupplierEx<Processor> wrapped) Same aspeekOutput(toStringFn, shouldLogFn, metaSupplier)
, but accepts aSupplierEx
of processors instead of a meta-supplier. -
peekOutputP
Convenience forpeekOutput(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function. -
peekOutputP
Convenience forpeekOutput(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function. This variant accepts aProcessorSupplier
instead of a meta-supplier. -
peekOutputP
Convenience forpeekOutput(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function. This variant accepts aSupplierEx
of processors instead of a meta-supplier. -
peekSnapshotP
@Nonnull public static <K,V> ProcessorMetaSupplier peekSnapshotP(@Nonnull FunctionEx<? super Map.Entry<K, V>, ? extends CharSequence> toStringFn, @Nonnull PredicateEx<? super Map.Entry<K, V>> shouldLogFn, @Nonnull ProcessorMetaSupplier wrapped) Returns a meta-supplier that wraps the provided one and adds a logging layer to each processor it creates. For each item the wrapped processor adds to the snapshot storage, the wrapping processor:-
uses the
shouldLogFn
predicate to see whether to log the item -
if the item passed, uses
toStringFn
to get a string representation of the item -
logs the string at the INFO level, the category being
com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
- Type Parameters:
K
- type of the key emitted to the snapshotV
- type of the value emitted to the snapshot- Parameters:
toStringFn
- a function that returns the string representation of the item. You can useDEFAULT_TO_STRING
shouldLogFn
- a function to filter the logged items. You can usealwaysTrue()
as a pass-through filter when you don't need any filtering.wrapped
- The wrapped meta-supplier.- See Also:
-
uses the
-
peekSnapshotP
@Nonnull public static <K,V> ProcessorSupplier peekSnapshotP(@Nonnull FunctionEx<? super Map.Entry<K, V>, ? extends CharSequence> toStringFn, @Nonnull PredicateEx<? super Map.Entry<K, V>> shouldLogFn, @Nonnull ProcessorSupplier wrapped) Same aspeekSnapshot(toStringFn, shouldLogFn, metaSupplier)
, but accepts aProcessorSupplier
instead of a meta-supplier. -
peekSnapshotP
@Nonnull public static <K,V> SupplierEx<Processor> peekSnapshotP(@Nonnull FunctionEx<? super Map.Entry<K, V>, ? extends CharSequence> toStringFn, @Nonnull PredicateEx<? super Map.Entry<K, V>> shouldLogFn, @Nonnull SupplierEx<Processor> wrapped) Same aspeekSnapshot(toStringFn, shouldLogFn, metaSupplier)
, but accepts aSupplierEx
of processors instead of a meta-supplier. -
peekSnapshotP
Convenience forpeekSnapshot(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function. This variant accepts aSupplierEx
of processors instead of a meta-supplier. -
peekSnapshotP
Convenience forpeekSnapshot(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function. -
peekSnapshotP
Convenience forpeekSnapshot(toStringFn, shouldLogFn, metaSupplier
with a pass-through filter andObject#toString
as the formatting function. This variant accepts aProcessorSupplier
instead of a meta-supplier.
-