A - the type of the accumulatorR - the type of the final resultpublic interface AggregateOperation<A,R> extends Serializable
AggregateOperations to find the one
you need and, if you don't find it there, construct one by using the
aggregate operation builder and reading the
description below.
Jet aggregates the data by updating a mutable container,
called the accumulator, with the data from each stream item.
It does this by applying the accumulate primitive
to the the accumulator and a given item. Jet provides some accumulator
objects in the accumulator package
that you can reuse, and you can also write your own if needed. The
accumulator must be serializable because Jet may need to send it to
another member to be combined with other accumulators.
After it processes all the items in a batch/window, Jet transforms the
accumulator into the final result by applying the finish primitive.
Since it is a distributed/parallel computation engine, Jet will create
several independent processing units to perform the same aggregation,
and it must combine their partial results before applying the finish primitive and emitting the final result. This is the role of the
combine primitive.
Finally, AggregateOperation also defines the deduct primitive, which allows Jet to efficiently aggregate infinite
stream data over a sliding window by evicting old data from the
existing accumulator instead of building a new one from scratch each time
the window slides forward. Providing a deduct primitive that makes
the computation more efficient than rebuilding the accumulator from scratch
isn't always possible. Therefore it is optional.
Depending on usage, the data items may come from one or more inbound
streams, and the AggregateOperation must provide a separate
accumulate primitive for each of them. If you are creating the
aggregating pipeline stage using the builder object, then you'll identify each contributing stream to the
AggregateOperation using the tags you got from the
builder.
If, on the other hand, you are calling one of the direct methods such
as stage.aggregate2(), then you'll deal with specializations of this interface
such as AggregateOperation2 and you'll identify the input stages by
their index, zero index corresponding to the stage you're calling the
method on and the higher indices corresponding to the stages you pass in as
arguments.
This is a summary of all the primitives involved:
create a new accumulator object
accumulate the data of an item by mutating
the accumulator
combine the contents of the right-hand
accumulator into the left-hand one
deduct the contents of the right-hand
accumulator from the left-hand one (undo the effects of combine)
finish accumulation by transforming the
accumulator object into the final result
| Modifier and Type | Method and Description |
|---|---|
<T> DistributedBiConsumer<? super A,? super T> |
accumulateFn(int index)
A primitive that updates the accumulator state to account for a new
item.
|
default <T> DistributedBiConsumer<? super A,? super T> |
accumulateFn(Tag<T> tag)
A primitive that updates the accumulator state to account for a new
item.
|
int |
arity()
Returns the number of contributing streams this operation is set up to
handle.
|
DistributedBiConsumer<? super A,? super A> |
combineFn()
A primitive that accepts two accumulators and updates the state of the
left-hand one by combining it with the state of the right-hand one.
|
DistributedSupplier<A> |
createFn()
A primitive that returns a new accumulator.
|
DistributedBiConsumer<? super A,? super A> |
deductFn()
A primitive that accepts two accumulators and updates the state of the
left-hand one by deducting the state of the right-hand one from it.
|
DistributedFunction<? super A,R> |
finishFn()
A primitive that finishes the accumulation process by transforming
the accumulator object into the final result.
|
AggregateOperation<A,R> |
withAccumulateFns(DistributedBiConsumer... accumulateFns)
Returns a copy of this aggregate operation, but with all the
accumulate primitives replaced with the ones supplied here. |
default <T> AggregateOperation1<T,A,R> |
withCombiningAccumulateFn(DistributedFunction<T,A> getAccFn)
Returns a copy of this aggregate operation, but with the
accumulate primitive replaced with one that expects to find
accumulator objects in the input and will combine them all into
a single accumulator of the same type. |
static <A> AggregateOperationBuilder<A> |
withCreate(DistributedSupplier<A> createFn)
Returns a builder object, initialized with the supplied
create
primitive, that can be used to construct the definition of an aggregate
operation in a step-by-step manner. |
<R_NEW> AggregateOperation<A,R_NEW> |
withFinishFn(DistributedFunction<? super A,R_NEW> finishFn)
Returns a copy of this aggregate operation, but with the
finish
primitive replaced with the supplied one. |
int arity()
accumulateFn(int) must be less than
this number.@Nonnull DistributedSupplier<A> createFn()
deduct
primitive is defined, the accumulator object must
properly implement equals(). See deductFn() for an
explanation.@Nonnull default <T> DistributedBiConsumer<? super A,? super T> accumulateFn(@Nonnull Tag<T> tag)
@Nonnull <T> DistributedBiConsumer<? super A,? super T> accumulateFn(int index)
@Nullable DistributedBiConsumer<? super A,? super A> combineFn()
null.@Nullable DistributedBiConsumer<? super A,? super A> deductFn()
The effect of this primitive must be the opposite of combine so that
combine(acc, x);
deduct(acc, x);
leaves acc in the same state as it was before the two
operations.
This primitive is only used in sliding window aggregation and even in that case it is optional, but its presence may significantly reduce the computational cost. With it, the current sliding window can be obtained from the previous one by deducting the trailing frame and combining the leading frame; without it, each window must be recomputed from all its constituent frames. The finer the sliding step, the more pronounced the difference in computation effort will be.
If this method returns non-null, then createFn() must
return an accumulator which properly implements equals(). After calling deductFn, Jet will use equals()
to determine whether the accumulator is now "empty" (i.e., equal to a
fresh instance), which signals that the current window contains no more
items with the associated grouping key and the entry must be removed
from the resuts.
@Nonnull DistributedFunction<? super A,R> finishFn()
@Nonnull AggregateOperation<A,R> withAccumulateFns(DistributedBiConsumer... accumulateFns)
accumulate primitives replaced with the ones supplied here. The
argument at position i replaces the primitive at index i, as returned by accumulateFn(int).@Nonnull <R_NEW> AggregateOperation<A,R_NEW> withFinishFn(@Nonnull DistributedFunction<? super A,R_NEW> finishFn)
finish
primitive replaced with the supplied one.R_NEW - the new aggregation result typefinishFn - the new finish primitive@Nonnull default <T> AggregateOperation1<T,A,R> withCombiningAccumulateFn(@Nonnull DistributedFunction<T,A> getAccFn)
accumulate primitive replaced with one that expects to find
accumulator objects in the input and will combine them all into
a single accumulator of the same type.T - the type of stream itemgetAccFn - the function that extracts the accumulator from the stream item@Nonnull static <A> AggregateOperationBuilder<A> withCreate(@Nonnull DistributedSupplier<A> createFn)
create
primitive, that can be used to construct the definition of an aggregate
operation in a step-by-step manner.
The same builder is used to construct both fixed- and variable-arity aggregate operations:
andAccumulate0(), optionally followed by .andAccumulate1(),
.andAccumulate2(). The return type of these methods changes as the
static types of the contributing streams are captured.
andAccumulate(tag).
andFinish() method returns the constructed aggregate operation. Its
static type receives all the type parameters captured in the above
method calls. If your aggregate operation doesn't need a finishing
transformation (the accumulator itself is the result value), you
can call the shorthand andIdentityFinish().A - the type of the accumulatorcreateFn - the create primitivecreate primitive definedCopyright © 2018 Hazelcast, Inc.. All rights reserved.