Class Processors
Many of the processors deal with an aggregating operation over stream items. Prior to aggregation items may be grouped by an arbitrary key and/or an event timestamp-based window. There are two main aggregation setups: single-stage and two-stage.
Unless specified otherwise, all functions passed to member methods must be stateless.
Single-stage aggregation
This is the basic setup where all the aggregation steps happen in one vertex. The input must be properly partitioned and distributed. For non-aligned window aggregation (e.g., session-based, trigger-based, etc.) this is the only choice. In the case of aligned windows it is the best choice if the source is already partitioned by the grouping key because the inbound edge will not have to be distributed. If the input stream needs repartitioning, this setup will incur heavier network traffic than the two-stage setup due to the need for a distributed-partitioned edge. On the other hand, it will use less memory because each member keeps track only of the keys belonging to its own partitions. This is the DAG outline for the case where upstream data is not localized by grouping key:----------------- | upstream vertex | ----------------- | | partitioned-distributed V ----------- | aggregate | -----------
Two-stage aggregation
In two-stage aggregation, the first stage applies just theaccumulate
aggregation
primitive and the second stage does combine
and finish
. The essential property
of this setup is that the edge leading to the first stage is local,
incurring no network traffic, and only the edge from the first to the
second stage is distributed. There is only one item per group traveling on
the distributed edge. Compared to the single-stage setup this can
dramatically reduce network traffic, but it needs more memory to keep
track of all keys on each cluster member. This is the outline of the DAG:
----------------- | upstream vertex | ----------------- | | partitioned-local V ------------ | accumulate | ------------ | | partitioned-distributed V ---------------- | combine/finish | ----------------The variants without a grouping key are equivalent to grouping by a single, global key. In that case the edge towards the final-stage vertex must be all-to-one and the local parallelism of the vertex must be one. Unless the volume of the aggregated data is small (e.g., some side branch off the main flow in the DAG), the best choice is this two-stage setup:
----------------- | upstream vertex | ----------------- | | local, non-partitioned V ------------ | accumulate | ------------ | | distributed, all-to-one V ---------------- | combine/finish | localParallelism = 1 ----------------This will parallelize and distributed most of the processing and the second-stage processor will receive just a single item from each upstream processor, doing very little work.
Overview of factory methods for aggregate operations
Tumbling window is a special case of sliding window with sliding step = window size.
- Since:
- Jet 3.0
-
Method Summary
Modifier and TypeMethodDescriptionstatic <K,
A> SupplierEx<Processor> accumulateByFrameP
(List<FunctionEx<?, ? extends K>> keyFns, List<ToLongFunctionEx<?>> timestampFns, TimestampKind timestampKind, SlidingWindowPolicy winPolicy, AggregateOperation<A, ?> aggrOp) Returns a supplier of processors for the first-stage vertex in a two-stage sliding window aggregation setup (see theclass Javadoc
for an explanation of aggregation stages).static <K,
A> SupplierEx<Processor> accumulateByFrameP
(List<FunctionEx<?, ? extends K>> keyFns, List<ToLongFunctionEx<?>> timestampFns, TimestampKind timestampKind, SlidingWindowPolicy winPolicy, AggregateOperation<A, ?> aggrOp, byte watermarkKey) Returns a supplier of processors for the first-stage vertex in a two-stage sliding window aggregation setup (see theclass Javadoc
for an explanation of aggregation stages).static <K,
A> SupplierEx<Processor> accumulateByKeyP
(List<FunctionEx<?, ? extends K>> getKeyFns, AggregateOperation<A, ?> aggrOp) Returns a supplier of processors for the first-stage vertex in a two-stage group-and-aggregate setup.static <A,
R> SupplierEx<Processor> accumulateP
(AggregateOperation<A, R> aggrOp) Returns a supplier of processors for a vertex that performs the accumulation step of the provided aggregate operation on all the items it receives.static <K,
A, R, OUT>
SupplierEx<Processor>aggregateByKeyP
(List<FunctionEx<?, ? extends K>> keyFns, AggregateOperation<A, R> aggrOp, BiFunctionEx<? super K, ? super R, OUT> mapToOutputFn) Returns a supplier of processors for a vertex that groups items by key and performs the provided aggregate operation on each group.static <A,
R> SupplierEx<Processor> aggregateP
(AggregateOperation<A, R> aggrOp) Returns a supplier of processors for a vertex that performs the provided aggregate operation on all the items it receives.static <K,
A, R, OUT>
SupplierEx<Processor>aggregateToSessionWindowP
(long sessionTimeout, long earlyResultsPeriod, List<ToLongFunctionEx<?>> timestampFns, List<FunctionEx<?, ? extends K>> keyFns, AggregateOperation<A, ? extends R> aggrOp, KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) Returns a supplier of processors for a vertex that aggregates events into session windows.static <K,
A, R, OUT>
SupplierEx<Processor>aggregateToSlidingWindowP
(List<FunctionEx<?, ? extends K>> keyFns, List<ToLongFunctionEx<?>> timestampFns, TimestampKind timestampKind, SlidingWindowPolicy winPolicy, long earlyResultsPeriod, AggregateOperation<A, ? extends R> aggrOp, KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) Returns a supplier of processors for a vertex that aggregates events into a sliding window in a single stage (see theclass Javadoc
for an explanation of aggregation stages).static <K,
A, R, OUT>
SupplierEx<Processor>aggregateToSlidingWindowP
(List<FunctionEx<?, ? extends K>> keyFns, List<ToLongFunctionEx<?>> timestampFns, TimestampKind timestampKind, SlidingWindowPolicy winPolicy, long earlyResultsPeriod, AggregateOperation<A, ? extends R> aggrOp, KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn, byte windowWatermarkKey) Returns a supplier of processors for a vertex that aggregates events into a sliding window in a single stage (see theclass Javadoc
for an explanation of aggregation stages).static <K,
A, R, OUT>
SupplierEx<Processor>combineByKeyP
(AggregateOperation<A, R> aggrOp, BiFunctionEx<? super K, ? super R, OUT> mapToOutputFn) Returns a supplier of processors for the second-stage vertex in a two-stage group-and-aggregate setup.static <A,
R> SupplierEx<Processor> combineP
(AggregateOperation<A, R> aggrOp) Returns a supplier of processors for a vertex that performs the combining and finishing steps of the provided aggregate operation.static <K,
A, R, OUT>
SupplierEx<Processor>combineToSlidingWindowP
(SlidingWindowPolicy winPolicy, AggregateOperation<A, ? extends R> aggrOp, KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) Returns a supplier of processors for the second-stage vertex in a two-stage sliding window aggregation setup (see theclass Javadoc
for an explanation of aggregation stages).static <K,
A, R, OUT>
SupplierEx<Processor>combineToSlidingWindowP
(SlidingWindowPolicy winPolicy, AggregateOperation<A, ? extends R> aggrOp, KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn, byte windowWatermarkKey) Returns a supplier of processors for the second-stage vertex in a two-stage sliding window aggregation setup (see theclass Javadoc
for an explanation of aggregation stages) with specifiedwindowWatermarkKey
.static <T> SupplierEx<Processor>
filterP
(PredicateEx<? super T> filterFn) Returns a supplier of processors for a vertex that emits the same items it receives, but only those that pass the given predicate.static <C,
S, T> ProcessorSupplier filterUsingServiceP
(ServiceFactory<C, S> serviceFactory, BiPredicateEx<? super S, ? super T> filterFn) Returns a supplier of processors for a vertex that emits the same items it receives, but only those that pass the given predicate.static <T,
R> SupplierEx<Processor> flatMapP
(FunctionEx<? super T, ? extends Traverser<? extends R>> flatMapFn) Returns a supplier of processors for a vertex that applies the provided item-to-traverser mapping function to each received item and emits all the items from the resulting traverser.static <T,
K, S, R> SupplierEx<Processor> flatMapStatefulP
(long ttl, FunctionEx<? super T, ? extends K> keyFn, ToLongFunctionEx<? super T> timestampFn, Supplier<? extends S> createFn, TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> statefulFlatMapFn, TriFunction<? super S, ? super K, ? super Long, ? extends Traverser<R>> onEvictFn) Returns a supplier of processors for a vertex that performs a stateful flat-mapping of its input.static <C,
S, T, R> ProcessorSupplier flatMapUsingServiceP
(ServiceFactory<C, S> serviceFactory, BiFunctionEx<? super S, ? super T, ? extends Traverser<R>> flatMapFn) Returns a supplier of processors for a vertex that applies the provided item-to-traverser mapping function to each received item and emits all the items from the resulting traverser.static <T> SupplierEx<Processor>
insertWatermarksP
(FunctionEx<ProcessorSupplier.Context, EventTimePolicy<? super T>> eventTimePolicyProvider) Returns a supplier of processors for a vertex that insertswatermark items
into the stream.static <T> SupplierEx<Processor>
insertWatermarksP
(EventTimePolicy<? super T> eventTimePolicy) Returns a supplier of processors for a vertex that insertswatermark items
into the stream.static <T,
R> SupplierEx<Processor> mapP
(FunctionEx<? super T, ? extends R> mapFn) Returns a supplier of processors for a vertex which, for each received item, emits the result of applying the given mapping function to it.static <T,
K, S, R> SupplierEx<Processor> mapStatefulP
(long ttl, FunctionEx<? super T, ? extends K> keyFn, ToLongFunctionEx<? super T> timestampFn, Supplier<? extends S> createFn, TriFunction<? super S, ? super K, ? super T, ? extends R> statefulMapFn, TriFunction<? super S, ? super K, ? super Long, ? extends R> onEvictFn) Returns a supplier of processors for a vertex that performs a stateful mapping of its input.static <C,
S, T, K, R>
ProcessorSuppliermapUsingServiceAsyncP
(ServiceFactory<C, S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, FunctionEx<T, K> extractKeyFn, BiFunctionEx<? super S, ? super T, CompletableFuture<R>> mapAsyncFn) Asynchronous version ofmapUsingServiceP(com.hazelcast.jet.pipeline.ServiceFactory<C, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>)
: themapAsyncFn
returns aCompletableFuture<R>
instead of justR
.static <C,
S, T, R> ProcessorSupplier mapUsingServiceP
(ServiceFactory<C, S> serviceFactory, BiFunctionEx<? super S, ? super T, ? extends R> mapFn) Returns a supplier of processors for a vertex which, for each received item, emits the result of applying the given mapping function to it.static SupplierEx<Processor>
noopP()
Returns a supplier of a processor that swallows all its normal input (if any), does nothing with it, forwards the watermarks, produces no output and completes immediately.static <T> SupplierEx<Processor>
sortP
(Comparator<T> comparator) Returns a supplier of processors for a vertex that sorts its input using aPriorityQueue
and emits it in thecomplete
phase.
-
Method Details
-
aggregateP
@Nonnull public static <A,R> SupplierEx<Processor> aggregateP(@Nonnull AggregateOperation<A, R> aggrOp) Returns a supplier of processors for a vertex that performs the provided aggregate operation on all the items it receives. After exhausting all its input, it emits a single item of typeR
— the result of the aggregate operation'sfinish
primitive. The primitive may returnnull
, in that case the vertex will not produce any output.Since the input to this vertex must be bounded, its primary use case are batch jobs.
This processor has state, but does not save it to the snapshot. On job restart, the state will be lost.
- Type Parameters:
A
- type of accumulator returned fromaggrOp.createAccumulatorFn()
R
- type of the finished result returned fromaggrOp.finishAccumulationFn()
- Parameters:
aggrOp
- the aggregate operation to perform
-
accumulateP
@Nonnull public static <A,R> SupplierEx<Processor> accumulateP(@Nonnull AggregateOperation<A, R> aggrOp) Returns a supplier of processors for a vertex that performs the accumulation step of the provided aggregate operation on all the items it receives. After exhausting all its input, it emits a single item of typeA
— the accumulator object.Since the input to this vertex must be bounded, its primary use case are batch jobs.
This processor has state, but does not save it to the snapshot. On job restart, the state will be lost.
- Type Parameters:
A
- type of accumulator returned fromaggrOp.createAccumulatorFn()
R
- type of the finished result returned fromaggrOp. finishAccumulationFn()
- Parameters:
aggrOp
- the aggregate operation to perform
-
combineP
@Nonnull public static <A,R> SupplierEx<Processor> combineP(@Nonnull AggregateOperation<A, R> aggrOp) Returns a supplier of processors for a vertex that performs the combining and finishing steps of the provided aggregate operation. It expects to receive the accumulator objects from the upstreamaccumulateP(com.hazelcast.jet.aggregate.AggregateOperation<A, R>)
vertex and combines their state into a single accumulator. After exhausting all its input, it emits a single result of typeR
— the result of applying thefinish
primitive to the combined accumulator. The primitive may returnnull
, in that case the vertex will not produce any output.Since the input to this vertex must be bounded, its primary use case is batch jobs.
This processor has state, but does not save it to the snapshot. On job restart, the state will be lost.
- Type Parameters:
A
- type of accumulator returned fromaggrOp.createAccumulatorFn()
R
- type of the finished result returned fromaggrOp. finishAccumulationFn()
- Parameters:
aggrOp
- the aggregate operation to perform
-
aggregateByKeyP
@Nonnull public static <K,A, SupplierEx<Processor> aggregateByKeyPR, OUT> (@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull AggregateOperation<A, R> aggrOp, @Nonnull BiFunctionEx<? super K, ? super R, OUT> mapToOutputFn) Returns a supplier of processors for a vertex that groups items by key and performs the provided aggregate operation on each group. After exhausting all its input it emits one item per distinct key. It computes the item to emit by passing each (key, result) pair tomapToOutputFn
.The vertex accepts input from one or more inbound edges. The type of items may be different on each edge. For each edge a separate key extracting function must be supplied and the aggregate operation must contain a separate accumulation function for each edge.
This processor has state, but does not save it to snapshot. On job restart, the state will be lost.
- Type Parameters:
K
- type of keyA
- type of accumulator returned fromaggrOp.createAccumulatorFn()
R
- type of the result returned fromaggrOp.finishAccumulationFn()
OUT
- type of the item to emit- Parameters:
keyFns
- functions that compute the grouping keyaggrOp
- the aggregate operationmapToOutputFn
- function that takes the key and the aggregation result and returns the output item
-
accumulateByKeyP
@Nonnull public static <K,A> SupplierEx<Processor> accumulateByKeyP(@Nonnull List<FunctionEx<?, ? extends K>> getKeyFns, @Nonnull AggregateOperation<A, ?> aggrOp) Returns a supplier of processors for the first-stage vertex in a two-stage group-and-aggregate setup. The vertex groups items by the grouping key and applies theaccumulate
primitive to each group. After exhausting all its input it emits oneMap.Entry<K, A>
per distinct key.The vertex accepts input from one or more inbound edges. The type of items may be different on each edge. For each edge a separate key extracting function must be supplied and the aggregate operation must contain a separate accumulation function for each edge.
This processor has state, but does not save it to snapshot. On job restart, the state will be lost.
- Type Parameters:
K
- type of keyA
- type of accumulator returned fromaggrOp.createAccumulatorFn()
- Parameters:
getKeyFns
- functions that compute the grouping keyaggrOp
- the aggregate operation to perform
-
combineByKeyP
@Nonnull public static <K,A, SupplierEx<Processor> combineByKeyPR, OUT> (@Nonnull AggregateOperation<A, R> aggrOp, @Nonnull BiFunctionEx<? super K, ? super R, OUT> mapToOutputFn) Returns a supplier of processors for the second-stage vertex in a two-stage group-and-aggregate setup. Each processor applies thecombine
aggregation primitive to the entries received from several upstream instances ofaccumulateByKeyP(java.util.List<com.hazelcast.function.FunctionEx<?, ? extends K>>, com.hazelcast.jet.aggregate.AggregateOperation<A, ?>)
. After exhausting all its input it emits one item per distinct key. It computes the item to emit by passing each (key, result) pair tomapToOutputFn
.Since the input to this vertex must be bounded, its primary use case are batch jobs.
This processor has state, but does not save it to snapshot. On job restart, the state will be lost.
- Type Parameters:
A
- type of accumulator returned fromaggrOp.createAccumulatorFn()
R
- type of the finished result returned fromaggrOp.finishAccumulationFn()
OUT
- type of the item to emit- Parameters:
aggrOp
- the aggregate operation to performmapToOutputFn
- function that takes the key and the aggregation result and returns the output item
-
aggregateToSlidingWindowP
@Nonnull public static <K,A, SupplierEx<Processor> aggregateToSlidingWindowPR, OUT> (@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, long earlyResultsPeriod, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) Returns a supplier of processors for a vertex that aggregates events into a sliding window in a single stage (see theclass Javadoc
for an explanation of aggregation stages). The vertex groups items by the grouping key (as obtained from the given key-extracting function) and by frame, which is a range of timestamps equal to the sliding step. It emits sliding window results labeled with the timestamp denoting the window's end time (the exclusive upper bound of the timestamps belonging to the window).The vertex accepts input from one or more inbound edges. The type of items may be different on each edge. For each edge a separate key extracting function must be supplied and the aggregate operation must contain a separate accumulation function for each edge.
When the vertex receives a watermark with a given
wmVal
, it emits the result of aggregation for all the positions of the sliding window withwindowTimestamp <= wmVal
. It computes the window result by combining the partial results of the frames belonging to it and finally applying thefinish
aggregation primitive. After this it deletes from storage all the frames that trail behind the emitted windows. In the output there is one item per key per window position.Behavior on job restart
This processor saves its state to snapshot. After restart, it can continue accumulating where it left off.After a restart in at-least-once mode, watermarks are allowed to go back in time. If such a watermark is received, some windows that were emitted in previous execution will be re-emitted. These windows might miss events as some of them had already been evicted before the snapshot was done in previous execution.
-
aggregateToSlidingWindowP
@Nonnull public static <K,A, SupplierEx<Processor> aggregateToSlidingWindowPR, OUT> (@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, long earlyResultsPeriod, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn, byte windowWatermarkKey) Returns a supplier of processors for a vertex that aggregates events into a sliding window in a single stage (see theclass Javadoc
for an explanation of aggregation stages). The vertex groups items by the grouping key (as obtained from the given key-extracting function) and by frame, which is a range of timestamps equal to the sliding step. It emits sliding window results labeled with the timestamp denoting the window's end time (the exclusive upper bound of the timestamps belonging to the window).The vertex accepts input from one or more inbound edges. The type of items may be different on each edge. For each edge a separate key extracting function must be supplied and the aggregate operation must contain a separate accumulation function for each edge.
When the vertex receives a watermark with a given
wmVal
, it emits the result of aggregation for all the positions of the sliding window withwindowTimestamp <= wmVal
. It computes the window result by combining the partial results of the frames belonging to it and finally applying thefinish
aggregation primitive. After this it deletes from storage all the frames that trail behind the emitted windows. In the output there is one item per key per window position.Behavior on job restart
This processor saves its state to snapshot. After restart, it can continue accumulating where it left off.After a restart in at-least-once mode, watermarks are allowed to go back in time. If such a watermark is received, some windows that were emitted in previous execution will be re-emitted. These windows might miss events as some of them had already been evicted before the snapshot was done in previous execution.
-
accumulateByFrameP
@Nonnull public static <K,A> SupplierEx<Processor> accumulateByFrameP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, ?> aggrOp) Returns a supplier of processors for the first-stage vertex in a two-stage sliding window aggregation setup (see theclass Javadoc
for an explanation of aggregation stages). The vertex groups items by the grouping key (as obtained from the given key-extracting function) and by frame, which is a range of timestamps equal to the sliding step. It applies theaccumulate
aggregation primitive to each key-frame group.The frame is identified by the timestamp denoting its end time (equal to the exclusive upper bound of its timestamp range).
SlidingWindowPolicy.higherFrameTs(long)
maps the event timestamp to the timestamp of the frame it belongs to.The vertex accepts input from one or more inbound edges. The type of items may be different on each edge. For each edge a separate key extracting function must be supplied and the aggregate operation must contain a separate accumulation function for each edge.
When the processor receives a watermark with a given
wmVal
, it emits the current accumulated state of all frames withtimestamp <= wmVal
and deletes these frames from its storage. In the output there is one item per key per frame.When a state snapshot is requested, the state is flushed to second-stage processor and nothing is saved to snapshot.
- Type Parameters:
K
- type of the grouping keyA
- type of accumulator returned fromaggrOp. createAccumulatorFn()
-
accumulateByFrameP
@Nonnull public static <K,A> SupplierEx<Processor> accumulateByFrameP(@Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, ?> aggrOp, byte watermarkKey) Returns a supplier of processors for the first-stage vertex in a two-stage sliding window aggregation setup (see theclass Javadoc
for an explanation of aggregation stages). The vertex groups items by the grouping key (as obtained from the given key-extracting function) and by frame, which is a range of timestamps equal to the sliding step. It applies theaccumulate
aggregation primitive to each key-frame group.The frame is identified by the timestamp denoting its end time (equal to the exclusive upper bound of its timestamp range).
SlidingWindowPolicy.higherFrameTs(long)
maps the event timestamp to the timestamp of the frame it belongs to.The vertex accepts input from one or more inbound edges. The type of items may be different on each edge. For each edge a separate key extracting function must be supplied and the aggregate operation must contain a separate accumulation function for each edge.
When the processor receives a keyed watermark with a given
wmVal
, it emits the current accumulated state of all frames withtimestamp <= wmVal
and deletes these frames from its storage. In the output there is one item per key per frame.When a state snapshot is requested, the state is flushed to second-stage processor and nothing is saved to snapshot.
- Type Parameters:
K
- type of the grouping keyA
- type of accumulator returned fromaggrOp. createAccumulatorFn()
-
combineToSlidingWindowP
@Nonnull public static <K,A, SupplierEx<Processor> combineToSlidingWindowPR, OUT> (@Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) Returns a supplier of processors for the second-stage vertex in a two-stage sliding window aggregation setup (see theclass Javadoc
for an explanation of aggregation stages). Each processor applies thecombine
aggregation primitive to the frames received from several upstream instances ofaccumulateByFrame()
.When the processor receives a watermark with a given
wmVal
, it emits the result of aggregation for all positions of the sliding window withwindowTimestamp <= wmVal
. It computes the window result by combining the partial results of the frames belonging to it and finally applying thefinish
aggregation primitive. After this it deletes from storage all the frames that trail behind the emitted windows. To compute the item to emit, it callsmapToOutputFn
with the window's start and end timestamps, the key and the aggregation result. The window end time is the exclusive upper bound of the timestamps belonging to the window.Behavior on job restart
This processor saves its state to snapshot. After restart, it can continue accumulating where it left off.After a restart in at-least-once mode, watermarks are allowed to go back in time. If such a watermark is received, some windows that were emitted in previous execution will be re-emitted. These windows might miss events as some of them had already been evicted before the snapshot was done in previous execution.
- Type Parameters:
A
- type of the accumulatorR
- type of the finished result returned fromaggrOp. finishAccumulationFn()
OUT
- type of the item to emit
-
combineToSlidingWindowP
@Nonnull public static <K,A, SupplierEx<Processor> combineToSlidingWindowPR, OUT> (@Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn, byte windowWatermarkKey) Returns a supplier of processors for the second-stage vertex in a two-stage sliding window aggregation setup (see theclass Javadoc
for an explanation of aggregation stages) with specifiedwindowWatermarkKey
.Each processor applies the
combine
aggregation primitive to the frames received from several upstream instances ofaccumulateByFrame()
.When the processor receives a watermark with a given
wmVal
, it emits the result of aggregation for all positions of the sliding window withwindowTimestamp <= wmVal
. It computes the window result by combining the partial results of the frames belonging to it and finally applying thefinish
aggregation primitive. After this it deletes from storage all the frames that trail behind the emitted windows. To compute the item to emit, it callsmapToOutputFn
with the window's start and end timestamps, the key and the aggregation result. The window end time is the exclusive upper bound of the timestamps belonging to the window.Behavior on job restart
This processor saves its state to snapshot. After restart, it can continue accumulating where it left off.After a restart in at-least-once mode, watermarks are allowed to go back in time. If such a watermark is received, some windows that were emitted in previous execution will be re-emitted. These windows might miss events as some of them had already been evicted before the snapshot was done in previous execution.
- Type Parameters:
A
- type of the accumulatorR
- type of the finished result returned fromaggrOp. finishAccumulationFn()
OUT
- type of the item to emit
-
aggregateToSessionWindowP
@Nonnull public static <K,A, SupplierEx<Processor> aggregateToSessionWindowPR, OUT> (long sessionTimeout, long earlyResultsPeriod, @Nonnull List<ToLongFunctionEx<?>> timestampFns, @Nonnull List<FunctionEx<?, ? extends K>> keyFns, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn) Returns a supplier of processors for a vertex that aggregates events into session windows. Events and windows under different grouping keys are treated independently. Outputs objects of typeWindowResult
.The vertex accepts input from one or more inbound edges. The type of items may be different on each edge. For each edge a separate key extracting function must be supplied and the aggregate operation must contain a separate accumulation function for each edge.
The functioning of this vertex is easiest to explain in terms of the event interval: the range
[timestamp, timestamp + sessionTimeout)
. Initially an event causes a new session window to be created, covering exactly the event interval. A following event under the same key belongs to this window iff its interval overlaps it. The window is extended to cover the entire interval of the new event. The event may happen to belong to two existing windows if its interval bridges the gap between them; in that case they are combined into one.Behavior on job restart
This processor saves its state to snapshot. After restart, it can continue accumulating where it left off.After a restart in at-least-once mode, watermarks are allowed to go back in time. The processor evicts state based on watermarks it received. If it receives duplicate watermark, it might emit sessions with missing events, because they were already evicted. The sessions before and after snapshot might overlap, which they normally don't.
- Type Parameters:
K
- type of the item's grouping keyA
- type of the container of the accumulated valueR
- type of the session window's result value- Parameters:
sessionTimeout
- maximum gap between consecutive events in the same session windowtimestampFns
- functions to extract the timestamp from the itemkeyFns
- functions to extract the grouping key from the itemaggrOp
- the aggregate operation
-
insertWatermarksP
@Nonnull public static <T> SupplierEx<Processor> insertWatermarksP(@Nonnull EventTimePolicy<? super T> eventTimePolicy) Returns a supplier of processors for a vertex that insertswatermark items
into the stream. The value of the watermark is determined by the suppliedEventTimePolicy
instance.This processor also drops late items. It never allows an event which is late with regard to already emitted watermark to pass.
The processor saves value of the last emitted watermark to snapshot. Different instances of this processor can be at different watermark at snapshot time. After restart all instances will start at watermark of the most-behind instance before the restart.
This might sound as it could break the monotonicity requirement, but thanks to watermark coalescing, watermarks are only delivered for downstream processing after they have been received from all upstream processors. Another side effect of this is, that a late event, which was dropped before restart, is not considered late after restart.
- Type Parameters:
T
- the type of the stream item
-
insertWatermarksP
@Nonnull public static <T> SupplierEx<Processor> insertWatermarksP(@Nonnull FunctionEx<ProcessorSupplier.Context, EventTimePolicy<? super T>> eventTimePolicyProvider) Returns a supplier of processors for a vertex that insertswatermark items
into the stream. The value of the watermark is determined by the suppliedEventTimePolicy
instance.This processor also drops late items. It never allows an event which is late with regard to already emitted watermark to pass.
The processor saves value of the last emitted watermark to snapshot. Different instances of this processor can be at different watermark at snapshot time. After restart all instances will start at watermark of the most-behind instance before the restart.
This might sound as it could break the monotonicity requirement, but thanks to watermark coalescing, watermarks are only delivered for downstream processing after they have been received from all upstream processors. Another side effect of this is, that a late event, which was dropped before restart, is not considered late after restart.
- Type Parameters:
T
- the type of the stream item
-
mapP
@Nonnull public static <T,R> SupplierEx<Processor> mapP(@Nonnull FunctionEx<? super T, ? extends R> mapFn) Returns a supplier of processors for a vertex which, for each received item, emits the result of applying the given mapping function to it. If the result isnull
, it emits nothing. Therefore this vertex can be used to implement filtering semantics as well.This processor is stateless.
- Type Parameters:
T
- type of received itemR
- type of emitted item- Parameters:
mapFn
- a stateless mapping function
-
filterP
Returns a supplier of processors for a vertex that emits the same items it receives, but only those that pass the given predicate.This processor is stateless.
- Type Parameters:
T
- type of received item- Parameters:
filterFn
- a stateless predicate to test each received item against
-
flatMapP
@Nonnull public static <T,R> SupplierEx<Processor> flatMapP(@Nonnull FunctionEx<? super T, ? extends Traverser<? extends R>> flatMapFn) Returns a supplier of processors for a vertex that applies the provided item-to-traverser mapping function to each received item and emits all the items from the resulting traverser. The traverser must be null-terminated.This processor is stateless.
- Type Parameters:
T
- received item typeR
- emitted item type- Parameters:
flatMapFn
- a stateless function that maps the received item to a traverser over output items. It must not return null traverser, but can return an empty traverser.
-
mapStatefulP
@Nonnull public static <T,K, SupplierEx<Processor> mapStatefulPS, R> (long ttl, @Nonnull FunctionEx<? super T, ? extends K> keyFn, @Nonnull ToLongFunctionEx<? super T> timestampFn, @Nonnull Supplier<? extends S> createFn, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends R> statefulMapFn, @Nullable TriFunction<? super S, ? super K, ? super Long, ? extends R> onEvictFn) Returns a supplier of processors for a vertex that performs a stateful mapping of its input.createFn
returns the object that holds the state. The processor passes this object along with each input item tomapFn
, which can update the object's state. For each grouping key there's a separate state object. The state object will be included in the state snapshot, so it survives job restarts. For this reason the object must be serializable. If the mapping function maps an item tonull
, it will have the effect of filtering out that item.If the given
ttl
is greater than zero, the processor will consider the state object stale if its time-to-live has expired. The time-to-live refers to the event time as kept by the watermark: each time it processes an event, the processor compares the state object's timestamp with the current watermark. If it is less thanwm - ttl
, it discards the state object. Otherwise it updates the timestamp with the current watermark.- Type Parameters:
T
- type of the input itemK
- type of the keyS
- type of the state objectR
- type of the mapping function's result- Parameters:
ttl
- state object's time to livekeyFn
- function to extract the key from an input itemcreateFn
- supplier of the state objectstatefulMapFn
- the stateful mapping function
-
flatMapStatefulP
@Nonnull public static <T,K, SupplierEx<Processor> flatMapStatefulPS, R> (long ttl, @Nonnull FunctionEx<? super T, ? extends K> keyFn, @Nonnull ToLongFunctionEx<? super T> timestampFn, @Nonnull Supplier<? extends S> createFn, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> statefulFlatMapFn, @Nullable TriFunction<? super S, ? super K, ? super Long, ? extends Traverser<R>> onEvictFn) Returns a supplier of processors for a vertex that performs a stateful flat-mapping of its input.createFn
returns the object that holds the state. The processor passes this object along with each input item tomapFn
, which can update the object's state. For each grouping key there's a separate state object. The state object will be included in the state snapshot, so it survives job restarts. For this reason the object must be serializable.If the given
ttl
is greater than zero, the processor will consider the state object stale if its time-to-live has expired. The time-to-live refers to the event time as kept by the watermark: each time it processes an event, the processor compares the state object's timestamp with the current watermark. If it is less thanwm - ttl
, it discards the state object. Otherwise it updates the timestamp with the current watermark.- Type Parameters:
T
- type of the input itemK
- type of the keyS
- type of the state objectR
- type of the mapping function's result- Parameters:
ttl
- state object's time to livekeyFn
- function to extract the key from an input itemcreateFn
- supplier of the state objectstatefulFlatMapFn
- the stateful mapping function
-
mapUsingServiceP
@Nonnull public static <C,S, ProcessorSupplier mapUsingServicePT, R> (@Nonnull ServiceFactory<C, S> serviceFactory, @Nonnull BiFunctionEx<? super S, ? super T, ? extends R> mapFn) Returns a supplier of processors for a vertex which, for each received item, emits the result of applying the given mapping function to it. The mapping function receives another parameter, the service object which Jet will create using the suppliedserviceFactory
.If the mapping result is
null
, the vertex emits nothing. Therefore it can be used to implement filtering semantics as well.Unlike
mapStatefulP(long, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.ToLongFunctionEx<? super T>, java.util.function.Supplier<? extends S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super java.lang.Long, ? extends R>)
(with the "Keyed
" part), this method creates one service object per processor.While it's allowed to store some local state in the service object, it won't be saved to the snapshot and will misbehave in a fault-tolerant stream processing job.
- Type Parameters:
C
- type of context objectS
- type of service objectT
- type of received itemR
- type of emitted item- Parameters:
serviceFactory
- the service factorymapFn
- a stateless mapping function
-
mapUsingServiceAsyncP
@Nonnull public static <C,S, ProcessorSupplier mapUsingServiceAsyncPT, K, R> (@Nonnull ServiceFactory<C, S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, @Nonnull FunctionEx<T, K> extractKeyFn, @Nonnull BiFunctionEx<? super S, ? super T, CompletableFuture<R>> mapAsyncFn) Asynchronous version ofmapUsingServiceP(com.hazelcast.jet.pipeline.ServiceFactory<C, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>)
: themapAsyncFn
returns aCompletableFuture<R>
instead of justR
.The function can return a null future and the future can return a null result: in both cases it will act just like a filter.
The
extractKeyFn
is used to extract keys under which to save in-flight items to the snapshot. If the input to this processor is over a partitioned edge, you should use the same key. If it's a round-robin edge, you can use any key, for exampleObject::hashCode
.- Type Parameters:
C
- type of context objectS
- type of service objectT
- type of received itemK
- type of keyR
- type of result item- Parameters:
serviceFactory
- the service factorymaxConcurrentOps
- maximum number of concurrent async operations per processorpreserveOrder
- whether the async responses are ordered or notextractKeyFn
- a function to extract snapshot keys. Used only if preserveOrder==falsemapAsyncFn
- a stateless mapping function
-
filterUsingServiceP
@Nonnull public static <C,S, ProcessorSupplier filterUsingServicePT> (@Nonnull ServiceFactory<C, S> serviceFactory, @Nonnull BiPredicateEx<? super S, ? super T> filterFn) Returns a supplier of processors for a vertex that emits the same items it receives, but only those that pass the given predicate. The predicate function receives another parameter, the service object which Jet will create using the suppliedserviceFactory
.While it's allowed to store some local state in the service object, it won't be saved to the snapshot and will misbehave in a fault-tolerant stream processing job.
- Type Parameters:
C
- type of context objectS
- type of service objectT
- type of received item- Parameters:
serviceFactory
- the service factoryfilterFn
- a stateless predicate to test each received item against
-
flatMapUsingServiceP
@Nonnull public static <C,S, ProcessorSupplier flatMapUsingServicePT, R> (@Nonnull ServiceFactory<C, S> serviceFactory, @Nonnull BiFunctionEx<? super S, ? super T, ? extends Traverser<R>> flatMapFn) Returns a supplier of processors for a vertex that applies the provided item-to-traverser mapping function to each received item and emits all the items from the resulting traverser. The traverser must be null-terminated. The mapping function receives another parameter, the service object which Jet will create using the suppliedserviceFactory
.While it's allowed to store some local state in the service object, it won't be saved to the snapshot and will misbehave in a fault-tolerant stream processing job.
- Type Parameters:
C
- type of context objectS
- type of service objectT
- type of input itemR
- type of result item- Parameters:
serviceFactory
- the service factoryflatMapFn
- a stateless function that maps the received item to a traverser over the output items
-
sortP
Returns a supplier of processors for a vertex that sorts its input using aPriorityQueue
and emits it in thecomplete
phase.The output edge of this vertex should be
distributed
monotonicOrder
allToOne
so it preserves the ordering when merging the data from all upstream processors.- Since:
- Jet 4.3
-
noopP
Returns a supplier of a processor that swallows all its normal input (if any), does nothing with it, forwards the watermarks, produces no output and completes immediately. It also swallows any restored snapshot data.
-