public final class Processors extends Object
package-level documentation
.
Many of the processors deal with an aggregating operation over stream items. Prior to aggregation items may be grouped by an arbitrary key and/or an event timestamp-based window. There are two main aggregation setups: single-stage and two-stage.
----------------- | upstream vertex | ----------------- | | partitioned-distributed V ----------- | aggregate | -----------
accumulate
aggregation
primitive and the second stage does combine
and finish
. The essential property
of this setup is that the edge leading to the first stage is local,
incurring no network traffic, and only the edge from the first to the
second stage is distributed. There is only one item per group traveling on
the distributed edge. Compared to the single-stage setup this can
dramatically reduce network traffic, but it needs more memory to keep
track of all keys on each cluster member. This is the outline of the DAG:
----------------- | upstream vertex | ----------------- | | partitioned-local V ------------ | accumulate | ------------ | | partitioned-distributed V ---------------- | combine/finish | ----------------The variants without a grouping key are equivalent to grouping by a single, global key. In that case the edge towards the final-stage vertex must be all-to-one and the local parallelism of the vertex must be one. Unless the volume of the aggregated data is small (e.g., some side branch off the main flow in the DAG), the best choice is this two-stage setup:
----------------- | upstream vertex | ----------------- | | local, non-partitioned V ------------ | accumulate | ------------ | | distributed, all-to-one V ---------------- | combine/finish | localParallelism = 1 ----------------This will parallelize and distributed most of the processing and the second-stage processor will receive just a single item from each upstream processor, doing very little work.
single-stage | stage 1/2 | stage 2/2 | |
---|---|---|---|
batch, no grouping |
aggregate() |
accumulate() |
combine() |
batch, group by key | aggregateByKey() |
accumulateByKey() |
combineByKey() |
stream, group by key and aligned window |
aggregateToSlidingWindow() |
accumulateByFrame() |
combineToSlidingWindow() |
stream, group by key and session window |
aggregateToSessionWindow() |
N/A | N/A |
Tumbling window is a special case of sliding window with sliding step =
window size. To achieve the effect of aggregation without a
grouping key, specify constantKey()
as the key-extracting function.
Modifier and Type | Method and Description |
---|---|
static <T,A,R> DistributedSupplier<Processor> |
accumulate(AggregateOperation<T,A,R> aggregateOperation)
Returns a supplier of processor that performs the provided aggregate
operation on all the items it receives.
|
static <T,K,A> DistributedSupplier<Processor> |
accumulateByFrame(DistributedFunction<? super T,K> getKeyF,
DistributedToLongFunction<? super T> getTimestampF,
TimestampKind timestampKind,
WindowDefinition windowDef,
AggregateOperation<? super T,A,?> aggregateOperation)
Returns a supplier of the first-stage processor in a two-stage sliding
window aggregation setup (see the
class
Javadoc for an explanation of aggregation stages). |
static <T,K,A> DistributedSupplier<Processor> |
accumulateByKey(DistributedFunction<? super T,K> getKeyF,
AggregateOperation<? super T,A,?> aggregateOperation)
Returns a supplier of the first-stage processor in a two-stage
group-and-aggregate setup.
|
static <T,A,R> DistributedSupplier<Processor> |
aggregate(AggregateOperation<T,A,R> aggregateOperation)
Returns a supplier of processor that performs the provided aggregate
operation on all the items it receives.
|
static <T,K,A,R> DistributedSupplier<Processor> |
aggregateByKey(DistributedFunction<? super T,K> getKeyF,
AggregateOperation<? super T,A,R> aggregateOperation)
Returns a supplier of processor that groups items by key and performs
the provided aggregate operation on each group.
|
static <T,K,A,R> DistributedSupplier<Processor> |
aggregateToSessionWindow(long sessionTimeout,
DistributedToLongFunction<? super T> getTimestampF,
DistributedFunction<? super T,K> getKeyF,
AggregateOperation<? super T,A,R> aggregateOperation)
Returns a supplier of processor that aggregates events into session
windows.
|
static <T,K,A,R> DistributedSupplier<Processor> |
aggregateToSlidingWindow(DistributedFunction<? super T,K> getKeyF,
DistributedToLongFunction<? super T> getTimestampF,
TimestampKind timestampKind,
WindowDefinition windowDef,
AggregateOperation<? super T,A,R> aggregateOperation)
Returns a supplier of processor that aggregates events into a sliding
window in a single stage (see the
class
Javadoc for an explanation of aggregation stages). |
static <T,A,R> DistributedSupplier<Processor> |
combine(AggregateOperation<T,A,R> aggregateOperation)
Returns a supplier of processor that performs the provided aggregate
operation on all the items it receives.
|
static <A,R> DistributedSupplier<Processor> |
combineByKey(AggregateOperation<?,A,R> aggregateOperation)
Returns a supplier of the second-stage processor in a two-stage
group-and-aggregate setup.
|
static <K,A,R> DistributedSupplier<Processor> |
combineToSlidingWindow(WindowDefinition windowDef,
AggregateOperation<?,A,R> aggregateOperation)
Returns a supplier of the second-stage processor in a two-stage sliding
window aggregation setup (see the
class
Javadoc for an explanation of aggregation stages). |
static <T> DistributedSupplier<Processor> |
filter(DistributedPredicate<T> predicate)
Returns a supplier of processor which emits the same items it receives,
but only those that pass the given predicate.
|
static <T,R> DistributedSupplier<Processor> |
flatMap(DistributedFunction<T,? extends Traverser<? extends R>> mapper)
Returns a supplier of processor which applies the provided
item-to-traverser mapping function to each received item and emits all
the items from the resulting traverser.
|
static <T> DistributedSupplier<Processor> |
insertWatermarks(DistributedToLongFunction<T> getTimestampF,
DistributedSupplier<WatermarkPolicy> newWmPolicyF,
WatermarkEmissionPolicy wmEmitPolicy)
Returns a supplier of processor that inserts
watermark items into a data
(sub)stream. |
static <T,R> DistributedSupplier<Processor> |
map(DistributedFunction<T,R> mapper)
Returns a supplier of processor which, for each received item, emits the
result of applying the given mapping function to it.
|
static DistributedSupplier<Processor> |
nonCooperative(DistributedSupplier<Processor> wrapped)
Decorates a
Supplier<Processor> into one that will declare
its processors non-cooperative. |
static ProcessorSupplier |
nonCooperative(ProcessorSupplier wrapped)
Decorates a
ProcessorSupplier with one that will declare all its
processors non-cooperative. |
static DistributedSupplier<Processor> |
noop()
Returns a supplier of processor that consumes all its input (if any) and
does nothing with it.
|
@Nonnull public static <T,K,A,R> DistributedSupplier<Processor> aggregateByKey(@Nonnull DistributedFunction<? super T,K> getKeyF, @Nonnull AggregateOperation<? super T,A,R> aggregateOperation)
Map.Entry<K, R>
per observed key.
Since the input to this processor must be bounded, its primary use case are batch jobs.
T
- type of received itemK
- type of keyA
- type of accumulator returned from aggregateOperation.
createAccumulatorF()
R
- type of the finished result returned from aggregateOperation.
finishAccumulationF()
getKeyF
- computes the key from the entryaggregateOperation
- the aggregate operation to perform@Nonnull public static <T,K,A> DistributedSupplier<Processor> accumulateByKey(@Nonnull DistributedFunction<? super T,K> getKeyF, @Nonnull AggregateOperation<? super T,A,?> aggregateOperation)
accumulate
aggregation
primitive to each group. After exhausting all its input it emits one
Map.Entry<K, A>
per observed key.
Since the input to this processor must be bounded, its primary use case are batch jobs.
T
- type of received itemK
- type of keyA
- type of accumulator returned from aggregateOperation.
createAccumulatorF()
getKeyF
- computes the key from the entryaggregateOperation
- the aggregate operation to perform@Nonnull public static <A,R> DistributedSupplier<Processor> combineByKey(@Nonnull AggregateOperation<?,A,R> aggregateOperation)
combine
aggregation
primitive to the entries received from several upstream instances of
accumulateByKey()
. After exhausting all its input it emits one
Map.Entry<K, R>
per observed key.
Since the input to this processor must be bounded, its primary use case are batch jobs.
A
- type of accumulator returned from aggregateOperation.createAccumulatorF()
R
- type of the finished result returned from aggregateOperation.
finishAccumulationF()
aggregateOperation
- the aggregate operation to perform@Nonnull public static <T,A,R> DistributedSupplier<Processor> aggregate(@Nonnull AggregateOperation<T,A,R> aggregateOperation)
R
—the result of the
aggregate operation.
Since the input to this processor must be bounded, its primary use case is batch jobs.
T
- type of received itemA
- type of accumulator returned from aggregateOperation.createAccumulatorF()
R
- type of the finished result returned from aggregateOperation.
finishAccumulationF()
aggregateOperation
- the aggregate operation to perform@Nonnull public static <T,A,R> DistributedSupplier<Processor> accumulate(@Nonnull AggregateOperation<T,A,R> aggregateOperation)
R
—the result of the
aggregate operation.
Since the input to this processor must be bounded, its primary use case are batch jobs.
T
- type of received itemA
- type of accumulator returned from aggregateOperation.createAccumulatorF()
R
- type of the finished result returned from aggregateOperation.
finishAccumulationF()
aggregateOperation
- the aggregate operation to perform@Nonnull public static <T,A,R> DistributedSupplier<Processor> combine(@Nonnull AggregateOperation<T,A,R> aggregateOperation)
R
—the result of the
aggregate operation.
Since the input to this processor must be bounded, its primary use case are batch jobs.
T
- type of received itemA
- type of accumulator returned from aggregateOperation.createAccumulatorF()
R
- type of the finished result returned from aggregateOperation.
finishAccumulationF()
aggregateOperation
- the aggregate operation to perform@Nonnull public static <T,K,A,R> DistributedSupplier<Processor> aggregateToSlidingWindow(@Nonnull DistributedFunction<? super T,K> getKeyF, @Nonnull DistributedToLongFunction<? super T> getTimestampF, @Nonnull TimestampKind timestampKind, @Nonnull WindowDefinition windowDef, @Nonnull AggregateOperation<? super T,A,R> aggregateOperation)
class
Javadoc
for an explanation of aggregation stages). The processor groups
items by the grouping key (as obtained from the given key-extracting
function) and by frame, which is a range of timestamps equal to
the sliding step. It emits sliding window results labeled with the
timestamp denoting the window's end time. This timestamp is equal to the
exclusive upper bound of timestamps belonging to the window.
When the processor receives a watermark with a given wmVal
,
it emits the result of aggregation for all positions of the sliding
window with windowTimestamp <= wmVal
. It computes the window
result by combining the partial results of the frames belonging to it
and finally applying the finish
aggregation primitive. After this
it deletes from storage all the frames that trail behind the emitted
windows. The type of emitted items is TimestampedEntry<K, A>
so there is one item per key per window position.
@Nonnull public static <T,K,A> DistributedSupplier<Processor> accumulateByFrame(@Nonnull DistributedFunction<? super T,K> getKeyF, @Nonnull DistributedToLongFunction<? super T> getTimestampF, @Nonnull TimestampKind timestampKind, @Nonnull WindowDefinition windowDef, @Nonnull AggregateOperation<? super T,A,?> aggregateOperation)
class
Javadoc
for an explanation of aggregation stages). The processor groups
items by the grouping key (as obtained from the given key-extracting
function) and by frame, which is a range of timestamps equal to
the sliding step. It applies the accumulate
aggregation primitive to
each key-frame group.
The frame is identified by the timestamp denoting its end time (equal to
the exclusive upper bound of its timestamp range). WindowDefinition.higherFrameTs(long)
maps the event timestamp to the
timestamp of the frame it belongs to.
When the processor receives a watermark with a given wmVal
,
it emits the current accumulated state of all frames with timestamp <= wmVal
and deletes these frames from its storage.
The type of emitted items is TimestampedEntry<K, A>
so there is one item per key per frame.
T
- input item typeK
- type of key returned from getKeyF
A
- type of accumulator returned from aggregateOperation.
createAccumulatorF()
@Nonnull public static <K,A,R> DistributedSupplier<Processor> combineToSlidingWindow(@Nonnull WindowDefinition windowDef, @Nonnull AggregateOperation<?,A,R> aggregateOperation)
class
Javadoc
for an explanation of aggregation stages). It applies the
combine
aggregation
primitive to frames received from several upstream instances of accumulateByFrame()
. It emits sliding window results labeled with
the timestamp denoting the window's end time. This timestamp is equal to
the exclusive upper bound of timestamps belonging to the window.
When the processor receives a watermark with a given wmVal
,
it emits the result of aggregation for all positions of the sliding
window with windowTimestamp <= wmVal
. It computes the window
result by combining the partial results of the frames belonging to it
and finally applying the finish
aggregation primitive. After this
it deletes from storage all the frames that trail behind the emitted
windows. The type of emitted items is TimestampedEntry<K, A>
so there is one item per key per window position.
A
- type of the accumulatorR
- type of the finished result returned from aggregateOperation.
finishAccumulationF()
@Nonnull public static <T,K,A,R> DistributedSupplier<Processor> aggregateToSessionWindow(long sessionTimeout, @Nonnull DistributedToLongFunction<? super T> getTimestampF, @Nonnull DistributedFunction<? super T,K> getKeyF, @Nonnull AggregateOperation<? super T,A,R> aggregateOperation)
The functioning of this processor is easiest to explain in terms of
the event interval: the range [timestamp, timestamp +
sessionTimeout]
. Initially an event causes a new session window to be
created, covering exactly the event interval. A following event under
the same key belongs to this window iff its interval overlaps it. The
window is extended to cover the entire interval of the new event. The
event may happen to belong to two existing windows if its interval
bridges the gap between them; in that case they are combined into one.
T
- type of the stream eventK
- type of the item's grouping keyA
- type of the container of the accumulated valueR
- type of the session window's result valuesessionTimeout
- maximum gap between consecutive events in the same session windowgetTimestampF
- function to extract the timestamp from the itemgetKeyF
- function to extract the grouping key from the itemaggregateOperation
- contains aggregation logic@Nonnull public static <T> DistributedSupplier<Processor> insertWatermarks(@Nonnull DistributedToLongFunction<T> getTimestampF, @Nonnull DistributedSupplier<WatermarkPolicy> newWmPolicyF, @Nonnull WatermarkEmissionPolicy wmEmitPolicy)
watermark items
into a data
(sub)stream. The value of the watermark is determined by a separate
policy object of type WatermarkPolicy
.T
- the type of the stream item@Nonnull public static <T,R> DistributedSupplier<Processor> map(@Nonnull DistributedFunction<T,R> mapper)
null
, it emits nothing. Therefore this processor can be used to
implement filtering semantics as well.T
- type of received itemR
- type of emitted itemmapper
- the mapping function@Nonnull public static <T> DistributedSupplier<Processor> filter(@Nonnull DistributedPredicate<T> predicate)
T
- type of received itempredicate
- the predicate to test each received item against@Nonnull public static <T,R> DistributedSupplier<Processor> flatMap(@Nonnull DistributedFunction<T,? extends Traverser<? extends R>> mapper)
T
- received item typeR
- emitted item typemapper
- function that maps the received item to a traverser over output items@Nonnull public static DistributedSupplier<Processor> noop()
@Nonnull public static ProcessorSupplier nonCooperative(@Nonnull ProcessorSupplier wrapped)
ProcessorSupplier
with one that will declare all its
processors non-cooperative. The wrapped supplier must return processors
that are instanceof
AbstractProcessor
.@Nonnull public static DistributedSupplier<Processor> nonCooperative(@Nonnull DistributedSupplier<Processor> wrapped)
Supplier<Processor>
into one that will declare
its processors non-cooperative. The wrapped supplier must return
processors that are instanceof
AbstractProcessor
.Copyright © 2017 Hazelcast, Inc.. All Rights Reserved.