public final class AggregateOperations extends Object
AggregateOperation. You can
 also create your own aggregate operation using the builder object.| Modifier and Type | Method and Description | 
|---|---|
| static <T0,T1,A0,A1,R0,R1> | aggregateOperation2(AggregateOperation1<? super T0,A0,? extends R0> op0,
                   AggregateOperation1<? super T1,A1,? extends R1> op1)Convenience for  aggregateOperation2(aggrOp0, aggrOp1, finishFn)that outputs aTuple2(result0, result1). | 
| static <T0,A0,R0,T1,A1,R1,R> | aggregateOperation2(AggregateOperation1<? super T0,A0,? extends R0> op0,
                   AggregateOperation1<? super T1,A1,? extends R1> op1,
                   BiFunctionEx<? super R0,? super R1,? extends R> exportFinishFn)Returns an aggregate operation that is a composite of two independent
 aggregate operations, each one accepting its own input. | 
| static <T0,T1,T2,A0,A1,A2,R0,R1,R2> | aggregateOperation3(AggregateOperation1<? super T0,A0,? extends R0> op0,
                   AggregateOperation1<? super T1,A1,? extends R1> op1,
                   AggregateOperation1<? super T2,A2,? extends R2> op2)Convenience for  aggregateOperation3(aggrOp0, aggrOp1, aggrOp2, finishFn)that outputs aTuple3(result0, result1, result2). | 
| static <T0,T1,T2,A0,A1,A2,R0,R1,R2,R> | aggregateOperation3(AggregateOperation1<? super T0,A0,? extends R0> op0,
                   AggregateOperation1<? super T1,A1,? extends R1> op1,
                   AggregateOperation1<? super T2,A2,? extends R2> op2,
                   TriFunction<? super R0,? super R1,? super R2,? extends R> exportFinishFn)Returns an aggregate operation that is a composite of three independent
 aggregate operations, each one accepting its own input. | 
| static <T,A0,A1,A2,R0,R1,R2> | allOf(AggregateOperation1<? super T,A0,? extends R0> op0,
     AggregateOperation1<? super T,A1,? extends R1> op1,
     AggregateOperation1<? super T,A2,? extends R2> op2)Convenience for  allOf(AggregateOperation1, AggregateOperation1,
 AggregateOperation1, TriFunction)wrapping the three results in aTuple3. | 
| static <T,A0,A1,A2,R0,R1,R2,R> | allOf(AggregateOperation1<? super T,A0,? extends R0> op0,
     AggregateOperation1<? super T,A1,? extends R1> op1,
     AggregateOperation1<? super T,A2,? extends R2> op2,
     TriFunction<? super R0,? super R1,? super R2,? extends R> exportFinishFn)Returns an aggregate operation that is a composite of three aggregate
 operations. | 
| static <T,A0,A1,R0,R1,R> | allOf(AggregateOperation1<? super T,A0,? extends R0> op0,
     AggregateOperation1<? super T,A1,? extends R1> op1,
     BiFunctionEx<? super R0,? super R1,? extends R> exportFinishFn)Returns an aggregate operation that is a composite of two aggregate
 operations. | 
| static <T,A0,A1,R0,R1> | allOf(AggregateOperation1<? super T,A0,R0> op1,
     AggregateOperation1<? super T,A1,R1> op2)Convenience for  allOf(AggregateOperation1, AggregateOperation1,
 BiFunctionEx)wrapping the two results in aTuple2. | 
| static <T> AllOfAggregationBuilder<T> | allOfBuilder()Returns a builder object that helps you create a composite of multiple
 aggregate operations. | 
| static <T> AggregateOperation1<T,LongDoubleAccumulator,Double> | averagingDouble(ToDoubleFunctionEx<? super T> getDoubleValueFn)Returns an aggregate operation that finds the arithmetic mean (aka. | 
| static <T> AggregateOperation1<T,LongLongAccumulator,Double> | averagingLong(ToLongFunctionEx<? super T> getLongValueFn)Returns an aggregate operation that finds the arithmetic mean (aka. | 
| static <T> AggregateOperation1<T,PriorityQueue<T>,List<T>> | bottomN(int n,
       ComparatorEx<? super T> comparator)Returns an aggregate operation that finds the bottom  nitems
 according to the givencomparator. | 
| static CoAggregateOperationBuilder | coAggregateOperationBuilder()Returns a builder object that offers a step-by-step fluent API to create
 an aggregate operation that accepts multiple inputs. | 
| static AggregateOperation1<CharSequence,StringBuilder,String> | concatenating()Returns an aggregate operation that takes string items and concatenates
 them into a single string. | 
| static AggregateOperation1<CharSequence,StringBuilder,String> | concatenating(CharSequence delimiter)Returns an aggregate operation that takes string items and concatenates
 them, separated by the given  delimiter, into a single string. | 
| static AggregateOperation1<CharSequence,StringBuilder,String> | concatenating(CharSequence delimiter,
             CharSequence prefix,
             CharSequence suffix)Returns an aggregate operation that takes string items and concatenates
 them, separated by the given  delimiter, into a single string. | 
| static <T> AggregateOperation1<T,LongAccumulator,Long> | counting()Returns an aggregate operation that counts the items it observes. | 
| static <T,A,R> AggregateOperation1<T,A,R> | filtering(PredicateEx<? super T> filterFn,
         AggregateOperation1<? super T,A,? extends R> downstream)Adapts an aggregate operation so that it accumulates only the items
 passing the  filterFnand ignores others. | 
| static <T,U,A,R> AggregateOperation1<T,A,R> | flatMapping(FunctionEx<? super T,? extends Traverser<? extends U>> flatMapFn,
           AggregateOperation1<? super U,A,? extends R> downstream)Adapts an aggregate operation that takes items of type  Uto one
 that takes items of typeT, by exploding eachTinto a
 sequence ofUs and then accumulating all of them. | 
| static <T,K> AggregateOperation1<T,Map<K,List<T>>,Map<K,List<T>>> | groupingBy(FunctionEx<? super T,? extends K> keyFn)Returns an aggregate operation that accumulates the items into a
  HashMapwhere the key is the result of applyingkeyFnand the value is a list of the items with that key. | 
| static <T,K,A,R> AggregateOperation1<T,Map<K,A>,Map<K,R>> | groupingBy(FunctionEx<? super T,? extends K> keyFn,
          AggregateOperation1<? super T,A,R> downstream)Returns an aggregate operation that accumulates the items into a
  HashMapwhere the key is the result of applyingkeyFnand the value is the result of applying the downstream aggregate
 operation to the items with that key. | 
| static <T,K,R,A,M extends Map<K,R>> | groupingBy(FunctionEx<? super T,? extends K> keyFn,
          SupplierEx<M> createMapFn,
          AggregateOperation1<? super T,A,R> downstream)Returns an  AggregateOperation1that accumulates the items into aMap(as obtained fromcreateMapFn) where the key is the
 result of applyingkeyFnand the value is the result of
 applying the downstream aggregate operation to the items with that key. | 
| static <T> AggregateOperation1<T,LinTrendAccumulator,Double> | linearTrend(ToLongFunctionEx<T> getXFn,
           ToLongFunctionEx<T> getYFn)Returns an aggregate operation that computes a linear trend over the
 items. | 
| static <T,U,A,R> AggregateOperation1<T,A,R> | mapping(FunctionEx<? super T,? extends U> mapFn,
       AggregateOperation1<? super U,A,? extends R> downstream)Adapts an aggregate operation that takes items of type  Uto one
 that takes items of typeT, by applying the given mapping
 function to each item. | 
| static <T> AggregateOperation1<T,MutableReference<T>,T> | maxBy(ComparatorEx<? super T> comparator)Returns an aggregate operation that computes the greatest item according
 to the given  comparator. | 
| static <T> AggregateOperation1<T,MutableReference<T>,T> | minBy(ComparatorEx<? super T> comparator)Returns an aggregate operation that computes the least item according to
 the given  comparator. | 
| static <T> AggregateOperation1<T,PickAnyAccumulator<T>,T> | pickAny()Returns an aggregate operation whose result is an arbitrary item it
 observed, or  nullif it observed no items. | 
| static <T,A> AggregateOperation1<T,MutableReference<A>,A> | reducing(A emptyAccValue,
        FunctionEx<? super T,? extends A> toAccValueFn,
        BinaryOperatorEx<A> combineAccValuesFn,
        BinaryOperatorEx<A> deductAccValueFn)Returns an aggregate operation that constructs the result through the
 process of immutable reduction:
 
     The initial accumulated value is  emptyAccValue. | 
| static <T> AggregateOperation1<T,ArrayList<T>,List<T>> | sorting(ComparatorEx<? super T> comparator)Returns an aggregate operation that accumulates all input items into an
  ArrayListand sorts it with the given comparator. | 
| static <T> AggregateOperation1<T,DoubleAccumulator,Double> | summingDouble(ToDoubleFunctionEx<? super T> getDoubleValueFn)Returns an aggregate operation that computes the sum of the  doublevalues it obtains by applyinggetDoubleValueFnto each
 item. | 
| static <T> AggregateOperation1<T,LongAccumulator,Long> | summingLong(ToLongFunctionEx<? super T> getLongValueFn)Returns an aggregate operation that computes the sum of the  longvalues it obtains by applyinggetLongValueFnto each item. | 
| static <T,A,R> Aggregator<T,R> | toAggregator(AggregateOperation1<? super T,A,? extends R> aggrOp)Adapts this aggregate operation to be used for  IMap.aggregate(Aggregator)calls. | 
| static <T,C extends Collection<T>> | toCollection(SupplierEx<C> createCollectionFn)Returns an aggregate operation that accumulates the items into a  Collection. | 
| static <T,A,R> Collector<T,A,R> | toCollector(AggregateOperation1<? super T,A,? extends R> aggrOp)Adapts this aggregate operation to a collector which can be passed to
  Stream.collect(Collector). | 
| static <T> AggregateOperation1<T,List<T>,List<T>> | toList()Returns an aggregate operation that accumulates the items into an  ArrayList. | 
| static <T,K,U> AggregateOperation1<T,Map<K,U>,Map<K,U>> | toMap(FunctionEx<? super T,? extends K> keyFn,
     FunctionEx<? super T,? extends U> valueFn)Returns an aggregate operation that accumulates the items into a  HashMapwhose keys and values are the result of applying the provided
 mapping functions. | 
| static <T,K,U> AggregateOperation1<T,Map<K,U>,Map<K,U>> | toMap(FunctionEx<? super T,? extends K> keyFn,
     FunctionEx<? super T,? extends U> valueFn,
     BinaryOperatorEx<U> mergeFn)Returns an aggregate operation that accumulates the items into a
  HashMapwhose keys and values are the result of applying
 the provided mapping functions. | 
| static <T,K,U,M extends Map<K,U>> | toMap(FunctionEx<? super T,? extends K> keyFn,
     FunctionEx<? super T,? extends U> valueFn,
     BinaryOperatorEx<U> mergeFn,
     SupplierEx<M> createMapFn)Returns an aggregate operation that accumulates elements into a
 user-supplied  Mapinstance. | 
| static <T> AggregateOperation1<T,PriorityQueue<T>,List<T>> | topN(int n,
    ComparatorEx<? super T> comparator)Returns an aggregate operation that finds the top  nitems
 according to the givencomparator. | 
| static <T> AggregateOperation1<T,Set<T>,Set<T>> | toSet()Returns an aggregate operation that accumulates the items into a  HashSet. | 
@Nonnull public static <T> AggregateOperation1<T,LongAccumulator,Long> counting()
long.
 This sample takes a stream of words and finds the number of occurrences of each word in it:
 BatchStage<String> words = pipeline.readFrom(wordSource);
 BatchStage<Entry<String, Long>> wordFrequencies =
     words.groupingKey(wholeItem()).aggregate(counting());
 @Nonnull public static <T> AggregateOperation1<T,LongAccumulator,Long> summingLong(@Nonnull ToLongFunctionEx<? super T> getLongValueFn)
long
 values it obtains by applying getLongValueFn to each item.
 
 This sample takes a stream of lines of text and outputs a single long number telling how many words there were in the stream:
 
 BatchStage<String> linesOfText = pipeline.readFrom(textSource);
 BatchStage<Long> numberOfWordsInText =
         linesOfText
                 .map(line -> line.split("\\W+"))
                 .aggregate(summingLong(wordsInLine -> wordsInLine.length));
 Long.MAX_VALUE, the job
 will fail with an ArithmeticException.T - type of the input itemgetLongValueFn - function that extracts the long values you
     want to sum. It must be stateless and cooperative.@Nonnull public static <T> AggregateOperation1<T,DoubleAccumulator,Double> summingDouble(@Nonnull ToDoubleFunctionEx<? super T> getDoubleValueFn)
double values it obtains by applying getDoubleValueFn to each
 item.
 
 This sample takes a stream of purchase events and outputs a single
 double value that tells the total sum of money spent in
 them:
 
 BatchStage<Purchase> purchases = pipeline.readFrom(purchaseSource);
 BatchStage<Double> purchaseVolume =
         purchases.aggregate(summingDouble(Purchase::amount));
 T - type of the input itemgetDoubleValueFn - function that extracts the double values
     you want to sum. It must be stateless and cooperative.@Nonnull public static <T> AggregateOperation1<T,MutableReference<T>,T> minBy(@Nonnull ComparatorEx<? super T> comparator)
comparator.
 This sample takes a stream of people and finds the youngest person in it:
 BatchStage<Person> people = pipeline.readFrom(peopleSource);
 BatchStage<Person> youngestPerson =
         people.aggregate(minBy(ComparatorEx.comparing(Person::age)));
 null. Since the non-keyed BatchStage.aggregate(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, ?, ? extends R>) emits just the naked aggregation result, and since
 a null cannot travel through a Jet pipeline, you will not get
 any output in that case.
 If several items tie for the least one, this aggregate operation will choose any one to return and may choose a different one each time.
 Implementation note: this aggregate operation does not
 implement the deduct primitive.
 This has performance implications for sliding
 window aggregation.
T - type of the input itemcomparator - comparator to compare the items. It must be stateless
     and cooperative.@Nonnull public static <T> AggregateOperation1<T,MutableReference<T>,T> maxBy(@Nonnull ComparatorEx<? super T> comparator)
comparator.
 This sample takes a stream of people and finds the oldest person in it:
 BatchStage<Person> people = pipeline.readFrom(peopleSource);
 BatchStage<Person> oldestPerson =
         people.aggregate(maxBy(ComparatorEx.comparing(Person::age)));
 null. Since the non-keyed BatchStage.aggregate(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, ?, ? extends R>) emits just the naked aggregation result, and since
 a null cannot travel through a Jet pipeline, you will not get
 any output in that case.
 If several items tie for the greatest one, this aggregate operation will choose any one to return and may choose a different one each time.
 Implementation note: this aggregate operation does not
 implement the deduct primitive.
 This has performance implications for sliding
 window aggregation.
T - type of the input itemcomparator - comparator to compare the items. It must be stateless
     and cooperative.@Nonnull public static <T> AggregateOperation1<T,PriorityQueue<T>,List<T>> topN(int n, @Nonnull ComparatorEx<? super T> comparator)
n items
 according to the given comparator. It outputs a
 sorted list with the top item in the first position.
 This sample takes a stream of people and finds ten oldest persons in it:
 BatchStage<Person> people = pipeline.readFrom(peopleSource);
 BatchStage<List<Person>> oldestDudes =
         people.aggregate(topN(10, ComparatorEx.comparing(Person::age)));
 deduct primitive.
 This has performance implications for sliding
 window aggregation.T - type of the input itemn - number of top items to findcomparator - compares the items. It must be stateless and
     cooperative.@Nonnull public static <T> AggregateOperation1<T,PriorityQueue<T>,List<T>> bottomN(int n, @Nonnull ComparatorEx<? super T> comparator)
n items
 according to the given comparator. It outputs a
 sorted list with the bottom item in the first position.
 This sample takes a stream of people and finds ten youngest persons in it:
 BatchStage<Person> people = pipeline.readFrom(peopleSource);
 BatchStage<List<Person>> youngestDudes =
         people.aggregate(bottomN(10, ComparatorEx.comparing(Person::age)));
 deduct primitive.
 This has performance implications for sliding
 window aggregation.T - type of the input itemn - number of bottom items to findcomparator - compares the items. It must be stateless and
     cooperative.@Nonnull public static <T> AggregateOperation1<T,LongLongAccumulator,Double> averagingLong(@Nonnull ToLongFunctionEx<? super T> getLongValueFn)
long values it obtains by applying getLongValueFn to each item. It outputs the result as a double.
 This sample takes a stream of people and finds their mean age:
 BatchStage<Person> people = pipeline.readFrom(peopleSource);
 BatchStage<Double> meanAge = people.aggregate(averagingLong(Person::age));
 
 If the aggregate operation does not observe any input, its result is
 NaN.
 
 NOTE: this operation accumulates the sum and the
 count as separate long variables and combines them at the end
 into the mean value. If either of these variables exceeds Long.MAX_VALUE, the job will fail with an ArithmeticException.
T - type of the input itemgetLongValueFn - function that extracts the long value from
     the item. It must be stateless and cooperative.@Nonnull public static <T> AggregateOperation1<T,LongDoubleAccumulator,Double> averagingDouble(@Nonnull ToDoubleFunctionEx<? super T> getDoubleValueFn)
double values it obtains by applying getDoubleValueFn to each item. It outputs the result as a double.
 This sample takes a stream of people and finds their mean age:
 BatchStage<Person> people = pipeline.readFrom(peopleSource);
 BatchStage<Double> meanAge = people.aggregate(averagingDouble(Person::age));
 
 If the aggregate operation does not observe any input, its result is
 NaN.
T - type of the input itemgetDoubleValueFn - function that extracts the double value
     from the item. It must be stateless and cooperative.@Nonnull public static <T> AggregateOperation1<T,LinTrendAccumulator,Double> linearTrend(@Nonnull ToLongFunctionEx<T> getXFn, @Nonnull ToLongFunctionEx<T> getYFn)
double-valued coefficient that
 approximates the rate of change of y as a function of x,
 where x and y are long quantities obtained
 by applying the two provided functions to each item.
 This sample takes an infinite stream of trade events and outputs the current rate of price change using a sliding window:
 StreamStage<Trade> trades = pipeline
         .readFrom(tradeSource)
         .withTimestamps(Trade::getTimestamp, SECONDS.toMillis(1));
 StreamStage<WindowResult<Double>> priceTrend = trades
     .window(WindowDefinition.sliding(MINUTES.toMillis(5), SECONDS.toMillis(1)))
     .aggregate(linearTrend(Trade::getTimestamp, Trade::getPrice));
 
 If this aggregate operation does not observe any input, its result is
 NaN.
T - type of the input itemgetXFn - a function to extract x from the input.
     It must be stateless and cooperative.getYFn - a function to extract y from the input.
     It must be stateless and cooperative.public static AggregateOperation1<CharSequence,StringBuilder,String> concatenating()
This sample outputs a string that you get by reading down the first column of the input text:
 BatchStage<String> linesOfText = pipeline.readFrom(textSource);
 BatchStage<String> lineStarters = linesOfText
         .map(line -> line.charAt(0))
         .map(Object::toString)
         .aggregate(concatenating());
 public static AggregateOperation1<CharSequence,StringBuilder,String> concatenating(CharSequence delimiter)
delimiter, into a single string.
 This sample outputs a single line of text that contains all the upper-cased and title-cased words of the input text:
 BatchStage<String> linesOfText = pipeline.readFrom(textSource);
 BatchStage<String> upcaseWords = linesOfText
         .map(line -> line.split("\\W+"))
         .flatMap(Traversers::traverseArray)
         .filter(word -> word.matches("\\p{Lu}.*"))
         .aggregate(concatenating(" "));
 public static AggregateOperation1<CharSequence,StringBuilder,String> concatenating(CharSequence delimiter, CharSequence prefix, CharSequence suffix)
delimiter, into a single string.
 The resulting string will start with the given prefix and end
 with the given suffix.
 This sample outputs a single item, a JSON array of all the upper-cased and title-cased words of the input text:
 BatchStage<String> linesOfText = pipeline.readFrom(textSource);
 BatchStage<String> upcaseWords = linesOfText
         .map(line -> line.split("\\W+"))
         .flatMap(Traversers::traverseArray)
         .filter(word -> word.matches("\\p{Lu}.*"))
         .aggregate(concatenating("['", "', '", "']"));
 public static <T,U,A,R> AggregateOperation1<T,A,R> mapping(@Nonnull FunctionEx<? super T,? extends U> mapFn, @Nonnull AggregateOperation1<? super U,A,? extends R> downstream)
U to one
 that takes items of type T, by applying the given mapping
 function to each item. Normally you should just apply the mapping in a
 stage before the aggregation, but this adapter is useful when
 simultaneously performing several aggregate operations using allOf(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>).
 
 In addition to mapping, you can apply filtering as well by returning
 null for an item you want filtered out.
 
This sample takes a stream of people and builds two sorted lists from it, one with all the names and one with all the surnames:
 BatchStage<Person> people = pipeline.readFrom(peopleSource);
 BatchStage<Tuple2<List<String>, List<String>>> sortedNames =
     people.aggregate(allOf(
         mapping(Person::getFirstName, sorting(ComparatorEx.naturalOrder())),
         mapping(Person::getLastName, sorting(ComparatorEx.naturalOrder()))));
 T - type of the input itemU - input type of the downstream aggregate operationA - downstream operation's accumulator typeR - downstream operation's result typemapFn - the function to apply to the input items. It must be
     stateless and cooperative.downstream - the downstream aggregate operationfiltering(com.hazelcast.function.PredicateEx<? super T>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A, ? extends R>), 
flatMapping(com.hazelcast.function.FunctionEx<? super T, ? extends com.hazelcast.jet.Traverser<? extends U>>, com.hazelcast.jet.aggregate.AggregateOperation1<? super U, A, ? extends R>)public static <T,A,R> AggregateOperation1<T,A,R> filtering(@Nonnull PredicateEx<? super T> filterFn, @Nonnull AggregateOperation1<? super T,A,? extends R> downstream)
filterFn and ignores others. Normally you should
 just apply the filter in a stage before the aggregation, but this
 adapter is useful when simultaneously performing several aggregate
 operations using allOf(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>).
 This sample takes a stream of people and outputs two numbers, the average height of kids and grown-ups:
 BatchStage<Person> people = pipeline.readFrom(peopleSource);
 BatchStage<Tuple2<Double, Double>> avgHeightByAge = people.aggregate(allOf(
     filtering((Person p) -> p.getAge() < 18, averagingLong(Person::getHeight)),
     filtering((Person p) -> p.getAge() >= 18, averagingLong(Person::getHeight))
 ));
 T - type of the input itemA - downstream operation's accumulator typeR - downstream operation's result typefilterFn - the filtering function. It must be stateless and
     cooperative.downstream - the downstream aggregate operationmapping(com.hazelcast.function.FunctionEx<? super T, ? extends U>, com.hazelcast.jet.aggregate.AggregateOperation1<? super U, A, ? extends R>), 
flatMapping(com.hazelcast.function.FunctionEx<? super T, ? extends com.hazelcast.jet.Traverser<? extends U>>, com.hazelcast.jet.aggregate.AggregateOperation1<? super U, A, ? extends R>)public static <T,U,A,R> AggregateOperation1<T,A,R> flatMapping(@Nonnull FunctionEx<? super T,? extends Traverser<? extends U>> flatMapFn, @Nonnull AggregateOperation1<? super U,A,? extends R> downstream)
U to one
 that takes items of type T, by exploding each T into a
 sequence of Us and then accumulating all of them. Normally you
 should just apply the flat-mapping in a stage before the aggregation,
 but this adapter is useful when simultaneously performing several
 aggregate operations using allOf(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>).
 The traverser your function returns must be non-null and null-terminated.
This sample takes a stream of people and outputs two numbers, the mean age of all the people and the mean age of people listed as someone's kid:
 BatchStage<Person> people = pipeline.readFrom(peopleSource);
 people.aggregate(allOf(
     averagingLong(Person::getAge),
     flatMapping((Person p) -> traverseIterable(p.getChildren()),
             averagingLong(Person::getAge))
 ));
 T - type of the input itemU - input type of the downstream aggregate operationA - downstream operation's accumulator typeR - downstream operation's result typeflatMapFn - the flat-mapping function to apply. It must be
     stateless and cooperative.downstream - the downstream aggregate operationmapping(com.hazelcast.function.FunctionEx<? super T, ? extends U>, com.hazelcast.jet.aggregate.AggregateOperation1<? super U, A, ? extends R>), 
filtering(com.hazelcast.function.PredicateEx<? super T>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A, ? extends R>)public static <T,C extends Collection<T>> AggregateOperation1<T,C,C> toCollection(@Nonnull SupplierEx<C> createCollectionFn)
Collection. It creates empty, mutable collections as needed by calling
 the provided createCollectionFn.
 This sample takes a stream of words and outputs a single sorted set of all the long words (above 5 letters):
 BatchStage<String> words = pipeline.readFrom(wordSource);
 BatchStage<SortedSet<String>> sortedLongWords = words
         .filter(w -> w.length() > 5)
         .aggregate(toCollection(TreeSet::new));
 T - type of the input itemC - the type of the collectioncreateCollectionFn - a Supplier of empty, mutable Collections. It must be stateless and cooperative.public static <T> AggregateOperation1<T,List<T>,List<T>> toList()
ArrayList.
 This sample takes a stream of words and outputs a single list of all the long words (above 5 letters):
 BatchStage<String> words = pipeline.readFrom(wordSource);
 BatchStage<List<String>> longWords = words
         .filter(w -> w.length() > 5)
         .aggregate(toList());
 T - type of the input itempublic static <T> AggregateOperation1<T,Set<T>,Set<T>> toSet()
HashSet.
 This sample takes a stream of people and outputs a single set of all the distinct cities they live in:
 pipeline.readFrom(personSource)
         .map(Person::getCity)
         .aggregate(toSet());
 T - type of the input itempublic static <T,K,U> AggregateOperation1<T,Map<K,U>,Map<K,U>> toMap(FunctionEx<? super T,? extends K> keyFn, FunctionEx<? super T,? extends U> valueFn)
HashMap whose keys and values are the result of applying the provided
 mapping functions.
 
 This aggregate operation does not tolerate duplicate keys and will throw
 an IllegalStateException if it detects them. If your data
 contains duplicates, use toMap(keyFn, valueFn, mergeFn).
 
The following sample takes a stream of sensor readings and outputs a single map {sensor ID -> reading}:
 BatchStage<Map<String, Double>> readings = pipeline
         .readFrom(sensorData)
         .aggregate(toMap(
                 SensorReading::getSensorId,
                 SensorReading::getValue));
 T - type of the input itemK - type of the keyU - type of the valuekeyFn - a function to extract the key from the input item. It must
     be stateless and cooperative.valueFn - a function to extract the value from the input item. It
     must be stateless and cooperative.toMap(FunctionEx, FunctionEx, BinaryOperatorEx), 
toMap(FunctionEx, FunctionEx, BinaryOperatorEx, SupplierEx), 
groupingBy(FunctionEx)public static <T,K,U> AggregateOperation1<T,Map<K,U>,Map<K,U>> toMap(FunctionEx<? super T,? extends K> keyFn, FunctionEx<? super T,? extends U> valueFn, BinaryOperatorEx<U> mergeFn)
HashMap whose keys and values are the result of applying
 the provided mapping functions.
 
 This aggregate operation resolves duplicate keys by applying mergeFn to the conflicting values. mergeFn will act upon the
 values after valueFn has already been applied.
 
The following sample takes a stream of sensor readings and outputs a single map {sensor ID -> reading}. Multiple readings from the same sensor get summed up:
 BatchStage<Map<String, Double>> readings = pipeline
         .readFrom(sensorData)
         .aggregate(toMap(
                 SensorReading::getSensorId,
                 SensorReading::getValue,
                 Double::sum));
 The given functions must be stateless and cooperative.
T - type of the input itemK - the type of keyU - the output type of the value mapping functionkeyFn - a function to extract the key from input itemvalueFn - a function to extract value from input itemmergeFn - the function used to resolve collisions between values associated
                with the same key, will be passed to Map.merge(Object, Object,
                java.util.function.BiFunction)toMap(FunctionEx, FunctionEx), 
toMap(FunctionEx, FunctionEx, BinaryOperatorEx, SupplierEx)public static <T,K,U,M extends Map<K,U>> AggregateOperation1<T,M,M> toMap(FunctionEx<? super T,? extends K> keyFn, FunctionEx<? super T,? extends U> valueFn, BinaryOperatorEx<U> mergeFn, SupplierEx<M> createMapFn)
Map instance. The keys and values are the result
 of applying the provided mapping functions to the input elements.
 
 This aggregate operation resolves duplicate keys by applying mergeFn to the conflicting values. mergeFn will act upon the
 values after valueFn has already been applied.
 
 The following sample takes a stream of sensor readings and outputs a
 single ObjectToLongHashMap of {sensor ID -> reading}. Multiple
 readings from the same sensor get summed up:
 
 BatchStage<Map<String, Long>> readings = pipeline
         .readFrom(sensorData)
         .aggregate(toMap(
                 SensorReading::getSensorId,
                 SensorReading::getValue,
                 Long::sum,
                 ObjectToLongHashMap::new));
 The given functions must be stateless and cooperative.
T - type of the input itemK - the output type of the key mapping functionU - the output type of the value mapping functionM - the type of the resulting MapkeyFn - a function to extract the key from input itemvalueFn - a function to extract value from input itemmergeFn - a merge function, used to resolve collisions between
                      values associated with the same key, as supplied
                      to Map.merge(Object, Object,
                      java.util.function.BiFunction)createMapFn - a function which returns a new, empty Map into
                    which the results will be insertedtoMap(FunctionEx, FunctionEx), 
toMap(FunctionEx, FunctionEx, BinaryOperatorEx)public static <T,K> AggregateOperation1<T,Map<K,List<T>>,Map<K,List<T>>> groupingBy(FunctionEx<? super T,? extends K> keyFn)
HashMap where the key is the result of applying keyFn
 and the value is a list of the items with that key.
 This operation is primarily useful when you need a cascaded group-by where you further classify the members of each group by a secondary key.
This sample takes a stream of persons and classifies them first by country and then by gender. It outputs a stream of map entries where the key is the country and the value is a map from gender to the list of people of that gender from that country:
 BatchStage<Person> people = pipeline.readFrom(personSource);
 BatchStage<Entry<String, Map<String, List<Person>>>> byCountryAndGender =
         people.groupingKey(Person::getCountry)
               .aggregate(groupingBy(Person::getGender));
     
 }groupingKey() pipeline transform
 so you may wonder why not use it in all cases, not just cascaded
 grouping. To see the difference, check out these two snippets:
 
 BatchStage<Person> people = pipeline.readFrom(personSource);
 // Snippet 1
 BatchStage<Entry<String, List<Person>>> byCountry1 =
         people.groupingKey(Person::getCountry)
               .aggregate(toList());
 // Snippet 2
 BatchStage<Map<String, List<Person>>> byCountry2 =
         people.aggregate(groupingBy(Person::getCountry));
 groupingKey stage if you have just one level of
 grouping.T - type of the input itemK - the output type of the key mapping functionkeyFn - a function to extract the key from input item. It must be
     stateless and cooperative.groupingBy(FunctionEx, AggregateOperation1), 
groupingBy(FunctionEx, SupplierEx, AggregateOperation1), 
toMap(FunctionEx, FunctionEx)public static <T,K,A,R> AggregateOperation1<T,Map<K,A>,Map<K,R>> groupingBy(FunctionEx<? super T,? extends K> keyFn, AggregateOperation1<? super T,A,R> downstream)
HashMap where the key is the result of applying keyFn
 and the value is the result of applying the downstream aggregate
 operation to the items with that key.
 
 This operation is primarily useful when you need a cascaded group-by
 where you further classify the members of each group by a secondary key.
 For the difference between this operation and the groupingKey() pipeline transform,
 see the documentation on groupingBy(keyFn).
 
This sample takes a stream of people, classifies them by country and gender, and reports the number of people in each category:
 BatchStage<Person> people = pipeline.readFrom(personSource);
 BatchStage<Entry<String, Map<String, Long>>> countByCountryAndGender =
         people.groupingKey(Person::getCountry)
               .aggregate(groupingBy(Person::getGender, counting()));
 T - type of the input itemK - the output type of the key mapping functionR - the type of the downstream aggregation resultA - downstream aggregation's accumulator typekeyFn - a function to extract the key from input item. It must be
     stateless and cooperative.downstream - the downstream aggregate operationgroupingBy(FunctionEx), 
groupingBy(FunctionEx, SupplierEx, AggregateOperation1), 
toMap(FunctionEx, FunctionEx)public static <T,K,R,A,M extends Map<K,R>> AggregateOperation1<T,Map<K,A>,M> groupingBy(FunctionEx<? super T,? extends K> keyFn, SupplierEx<M> createMapFn, AggregateOperation1<? super T,A,R> downstream)
AggregateOperation1 that accumulates the items into a
 Map (as obtained from createMapFn) where the key is the
 result of applying keyFn and the value is the result of
 applying the downstream aggregate operation to the items with that key.
 
 This operation is primarily useful when you need a cascaded group-by
 where you further classify the members of each group by a secondary key.
 For the difference between this operation and the groupingKey() pipeline transform,
 see the documentation on groupingBy(keyFn).
 
 The following sample takes a stream of people, classifies them by country
 and gender, and reports the number of people in each category. It uses
 the EnumMap to optimize memory usage:
 
 BatchStage<Person> people = pipeline.readFrom(personSource);
 BatchStage<Entry<String, Map<Gender, Long>>> countByCountryAndGender =
         people.groupingKey(Person::getCountry)
               .aggregate(groupingBy(
                       Person::getGender,
                       () -> new EnumMap<>(Gender.class),
                       counting()));
 T - type of the input itemK - the output type of the key mapping functionR - the type of the downstream aggregation resultA - downstream aggregation's accumulator typeM - output type of the resulting MapkeyFn - a function to extract the key from input item. It must be
     stateless and cooperative.createMapFn - a function which returns a new, empty Map into
     which the results will be inserted. It must be stateless and cooperative.downstream - the downstream aggregate operationgroupingBy(FunctionEx), 
groupingBy(FunctionEx, AggregateOperation1), 
toMap(FunctionEx, FunctionEx)@Nonnull public static <T,A> AggregateOperation1<T,MutableReference<A>,A> reducing(@Nonnull A emptyAccValue, @Nonnull FunctionEx<? super T,? extends A> toAccValueFn, @Nonnull BinaryOperatorEx<A> combineAccValuesFn, @Nullable BinaryOperatorEx<A> deductAccValueFn)
emptyAccValue.
     newVal = combineAccValues(currVal, toAccValue(item))
 combineAccValuesFn must be associative because
 it will also be used to combine partial results, as well as
 commutative because the encounter order of items is
 unspecified.
 
 The optional deductAccValueFn allows Jet to compute the sliding
 window in O(1) time. It must undo the effects of a previous combineAccValuesFn call:
 
     A accVal;  // has some pre-existing value
     A itemAccVal = toAccValueFn.apply(item);
     A combined = combineAccValuesFn.apply(accVal, itemAccVal);
     A deducted = deductAccValueFn.apply(combined, itemAccVal);
     assert deducted.equals(accVal);
 
 
 This sample takes a stream of product orders and outputs a single long number which is the sum total of all the ordered amounts. The
 aggregate operation it implements is equivalent to summingLong(com.hazelcast.function.ToLongFunctionEx<? super T>):
 
 BatchStage<Order> orders = pipeline.readFrom(orderSource);
 BatchStage<Long> totalAmount = orders.aggregate(reducing(
         0L,
         Order::getAmount,
         Math::addExact,
         Math::subtractExact
 ));
 The given functions must be stateless and cooperative.
T - type of the input itemA - type of the accumulated valueemptyAccValue - the reducing operation's emptyAccValue elementtoAccValueFn - transforms the stream item into its accumulated valuecombineAccValuesFn - combines two accumulated values into onedeductAccValueFn - deducts the right-hand accumulated value from the left-hand one
                        (optional)@Nonnull public static <T> AggregateOperation1<T,PickAnyAccumulator<T>,T> pickAny()
null if it observed no items.
 
 The implementation of StageWithWindow.distinct() uses this
 operation and, if needed, you can use it directly for the same purpose.
 
 This sample takes a stream of people and outputs a stream of people that
 have distinct last names (same as calling groupingKey(keyFn).distinct():
 
 BatchStage<Person> people = pipeline.readFrom(peopleSource);
 BatchStage<Entry<String, Person>> distinctByLastName =
         people.groupingKey(Person::getLastName)
               .aggregate(pickAny());
 null. Since the non-keyed BatchStage.aggregate(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, ?, ? extends R>) emits just the naked aggregation result, and since
 a null cannot travel through a Jet pipeline, you will not get
 any output in that case.T - type of the input itempublic static <T> AggregateOperation1<T,ArrayList<T>,List<T>> sorting(@Nonnull ComparatorEx<? super T> comparator)
ArrayList and sorts it with the given comparator. If you have
 Comparable items that you want to sort in their natural order, use
 ComparatorEx.naturalOrder().
 This sample takes a stream of people and outputs a single list of people sorted by their last name:
 BatchStage<Person> people = pipeline.readFrom(peopleSource);
 BatchStage<List<Person>> sorted = people.aggregate(
     sorting(ComparatorEx.comparing(Person::getLastName)));
 T - the type of input itemscomparator - the comparator to use for sorting. It must be
     stateless and cooperative.@Nonnull public static <T,A0,A1,R0,R1,R> AggregateOperation1<T,Tuple2<A0,A1>,R> allOf(@Nonnull AggregateOperation1<? super T,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T,A1,? extends R1> op1, @Nonnull BiFunctionEx<? super R0,? super R1,? extends R> exportFinishFn)
This sample takes a stream of orders and outputs a single tuple containing the orders with the smallest and the largest amount:
 BatchStage<Order> orders = pipeline.readFrom(orderSource);
 BatchStage<Tuple2<Order, Order>> extremes = orders.aggregate(allOf(
         minBy(ComparatorEx.comparing(Order::getAmount)),
         maxBy(ComparatorEx.comparing(Order::getAmount)),
         Tuple2::tuple2
 ));
 T - type of input itemsA0 - 1st accumulator typeA1 - 2nd accumulator typeR0 - 1st result typeR1 - 2nd result typeR - final result typeop0 - 1st operationop1 - 2nd operationexportFinishFn - function combining the two results into a single
     target instance. It must be stateless and cooperative.@Nonnull public static <T,A0,A1,R0,R1> AggregateOperation1<T,Tuple2<A0,A1>,Tuple2<R0,R1>> allOf(@Nonnull AggregateOperation1<? super T,A0,R0> op1, @Nonnull AggregateOperation1<? super T,A1,R1> op2)
allOf(AggregateOperation1, AggregateOperation1,
 BiFunctionEx) wrapping the two results in a Tuple2.@Nonnull public static <T,A0,A1,A2,R0,R1,R2,R> AggregateOperation1<T,Tuple3<A0,A1,A2>,R> allOf(@Nonnull AggregateOperation1<? super T,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T,A1,? extends R1> op1, @Nonnull AggregateOperation1<? super T,A2,? extends R2> op2, @Nonnull TriFunction<? super R0,? super R1,? super R2,? extends R> exportFinishFn)
This sample takes a stream of orders and outputs a single tuple containing the average amount ordered and the orders with the smallest and the largest amount:
 BatchStage<Order> orders = pipeline.readFrom(orderSource);
 BatchStage<Tuple3<Double, Order, Order>> averageAndExtremes =
     orders.aggregate(allOf(
         averagingLong(Order::getAmount),
         minBy(ComparatorEx.comparing(Order::getAmount)),
         maxBy(ComparatorEx.comparing(Order::getAmount)),
         Tuple3::tuple3
 ));
 T - type of input itemsA0 - 1st accumulator typeA1 - 2nd accumulator typeA2 - 3rd accumulator typeR0 - 1st result typeR1 - 2nd result typeR2 - 3rd result typeR - final result typeop0 - 1st operationop1 - 2nd operationop2 - 3rd operationexportFinishFn - function combining the three results into a single
     target instance. It must be stateless and cooperative.@Nonnull public static <T,A0,A1,A2,R0,R1,R2> AggregateOperation1<T,Tuple3<A0,A1,A2>,Tuple3<R0,R1,R2>> allOf(@Nonnull AggregateOperation1<? super T,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T,A1,? extends R1> op1, @Nonnull AggregateOperation1<? super T,A2,? extends R2> op2)
allOf(AggregateOperation1, AggregateOperation1,
 AggregateOperation1, TriFunction) wrapping the three results in a
 Tuple3.@Nonnull public static <T> AllOfAggregationBuilder<T> allOfBuilder()
ItemsByTag object you'll get in the
 output.
 
 The builder object is primarily intended to build a composite of four or
 more aggregate operations. For up to three operations, prefer the simpler
 and more type-safe variants allOf(op1, op2) and allOf(op1, op2, op3).
 
In the following sample we'll construct a composite aggregate operation that takes a stream of orders and finds the extremes in terms of ordered amount. Here's the input stage:
 BatchStage<Order> orders = pipeline.readFrom(orderSource);
 
 AllOfAggregationBuilder<Order> builder = allOfBuilder();
 Tag<Order> minTag = builder.add(minBy(ComparatorEx.comparing(Order::getAmount)));
 Tag<Order> maxTag = builder.add(maxBy(ComparatorEx.comparing(Order::getAmount)));
 AggregateOperation1<Order, ?, ItemsByTag> aggrOp = builder.build();
 
 BatchStage<ItemsByTag> extremes = orders.aggregate(aggrOp);
 BatchStage<Tuple2<Order, Order>> extremesAsTuple =
         extremes.map(ibt -> tuple2(ibt.get(minTag), ibt.get(maxTag)));
 T - type of input itemspublic static <T0,A0,R0,T1,A1,R1,R> AggregateOperation2<T0,T1,Tuple2<A0,A1>,R> aggregateOperation2(@Nonnull AggregateOperation1<? super T0,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T1,A1,? extends R1> op1, @Nonnull BiFunctionEx<? super R0,? super R1,? extends R> exportFinishFn)
BatchStage.aggregate2(BatchStage, AggregateOperation2)
 stage0.aggregate2(stage1, compositeAggrOp)}. Before using this method,
 see if you can instead use stage0.aggregate2(aggrOp0, stage1, aggrOp1) because it's simpler and
 doesn't require you to pre-compose the aggregate operations.
 This method is suitable when you can express your computation as two independent aggregate operations where you combine their final results. If you need an operation that combines the two inputs in the accumulation phase, you can create an aggregate operation by specifying each primitive using the aggregate operation builder.
As a quick example, let's say you have two data streams coming from an online store, consisting of user actions: page visits and payments:
 BatchStage<PageVisit> pageVisits = pipeline.readFrom(pageVisitSource);
 BatchStage<Payment> payments = pipeline.readFrom(paymentSource);
 
 BatchStage<Entry<Long, Double>> visitsPerPurchase = pageVisits
     .groupingKey(PageVisit::userId)
     .aggregate2(
         payments.groupingKey(Payment::userId),
         aggregateOperation2(counting(), counting(),
             (numPageVisits, numPayments) -> 1.0 * numPageVisits / numPayments
         ));
 Map.Entry where the key is
 the user ID and the value is the ratio of page visits to payments for
 that user.T0 - type of items in the first stageA0 - type of the first aggregate operation's accumulatorR0 - type of the first aggregate operation's resultT1 - type of items in the second stageA1 - type of the second aggregate operation's accumulatorR1 - type of the second aggregate operation's resultR - type of the resultop0 - the aggregate operation that will receive the first stage's inputop1 - the aggregate operation that will receive the second stage's inputexportFinishFn - the function that transforms the individual aggregate results into the
                 overall result that the co-aggregating stage emits. It must be stateless
                 and cooperative.public static <T0,T1,A0,A1,R0,R1> AggregateOperation2<T0,T1,Tuple2<A0,A1>,Tuple2<R0,R1>> aggregateOperation2(@Nonnull AggregateOperation1<? super T0,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T1,A1,? extends R1> op1)
aggregateOperation2(aggrOp0, aggrOp1, finishFn) that outputs a
 Tuple2(result0, result1).T0 - type of items in the first stageA0 - type of the first aggregate operation's accumulatorR0 - type of the first aggregate operation's resultT1 - type of items in the second stageA1 - type of the second aggregate operation's accumulatorR1 - type of the second aggregate operation's resultop0 - the aggregate operation that will receive the first stage's inputop1 - the aggregate operation that will receive the second stage's inputpublic static <T0,T1,T2,A0,A1,A2,R0,R1,R2,R> AggregateOperation3<T0,T1,T2,Tuple3<A0,A1,A2>,R> aggregateOperation3(@Nonnull AggregateOperation1<? super T0,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T1,A1,? extends R1> op1, @Nonnull AggregateOperation1<? super T2,A2,? extends R2> op2, @Nonnull TriFunction<? super R0,? super R1,? super R2,? extends R> exportFinishFn)
BatchStage.aggregate3(BatchStage, BatchStage, AggregateOperation3)
 stage0.aggregate3(stage1, stage2, compositeAggrOp)}. Before using this
 method, see if you can instead use stage0.aggregate3(aggrOp0, stage1,
 aggrOp1, stage2, aggrOp2) because it's simpler and doesn't require you
 to pre-compose the aggregate operations.
 This method is suitable when you can express your computation as three independent aggregate operations where you combine their final results. If you need an operation that combines the inputs in the accumulation phase, you can create an aggregate operation by specifying each primitive using the aggregate operation builder.
As a quick example, let's say you have three data streams coming from an online store, consisting of user actions: page visits, add-to-cart actions and payments:
 BatchStage<PageVisit> pageVisits = pipeline.readFrom(pageVisitSource);
 BatchStage<AddToCart> addToCarts = pipeline.readFrom(addToCartSource);
 BatchStage<Payment> payments = pipeline.readFrom(paymentSource);
 
 BatchStage<Entry<Integer, Tuple2<Double, Double>>> userStats = pageVisits
     .groupingKey(PageVisit::userId)
     .aggregate3(
         addToCarts.groupingKey(AddToCart::userId),
         payments.groupingKey(Payment::userId),
         aggregateOperation3(counting(), counting(), counting(),
             (numPageVisits, numAddToCarts, numPayments) ->
                 tuple2(1.0 * numPageVisits / numPayments,
                        1.0 * numAddToCarts / numPayments
                 )
         ));
 T0 - type of items in the first stageA0 - type of the first aggregate operation's accumulatorR0 - type of the first aggregate operation's resultT1 - type of items in the second stageA1 - type of the second aggregate operation's accumulatorR1 - type of the second aggregate operation's resultT2 - type of items in the third stageA2 - type of the third aggregate operation's accumulatorR2 - type of the third aggregate operation's resultR - type of the resultop0 - the aggregate operation that will receive the first stage's inputop1 - the aggregate operation that will receive the second stage's inputop2 - the aggregate operation that will receive the third stage's inputexportFinishFn - the function that transforms the individual aggregate results into the
                 overall result that the co-aggregating stage emits. It must be stateless
                 and cooperative.public static <T0,T1,T2,A0,A1,A2,R0,R1,R2> AggregateOperation3<T0,T1,T2,Tuple3<A0,A1,A2>,Tuple3<R0,R1,R2>> aggregateOperation3(@Nonnull AggregateOperation1<? super T0,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T1,A1,? extends R1> op1, @Nonnull AggregateOperation1<? super T2,A2,? extends R2> op2)
aggregateOperation3(aggrOp0, aggrOp1, aggrOp2, finishFn) that outputs a
 Tuple3(result0, result1, result2).T0 - type of items in the first stageA0 - type of the first aggregate operation's accumulatorR0 - type of the first aggregate operation's resultT1 - type of items in the second stageA1 - type of the second aggregate operation's accumulatorR1 - type of the second aggregate operation's resultT2 - type of items in the third stageA2 - type of the third aggregate operation's accumulatorR2 - type of the third aggregate operation's resultop0 - the aggregate operation that will receive the first stage's inputop1 - the aggregate operation that will receive the second stage's inputop2 - the aggregate operation that will receive the third stage's input@Nonnull public static CoAggregateOperationBuilder coAggregateOperationBuilder()
stage.aggregateBuilder(). Before deciding to use it, consider using
 stage.aggregateBuilder(aggrOp0) because it will allow you to directly
 pass the aggregate operation for each joined stage, without requiring
 you to build a composite operation through this builder. Finally, if
 you're co-aggregating two or three streams, prefer the simpler and more
 type-safe variants: aggregateOperation2(com.hazelcast.jet.aggregate.AggregateOperation1<? super T0, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T1, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>) and aggregateOperation3(com.hazelcast.jet.aggregate.AggregateOperation1<? super T0, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T1, A1, ? extends R1>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T2, A2, ? extends R2>, com.hazelcast.jet.function.TriFunction<? super R0, ? super R1, ? super R2, ? extends R>).
 This builder is suitable when you can express your computation as independent aggregate operations on each input where you combine only their final results. If you need an operation that combines the inputs in the accumulation phase, you can create an aggregate operation by specifying each primitive using the aggregate operation builder.
As a quick example, let's say you have two data streams coming from an online store, consisting of user actions: page visits and payments:
 BatchStage<PageVisit> pageVisits = pipeline.readFrom(pageVisitSource);
 BatchStage<Payment> payments = pipeline.readFrom(paymentSource);
 stage.aggregateBuilder(). Note that there will be two builders at play:
 the stage builder, which joins the pipeline stages, and the
 aggregate operation builder (obtained from this method). First
 we obtain the stage builder and add our pipeline stages:
 
 GroupAggregateBuilder1<PageVisit, Long> stageBuilder =
         pageVisits.groupingKey(PageVisit::userId).aggregateBuilder();
 Tag<PageVisit> visitTag_in = stageBuilder.tag0();
 Tag<Payment> payTag_in = stageBuilder.add(payments.groupingKey(Payment::userId));
 
 CoAggregateOperationBuilder opBuilder = coAggregateOperationBuilder();
 Tag<Long> visitTag = opBuilder.add(visitTag_in, counting());
 Tag<Long> payTag = opBuilder.add(payTag_in, counting());
 exportFinishFn we specify at the end:
 
 AggregateOperation<Object[], Double> aggrOp =
         opBuilder.build(ibt -> 1.0 * ibt.get(visitTag) / ibt.get(payTag));
 
 BatchStage<Entry<Long, Double>> visitsPerPurchase = stageBuilder.build(aggrOp);
 Map.Entrys where the key is
 the user ID and the value is the ratio of page visits to payments for
 that user.@Nonnull public static <T,A,R> Collector<T,A,R> toCollector(AggregateOperation1<? super T,A,? extends R> aggrOp)
Stream.collect(Collector).
 This can be useful when you want to combine java.util.stream with Jet aggregations. For example, the below can be used to do multiple aggregations in a single pass over the same data set:
   Stream<Person> personStream = people.stream();
   personStream.collect(
     AggregateOperations.toCollector(
       AggregateOperations.allOf(
         AggregateOperations.counting(),
         AggregateOperations.averagingLong(p -> p.getAge())
       )
     )
   );
 @Nonnull public static <T,A,R> Aggregator<T,R> toAggregator(AggregateOperation1<? super T,A,? extends R> aggrOp)
IMap.aggregate(Aggregator)
 calls.
 
 Using IMap aggregations can be desirable when you want to make
 use of indices when doing aggregations and
 want to use the Jet aggregations API instead of writing a custom
 Aggregator.
 
For example, the following aggregation can be used to group people by their age and find the counts for each group.
   IMap<Integer, Person> map = jet.getMap("people");
   Map<Integer, Long> counts = map.aggregate(
     AggregateOperations.toAggregator(
       AggregateOperations.groupingBy(
         e -> e.getValue().getGender(), AggregateOperations.counting()
       )
     )
   );
 
 Copyright © 2023 Hazelcast, Inc.. All rights reserved.