Interface AggregateOperation<A,R>
-
- Type Parameters:
A
- the type of the accumulatorR
- the type of the final result
- All Superinterfaces:
java.io.Serializable
- All Known Subinterfaces:
AggregateOperation1<T,A,R>
,AggregateOperation2<T0,T1,A,R>
,AggregateOperation3<T0,T1,T2,A,R>
public interface AggregateOperation<A,R> extends java.io.Serializable
Contains primitives needed to compute an aggregated result of data processing. Check outAggregateOperations
to find the one you need and, if you don't find it there, construct one by using theaggregate operation builder
and reading the description below.Jet aggregates the data by updating a mutable container, called the accumulator, with the data from each stream item. It does this by applying the
accumulate
primitive to the accumulator and a given item. Jet provides some accumulator objects in theaccumulator
package that you can reuse, and you can also write your own if needed. The accumulator must be serializable because Jet may need to send it to another member to be combined with other accumulators or store it in state snapshot.After it processes all the items in a batch/window, Jet transforms the accumulator into the final result by applying the
finish
primitive.Since it is a distributed/parallel computation engine, Jet will create several independent processing units to perform the same aggregation, and it must combine their partial results before applying the
finish
primitive and emitting the final result. This is the role of thecombine
primitive.Finally,
AggregateOperation
also defines thededuct
primitive, which allows Jet to efficiently aggregate infinite stream data into a sliding window by evicting old data from the existing accumulator instead of building a new one from scratch each time the window slides forward. Providing adeduct
primitive that makes the computation more efficient than rebuilding the accumulator from scratch isn't always possible. Therefore it is optional.Depending on usage, the data items may come from one or more inbound streams, and the
AggregateOperation
must provide a separateaccumulate
primitive for each of them. If you are creating the aggregating pipeline stage using thebuilder object
, then you'll identify each contributing stream to theAggregateOperation
using the tags you got from the builder.If, on the other hand, you are calling one of the direct methods such as
stage.aggregate2()
, then you'll deal with specializations of this interface such asAggregateOperation2
and you'll identify the input stages by their index; zero index corresponds to the stage you're calling the method on and the higher indices correspond to the stages you pass in as arguments.This is a summary of all the primitives involved:
-
create
a new accumulator object -
accumulate
the data of an item by mutating the accumulator -
combine
the contents of the right-hand accumulator into the left-hand one, optional -
deduct
the contents of the right-hand accumulator from the left-hand one (undo the effects ofcombine
), optional -
export
: calculate the result value from an accumulator while preserving the accumulator state for further accumulation. Used for aggregations with speculative results or for rolling aggregations -
finish
: calculate the result value from an accumulator. After this conversion the accumulator will no longer be used, it's allowed, for example, to use theidentity()
function
All the functions must be stateless and cooperative.
- Since:
- Jet 3.0
-
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Default Methods Modifier and Type Method Description <T> BiConsumerEx<? super A,? super T>
accumulateFn(int index)
A primitive that updates the accumulator state to account for a new item.default <T> BiConsumerEx<? super A,? super T>
accumulateFn(Tag<T> tag)
A primitive that updates the accumulator state to account for a new item.<R_NEW> AggregateOperation<A,R_NEW>
andThen(FunctionEx<? super R,? extends R_NEW> thenFn)
Returns a copy of this aggregate operation, but with theexport
andfinish
primitives composed with the suppliedthenFn
.int
arity()
Returns the number of contributing streams this operation is set up to handle.BiConsumerEx<? super A,? super A>
combineFn()
A primitive that accepts two accumulators and updates the state of the left-hand one by combining it with the state of the right-hand one.SupplierEx<A>
createFn()
A primitive that returns a new accumulator.BiConsumerEx<? super A,? super A>
deductFn()
A primitive that accepts two accumulators and updates the state of the left-hand one by deducting the state of the right-hand one from it.FunctionEx<? super A,? extends R>
exportFn()
A primitive that transforms the accumulator into a result of the aggregation.default FunctionEx<? super A,? extends R>
finishFn()
A primitive that transforms the accumulator into a result of the aggregation.AggregateOperation<A,R>
withAccumulateFns(BiConsumerEx... accumulateFns)
Returns a copy of this aggregate operation, but with all theaccumulate
primitives replaced with the ones supplied here.default <T> AggregateOperation1<T,A,R>
withCombiningAccumulateFn(FunctionEx<T,A> getAccFn)
Returns a copy of this aggregate operation, but with theaccumulate
primitive replaced with one that expects to find accumulator objects in the input items and combines them all into a single accumulator of the same type.static <A> AggregateOperationBuilder<A>
withCreate(SupplierEx<A> createFn)
Returns a builder object, initialized with the suppliedcreate
primitive, that can be used to construct the definition of an aggregate operation in a step-by-step manner.AggregateOperation<A,A>
withIdentityFinish()
Returns a copy of this aggregate operation, but with thefinish
primitive replaced with the identity function.
-
-
-
Method Detail
-
arity
int arity()
Returns the number of contributing streams this operation is set up to handle. The index passed toaccumulateFn(int)
must be less than this number.
-
createFn
@Nonnull SupplierEx<A> createFn()
A primitive that returns a new accumulator. If thededuct
primitive is defined, the accumulator object must properly implementequals()
. SeedeductFn()
for an explanation.The accumulator produced by the supplier must be serializable. For performance, you should prefer Hazelcast custom serialization.
The supplier must be stateless and cooperative.
-
accumulateFn
@Nonnull default <T> BiConsumerEx<? super A,? super T> accumulateFn(@Nonnull Tag<T> tag)
A primitive that updates the accumulator state to account for a new item. The tag argument identifies which of the contributing streams the returned function will handle. If asked for a tag that isn't registered with it, it will throw an exception.The function must be stateless and cooperative.
-
accumulateFn
@Nonnull <T> BiConsumerEx<? super A,? super T> accumulateFn(int index)
A primitive that updates the accumulator state to account for a new item. The argument identifies the index of the contributing stream the returned function will handle. If asked for an index that isn't registered with it, it will throw an exception.The function must be stateless and cooperative.
-
combineFn
@Nullable BiConsumerEx<? super A,? super A> combineFn()
A primitive that accepts two accumulators and updates the state of the left-hand one by combining it with the state of the right-hand one. The right-hand accumulator remains unchanged. In some cases, such as for single-stage batch or tumbling window aggregation it is not needed and may benull
.The function must be stateless and cooperative.
-
deductFn
@Nullable BiConsumerEx<? super A,? super A> deductFn()
A primitive that accepts two accumulators and updates the state of the left-hand one by deducting the state of the right-hand one from it. The right-hand accumulator remains unchanged.The effect of this primitive must be the opposite of
combine
so that:combine(acc, x); deduct(acc, x);
leavesacc
in the same state as it was before the two operations.This primitive is only used in sliding window aggregation and even in that case it is optional, but its presence may significantly reduce the computational cost. With it, the current sliding window can be obtained from the previous one by deducting the trailing frame and combining the leading frame; without it, each window must be recomputed from all its constituent frames. The finer the sliding step, the more pronounced the difference in computation effort will be.
If this method returns non-null, then
createFn()
must return an accumulator which properly implementsequals()
. After callingdeductFn
, Jet will useequals()
to determine whether the accumulator is now "empty" (i.e., equal to a fresh instance), which signals that the current window contains no more items with the associated grouping key and the entry must be removed from the results. For example:acc = create(); combine(acc, x); deduct(acc, x); assert acc.equals(create()) : "improper combine/deduct behavior";
The function must be stateless and cooperative.
-
exportFn
@Nonnull FunctionEx<? super A,? extends R> exportFn()
A primitive that transforms the accumulator into a result of the aggregation. Unlikefinish
primitive, this operation must not:- mutate the accumulator: it must remain ready to accumulate more items
- the result must not share mutable data with the accumulator: accumulating more items to the accumulator must not change the result
ArrayList
, you must copy it before returning it. If the elements of the list are mutated, they must be copied as well.The returned function must never return
null
. In other words, for any accumulator it must return a non-null exported value.The function must be stateless and cooperative.
-
finishFn
@Nonnull default FunctionEx<? super A,? extends R> finishFn()
A primitive that transforms the accumulator into a result of the aggregation. This is a relaxed version ofexport
primitive: the accumulator is guaranteed to be no longer used after this operation. For example, when accumulating into anArrayList
, you can return the accumulator list directly without copying it.The returned function must never return
null
. In other words, for any accumulator it must return a non-null finished value.The function must be stateless and cooperative.
-
withAccumulateFns
@Nonnull AggregateOperation<A,R> withAccumulateFns(BiConsumerEx... accumulateFns)
Returns a copy of this aggregate operation, but with all theaccumulate
primitives replaced with the ones supplied here. The argument at positioni
replaces the primitive at indexi
, as returned byaccumulateFn(int)
.The functions must be stateless and cooperative.
-
withIdentityFinish
@Nonnull AggregateOperation<A,A> withIdentityFinish()
Returns a copy of this aggregate operation, but with thefinish
primitive replaced with the identity function. It will return the accumulator object as-is. The returned aggregate operation does not support theexport
primitive.
-
withCombiningAccumulateFn
@Nonnull default <T> AggregateOperation1<T,A,R> withCombiningAccumulateFn(@Nonnull FunctionEx<T,A> getAccFn)
Returns a copy of this aggregate operation, but with theaccumulate
primitive replaced with one that expects to find accumulator objects in the input items and combines them all into a single accumulator of the same type. It's used in the second aggregation stage of a two-stage aggregation setup. The first stage emits its accumulators to the second stage.The function must be stateless and cooperative.
- Type Parameters:
T
- the type of stream item- Parameters:
getAccFn
- the function that extracts the accumulator from the stream item
-
andThen
@Nonnull <R_NEW> AggregateOperation<A,R_NEW> andThen(FunctionEx<? super R,? extends R_NEW> thenFn)
Returns a copy of this aggregate operation, but with theexport
andfinish
primitives composed with the suppliedthenFn
. This replacesexportFn
withexportFn.andThen(thenFn)
, same forfinishFn
. The main use case is to transform the result of an existing (library-provided) aggregate operation.The given function must be stateless and cooperative.
- Type Parameters:
R_NEW
- the type of the returned aggregate operation's result- Parameters:
thenFn
- the function to apply to the results ofexport
andfinish
primitives
-
withCreate
@Nonnull static <A> AggregateOperationBuilder<A> withCreate(@Nonnull SupplierEx<A> createFn)
Returns a builder object, initialized with the suppliedcreate
primitive, that can be used to construct the definition of an aggregate operation in a step-by-step manner.The same builder is used to construct both fixed- and variable-arity aggregate operations:
-
For fixed arity use
andAccumulate0()
, optionally followed by.andAccumulate1()
,.andAccumulate2()
. The return type of these methods changes as the static types of the contributing streams are captured. -
For variable arity use
andAccumulate(tag)
.
andExportFinish()
method returns the constructed aggregate operation. Its static type receives all the type parameters captured in the above method calls. For optimization purposes you may want to specify afinish
primitive that is different fromexport
, for example return the accumulator itself without copying. In that case you'll usebuilder.andExport(exportFn).andFinish(finishFn)
.The given function must be stateless and cooperative.
- Type Parameters:
A
- the type of the accumulator- Parameters:
createFn
- thecreate
primitive- Returns:
- the builder object whose static type represents the fact that it
has just the
create
primitive defined
-
For fixed arity use
-
-