Class AggregateOperations
AggregateOperation
. You can
also create your own aggregate operation using the builder object
.- Since:
- Jet 3.0
-
Method Summary
Modifier and TypeMethodDescriptionstatic <T0,
T1, A0, A1, R0, R1>
AggregateOperation2<T0,T1, Tuple2<A0, A1>, Tuple2<R0, R1>> aggregateOperation2
(AggregateOperation1<? super T0, A0, ? extends R0> op0, AggregateOperation1<? super T1, A1, ? extends R1> op1) Convenience foraggregateOperation2(aggrOp0, aggrOp1, finishFn)
that outputs aTuple2(result0, result1)
.static <T0,
A0, R0, T1, A1, R1, R>
AggregateOperation2<T0,T1, Tuple2<A0, A1>, R> aggregateOperation2
(AggregateOperation1<? super T0, A0, ? extends R0> op0, AggregateOperation1<? super T1, A1, ? extends R1> op1, BiFunctionEx<? super R0, ? super R1, ? extends R> exportFinishFn) Returns an aggregate operation that is a composite of two independent aggregate operations, each one accepting its own input.static <T0,
T1, T2, A0, A1, A2, R0, R1, R2>
AggregateOperation3<T0,T1, T2, Tuple3<A0, A1, A2>, Tuple3<R0, R1, R2>> aggregateOperation3
(AggregateOperation1<? super T0, A0, ? extends R0> op0, AggregateOperation1<? super T1, A1, ? extends R1> op1, AggregateOperation1<? super T2, A2, ? extends R2> op2) Convenience foraggregateOperation3(aggrOp0, aggrOp1, aggrOp2, finishFn)
that outputs aTuple3(result0, result1, result2)
.static <T0,
T1, T2, A0, A1, A2, R0, R1, R2, R>
AggregateOperation3<T0,T1, T2, Tuple3<A0, A1, A2>, R> aggregateOperation3
(AggregateOperation1<? super T0, A0, ? extends R0> op0, AggregateOperation1<? super T1, A1, ? extends R1> op1, AggregateOperation1<? super T2, A2, ? extends R2> op2, TriFunction<? super R0, ? super R1, ? super R2, ? extends R> exportFinishFn) Returns an aggregate operation that is a composite of three independent aggregate operations, each one accepting its own input.static <T,
A0, A1, R0, R1, R>
AggregateOperation1<T,Tuple2<A0, A1>, R> allOf
(AggregateOperation1<? super T, A0, ? extends R0> op0, AggregateOperation1<? super T, A1, ? extends R1> op1, BiFunctionEx<? super R0, ? super R1, ? extends R> exportFinishFn) Returns an aggregate operation that is a composite of two aggregate operations.static <T,
A0, A1, A2, R0, R1, R2>
AggregateOperation1<T,Tuple3<A0, A1, A2>, Tuple3<R0, R1, R2>> allOf
(AggregateOperation1<? super T, A0, ? extends R0> op0, AggregateOperation1<? super T, A1, ? extends R1> op1, AggregateOperation1<? super T, A2, ? extends R2> op2) Convenience forallOf(AggregateOperation1, AggregateOperation1, AggregateOperation1, TriFunction)
wrapping the three results in aTuple3
.static <T,
A0, A1, A2, R0, R1, R2, R>
AggregateOperation1<T,Tuple3<A0, A1, A2>, R> allOf
(AggregateOperation1<? super T, A0, ? extends R0> op0, AggregateOperation1<? super T, A1, ? extends R1> op1, AggregateOperation1<? super T, A2, ? extends R2> op2, TriFunction<? super R0, ? super R1, ? super R2, ? extends R> exportFinishFn) Returns an aggregate operation that is a composite of three aggregate operations.static <T,
A0, A1, R0, R1>
AggregateOperation1<T,Tuple2<A0, A1>, Tuple2<R0, R1>> allOf
(AggregateOperation1<? super T, A0, R0> op1, AggregateOperation1<? super T, A1, R1> op2) Convenience forallOf(AggregateOperation1, AggregateOperation1, BiFunctionEx)
wrapping the two results in aTuple2
.static <T> AllOfAggregationBuilder<T>
Returns a builder object that helps you create a composite of multiple aggregate operations.static <T> AggregateOperation1<T,
LongDoubleAccumulator, Double> averagingDouble
(ToDoubleFunctionEx<? super T> getDoubleValueFn) Returns an aggregate operation that finds the arithmetic mean (aka.static <T> AggregateOperation1<T,
LongLongAccumulator, Double> averagingLong
(ToLongFunctionEx<? super T> getLongValueFn) Returns an aggregate operation that finds the arithmetic mean (aka.static <T> AggregateOperation1<T,
PriorityQueue<T>, List<T>> bottomN
(int n, ComparatorEx<? super T> comparator) Returns an aggregate operation that finds the bottomn
items according to the givencomparator
.static CoAggregateOperationBuilder
Returns a builder object that offers a step-by-step fluent API to create an aggregate operation that accepts multiple inputs.Returns an aggregate operation that takes string items and concatenates them into a single string.concatenating
(CharSequence delimiter) Returns an aggregate operation that takes string items and concatenates them, separated by the givendelimiter
, into a single string.concatenating
(CharSequence delimiter, CharSequence prefix, CharSequence suffix) Returns an aggregate operation that takes string items and concatenates them, separated by the givendelimiter
, into a single string.static <T> AggregateOperation1<T,
LongAccumulator, Long> counting()
Returns an aggregate operation that counts the items it observes.static <T,
A, R> AggregateOperation1<T, A, R> filtering
(PredicateEx<? super T> filterFn, AggregateOperation1<? super T, A, ? extends R> downstream) Adapts an aggregate operation so that it accumulates only the items passing thefilterFn
and ignores others.static <T,
U, A, R> AggregateOperation1<T, A, R> flatMapping
(FunctionEx<? super T, ? extends Traverser<? extends U>> flatMapFn, AggregateOperation1<? super U, A, ? extends R> downstream) Adapts an aggregate operation that takes items of typeU
to one that takes items of typeT
, by exploding eachT
into a sequence ofU
s and then accumulating all of them.static <T,
K> AggregateOperation1<T, Map<K, List<T>>, Map<K, List<T>>> groupingBy
(FunctionEx<? super T, ? extends K> keyFn) Returns an aggregate operation that accumulates the items into aHashMap
where the key is the result of applyingkeyFn
and the value is a list of the items with that key.static <T,
K, R, A, M extends Map<K, R>>
AggregateOperation1<T,Map<K, A>, M> groupingBy
(FunctionEx<? super T, ? extends K> keyFn, SupplierEx<M> createMapFn, AggregateOperation1<? super T, A, R> downstream) Returns anAggregateOperation1
that accumulates the items into aMap
(as obtained fromcreateMapFn
) where the key is the result of applyingkeyFn
and the value is the result of applying the downstream aggregate operation to the items with that key.static <T,
K, A, R> AggregateOperation1<T, Map<K, A>, Map<K, R>> groupingBy
(FunctionEx<? super T, ? extends K> keyFn, AggregateOperation1<? super T, A, R> downstream) Returns an aggregate operation that accumulates the items into aHashMap
where the key is the result of applyingkeyFn
and the value is the result of applying the downstream aggregate operation to the items with that key.static <T> AggregateOperation1<T,
LinTrendAccumulator, Double> linearTrend
(ToLongFunctionEx<T> getXFn, ToLongFunctionEx<T> getYFn) Returns an aggregate operation that computes a linear trend over the items.static <T,
U, A, R> AggregateOperation1<T, A, R> mapping
(FunctionEx<? super T, ? extends U> mapFn, AggregateOperation1<? super U, A, ? extends R> downstream) Adapts an aggregate operation that takes items of typeU
to one that takes items of typeT
, by applying the given mapping function to each item.static <T> AggregateOperation1<T,
MutableReference<T>, T> maxBy
(ComparatorEx<? super T> comparator) Returns an aggregate operation that computes the greatest item according to the givencomparator
.static <T> AggregateOperation1<T,
MutableReference<T>, T> minBy
(ComparatorEx<? super T> comparator) Returns an aggregate operation that computes the least item according to the givencomparator
.static <T> AggregateOperation1<T,
PickAnyAccumulator<T>, T> pickAny()
Returns an aggregate operation whose result is an arbitrary item it observed, ornull
if it observed no items.static <T,
A> AggregateOperation1<T, MutableReference<A>, A> reducing
(A emptyAccValue, FunctionEx<? super T, ? extends A> toAccValueFn, BinaryOperatorEx<A> combineAccValuesFn, BinaryOperatorEx<A> deductAccValueFn) Returns an aggregate operation that constructs the result through the process of immutable reduction: The initial accumulated value isemptyAccValue
.static <T> AggregateOperation1<T,
ArrayList<T>, List<T>> sorting
(ComparatorEx<? super T> comparator) Returns an aggregate operation that accumulates all input items into anArrayList
and sorts it with the given comparator.static <T> AggregateOperation1<T,
DoubleAccumulator, Double> summingDouble
(ToDoubleFunctionEx<? super T> getDoubleValueFn) Returns an aggregate operation that computes the sum of thedouble
values it obtains by applyinggetDoubleValueFn
to each item.static <T> AggregateOperation1<T,
LongAccumulator, Long> summingLong
(ToLongFunctionEx<? super T> getLongValueFn) Returns an aggregate operation that computes the sum of thelong
values it obtains by applyinggetLongValueFn
to each item.static <T,
A, R> Aggregator<T, R> toAggregator
(AggregateOperation1<? super T, A, ? extends R> aggrOp) Adapts this aggregate operation to be used forIMap.aggregate(Aggregator)
calls.static <T,
C extends Collection<T>>
AggregateOperation1<T,C, C> toCollection
(SupplierEx<C> createCollectionFn) Returns an aggregate operation that accumulates the items into aCollection
.static <T,
A, R> Collector<T, A, R> toCollector
(AggregateOperation1<? super T, A, ? extends R> aggrOp) Adapts this aggregate operation to a collector which can be passed toStream.collect(Collector)
.static <T> AggregateOperation1<T,
List<T>, List<T>> toList()
Returns an aggregate operation that accumulates the items into anArrayList
.static <T,
K, U> AggregateOperation1<T, Map<K, U>, Map<K, U>> toMap
(FunctionEx<? super T, ? extends K> keyFn, FunctionEx<? super T, ? extends U> valueFn) Returns an aggregate operation that accumulates the items into aHashMap
whose keys and values are the result of applying the provided mapping functions.static <T,
K, U> AggregateOperation1<T, Map<K, U>, Map<K, U>> toMap
(FunctionEx<? super T, ? extends K> keyFn, FunctionEx<? super T, ? extends U> valueFn, BinaryOperatorEx<U> mergeFn) Returns an aggregate operation that accumulates the items into aHashMap
whose keys and values are the result of applying the provided mapping functions.static <T,
K, U, M extends Map<K, U>>
AggregateOperation1<T,M, M> toMap
(FunctionEx<? super T, ? extends K> keyFn, FunctionEx<? super T, ? extends U> valueFn, BinaryOperatorEx<U> mergeFn, SupplierEx<M> createMapFn) Returns an aggregate operation that accumulates elements into a user-suppliedMap
instance.static <T> AggregateOperation1<T,
PriorityQueue<T>, List<T>> topN
(int n, ComparatorEx<? super T> comparator) Returns an aggregate operation that finds the topn
items according to the givencomparator
.static <T> AggregateOperation1<T,
Set<T>, Set<T>> toSet()
Returns an aggregate operation that accumulates the items into aHashSet
.
-
Method Details
-
counting
Returns an aggregate operation that counts the items it observes. The result is of typelong
.This sample takes a stream of words and finds the number of occurrences of each word in it:
BatchStage<String> words = pipeline.readFrom(wordSource); BatchStage<Entry<String, Long>> wordFrequencies = words.groupingKey(wholeItem()).aggregate(counting());
-
summingLong
@Nonnull public static <T> AggregateOperation1<T,LongAccumulator, summingLongLong> (@Nonnull ToLongFunctionEx<? super T> getLongValueFn) Returns an aggregate operation that computes the sum of thelong
values it obtains by applyinggetLongValueFn
to each item.This sample takes a stream of lines of text and outputs a single
long
number telling how many words there were in the stream:
Note: if the sum exceedsBatchStage<String> linesOfText = pipeline.readFrom(textSource); BatchStage<Long> numberOfWordsInText = linesOfText .map(line -> line.split("\\W+")) .aggregate(summingLong(wordsInLine -> wordsInLine.length));
Long.MAX_VALUE
, the job will fail with anArithmeticException
.- Type Parameters:
T
- type of the input item- Parameters:
getLongValueFn
- function that extracts thelong
values you want to sum. It must be stateless and cooperative.
-
summingDouble
@Nonnull public static <T> AggregateOperation1<T,DoubleAccumulator, summingDoubleDouble> (@Nonnull ToDoubleFunctionEx<? super T> getDoubleValueFn) Returns an aggregate operation that computes the sum of thedouble
values it obtains by applyinggetDoubleValueFn
to each item.This sample takes a stream of purchase events and outputs a single
double
value that tells the total sum of money spent in them:BatchStage<Purchase> purchases = pipeline.readFrom(purchaseSource); BatchStage<Double> purchaseVolume = purchases.aggregate(summingDouble(Purchase::amount));
- Type Parameters:
T
- type of the input item- Parameters:
getDoubleValueFn
- function that extracts thedouble
values you want to sum. It must be stateless and cooperative.
-
minBy
@Nonnull public static <T> AggregateOperation1<T,MutableReference<T>, minByT> (@Nonnull ComparatorEx<? super T> comparator) Returns an aggregate operation that computes the least item according to the givencomparator
.This sample takes a stream of people and finds the youngest person in it:
NOTE: if this aggregate operation doesn't observe any items, its result will beBatchStage<Person> people = pipeline.readFrom(peopleSource); BatchStage<Person> youngestPerson = people.aggregate(minBy(ComparatorEx.comparing(Person::age)));
null
. Since the non-keyedBatchStage.aggregate(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, ?, ? extends R>)
emits just the naked aggregation result, and since anull
cannot travel through a Jet pipeline, you will not get any output in that case.If several items tie for the least one, this aggregate operation will choose any one to return and may choose a different one each time.
Implementation note: this aggregate operation does not implement the
deduct
primitive. This has performance implications for sliding window aggregation.- Type Parameters:
T
- type of the input item- Parameters:
comparator
- comparator to compare the items. It must be stateless and cooperative.
-
maxBy
@Nonnull public static <T> AggregateOperation1<T,MutableReference<T>, maxByT> (@Nonnull ComparatorEx<? super T> comparator) Returns an aggregate operation that computes the greatest item according to the givencomparator
.This sample takes a stream of people and finds the oldest person in it:
NOTE: if this aggregate operation doesn't observe any items, its result will beBatchStage<Person> people = pipeline.readFrom(peopleSource); BatchStage<Person> oldestPerson = people.aggregate(maxBy(ComparatorEx.comparing(Person::age)));
null
. Since the non-keyedBatchStage.aggregate(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, ?, ? extends R>)
emits just the naked aggregation result, and since anull
cannot travel through a Jet pipeline, you will not get any output in that case.If several items tie for the greatest one, this aggregate operation will choose any one to return and may choose a different one each time.
Implementation note: this aggregate operation does not implement the
deduct
primitive. This has performance implications for sliding window aggregation.- Type Parameters:
T
- type of the input item- Parameters:
comparator
- comparator to compare the items. It must be stateless and cooperative.
-
topN
@Nonnull public static <T> AggregateOperation1<T,PriorityQueue<T>, topNList<T>> (int n, @Nonnull ComparatorEx<? super T> comparator) Returns an aggregate operation that finds the topn
items according to the givencomparator
. It outputs a sorted list with the top item in the first position.This sample takes a stream of people and finds ten oldest persons in it:
Implementation note: this aggregate operation does not implement theBatchStage<Person> people = pipeline.readFrom(peopleSource); BatchStage<List<Person>> oldestDudes = people.aggregate(topN(10, ComparatorEx.comparing(Person::age)));
deduct
primitive. This has performance implications for sliding window aggregation.- Type Parameters:
T
- type of the input item- Parameters:
n
- number of top items to findcomparator
- compares the items. It must be stateless and cooperative.
-
bottomN
@Nonnull public static <T> AggregateOperation1<T,PriorityQueue<T>, bottomNList<T>> (int n, @Nonnull ComparatorEx<? super T> comparator) Returns an aggregate operation that finds the bottomn
items according to the givencomparator
. It outputs a sorted list with the bottom item in the first position.This sample takes a stream of people and finds ten youngest persons in it:
Implementation note: this aggregate operation does not implement theBatchStage<Person> people = pipeline.readFrom(peopleSource); BatchStage<List<Person>> youngestDudes = people.aggregate(bottomN(10, ComparatorEx.comparing(Person::age)));
deduct
primitive. This has performance implications for sliding window aggregation.- Type Parameters:
T
- type of the input item- Parameters:
n
- number of bottom items to findcomparator
- compares the items. It must be stateless and cooperative.
-
averagingLong
@Nonnull public static <T> AggregateOperation1<T,LongLongAccumulator, averagingLongDouble> (@Nonnull ToLongFunctionEx<? super T> getLongValueFn) Returns an aggregate operation that finds the arithmetic mean (aka. average) of thelong
values it obtains by applyinggetLongValueFn
to each item. It outputs the result as adouble
.This sample takes a stream of people and finds their mean age:
BatchStage<Person> people = pipeline.readFrom(peopleSource); BatchStage<Double> meanAge = people.aggregate(averagingLong(Person::age));
If the aggregate operation does not observe any input, its result is
NaN
.NOTE: this operation accumulates the sum and the count as separate
long
variables and combines them at the end into the mean value. If either of these variables exceedsLong.MAX_VALUE
, the job will fail with anArithmeticException
.- Type Parameters:
T
- type of the input item- Parameters:
getLongValueFn
- function that extracts thelong
value from the item. It must be stateless and cooperative.
-
averagingDouble
@Nonnull public static <T> AggregateOperation1<T,LongDoubleAccumulator, averagingDoubleDouble> (@Nonnull ToDoubleFunctionEx<? super T> getDoubleValueFn) Returns an aggregate operation that finds the arithmetic mean (aka. average) of thedouble
values it obtains by applyinggetDoubleValueFn
to each item. It outputs the result as adouble
.This sample takes a stream of people and finds their mean age:
BatchStage<Person> people = pipeline.readFrom(peopleSource); BatchStage<Double> meanAge = people.aggregate(averagingDouble(Person::age));
If the aggregate operation does not observe any input, its result is
NaN
.- Type Parameters:
T
- type of the input item- Parameters:
getDoubleValueFn
- function that extracts thedouble
value from the item. It must be stateless and cooperative.
-
linearTrend
@Nonnull public static <T> AggregateOperation1<T,LinTrendAccumulator, linearTrendDouble> (@Nonnull ToLongFunctionEx<T> getXFn, @Nonnull ToLongFunctionEx<T> getYFn) Returns an aggregate operation that computes a linear trend over the items. It will produce adouble
-valued coefficient that approximates the rate of change ofy
as a function ofx
, wherex
andy
arelong
quantities obtained by applying the two provided functions to each item.This sample takes an infinite stream of trade events and outputs the current rate of price change using a sliding window:
With the trade price given in cents and the timestamp in milliseconds, the output will be in cents per millisecond. Make sure you apply a scaling factor if you want another, more natural unit of measure.StreamStage<Trade> trades = pipeline .readFrom(tradeSource) .withTimestamps(Trade::getTimestamp, SECONDS.toMillis(1)); StreamStage<WindowResult<Double>> priceTrend = trades .window(WindowDefinition.sliding(MINUTES.toMillis(5), SECONDS.toMillis(1))) .aggregate(linearTrend(Trade::getTimestamp, Trade::getPrice));
If this aggregate operation does not observe any input, its result is
NaN
.- Type Parameters:
T
- type of the input item- Parameters:
getXFn
- a function to extract x from the input. It must be stateless and cooperative.getYFn
- a function to extract y from the input. It must be stateless and cooperative.
-
concatenating
Returns an aggregate operation that takes string items and concatenates them into a single string.This sample outputs a string that you get by reading down the first column of the input text:
BatchStage<String> linesOfText = pipeline.readFrom(textSource); BatchStage<String> lineStarters = linesOfText .map(line -> line.charAt(0)) .map(Object::toString) .aggregate(concatenating());
-
concatenating
public static AggregateOperation1<CharSequence,StringBuilder, concatenatingString> (CharSequence delimiter) Returns an aggregate operation that takes string items and concatenates them, separated by the givendelimiter
, into a single string.This sample outputs a single line of text that contains all the upper-cased and title-cased words of the input text:
BatchStage<String> linesOfText = pipeline.readFrom(textSource); BatchStage<String> upcaseWords = linesOfText .map(line -> line.split("\\W+")) .flatMap(Traversers::traverseArray) .filter(word -> word.matches("\\p{Lu}.*")) .aggregate(concatenating(" "));
-
concatenating
public static AggregateOperation1<CharSequence,StringBuilder, concatenatingString> (CharSequence delimiter, CharSequence prefix, CharSequence suffix) Returns an aggregate operation that takes string items and concatenates them, separated by the givendelimiter
, into a single string. The resulting string will start with the givenprefix
and end with the givensuffix
.This sample outputs a single item, a JSON array of all the upper-cased and title-cased words of the input text:
BatchStage<String> linesOfText = pipeline.readFrom(textSource); BatchStage<String> upcaseWords = linesOfText .map(line -> line.split("\\W+")) .flatMap(Traversers::traverseArray) .filter(word -> word.matches("\\p{Lu}.*")) .aggregate(concatenating("['", "', '", "']"));
-
mapping
public static <T,U, AggregateOperation1<T,A, R> A, mappingR> (@Nonnull FunctionEx<? super T, ? extends U> mapFn, @Nonnull AggregateOperation1<? super U, A, ? extends R> downstream) Adapts an aggregate operation that takes items of typeU
to one that takes items of typeT
, by applying the given mapping function to each item. Normally you should just apply the mapping in a stage before the aggregation, but this adapter is useful when simultaneously performing several aggregate operations usingallOf(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>)
.In addition to mapping, you can apply filtering as well by returning
null
for an item you want filtered out.This sample takes a stream of people and builds two sorted lists from it, one with all the names and one with all the surnames:
BatchStage<Person> people = pipeline.readFrom(peopleSource); BatchStage<Tuple2<List<String>, List<String>>> sortedNames = people.aggregate(allOf( mapping(Person::getFirstName, sorting(ComparatorEx.naturalOrder())), mapping(Person::getLastName, sorting(ComparatorEx.naturalOrder()))));
- Type Parameters:
T
- type of the input itemU
- input type of the downstream aggregate operationA
- downstream operation's accumulator typeR
- downstream operation's result type- Parameters:
mapFn
- the function to apply to the input items. It must be stateless and cooperative.downstream
- the downstream aggregate operation- See Also:
-
filtering(com.hazelcast.function.PredicateEx<? super T>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A, ? extends R>)
flatMapping(com.hazelcast.function.FunctionEx<? super T, ? extends com.hazelcast.jet.Traverser<? extends U>>, com.hazelcast.jet.aggregate.AggregateOperation1<? super U, A, ? extends R>)
-
filtering
public static <T,A, AggregateOperation1<T,R> A, filteringR> (@Nonnull PredicateEx<? super T> filterFn, @Nonnull AggregateOperation1<? super T, A, ? extends R> downstream) Adapts an aggregate operation so that it accumulates only the items passing thefilterFn
and ignores others. Normally you should just apply the filter in a stage before the aggregation, but this adapter is useful when simultaneously performing several aggregate operations usingallOf(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>)
.This sample takes a stream of people and outputs two numbers, the average height of kids and grown-ups:
BatchStage<Person> people = pipeline.readFrom(peopleSource); BatchStage<Tuple2<Double, Double>> avgHeightByAge = people.aggregate(allOf( filtering((Person p) -> p.getAge() < 18, averagingLong(Person::getHeight)), filtering((Person p) -> p.getAge() >= 18, averagingLong(Person::getHeight)) ));
- Type Parameters:
T
- type of the input itemA
- downstream operation's accumulator typeR
- downstream operation's result type- Parameters:
filterFn
- the filtering function. It must be stateless and cooperative.downstream
- the downstream aggregate operation- Since:
- Jet 3.1
- See Also:
-
mapping(com.hazelcast.function.FunctionEx<? super T, ? extends U>, com.hazelcast.jet.aggregate.AggregateOperation1<? super U, A, ? extends R>)
flatMapping(com.hazelcast.function.FunctionEx<? super T, ? extends com.hazelcast.jet.Traverser<? extends U>>, com.hazelcast.jet.aggregate.AggregateOperation1<? super U, A, ? extends R>)
-
flatMapping
public static <T,U, AggregateOperation1<T,A, R> A, flatMappingR> (@Nonnull FunctionEx<? super T, ? extends Traverser<? extends U>> flatMapFn, @Nonnull AggregateOperation1<? super U, A, ? extends R> downstream) Adapts an aggregate operation that takes items of typeU
to one that takes items of typeT
, by exploding eachT
into a sequence ofU
s and then accumulating all of them. Normally you should just apply the flat-mapping in a stage before the aggregation, but this adapter is useful when simultaneously performing several aggregate operations usingallOf(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>)
.The traverser your function returns must be non-null and null-terminated.
This sample takes a stream of people and outputs two numbers, the mean age of all the people and the mean age of people listed as someone's kid:
BatchStage<Person> people = pipeline.readFrom(peopleSource); people.aggregate(allOf( averagingLong(Person::getAge), flatMapping((Person p) -> traverseIterable(p.getChildren()), averagingLong(Person::getAge)) ));
- Type Parameters:
T
- type of the input itemU
- input type of the downstream aggregate operationA
- downstream operation's accumulator typeR
- downstream operation's result type- Parameters:
flatMapFn
- the flat-mapping function to apply. It must be stateless and cooperative.downstream
- the downstream aggregate operation- Since:
- Jet 3.1
- See Also:
-
toCollection
public static <T,C extends Collection<T>> AggregateOperation1<T,C, toCollectionC> (@Nonnull SupplierEx<C> createCollectionFn) Returns an aggregate operation that accumulates the items into aCollection
. It creates empty, mutable collections as needed by calling the providedcreateCollectionFn
.This sample takes a stream of words and outputs a single sorted set of all the long words (above 5 letters):
Note: if you use a collection that preserves the insertion order, keep in mind that Jet doesn't aggregate the items in any specified order.BatchStage<String> words = pipeline.readFrom(wordSource); BatchStage<SortedSet<String>> sortedLongWords = words .filter(w -> w.length() > 5) .aggregate(toCollection(TreeSet::new));
- Type Parameters:
T
- type of the input itemC
- the type of the collection- Parameters:
createCollectionFn
- aSupplier
of empty, mutableCollection
s. It must be stateless and cooperative.
-
toList
Returns an aggregate operation that accumulates the items into anArrayList
.This sample takes a stream of words and outputs a single list of all the long words (above 5 letters):
Note: accumulating all the data into an in-memory list shouldn't be your first choice in designing a pipeline. Consider draining the result stream to a sink.BatchStage<String> words = pipeline.readFrom(wordSource); BatchStage<List<String>> longWords = words .filter(w -> w.length() > 5) .aggregate(toList());
- Type Parameters:
T
- type of the input item
-
toSet
Returns an aggregate operation that accumulates the items into aHashSet
.This sample takes a stream of people and outputs a single set of all the distinct cities they live in:
Note: accumulating all the data into an in-memory set shouldn't be your first choice in designing a pipeline. Consider draining the result stream to a sink.pipeline.readFrom(personSource) .map(Person::getCity) .aggregate(toSet());
- Type Parameters:
T
- type of the input item
-
toMap
public static <T,K, AggregateOperation1<T,U> Map<K, toMapU>, Map<K, U>> (FunctionEx<? super T, ? extends K> keyFn, FunctionEx<? super T, ? extends U> valueFn) Returns an aggregate operation that accumulates the items into aHashMap
whose keys and values are the result of applying the provided mapping functions.This aggregate operation does not tolerate duplicate keys and will throw an
IllegalStateException
if it detects them. If your data contains duplicates, usetoMap(keyFn, valueFn, mergeFn)
.The following sample takes a stream of sensor readings and outputs a single map {sensor ID -> reading}:
Note: accumulating all the data into an in-memory map shouldn't be your first choice in designing a pipeline. Consider draining the stream to a sink.BatchStage<Map<String, Double>> readings = pipeline .readFrom(sensorData) .aggregate(toMap( SensorReading::getSensorId, SensorReading::getValue));
- Type Parameters:
T
- type of the input itemK
- type of the keyU
- type of the value- Parameters:
keyFn
- a function to extract the key from the input item. It must be stateless and cooperative.valueFn
- a function to extract the value from the input item. It must be stateless and cooperative.- See Also:
-
toMap
public static <T,K, AggregateOperation1<T,U> Map<K, toMapU>, Map<K, U>> (FunctionEx<? super T, ? extends K> keyFn, FunctionEx<? super T, ? extends U> valueFn, BinaryOperatorEx<U> mergeFn) Returns an aggregate operation that accumulates the items into aHashMap
whose keys and values are the result of applying the provided mapping functions.This aggregate operation resolves duplicate keys by applying
mergeFn
to the conflicting values.mergeFn
will act upon the values aftervalueFn
has already been applied.The following sample takes a stream of sensor readings and outputs a single map {sensor ID -> reading}. Multiple readings from the same sensor get summed up:
Note: accumulating all the data into an in-memory map shouldn't be your first choice in designing a pipeline. Consider draining the stream to a sink.BatchStage<Map<String, Double>> readings = pipeline .readFrom(sensorData) .aggregate(toMap( SensorReading::getSensorId, SensorReading::getValue, Double::sum));
The given functions must be stateless and cooperative.
- Type Parameters:
T
- type of the input itemK
- the type of keyU
- the output type of the value mapping function- Parameters:
keyFn
- a function to extract the key from input itemvalueFn
- a function to extract value from input itemmergeFn
- the function used to resolve collisions between values associated with the same key, will be passed toMap.merge(Object, Object, java.util.function.BiFunction)
- See Also:
-
toMap
public static <T,K, AggregateOperation1<T,U, M extends Map<K, U>> M, toMapM> (FunctionEx<? super T, ? extends K> keyFn, FunctionEx<? super T, ? extends U> valueFn, BinaryOperatorEx<U> mergeFn, SupplierEx<M> createMapFn) Returns an aggregate operation that accumulates elements into a user-suppliedMap
instance. The keys and values are the result of applying the provided mapping functions to the input elements.This aggregate operation resolves duplicate keys by applying
mergeFn
to the conflicting values.mergeFn
will act upon the values aftervalueFn
has already been applied.The following sample takes a stream of sensor readings and outputs a single
ObjectToLongHashMap
of {sensor ID -> reading}. Multiple readings from the same sensor get summed up:BatchStage<Map<String, Long>> readings = pipeline .readFrom(sensorData) .aggregate(toMap( SensorReading::getSensorId, SensorReading::getValue, Long::sum, ObjectToLongHashMap::new));
The given functions must be stateless and cooperative.
- Type Parameters:
T
- type of the input itemK
- the output type of the key mapping functionU
- the output type of the value mapping functionM
- the type of the resultingMap
- Parameters:
keyFn
- a function to extract the key from input itemvalueFn
- a function to extract value from input itemmergeFn
- a merge function, used to resolve collisions between values associated with the same key, as supplied toMap.merge(Object, Object, java.util.function.BiFunction)
createMapFn
- a function which returns a new, emptyMap
into which the results will be inserted- See Also:
-
groupingBy
public static <T,K> AggregateOperation1<T,Map<K, groupingByList<T>>, Map<K, List<T>>> (FunctionEx<? super T, ? extends K> keyFn) Returns an aggregate operation that accumulates the items into aHashMap
where the key is the result of applyingkeyFn
and the value is a list of the items with that key.This operation is primarily useful when you need a cascaded group-by where you further classify the members of each group by a secondary key.
This sample takes a stream of persons and classifies them first by country and then by gender. It outputs a stream of map entries where the key is the country and the value is a map from gender to the list of people of that gender from that country:
This aggregate operation has a similar effect to the dedicatedBatchStage<Person> people = pipeline.readFrom(personSource); BatchStage<Entry<String, Map<String, List<Person>>>> byCountryAndGender = people.groupingKey(Person::getCountry) .aggregate(groupingBy(Person::getGender));
}groupingKey()
pipeline transform, so you may wonder why not use it in all cases, not just cascaded grouping. To see the difference, check out these two snippets:
Notice that snippet 1 outputs a stream of map entries whereas snippet 2 outputs a single map. To produce the single map, Jet must do all the work on a single thread and hold all the data on a single cluster member, so you lose the advantage of distributed computation. By contrast, snippet 1 allows Jet to partition the input by the grouping key and split the work across the cluster. This is why you should prefer aBatchStage<Person> people = pipeline.readFrom(personSource); // Snippet 1 BatchStage<Entry<String, List<Person>>> byCountry1 = people.groupingKey(Person::getCountry) .aggregate(toList()); // Snippet 2 BatchStage<Map<String, List<Person>>> byCountry2 = people.aggregate(groupingBy(Person::getCountry));
groupingKey
stage if you have just one level of grouping.- Type Parameters:
T
- type of the input itemK
- the output type of the key mapping function- Parameters:
keyFn
- a function to extract the key from input item. It must be stateless and cooperative.- See Also:
-
groupingBy
public static <T,K, AggregateOperation1<T,A, R> Map<K, groupingByA>, Map<K, R>> (FunctionEx<? super T, ? extends K> keyFn, AggregateOperation1<? super T, A, R> downstream) Returns an aggregate operation that accumulates the items into aHashMap
where the key is the result of applyingkeyFn
and the value is the result of applying the downstream aggregate operation to the items with that key.This operation is primarily useful when you need a cascaded group-by where you further classify the members of each group by a secondary key. For the difference between this operation and the
groupingKey()
pipeline transform, see the documentation ongroupingBy(keyFn)
.This sample takes a stream of people, classifies them by country and gender, and reports the number of people in each category:
BatchStage<Person> people = pipeline.readFrom(personSource); BatchStage<Entry<String, Map<String, Long>>> countByCountryAndGender = people.groupingKey(Person::getCountry) .aggregate(groupingBy(Person::getGender, counting()));
- Type Parameters:
T
- type of the input itemK
- the output type of the key mapping functionR
- the type of the downstream aggregation resultA
- downstream aggregation's accumulator type- Parameters:
keyFn
- a function to extract the key from input item. It must be stateless and cooperative.downstream
- the downstream aggregate operation- See Also:
-
groupingBy
public static <T,K, AggregateOperation1<T,R, A, M extends Map<K, R>> Map<K, groupingByA>, M> (FunctionEx<? super T, ? extends K> keyFn, SupplierEx<M> createMapFn, AggregateOperation1<? super T, A, R> downstream) Returns anAggregateOperation1
that accumulates the items into aMap
(as obtained fromcreateMapFn
) where the key is the result of applyingkeyFn
and the value is the result of applying the downstream aggregate operation to the items with that key.This operation is primarily useful when you need a cascaded group-by where you further classify the members of each group by a secondary key. For the difference between this operation and the
groupingKey()
pipeline transform, see the documentation ongroupingBy(keyFn)
.The following sample takes a stream of people, classifies them by country and gender, and reports the number of people in each category. It uses the
EnumMap
to optimize memory usage:BatchStage<Person> people = pipeline.readFrom(personSource); BatchStage<Entry<String, Map<Gender, Long>>> countByCountryAndGender = people.groupingKey(Person::getCountry) .aggregate(groupingBy( Person::getGender, () -> new EnumMap<>(Gender.class), counting()));
- Type Parameters:
T
- type of the input itemK
- the output type of the key mapping functionR
- the type of the downstream aggregation resultA
- downstream aggregation's accumulator typeM
- output type of the resultingMap
- Parameters:
keyFn
- a function to extract the key from input item. It must be stateless and cooperative.createMapFn
- a function which returns a new, emptyMap
into which the results will be inserted. It must be stateless and cooperative.downstream
- the downstream aggregate operation- See Also:
-
reducing
@Nonnull public static <T,A> AggregateOperation1<T,MutableReference<A>, reducingA> (@Nonnull A emptyAccValue, @Nonnull FunctionEx<? super T, ? extends A> toAccValueFn, @Nonnull BinaryOperatorEx<A> combineAccValuesFn, @Nullable BinaryOperatorEx<A> deductAccValueFn) Returns an aggregate operation that constructs the result through the process of immutable reduction:- The initial accumulated value is
emptyAccValue
. - For each input item, compute the new value:
newVal = combineAccValues(currVal, toAccValue(item))
combineAccValuesFn
must be associative because it will also be used to combine partial results, as well as commutative because the encounter order of items is unspecified.The optional
deductAccValueFn
allows Jet to compute the sliding window in O(1) time. It must undo the effects of a previouscombineAccValuesFn
call:A accVal; // has some pre-existing value A itemAccVal = toAccValueFn.apply(item); A combined = combineAccValuesFn.apply(accVal, itemAccVal); A deducted = deductAccValueFn.apply(combined, itemAccVal); assert deducted.equals(accVal);
This sample takes a stream of product orders and outputs a single
long
number which is the sum total of all the ordered amounts. The aggregate operation it implements is equivalent tosummingLong(com.hazelcast.function.ToLongFunctionEx<? super T>)
:BatchStage<Order> orders = pipeline.readFrom(orderSource); BatchStage<Long> totalAmount = orders.aggregate(reducing( 0L, Order::getAmount, Math::addExact, Math::subtractExact ));
The given functions must be stateless and cooperative.
- Type Parameters:
T
- type of the input itemA
- type of the accumulated value- Parameters:
emptyAccValue
- the reducing operation's emptyAccValue elementtoAccValueFn
- transforms the stream item into its accumulated valuecombineAccValuesFn
- combines two accumulated values into onedeductAccValueFn
- deducts the right-hand accumulated value from the left-hand one (optional)
- The initial accumulated value is
-
pickAny
Returns an aggregate operation whose result is an arbitrary item it observed, ornull
if it observed no items.The implementation of
StageWithWindow.distinct()
uses this operation and, if needed, you can use it directly for the same purpose.This sample takes a stream of people and outputs a stream of people that have distinct last names (same as calling
groupingKey(keyFn).distinct()
:
NOTE: if this aggregate operation doesn't observe any items, its result will beBatchStage<Person> people = pipeline.readFrom(peopleSource); BatchStage<Entry<String, Person>> distinctByLastName = people.groupingKey(Person::getLastName) .aggregate(pickAny());
null
. Since the non-keyedBatchStage.aggregate(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, ?, ? extends R>)
emits just the naked aggregation result, and since anull
cannot travel through a Jet pipeline, you will not get any output in that case.- Type Parameters:
T
- type of the input item
-
sorting
public static <T> AggregateOperation1<T,ArrayList<T>, sortingList<T>> (@Nonnull ComparatorEx<? super T> comparator) Returns an aggregate operation that accumulates all input items into anArrayList
and sorts it with the given comparator. If you haveComparable
items that you want to sort in their natural order, useComparatorEx.naturalOrder()
.This sample takes a stream of people and outputs a single list of people sorted by their last name:
BatchStage<Person> people = pipeline.readFrom(peopleSource); BatchStage<List<Person>> sorted = people.aggregate( sorting(ComparatorEx.comparing(Person::getLastName)));
- Type Parameters:
T
- the type of input items- Parameters:
comparator
- the comparator to use for sorting. It must be stateless and cooperative.
-
allOf
@Nonnull public static <T,A0, AggregateOperation1<T,A1, R0, R1, R> Tuple2<A0, allOfA1>, R> (@Nonnull AggregateOperation1<? super T, A0, ? extends R0> op0, @Nonnull AggregateOperation1<? super T, A1, ? extends R1> op1, @Nonnull BiFunctionEx<? super R0, ? super R1, ? extends R> exportFinishFn) Returns an aggregate operation that is a composite of two aggregate operations. It allows you to calculate both aggregations over the same items at once.This sample takes a stream of orders and outputs a single tuple containing the orders with the smallest and the largest amount:
BatchStage<Order> orders = pipeline.readFrom(orderSource); BatchStage<Tuple2<Order, Order>> extremes = orders.aggregate(allOf( minBy(ComparatorEx.comparing(Order::getAmount)), maxBy(ComparatorEx.comparing(Order::getAmount)), Tuple2::tuple2 ));
- Type Parameters:
T
- type of input itemsA0
- 1st accumulator typeA1
- 2nd accumulator typeR0
- 1st result typeR1
- 2nd result typeR
- final result type- Parameters:
op0
- 1st operationop1
- 2nd operationexportFinishFn
- function combining the two results into a single target instance. It must be stateless and cooperative.- Returns:
- the composite operation
-
allOf
@Nonnull public static <T,A0, AggregateOperation1<T,A1, R0, R1> Tuple2<A0, allOfA1>, Tuple2<R0, R1>> (@Nonnull AggregateOperation1<? super T, A0, R0> op1, @Nonnull AggregateOperation1<? super T, A1, R1> op2) Convenience forallOf(AggregateOperation1, AggregateOperation1, BiFunctionEx)
wrapping the two results in aTuple2
. -
allOf
@Nonnull public static <T,A0, AggregateOperation1<T,A1, A2, R0, R1, R2, R> Tuple3<A0, allOfA1, A2>, R> (@Nonnull AggregateOperation1<? super T, A0, ? extends R0> op0, @Nonnull AggregateOperation1<? super T, A1, ? extends R1> op1, @Nonnull AggregateOperation1<? super T, A2, ? extends R2> op2, @Nonnull TriFunction<? super R0, ? super R1, ? super R2, ? extends R> exportFinishFn) Returns an aggregate operation that is a composite of three aggregate operations. It allows you to calculate all three over the same items at once.This sample takes a stream of orders and outputs a single tuple containing the average amount ordered and the orders with the smallest and the largest amount:
BatchStage<Order> orders = pipeline.readFrom(orderSource); BatchStage<Tuple3<Double, Order, Order>> averageAndExtremes = orders.aggregate(allOf( averagingLong(Order::getAmount), minBy(ComparatorEx.comparing(Order::getAmount)), maxBy(ComparatorEx.comparing(Order::getAmount)), Tuple3::tuple3 ));
- Type Parameters:
T
- type of input itemsA0
- 1st accumulator typeA1
- 2nd accumulator typeA2
- 3rd accumulator typeR0
- 1st result typeR1
- 2nd result typeR2
- 3rd result typeR
- final result type- Parameters:
op0
- 1st operationop1
- 2nd operationop2
- 3rd operationexportFinishFn
- function combining the three results into a single target instance. It must be stateless and cooperative.- Returns:
- the composite operation
-
allOf
@Nonnull public static <T,A0, AggregateOperation1<T,A1, A2, R0, R1, R2> Tuple3<A0, allOfA1, A2>, Tuple3<R0, R1, R2>> (@Nonnull AggregateOperation1<? super T, A0, ? extends R0> op0, @Nonnull AggregateOperation1<? super T, A1, ? extends R1> op1, @Nonnull AggregateOperation1<? super T, A2, ? extends R2> op2) Convenience forallOf(AggregateOperation1, AggregateOperation1, AggregateOperation1, TriFunction)
wrapping the three results in aTuple3
. -
allOfBuilder
Returns a builder object that helps you create a composite of multiple aggregate operations. The resulting aggregate operation will perform all of the constituent operations at the same time, and you can retrieve individual results from theItemsByTag
object you'll get in the output.The builder object is primarily intended to build a composite of four or more aggregate operations. For up to three operations, prefer the simpler and more type-safe variants
allOf(op1, op2)
andallOf(op1, op2, op3)
.In the following sample we'll construct a composite aggregate operation that takes a stream of orders and finds the extremes in terms of ordered amount. Here's the input stage:
Now we construct the aggregate operation using the builder:BatchStage<Order> orders = pipeline.readFrom(orderSource);
Finally, we apply the aggregate operation and use the tags we got above to extract the components:AllOfAggregationBuilder<Order> builder = allOfBuilder(); Tag<Order> minTag = builder.add(minBy(ComparatorEx.comparing(Order::getAmount))); Tag<Order> maxTag = builder.add(maxBy(ComparatorEx.comparing(Order::getAmount))); AggregateOperation1<Order, ?, ItemsByTag> aggrOp = builder.build();
BatchStage<ItemsByTag> extremes = orders.aggregate(aggrOp); BatchStage<Tuple2<Order, Order>> extremesAsTuple = extremes.map(ibt -> tuple2(ibt.get(minTag), ibt.get(maxTag)));
- Type Parameters:
T
- type of input items
-
aggregateOperation2
public static <T0,A0, AggregateOperation2<T0,R0, T1, A1, R1, R> T1, aggregateOperation2Tuple2<A0, A1>, R> (@Nonnull AggregateOperation1<? super T0, A0, ? extends R0> op0, @Nonnull AggregateOperation1<? super T1, A1, ? extends R1> op1, @Nonnull BiFunctionEx<? super R0, ? super R1, ? extends R> exportFinishFn) Returns an aggregate operation that is a composite of two independent aggregate operations, each one accepting its own input. You need this kind of operation in a two-way co-aggregating pipeline stage such asBatchStage.aggregate2(BatchStage, AggregateOperation2)
stage0.aggregate2(stage1, compositeAggrOp)}. Before using this method, see if you can instead usestage0.aggregate2(aggrOp0, stage1, aggrOp1)
because it's simpler and doesn't require you to pre-compose the aggregate operations.This method is suitable when you can express your computation as two independent aggregate operations where you combine their final results. If you need an operation that combines the two inputs in the accumulation phase, you can create an aggregate operation by specifying each primitive using the aggregate operation builder.
As a quick example, let's say you have two data streams coming from an online store, consisting of user actions: page visits and payments:
We want to find out how many page clicks each user did before buying a product. We can do it like this:BatchStage<PageVisit> pageVisits = pipeline.readFrom(pageVisitSource); BatchStage<Payment> payments = pipeline.readFrom(paymentSource);
The output stage's stream contains aBatchStage<Entry<Long, Double>> visitsPerPurchase = pageVisits .groupingKey(PageVisit::userId) .aggregate2( payments.groupingKey(Payment::userId), aggregateOperation2(counting(), counting(), (numPageVisits, numPayments) -> 1.0 * numPageVisits / numPayments ));
Map.Entry
where the key is the user ID and the value is the ratio of page visits to payments for that user.- Type Parameters:
T0
- type of items in the first stageA0
- type of the first aggregate operation's accumulatorR0
- type of the first aggregate operation's resultT1
- type of items in the second stageA1
- type of the second aggregate operation's accumulatorR1
- type of the second aggregate operation's resultR
- type of the result- Parameters:
op0
- the aggregate operation that will receive the first stage's inputop1
- the aggregate operation that will receive the second stage's inputexportFinishFn
- the function that transforms the individual aggregate results into the overall result that the co-aggregating stage emits. It must be stateless and cooperative.
-
aggregateOperation2
public static <T0,T1, AggregateOperation2<T0,A0, A1, R0, R1> T1, aggregateOperation2Tuple2<A0, A1>, Tuple2<R0, R1>> (@Nonnull AggregateOperation1<? super T0, A0, ? extends R0> op0, @Nonnull AggregateOperation1<? super T1, A1, ? extends R1> op1) Convenience foraggregateOperation2(aggrOp0, aggrOp1, finishFn)
that outputs aTuple2(result0, result1)
.- Type Parameters:
T0
- type of items in the first stageA0
- type of the first aggregate operation's accumulatorR0
- type of the first aggregate operation's resultT1
- type of items in the second stageA1
- type of the second aggregate operation's accumulatorR1
- type of the second aggregate operation's result- Parameters:
op0
- the aggregate operation that will receive the first stage's inputop1
- the aggregate operation that will receive the second stage's input
-
aggregateOperation3
public static <T0,T1, AggregateOperation3<T0,T2, A0, A1, A2, R0, R1, R2, R> T1, aggregateOperation3T2, Tuple3<A0, A1, A2>, R> (@Nonnull AggregateOperation1<? super T0, A0, ? extends R0> op0, @Nonnull AggregateOperation1<? super T1, A1, ? extends R1> op1, @Nonnull AggregateOperation1<? super T2, A2, ? extends R2> op2, @Nonnull TriFunction<? super R0, ? super R1, ? super R2, ? extends R> exportFinishFn) Returns an aggregate operation that is a composite of three independent aggregate operations, each one accepting its own input. You need this kind of operation in the three-way co-aggregating pipeline stage:BatchStage.aggregate3(BatchStage, BatchStage, AggregateOperation3)
stage0.aggregate3(stage1, stage2, compositeAggrOp)}. Before using this method, see if you can instead usestage0.aggregate3(aggrOp0, stage1, aggrOp1, stage2, aggrOp2)
because it's simpler and doesn't require you to pre-compose the aggregate operations.This method is suitable when you can express your computation as three independent aggregate operations where you combine their final results. If you need an operation that combines the inputs in the accumulation phase, you can create an aggregate operation by specifying each primitive using the aggregate operation builder.
As a quick example, let's say you have three data streams coming from an online store, consisting of user actions: page visits, add-to-cart actions and payments:
We want to get these metrics per each user: how many page clicks they did before buying a product, and how many products they bought per purchase. We could do it like this:BatchStage<PageVisit> pageVisits = pipeline.readFrom(pageVisitSource); BatchStage<AddToCart> addToCarts = pipeline.readFrom(addToCartSource); BatchStage<Payment> payments = pipeline.readFrom(paymentSource);
BatchStage<Entry<Integer, Tuple2<Double, Double>>> userStats = pageVisits .groupingKey(PageVisit::userId) .aggregate3( addToCarts.groupingKey(AddToCart::userId), payments.groupingKey(Payment::userId), aggregateOperation3(counting(), counting(), counting(), (numPageVisits, numAddToCarts, numPayments) -> tuple2(1.0 * numPageVisits / numPayments, 1.0 * numAddToCarts / numPayments ) ));
- Type Parameters:
T0
- type of items in the first stageA0
- type of the first aggregate operation's accumulatorR0
- type of the first aggregate operation's resultT1
- type of items in the second stageA1
- type of the second aggregate operation's accumulatorR1
- type of the second aggregate operation's resultT2
- type of items in the third stageA2
- type of the third aggregate operation's accumulatorR2
- type of the third aggregate operation's resultR
- type of the result- Parameters:
op0
- the aggregate operation that will receive the first stage's inputop1
- the aggregate operation that will receive the second stage's inputop2
- the aggregate operation that will receive the third stage's inputexportFinishFn
- the function that transforms the individual aggregate results into the overall result that the co-aggregating stage emits. It must be stateless and cooperative.
-
aggregateOperation3
public static <T0,T1, AggregateOperation3<T0,T2, A0, A1, A2, R0, R1, R2> T1, aggregateOperation3T2, Tuple3<A0, A1, A2>, Tuple3<R0, R1, R2>> (@Nonnull AggregateOperation1<? super T0, A0, ? extends R0> op0, @Nonnull AggregateOperation1<? super T1, A1, ? extends R1> op1, @Nonnull AggregateOperation1<? super T2, A2, ? extends R2> op2) Convenience foraggregateOperation3(aggrOp0, aggrOp1, aggrOp2, finishFn)
that outputs aTuple3(result0, result1, result2)
.- Type Parameters:
T0
- type of items in the first stageA0
- type of the first aggregate operation's accumulatorR0
- type of the first aggregate operation's resultT1
- type of items in the second stageA1
- type of the second aggregate operation's accumulatorR1
- type of the second aggregate operation's resultT2
- type of items in the third stageA2
- type of the third aggregate operation's accumulatorR2
- type of the third aggregate operation's result- Parameters:
op0
- the aggregate operation that will receive the first stage's inputop1
- the aggregate operation that will receive the second stage's inputop2
- the aggregate operation that will receive the third stage's input
-
coAggregateOperationBuilder
Returns a builder object that offers a step-by-step fluent API to create an aggregate operation that accepts multiple inputs. You must supply this kind of operation to a co-aggregating pipeline stage. You need this builder if you're using thestage.aggregateBuilder()
. Before deciding to use it, consider usingstage.aggregateBuilder(aggrOp0)
because it will allow you to directly pass the aggregate operation for each joined stage, without requiring you to build a composite operation through this builder. Finally, if you're co-aggregating two or three streams, prefer the simpler and more type-safe variants:aggregateOperation2(com.hazelcast.jet.aggregate.AggregateOperation1<? super T0, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T1, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>)
andaggregateOperation3(com.hazelcast.jet.aggregate.AggregateOperation1<? super T0, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T1, A1, ? extends R1>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T2, A2, ? extends R2>, com.hazelcast.jet.function.TriFunction<? super R0, ? super R1, ? super R2, ? extends R>)
.This builder is suitable when you can express your computation as independent aggregate operations on each input where you combine only their final results. If you need an operation that combines the inputs in the accumulation phase, you can create an aggregate operation by specifying each primitive using the aggregate operation builder.
As a quick example, let's say you have two data streams coming from an online store, consisting of user actions: page visits and payments:
We want to find out how many page clicks each user did before buying a product, and we want to do it usingBatchStage<PageVisit> pageVisits = pipeline.readFrom(pageVisitSource); BatchStage<Payment> payments = pipeline.readFrom(paymentSource);
stage.aggregateBuilder()
. Note that there will be two builders at play: the stage builder, which joins the pipeline stages, and the aggregate operation builder (obtained from this method). First we obtain the stage builder and add our pipeline stages:
Now we have the tags we need to build the aggregate operation, and while building it we get new tags to get the results of the operation:GroupAggregateBuilder1<PageVisit, Long> stageBuilder = pageVisits.groupingKey(PageVisit::userId).aggregateBuilder(); Tag<PageVisit> visitTag_in = stageBuilder.tag0(); Tag<Payment> payTag_in = stageBuilder.add(payments.groupingKey(Payment::userId));
We use these tags in theCoAggregateOperationBuilder opBuilder = coAggregateOperationBuilder(); Tag<Long> visitTag = opBuilder.add(visitTag_in, counting()); Tag<Long> payTag = opBuilder.add(payTag_in, counting());
exportFinishFn
we specify at the end:
And now we're ready to construct the output stage:AggregateOperation<Object[], Double> aggrOp = opBuilder.build(ibt -> 1.0 * ibt.get(visitTag) / ibt.get(payTag));
The output stage's stream containsBatchStage<Entry<Long, Double>> visitsPerPurchase = stageBuilder.build(aggrOp);
Map.Entry
s where the key is the user ID and the value is the ratio of page visits to payments for that user. -
toCollector
@Nonnull public static <T,A, Collector<T,R> A, toCollectorR> (AggregateOperation1<? super T, A, ? extends R> aggrOp) Adapts this aggregate operation to a collector which can be passed toStream.collect(Collector)
.This can be useful when you want to combine java.util.stream with Jet aggregations. For example, the below can be used to do multiple aggregations in a single pass over the same data set:
Stream<Person> personStream = people.stream(); personStream.collect( AggregateOperations.toCollector( AggregateOperations.allOf( AggregateOperations.counting(), AggregateOperations.averagingLong(p -> p.getAge()) ) ) );
-
toAggregator
@Nonnull public static <T,A, Aggregator<T,R> R> toAggregator(AggregateOperation1<? super T, A, ? extends R> aggrOp) Adapts this aggregate operation to be used forIMap.aggregate(Aggregator)
calls.Using
IMap
aggregations can be desirable when you want to make use of indices when doing aggregations and want to use the Jet aggregations API instead of writing a customAggregator
.For example, the following aggregation can be used to group people by their age and find the counts for each group.
IMap<Integer, Person> map = jet.getMap("people"); Map<Integer, Long> counts = map.aggregate( AggregateOperations.toAggregator( AggregateOperations.groupingBy( e -> e.getValue().getGender(), AggregateOperations.counting() ) ) );
-