public final class Processors extends Object
Many of the processors deal with an aggregating operation over stream items. Prior to aggregation items may be grouped by an arbitrary key and/or an event timestamp-based window. There are two main aggregation setups: single-stage and two-stage.
Unless specified otherwise, all functions passed to member methods must be stateless.
----------------- | upstream vertex | ----------------- | | partitioned-distributed V ----------- | aggregate | -----------
accumulate
aggregation
primitive and the second stage does combine
and finish
. The essential property
of this setup is that the edge leading to the first stage is local,
incurring no network traffic, and only the edge from the first to the
second stage is distributed. There is only one item per group traveling on
the distributed edge. Compared to the single-stage setup this can
dramatically reduce network traffic, but it needs more memory to keep
track of all keys on each cluster member. This is the outline of the DAG:
----------------- | upstream vertex | ----------------- | | partitioned-local V ------------ | accumulate | ------------ | | partitioned-distributed V ---------------- | combine/finish | ----------------The variants without a grouping key are equivalent to grouping by a single, global key. In that case the edge towards the final-stage vertex must be all-to-one and the local parallelism of the vertex must be one. Unless the volume of the aggregated data is small (e.g., some side branch off the main flow in the DAG), the best choice is this two-stage setup:
----------------- | upstream vertex | ----------------- | | local, non-partitioned V ------------ | accumulate | ------------ | | distributed, all-to-one V ---------------- | combine/finish | localParallelism = 1 ----------------This will parallelize and distributed most of the processing and the second-stage processor will receive just a single item from each upstream processor, doing very little work.
Tumbling window is a special case of sliding window with sliding step = window size.
Modifier and Type | Method and Description |
---|---|
static <K,A> DistributedSupplier<Processor> |
accumulateByFrameP(List<DistributedFunction<?,? extends K>> keyFns,
List<DistributedToLongFunction<?>> timestampFns,
TimestampKind timestampKind,
SlidingWindowPolicy winPolicy,
AggregateOperation<A,?> aggrOp)
Returns a supplier of processors for the first-stage vertex in a
two-stage sliding window aggregation setup (see the
class Javadoc for an explanation of aggregation stages). |
static <K,A> DistributedSupplier<Processor> |
accumulateByKeyP(List<DistributedFunction<?,? extends K>> getKeyFns,
AggregateOperation<A,?> aggrOp)
Returns a supplier of processors for the first-stage vertex in a
two-stage group-and-aggregate setup.
|
static <A,R> DistributedSupplier<Processor> |
accumulateP(AggregateOperation<A,R> aggrOp)
Returns a supplier of processors for a vertex that performs the provided
aggregate operation on all the items it receives.
|
static <K,A,R,OUT> |
aggregateByKeyP(List<DistributedFunction<?,? extends K>> keyFns,
AggregateOperation<A,R> aggrOp,
DistributedBiFunction<? super K,? super R,OUT> mapToOutputFn)
Returns a supplier of processors for a vertex that groups items by key
and performs the provided aggregate operation on each group.
|
static <A,R> DistributedSupplier<Processor> |
aggregateP(AggregateOperation<A,R> aggrOp)
Returns a supplier of processors for a vertex that performs the provided
aggregate operation on all the items it receives.
|
static <K,A,R,OUT> |
aggregateToSessionWindowP(long sessionTimeout,
List<DistributedToLongFunction<?>> timestampFns,
List<DistributedFunction<?,? extends K>> keyFns,
AggregateOperation<A,R> aggrOp,
KeyedWindowResultFunction<? super K,? super R,OUT> mapToOutputFn)
Returns a supplier of processors for a vertex that aggregates events into
session windows.
|
static <K,A,R,OUT> |
aggregateToSlidingWindowP(List<DistributedFunction<?,? extends K>> keyFns,
List<DistributedToLongFunction<?>> timestampFns,
TimestampKind timestampKind,
SlidingWindowPolicy winPolicy,
AggregateOperation<A,R> aggrOp,
KeyedWindowResultFunction<? super K,? super R,OUT> mapToOutputFn)
Returns a supplier of processors for a vertex that aggregates events
into a sliding window in a single stage (see the
class Javadoc for an explanation of aggregation stages). |
static <K,A,R,OUT> |
combineByKeyP(AggregateOperation<A,R> aggrOp,
DistributedBiFunction<? super K,? super R,OUT> mapToOutputFn)
Returns a supplier of processors for the second-stage vertex in a
two-stage group-and-aggregate setup.
|
static <A,R> DistributedSupplier<Processor> |
combineP(AggregateOperation<A,R> aggrOp)
Returns a supplier of processors for a vertex that performs the provided
aggregate operation on all the items it receives.
|
static <K,A,R,OUT> |
combineToSlidingWindowP(SlidingWindowPolicy winPolicy,
AggregateOperation<A,R> aggrOp,
KeyedWindowResultFunction<? super K,? super R,OUT> mapToOutputFn)
Returns a supplier of processors for the second-stage vertex in a
two-stage sliding window aggregation setup (see the
class Javadoc for an explanation of aggregation stages). |
static <T> DistributedSupplier<Processor> |
filterP(DistributedPredicate<T> filterFn)
Returns a supplier of processors for a vertex that emits the same items
it receives, but only those that pass the given predicate.
|
static <C,T> ProcessorSupplier |
filterUsingContextP(ContextFactory<C> contextFactory,
DistributedBiPredicate<? super C,? super T> filterFn)
Returns a supplier of processors for a vertex that emits the same items
it receives, but only those that pass the given predicate.
|
static <T,R> DistributedSupplier<Processor> |
flatMapP(DistributedFunction<T,? extends Traverser<? extends R>> flatMapFn)
Returns a supplier of processors for a vertex that applies the provided
item-to-traverser mapping function to each received item and emits all
the items from the resulting traverser.
|
static <C,T,R> ProcessorSupplier |
flatMapUsingContextP(ContextFactory<C> contextFactory,
DistributedBiFunction<? super C,? super T,? extends Traverser<? extends R>> flatMapFn)
Returns a supplier of processors for a vertex that applies the provided
item-to-traverser mapping function to each received item and emits all
the items from the resulting traverser.
|
static <T> DistributedSupplier<Processor> |
insertWatermarksP(WatermarkGenerationParams<? super T> wmGenParams)
Returns a supplier of processors for a vertex that inserts
watermark items into the stream. |
static <T,R> DistributedSupplier<Processor> |
mapP(DistributedFunction<T,R> mapFn)
Returns a supplier of processors for a vertex which, for each received
item, emits the result of applying the given mapping function to it.
|
static <C,T,R> ProcessorSupplier |
mapUsingContextP(ContextFactory<C> contextFactory,
DistributedBiFunction<? super C,? super T,? extends R> mapFn)
Returns a supplier of processors for a vertex which, for each received
item, emits the result of applying the given mapping function to it.
|
static DistributedSupplier<Processor> |
noopP()
Returns a supplier of processor that swallows all its input (if any) and
does nothing with it and produces no output.
|
static <T,K,A,R,OUT> |
rollingAggregateP(DistributedFunction<? super T,? extends K> keyFn,
AggregateOperation1<? super T,A,? extends R> aggrOp,
DistributedTriFunction<? super T,? super K,? super R,? extends OUT> mapToOutputFn)
Returns a supplier of processors for a vertex that performs a rolling
aggregation.
|
@Nonnull public static <A,R> DistributedSupplier<Processor> aggregateP(@Nonnull AggregateOperation<A,R> aggrOp)
R
—the result of
the aggregate operation.
Since the input to this vertex must be bounded, its primary use case are batch jobs.
This processor has state, but does not save it to snapshot. On job restart, the state will be lost.
A
- type of accumulator returned from aggrOp.createAccumulatorFn()
R
- type of the finished result returned from aggrOp.finishAccumulationFn()
aggrOp
- the aggregate operation to perform@Nonnull public static <A,R> DistributedSupplier<Processor> accumulateP(@Nonnull AggregateOperation<A,R> aggrOp)
R
—the result of
the aggregate operation.
Since the input to this vertex must be bounded, its primary use case are batch jobs.
This processor has state, but does not save it to snapshot. On job restart, the state will be lost.
A
- type of accumulator returned from aggrOp.createAccumulatorFn()
R
- type of the finished result returned from aggrOp.
finishAccumulationFn()
aggrOp
- the aggregate operation to perform@Nonnull public static <A,R> DistributedSupplier<Processor> combineP(@Nonnull AggregateOperation<A,R> aggrOp)
R
— the result of
the aggregate operation.
Since the input to this vertex must be bounded, its primary use case is batch jobs.
This processor has state, but does not save it to snapshot. On job restart, the state will be lost.
A
- type of accumulator returned from aggrOp.createAccumulatorFn()
R
- type of the finished result returned from aggrOp.
finishAccumulationFn()
aggrOp
- the aggregate operation to perform@Nonnull public static <K,A,R,OUT> DistributedSupplier<Processor> aggregateByKeyP(@Nonnull List<DistributedFunction<?,? extends K>> keyFns, @Nonnull AggregateOperation<A,R> aggrOp, @Nonnull DistributedBiFunction<? super K,? super R,OUT> mapToOutputFn)
mapToOutputFn
.
The vertex accepts input from one or more inbound edges. The type of items may be different on each edge. For each edge a separate key extracting function must be supplied and the aggregate operation must contain a separate accumulation function for each edge.
This processor has state, but does not save it to snapshot. On job restart, the state will be lost.
K
- type of keyA
- type of accumulator returned from aggrOp.createAccumulatorFn()
R
- type of the result returned from aggrOp.finishAccumulationFn()
OUT
- type of the item to emitkeyFns
- functions that compute the grouping keyaggrOp
- the aggregate operationmapToOutputFn
- function that takes the key and the aggregation result and returns
the output item@Nonnull public static <K,A> DistributedSupplier<Processor> accumulateByKeyP(@Nonnull List<DistributedFunction<?,? extends K>> getKeyFns, @Nonnull AggregateOperation<A,?> aggrOp)
accumulate
primitive to each group.
After exhausting all its input it emits one Map.Entry<K, A>
per
distinct key.
The vertex accepts input from one or more inbound edges. The type of items may be different on each edge. For each edge a separate key extracting function must be supplied and the aggregate operation must contain a separate accumulation function for each edge.
This processor has state, but does not save it to snapshot. On job restart, the state will be lost.
K
- type of keyA
- type of accumulator returned from aggrOp.createAccumulatorFn()
getKeyFns
- functions that compute the grouping keyaggrOp
- the aggregate operation to perform@Nonnull public static <K,A,R,OUT> DistributedSupplier<Processor> combineByKeyP(@Nonnull AggregateOperation<A,R> aggrOp, @Nonnull DistributedBiFunction<? super K,? super R,OUT> mapToOutputFn)
combine
aggregation primitive to the
entries received from several upstream instances of accumulateByKeyP(java.util.List<com.hazelcast.jet.function.DistributedFunction<?, ? extends K>>, com.hazelcast.jet.aggregate.AggregateOperation<A, ?>)
. After exhausting all its input it emits one item per
distinct key. It computes the item to emit by passing each (key, result)
pair to mapToOutputFn
.
Since the input to this vertex must be bounded, its primary use case are batch jobs.
This processor has state, but does not save it to snapshot. On job restart, the state will be lost.
A
- type of accumulator returned from aggrOp.createAccumulatorFn()
R
- type of the finished result returned from
aggrOp.finishAccumulationFn()
OUT
- type of the item to emitaggrOp
- the aggregate operation to performmapToOutputFn
- function that takes the key and the aggregation result and returns
the output item@Nonnull public static <K,A,R,OUT> DistributedSupplier<Processor> aggregateToSlidingWindowP(@Nonnull List<DistributedFunction<?,? extends K>> keyFns, @Nonnull List<DistributedToLongFunction<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A,R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K,? super R,OUT> mapToOutputFn)
class Javadoc
for an explanation of aggregation stages). The vertex
groups items by the grouping key (as obtained from the given
key-extracting function) and by frame, which is a range of
timestamps equal to the sliding step. It emits sliding window results
labeled with the timestamp denoting the window's end time (the exclusive
upper bound of the timestamps belonging to the window).
The vertex accepts input from one or more inbound edges. The type of items may be different on each edge. For each edge a separate key extracting function must be supplied and the aggregate operation must contain a separate accumulation function for each edge.
When the vertex receives a watermark with a given wmVal
, it
emits the result of aggregation for all the positions of the sliding
window with windowTimestamp <= wmVal
. It computes the window
result by combining the partial results of the frames belonging to it
and finally applying the finish
aggregation primitive. After this
it deletes from storage all the frames that trail behind the emitted
windows. The type of emitted items is TimestampedEntry<K, A>
so there is one item per key per window position.
Behavior on job restart
This processor saves its state to snapshot. After restart, it can
continue accumulating where it left off.
After a restart in at-least-once mode, watermarks are allowed to go back in time. If such a watermark is received, some windows that were emitted in previous execution will be re-emitted. These windows might miss events as some of them had already been evicted before the snapshot was done in previous execution.
@Nonnull public static <K,A> DistributedSupplier<Processor> accumulateByFrameP(@Nonnull List<DistributedFunction<?,? extends K>> keyFns, @Nonnull List<DistributedToLongFunction<?>> timestampFns, @Nonnull TimestampKind timestampKind, @Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A,?> aggrOp)
class Javadoc
for an explanation of aggregation stages). The vertex
groups items by the grouping key (as obtained from the given
key-extracting function) and by frame, which is a range of
timestamps equal to the sliding step. It applies the accumulate
aggregation primitive to
each key-frame group.
The frame is identified by the timestamp denoting its end time (equal to
the exclusive upper bound of its timestamp range). SlidingWindowPolicy.higherFrameTs(long)
maps the event timestamp to the
timestamp of the frame it belongs to.
The vertex accepts input from one or more inbound edges. The type of items may be different on each edge. For each edge a separate key extracting function must be supplied and the aggregate operation must contain a separate accumulation function for each edge.
When the processor receives a watermark with a given wmVal
, it
emits the current accumulated state of all frames with timestamp <= wmVal
and deletes these frames from its storage.
The type of emitted items is TimestampedEntry<K, A>
so there is one item per key per frame.
When a state snapshot is requested, the state is flushed to second-stage processor and nothing is saved to snapshot.
K
- type of the grouping keyA
- type of accumulator returned from aggrOp.
createAccumulatorFn()
@Nonnull public static <K,A,R,OUT> DistributedSupplier<Processor> combineToSlidingWindowP(@Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A,R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K,? super R,OUT> mapToOutputFn)
class Javadoc
for an explanation of aggregation stages). Each
processor applies the combine
aggregation primitive to the frames received from several upstream
instances of accumulateByFrame()
.
When the processor receives a watermark with a given wmVal
,
it emits the result of aggregation for all positions of the sliding
window with windowTimestamp <= wmVal
. It computes the window
result by combining the partial results of the frames belonging to it
and finally applying the finish
aggregation primitive. After
this it deletes from storage all the frames that trail behind the
emitted windows. To compute the item to emit, it calls mapToOutputFn
with the window's start and end timestamps, the key and
the aggregation result. The window end time is the exclusive upper bound
of the timestamps belonging to the window.
Behavior on job restart
This processor saves its state to snapshot. After restart, it can
continue accumulating where it left off.
After a restart in at-least-once mode, watermarks are allowed to go back in time. If such a watermark is received, some windows that were emitted in previous execution will be re-emitted. These windows might miss events as some of them had already been evicted before the snapshot was done in previous execution.
A
- type of the accumulatorR
- type of the finished result returned from aggrOp.
finishAccumulationFn()
OUT
- type of the item to emit@Nonnull public static <K,A,R,OUT> DistributedSupplier<Processor> aggregateToSessionWindowP(long sessionTimeout, @Nonnull List<DistributedToLongFunction<?>> timestampFns, @Nonnull List<DistributedFunction<?,? extends K>> keyFns, @Nonnull AggregateOperation<A,R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K,? super R,OUT> mapToOutputFn)
WindowResult
.
The vertex accepts input from one or more inbound edges. The type of items may be different on each edge. For each edge a separate key extracting function must be supplied and the aggregate operation must contain a separate accumulation function for each edge.
The functioning of this vertex is easiest to explain in terms of the
event interval: the range [timestamp, timestamp +
sessionTimeout]
. Initially an event causes a new session window to be
created, covering exactly the event interval. A following event under
the same key belongs to this window iff its interval overlaps it. The
window is extended to cover the entire interval of the new event. The
event may happen to belong to two existing windows if its interval
bridges the gap between them; in that case they are combined into one.
Behavior on job restart
This processor saves its state to snapshot. After restart, it can
continue accumulating where it left off.
After a restart in at-least-once mode, watermarks are allowed to go back in time. The processor evicts state based on watermarks it received. If it receives duplicate watermark, it might emit sessions with missing events, because they were already evicted. The sessions before and after snapshot might overlap, which they normally don't.
K
- type of the item's grouping keyA
- type of the container of the accumulated valueR
- type of the session window's result valuesessionTimeout
- maximum gap between consecutive events in the same session windowtimestampFns
- functions to extract the timestamp from the itemkeyFns
- functions to extract the grouping key from the itemaggrOp
- the aggregate operation@Nonnull public static <T> DistributedSupplier<Processor> insertWatermarksP(@Nonnull WatermarkGenerationParams<? super T> wmGenParams)
watermark items
into the stream. The
value of the watermark is determined by the supplied WatermarkPolicy
instance.
This processor also drops late items. It never allows an event which is late with regard to already emitted watermark to pass.
The processor saves value of the last emitted watermark to snapshot. Different instances of this processor can be at different watermark at snapshot time. After restart all instances will start at watermark of the most-behind instance before the restart.
This might sound as it could break the monotonicity requirement, but thanks to watermark coalescing, watermarks are only delivered for downstream processing after they have been received from all upstream processors. Another side effect of this is, that a late event, which was dropped before restart, is not considered late after restart.
T
- the type of the stream item@Nonnull public static <T,R> DistributedSupplier<Processor> mapP(@Nonnull DistributedFunction<T,R> mapFn)
null
, it emits nothing. Therefore this vertex can
be used to implement filtering semantics as well.
This processor is stateless.
T
- type of received itemR
- type of emitted itemmapFn
- a stateless mapping function@Nonnull public static <T> DistributedSupplier<Processor> filterP(@Nonnull DistributedPredicate<T> filterFn)
This processor is stateless.
T
- type of received itemfilterFn
- a stateless predicate to test each received item against@Nonnull public static <T,R> DistributedSupplier<Processor> flatMapP(@Nonnull DistributedFunction<T,? extends Traverser<? extends R>> flatMapFn)
This processor is stateless.
T
- received item typeR
- emitted item typeflatMapFn
- a stateless function that maps the received item to a traverser over output items@Nonnull public static <C,T,R> ProcessorSupplier mapUsingContextP(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiFunction<? super C,? super T,? extends R> mapFn)
contextFactory
.
If the mapping result is null
, the vertex emits nothing.
Therefore it can be used to implement filtering semantics as well.
Unlike rollingAggregateP(com.hazelcast.jet.function.DistributedFunction<? super T, ? extends K>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A, ? extends R>, com.hazelcast.jet.function.DistributedTriFunction<? super T, ? super K, ? super R, ? extends OUT>)
(with the "Keyed
" part),
this method creates one context object per processor (or per member, if
shared).
While it's allowed to store some local state in the context object, it won't be saved to the snapshot and will misbehave in a fault-tolerant stream processing job.
C
- type of context objectT
- type of received itemR
- type of emitted itemcontextFactory
- the context factorymapFn
- a stateless mapping function@Nonnull public static <C,T> ProcessorSupplier filterUsingContextP(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiPredicate<? super C,? super T> filterFn)
contextFactory
.
While it's allowed to store some local state in the context object, it won't be saved to the snapshot and will misbehave in a fault-tolerant stream processing job.
C
- type of context objectT
- type of received itemcontextFactory
- the context factoryfilterFn
- a stateless predicate to test each received item against@Nonnull public static <C,T,R> ProcessorSupplier flatMapUsingContextP(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiFunction<? super C,? super T,? extends Traverser<? extends R>> flatMapFn)
contextFactory
.
While it's allowed to store some local state in the context object, it won't be saved to the snapshot and will misbehave in a fault-tolerant stream processing job.
C
- type of context objectT
- received item typeR
- emitted item typecontextFactory
- the context factoryflatMapFn
- a stateless function that maps the received item to a traverser over
the output items@Nonnull public static <T,K,A,R,OUT> DistributedSupplier<Processor> rollingAggregateP(@Nonnull DistributedFunction<? super T,? extends K> keyFn, @Nonnull AggregateOperation1<? super T,A,? extends R> aggrOp, @Nonnull DistributedTriFunction<? super T,? super K,? super R,? extends OUT> mapToOutputFn)
If the result after applying `mapToOutputFn` is null
, the vertex
emits nothing. Therefore it can be used to implement filtering semantics
as well.
This vertex saves the state to snapshot so the state of the accumulators will survive a job restart.
T
- type of the input itemK
- type of the keyA
- type of the accumulatorR
- type of the output itemkeyFn
- function that computes the grouping keyaggrOp
- the aggregate operation to performmapToOutputFn
- function that takes the input item, the key and the aggregation result
and returns the output item@Nonnull public static DistributedSupplier<Processor> noopP()
Copyright © 2018 Hazelcast, Inc.. All rights reserved.