Class AggregateOperations


  • public final class AggregateOperations
    extends java.lang.Object
    Utility class with factory methods for several useful aggregate operations. See the Javadoc on AggregateOperation. You can also create your own aggregate operation using the builder object.
    Since:
    Jet 3.0
    • Method Detail

      • counting

        @Nonnull
        public static <T> AggregateOperation1<T,​LongAccumulator,​java.lang.Long> counting()
        Returns an aggregate operation that counts the items it observes. The result is of type long.

        This sample takes a stream of words and finds the number of occurrences of each word in it:

        
         BatchStage<String> words = pipeline.readFrom(wordSource);
         BatchStage<Entry<String, Long>> wordFrequencies =
             words.groupingKey(wholeItem()).aggregate(counting());
         
      • summingLong

        @Nonnull
        public static <T> AggregateOperation1<T,​LongAccumulator,​java.lang.Long> summingLong​(@Nonnull
                                                                                                        ToLongFunctionEx<? super T> getLongValueFn)
        Returns an aggregate operation that computes the sum of the long values it obtains by applying getLongValueFn to each item.

        This sample takes a stream of lines of text and outputs a single long number telling how many words there were in the stream:

        
         BatchStage<String> linesOfText = pipeline.readFrom(textSource);
         BatchStage<Long> numberOfWordsInText =
                 linesOfText
                         .map(line -> line.split("\\W+"))
                         .aggregate(summingLong(wordsInLine -> wordsInLine.length));
         
        Note: if the sum exceeds Long.MAX_VALUE, the job will fail with an ArithmeticException.
        Type Parameters:
        T - type of the input item
        Parameters:
        getLongValueFn - function that extracts the long values you want to sum. It must be stateless and cooperative.
      • summingDouble

        @Nonnull
        public static <T> AggregateOperation1<T,​DoubleAccumulator,​java.lang.Double> summingDouble​(@Nonnull
                                                                                                              ToDoubleFunctionEx<? super T> getDoubleValueFn)
        Returns an aggregate operation that computes the sum of the double values it obtains by applying getDoubleValueFn to each item.

        This sample takes a stream of purchase events and outputs a single double value that tells the total sum of money spent in them:

        
         BatchStage<Purchase> purchases = pipeline.readFrom(purchaseSource);
         BatchStage<Double> purchaseVolume =
                 purchases.aggregate(summingDouble(Purchase::amount));
         
        Type Parameters:
        T - type of the input item
        Parameters:
        getDoubleValueFn - function that extracts the double values you want to sum. It must be stateless and cooperative.
      • minBy

        @Nonnull
        public static <T> AggregateOperation1<T,​MutableReference<T>,​T> minBy​(@Nonnull
                                                                                         ComparatorEx<? super T> comparator)
        Returns an aggregate operation that computes the least item according to the given comparator.

        This sample takes a stream of people and finds the youngest person in it:

        
         BatchStage<Person> people = pipeline.readFrom(peopleSource);
         BatchStage<Person> youngestPerson =
                 people.aggregate(minBy(ComparatorEx.comparing(Person::age)));
         
        NOTE: if this aggregate operation doesn't observe any items, its result will be null. Since the non-keyed BatchStage.aggregate(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, ?, ? extends R>) emits just the naked aggregation result, and since a null cannot travel through a Jet pipeline, you will not get any output in that case.

        If several items tie for the least one, this aggregate operation will choose any one to return and may choose a different one each time.

        Implementation note: this aggregate operation does not implement the deduct primitive. This has performance implications for sliding window aggregation.

        Type Parameters:
        T - type of the input item
        Parameters:
        comparator - comparator to compare the items. It must be stateless and cooperative.
      • maxBy

        @Nonnull
        public static <T> AggregateOperation1<T,​MutableReference<T>,​T> maxBy​(@Nonnull
                                                                                         ComparatorEx<? super T> comparator)
        Returns an aggregate operation that computes the greatest item according to the given comparator.

        This sample takes a stream of people and finds the oldest person in it:

        
         BatchStage<Person> people = pipeline.readFrom(peopleSource);
         BatchStage<Person> oldestPerson =
                 people.aggregate(maxBy(ComparatorEx.comparing(Person::age)));
         
        NOTE: if this aggregate operation doesn't observe any items, its result will be null. Since the non-keyed BatchStage.aggregate(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, ?, ? extends R>) emits just the naked aggregation result, and since a null cannot travel through a Jet pipeline, you will not get any output in that case.

        If several items tie for the greatest one, this aggregate operation will choose any one to return and may choose a different one each time.

        Implementation note: this aggregate operation does not implement the deduct primitive. This has performance implications for sliding window aggregation.

        Type Parameters:
        T - type of the input item
        Parameters:
        comparator - comparator to compare the items. It must be stateless and cooperative.
      • topN

        @Nonnull
        public static <T> AggregateOperation1<T,​java.util.PriorityQueue<T>,​java.util.List<T>> topN​(int n,
                                                                                                               @Nonnull
                                                                                                               ComparatorEx<? super T> comparator)
        Returns an aggregate operation that finds the top n items according to the given comparator. It outputs a sorted list with the top item in the first position.

        This sample takes a stream of people and finds ten oldest persons in it:

        
         BatchStage<Person> people = pipeline.readFrom(peopleSource);
         BatchStage<List<Person>> oldestDudes =
                 people.aggregate(topN(10, ComparatorEx.comparing(Person::age)));
         
        Implementation note: this aggregate operation does not implement the deduct primitive. This has performance implications for sliding window aggregation.
        Type Parameters:
        T - type of the input item
        Parameters:
        n - number of top items to find
        comparator - compares the items. It must be stateless and cooperative.
      • bottomN

        @Nonnull
        public static <T> AggregateOperation1<T,​java.util.PriorityQueue<T>,​java.util.List<T>> bottomN​(int n,
                                                                                                                  @Nonnull
                                                                                                                  ComparatorEx<? super T> comparator)
        Returns an aggregate operation that finds the bottom n items according to the given comparator. It outputs a sorted list with the bottom item in the first position.

        This sample takes a stream of people and finds ten youngest persons in it:

        
         BatchStage<Person> people = pipeline.readFrom(peopleSource);
         BatchStage<List<Person>> youngestDudes =
                 people.aggregate(bottomN(10, ComparatorEx.comparing(Person::age)));
         
        Implementation note: this aggregate operation does not implement the deduct primitive. This has performance implications for sliding window aggregation.
        Type Parameters:
        T - type of the input item
        Parameters:
        n - number of bottom items to find
        comparator - compares the items. It must be stateless and cooperative.
      • averagingLong

        @Nonnull
        public static <T> AggregateOperation1<T,​LongLongAccumulator,​java.lang.Double> averagingLong​(@Nonnull
                                                                                                                ToLongFunctionEx<? super T> getLongValueFn)
        Returns an aggregate operation that finds the arithmetic mean (aka. average) of the long values it obtains by applying getLongValueFn to each item. It outputs the result as a double.

        This sample takes a stream of people and finds their mean age:

        
         BatchStage<Person> people = pipeline.readFrom(peopleSource);
         BatchStage<Double> meanAge = people.aggregate(averagingLong(Person::age));
         

        If the aggregate operation does not observe any input, its result is NaN.

        NOTE: this operation accumulates the sum and the count as separate long variables and combines them at the end into the mean value. If either of these variables exceeds Long.MAX_VALUE, the job will fail with an ArithmeticException.

        Type Parameters:
        T - type of the input item
        Parameters:
        getLongValueFn - function that extracts the long value from the item. It must be stateless and cooperative.
      • averagingDouble

        @Nonnull
        public static <T> AggregateOperation1<T,​LongDoubleAccumulator,​java.lang.Double> averagingDouble​(@Nonnull
                                                                                                                    ToDoubleFunctionEx<? super T> getDoubleValueFn)
        Returns an aggregate operation that finds the arithmetic mean (aka. average) of the double values it obtains by applying getDoubleValueFn to each item. It outputs the result as a double.

        This sample takes a stream of people and finds their mean age:

        
         BatchStage<Person> people = pipeline.readFrom(peopleSource);
         BatchStage<Double> meanAge = people.aggregate(averagingDouble(Person::age));
         

        If the aggregate operation does not observe any input, its result is NaN.

        Type Parameters:
        T - type of the input item
        Parameters:
        getDoubleValueFn - function that extracts the double value from the item. It must be stateless and cooperative.
      • linearTrend

        @Nonnull
        public static <T> AggregateOperation1<T,​LinTrendAccumulator,​java.lang.Double> linearTrend​(@Nonnull
                                                                                                              ToLongFunctionEx<T> getXFn,
                                                                                                              @Nonnull
                                                                                                              ToLongFunctionEx<T> getYFn)
        Returns an aggregate operation that computes a linear trend over the items. It will produce a double-valued coefficient that approximates the rate of change of y as a function of x, where x and y are long quantities obtained by applying the two provided functions to each item.

        This sample takes an infinite stream of trade events and outputs the current rate of price change using a sliding window:

        
         StreamStage<Trade> trades = pipeline
                 .readFrom(tradeSource)
                 .withTimestamps(Trade::getTimestamp, SECONDS.toMillis(1));
         StreamStage<WindowResult<Double>> priceTrend = trades
             .window(WindowDefinition.sliding(MINUTES.toMillis(5), SECONDS.toMillis(1)))
             .aggregate(linearTrend(Trade::getTimestamp, Trade::getPrice));
         
        With the trade price given in cents and the timestamp in milliseconds, the output will be in cents per millisecond. Make sure you apply a scaling factor if you want another, more natural unit of measure.

        If this aggregate operation does not observe any input, its result is NaN.

        Type Parameters:
        T - type of the input item
        Parameters:
        getXFn - a function to extract x from the input. It must be stateless and cooperative.
        getYFn - a function to extract y from the input. It must be stateless and cooperative.
      • concatenating

        public static AggregateOperation1<java.lang.CharSequence,​java.lang.StringBuilder,​java.lang.String> concatenating()
        Returns an aggregate operation that takes string items and concatenates them into a single string.

        This sample outputs a string that you get by reading down the first column of the input text:

        
         BatchStage<String> linesOfText = pipeline.readFrom(textSource);
         BatchStage<String> lineStarters = linesOfText
                 .map(line -> line.charAt(0))
                 .map(Object::toString)
                 .aggregate(concatenating());
         
      • concatenating

        public static AggregateOperation1<java.lang.CharSequence,​java.lang.StringBuilder,​java.lang.String> concatenating​(java.lang.CharSequence delimiter)
        Returns an aggregate operation that takes string items and concatenates them, separated by the given delimiter, into a single string.

        This sample outputs a single line of text that contains all the upper-cased and title-cased words of the input text:

        
         BatchStage<String> linesOfText = pipeline.readFrom(textSource);
         BatchStage<String> upcaseWords = linesOfText
                 .map(line -> line.split("\\W+"))
                 .flatMap(Traversers::traverseArray)
                 .filter(word -> word.matches("\\p{Lu}.*"))
                 .aggregate(concatenating(" "));
         
      • concatenating

        public static AggregateOperation1<java.lang.CharSequence,​java.lang.StringBuilder,​java.lang.String> concatenating​(java.lang.CharSequence delimiter,
                                                                                                                                     java.lang.CharSequence prefix,
                                                                                                                                     java.lang.CharSequence suffix)
        Returns an aggregate operation that takes string items and concatenates them, separated by the given delimiter, into a single string. The resulting string will start with the given prefix and end with the given suffix.

        This sample outputs a single item, a JSON array of all the upper-cased and title-cased words of the input text:

        
         BatchStage<String> linesOfText = pipeline.readFrom(textSource);
         BatchStage<String> upcaseWords = linesOfText
                 .map(line -> line.split("\\W+"))
                 .flatMap(Traversers::traverseArray)
                 .filter(word -> word.matches("\\p{Lu}.*"))
                 .aggregate(concatenating("['", "', '", "']"));
         
      • toCollection

        public static <T,​C extends java.util.Collection<T>> AggregateOperation1<T,​C,​C> toCollection​(@Nonnull
                                                                                                                      SupplierEx<C> createCollectionFn)
        Returns an aggregate operation that accumulates the items into a Collection. It creates empty, mutable collections as needed by calling the provided createCollectionFn.

        This sample takes a stream of words and outputs a single sorted set of all the long words (above 5 letters):

        
         BatchStage<String> words = pipeline.readFrom(wordSource);
         BatchStage<SortedSet<String>> sortedLongWords = words
                 .filter(w -> w.length() > 5)
                 .aggregate(toCollection(TreeSet::new));
         
        Note: if you use a collection that preserves the insertion order, keep in mind that Jet doesn't aggregate the items in any specified order.
        Type Parameters:
        T - type of the input item
        C - the type of the collection
        Parameters:
        createCollectionFn - a Supplier of empty, mutable Collections. It must be stateless and cooperative.
      • toList

        public static <T> AggregateOperation1<T,​java.util.List<T>,​java.util.List<T>> toList()
        Returns an aggregate operation that accumulates the items into an ArrayList.

        This sample takes a stream of words and outputs a single list of all the long words (above 5 letters):

        
         BatchStage<String> words = pipeline.readFrom(wordSource);
         BatchStage<List<String>> longWords = words
                 .filter(w -> w.length() > 5)
                 .aggregate(toList());
         
        Note: accumulating all the data into an in-memory list shouldn't be your first choice in designing a pipeline. Consider draining the result stream to a sink.
        Type Parameters:
        T - type of the input item
      • toSet

        public static <T> AggregateOperation1<T,​java.util.Set<T>,​java.util.Set<T>> toSet()
        Returns an aggregate operation that accumulates the items into a HashSet.

        This sample takes a stream of people and outputs a single set of all the distinct cities they live in:

        
         pipeline.readFrom(personSource)
                 .map(Person::getCity)
                 .aggregate(toSet());
         
        Note: accumulating all the data into an in-memory set shouldn't be your first choice in designing a pipeline. Consider draining the result stream to a sink.
        Type Parameters:
        T - type of the input item
      • toMap

        public static <T,​K,​U> AggregateOperation1<T,​java.util.Map<K,​U>,​java.util.Map<K,​U>> toMap​(FunctionEx<? super T,​? extends K> keyFn,
                                                                                                                                     FunctionEx<? super T,​? extends U> valueFn)
        Returns an aggregate operation that accumulates the items into a HashMap whose keys and values are the result of applying the provided mapping functions.

        This aggregate operation does not tolerate duplicate keys and will throw an IllegalStateException if it detects them. If your data contains duplicates, use toMap(keyFn, valueFn, mergeFn).

        The following sample takes a stream of sensor readings and outputs a single map {sensor ID -> reading}:

        
         BatchStage<Map<String, Double>> readings = pipeline
                 .readFrom(sensorData)
                 .aggregate(toMap(
                         SensorReading::getSensorId,
                         SensorReading::getValue));
         
        Note: accumulating all the data into an in-memory map shouldn't be your first choice in designing a pipeline. Consider draining the stream to a sink.
        Type Parameters:
        T - type of the input item
        K - type of the key
        U - type of the value
        Parameters:
        keyFn - a function to extract the key from the input item. It must be stateless and cooperative.
        valueFn - a function to extract the value from the input item. It must be stateless and cooperative.
        See Also:
        toMap(FunctionEx, FunctionEx, BinaryOperatorEx), toMap(FunctionEx, FunctionEx, BinaryOperatorEx, SupplierEx), groupingBy(FunctionEx)
      • toMap

        public static <T,​K,​U> AggregateOperation1<T,​java.util.Map<K,​U>,​java.util.Map<K,​U>> toMap​(FunctionEx<? super T,​? extends K> keyFn,
                                                                                                                                     FunctionEx<? super T,​? extends U> valueFn,
                                                                                                                                     BinaryOperatorEx<U> mergeFn)
        Returns an aggregate operation that accumulates the items into a HashMap whose keys and values are the result of applying the provided mapping functions.

        This aggregate operation resolves duplicate keys by applying mergeFn to the conflicting values. mergeFn will act upon the values after valueFn has already been applied.

        The following sample takes a stream of sensor readings and outputs a single map {sensor ID -> reading}. Multiple readings from the same sensor get summed up:

        
         BatchStage<Map<String, Double>> readings = pipeline
                 .readFrom(sensorData)
                 .aggregate(toMap(
                         SensorReading::getSensorId,
                         SensorReading::getValue,
                         Double::sum));
         
        Note: accumulating all the data into an in-memory map shouldn't be your first choice in designing a pipeline. Consider draining the stream to a sink.

        The given functions must be stateless and cooperative.

        Type Parameters:
        T - type of the input item
        K - the type of key
        U - the output type of the value mapping function
        Parameters:
        keyFn - a function to extract the key from input item
        valueFn - a function to extract value from input item
        mergeFn - the function used to resolve collisions between values associated with the same key, will be passed to Map.merge(Object, Object, java.util.function.BiFunction)
        See Also:
        toMap(FunctionEx, FunctionEx), toMap(FunctionEx, FunctionEx, BinaryOperatorEx, SupplierEx)
      • toMap

        public static <T,​K,​U,​M extends java.util.Map<K,​U>> AggregateOperation1<T,​M,​M> toMap​(FunctionEx<? super T,​? extends K> keyFn,
                                                                                                                                FunctionEx<? super T,​? extends U> valueFn,
                                                                                                                                BinaryOperatorEx<U> mergeFn,
                                                                                                                                SupplierEx<M> createMapFn)
        Returns an aggregate operation that accumulates elements into a user-supplied Map instance. The keys and values are the result of applying the provided mapping functions to the input elements.

        This aggregate operation resolves duplicate keys by applying mergeFn to the conflicting values. mergeFn will act upon the values after valueFn has already been applied.

        The following sample takes a stream of sensor readings and outputs a single ObjectToLongHashMap of {sensor ID -> reading}. Multiple readings from the same sensor get summed up:

        
         BatchStage<Map<String, Long>> readings = pipeline
                 .readFrom(sensorData)
                 .aggregate(toMap(
                         SensorReading::getSensorId,
                         SensorReading::getValue,
                         Long::sum,
                         ObjectToLongHashMap::new));
         

        The given functions must be stateless and cooperative.

        Type Parameters:
        T - type of the input item
        K - the output type of the key mapping function
        U - the output type of the value mapping function
        M - the type of the resulting Map
        Parameters:
        keyFn - a function to extract the key from input item
        valueFn - a function to extract value from input item
        mergeFn - a merge function, used to resolve collisions between values associated with the same key, as supplied to Map.merge(Object, Object, java.util.function.BiFunction)
        createMapFn - a function which returns a new, empty Map into which the results will be inserted
        See Also:
        toMap(FunctionEx, FunctionEx), toMap(FunctionEx, FunctionEx, BinaryOperatorEx)
      • groupingBy

        public static <T,​K> AggregateOperation1<T,​java.util.Map<K,​java.util.List<T>>,​java.util.Map<K,​java.util.List<T>>> groupingBy​(FunctionEx<? super T,​? extends K> keyFn)
        Returns an aggregate operation that accumulates the items into a HashMap where the key is the result of applying keyFn and the value is a list of the items with that key.

        This operation is primarily useful when you need a cascaded group-by where you further classify the members of each group by a secondary key.

        This sample takes a stream of persons and classifies them first by country and then by gender. It outputs a stream of map entries where the key is the country and the value is a map from gender to the list of people of that gender from that country:

        
         BatchStage<Person> people = pipeline.readFrom(personSource);
         BatchStage<Entry<String, Map<String, List<Person>>>> byCountryAndGender =
                 people.groupingKey(Person::getCountry)
                       .aggregate(groupingBy(Person::getGender));
             
         }
        This aggregate operation has a similar effect to the dedicated groupingKey() pipeline transform so you may wonder why not use it in all cases, not just cascaded grouping. To see the difference, check out these two snippets:
        
         BatchStage<Person> people = pipeline.readFrom(personSource);
        
         // Snippet 1
         BatchStage<Entry<String, List<Person>>> byCountry1 =
                 people.groupingKey(Person::getCountry)
                       .aggregate(toList());
        
         // Snippet 2
         BatchStage<Map<String, List<Person>>> byCountry2 =
                 people.aggregate(groupingBy(Person::getCountry));
         
        Notice that snippet 1 outputs a stream of map entries whereas snippet 2 outputs a single map. To produce the single map, Jet must do all the work on a single thread and hold all the data on a single cluster member, so you lose the advantage of distributed computation. By contrast, snippet 1 allows Jet to partition the input by the grouping key and split the work across the cluster. This is why you should prefer a groupingKey stage if you have just one level of grouping.
        Type Parameters:
        T - type of the input item
        K - the output type of the key mapping function
        Parameters:
        keyFn - a function to extract the key from input item. It must be stateless and cooperative.
        See Also:
        groupingBy(FunctionEx, AggregateOperation1), groupingBy(FunctionEx, SupplierEx, AggregateOperation1), toMap(FunctionEx, FunctionEx)
      • groupingBy

        public static <T,​K,​A,​R> AggregateOperation1<T,​java.util.Map<K,​A>,​java.util.Map<K,​R>> groupingBy​(FunctionEx<? super T,​? extends K> keyFn,
                                                                                                                                                  AggregateOperation1<? super T,​A,​R> downstream)
        Returns an aggregate operation that accumulates the items into a HashMap where the key is the result of applying keyFn and the value is the result of applying the downstream aggregate operation to the items with that key.

        This operation is primarily useful when you need a cascaded group-by where you further classify the members of each group by a secondary key. For the difference between this operation and the groupingKey() pipeline transform, see the documentation on groupingBy(keyFn).

        This sample takes a stream of people, classifies them by country and gender, and reports the number of people in each category:

        
         BatchStage<Person> people = pipeline.readFrom(personSource);
         BatchStage<Entry<String, Map<String, Long>>> countByCountryAndGender =
                 people.groupingKey(Person::getCountry)
                       .aggregate(groupingBy(Person::getGender, counting()));
         
        Type Parameters:
        T - type of the input item
        K - the output type of the key mapping function
        R - the type of the downstream aggregation result
        A - downstream aggregation's accumulator type
        Parameters:
        keyFn - a function to extract the key from input item. It must be stateless and cooperative.
        downstream - the downstream aggregate operation
        See Also:
        groupingBy(FunctionEx), groupingBy(FunctionEx, SupplierEx, AggregateOperation1), toMap(FunctionEx, FunctionEx)
      • groupingBy

        public static <T,​K,​R,​A,​M extends java.util.Map<K,​R>> AggregateOperation1<T,​java.util.Map<K,​A>,​M> groupingBy​(FunctionEx<? super T,​? extends K> keyFn,
                                                                                                                                                                    SupplierEx<M> createMapFn,
                                                                                                                                                                    AggregateOperation1<? super T,​A,​R> downstream)
        Returns an AggregateOperation1 that accumulates the items into a Map (as obtained from createMapFn) where the key is the result of applying keyFn and the value is the result of applying the downstream aggregate operation to the items with that key.

        This operation is primarily useful when you need a cascaded group-by where you further classify the members of each group by a secondary key. For the difference between this operation and the groupingKey() pipeline transform, see the documentation on groupingBy(keyFn).

        The following sample takes a stream of people, classifies them by country and gender, and reports the number of people in each category. It uses the EnumMap to optimize memory usage:

        
         BatchStage<Person> people = pipeline.readFrom(personSource);
         BatchStage<Entry<String, Map<Gender, Long>>> countByCountryAndGender =
                 people.groupingKey(Person::getCountry)
                       .aggregate(groupingBy(
                               Person::getGender,
                               () -> new EnumMap<>(Gender.class),
                               counting()));
         
        Type Parameters:
        T - type of the input item
        K - the output type of the key mapping function
        R - the type of the downstream aggregation result
        A - downstream aggregation's accumulator type
        M - output type of the resulting Map
        Parameters:
        keyFn - a function to extract the key from input item. It must be stateless and cooperative.
        createMapFn - a function which returns a new, empty Map into which the results will be inserted. It must be stateless and cooperative.
        downstream - the downstream aggregate operation
        See Also:
        groupingBy(FunctionEx), groupingBy(FunctionEx, AggregateOperation1), toMap(FunctionEx, FunctionEx)
      • reducing

        @Nonnull
        public static <T,​A> AggregateOperation1<T,​MutableReference<A>,​A> reducing​(@Nonnull
                                                                                                    A emptyAccValue,
                                                                                                    @Nonnull
                                                                                                    FunctionEx<? super T,​? extends A> toAccValueFn,
                                                                                                    @Nonnull
                                                                                                    BinaryOperatorEx<A> combineAccValuesFn,
                                                                                                    @Nullable
                                                                                                    BinaryOperatorEx<A> deductAccValueFn)
        Returns an aggregate operation that constructs the result through the process of immutable reduction:
        1. The initial accumulated value is emptyAccValue.
        2. For each input item, compute the new value: newVal = combineAccValues(currVal, toAccValue(item))
        combineAccValuesFn must be associative because it will also be used to combine partial results, as well as commutative because the encounter order of items is unspecified.

        The optional deductAccValueFn allows Jet to compute the sliding window in O(1) time. It must undo the effects of a previous combineAccValuesFn call:

             A accVal;  // has some pre-existing value
             A itemAccVal = toAccValueFn.apply(item);
             A combined = combineAccValuesFn.apply(accVal, itemAccVal);
             A deducted = deductAccValueFn.apply(combined, itemAccVal);
             assert deducted.equals(accVal);
         

        This sample takes a stream of product orders and outputs a single long number which is the sum total of all the ordered amounts. The aggregate operation it implements is equivalent to summingLong(com.hazelcast.function.ToLongFunctionEx<? super T>):

        
         BatchStage<Order> orders = pipeline.readFrom(orderSource);
         BatchStage<Long> totalAmount = orders.aggregate(reducing(
                 0L,
                 Order::getAmount,
                 Math::addExact,
                 Math::subtractExact
         ));
         

        The given functions must be stateless and cooperative.

        Type Parameters:
        T - type of the input item
        A - type of the accumulated value
        Parameters:
        emptyAccValue - the reducing operation's emptyAccValue element
        toAccValueFn - transforms the stream item into its accumulated value
        combineAccValuesFn - combines two accumulated values into one
        deductAccValueFn - deducts the right-hand accumulated value from the left-hand one (optional)
      • pickAny

        @Nonnull
        public static <T> AggregateOperation1<T,​PickAnyAccumulator<T>,​T> pickAny()
        Returns an aggregate operation whose result is an arbitrary item it observed, or null if it observed no items.

        The implementation of StageWithWindow.distinct() uses this operation and, if needed, you can use it directly for the same purpose.

        This sample takes a stream of people and outputs a stream of people that have distinct last names (same as calling groupingKey(keyFn).distinct():

        
         BatchStage<Person> people = pipeline.readFrom(peopleSource);
         BatchStage<Entry<String, Person>> distinctByLastName =
                 people.groupingKey(Person::getLastName)
                       .aggregate(pickAny());
         
        NOTE: if this aggregate operation doesn't observe any items, its result will be null. Since the non-keyed BatchStage.aggregate(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, ?, ? extends R>) emits just the naked aggregation result, and since a null cannot travel through a Jet pipeline, you will not get any output in that case.
        Type Parameters:
        T - type of the input item
      • sorting

        public static <T> AggregateOperation1<T,​java.util.ArrayList<T>,​java.util.List<T>> sorting​(@Nonnull
                                                                                                              ComparatorEx<? super T> comparator)
        Returns an aggregate operation that accumulates all input items into an ArrayList and sorts it with the given comparator. If you have Comparable items that you want to sort in their natural order, use ComparatorEx.naturalOrder().

        This sample takes a stream of people and outputs a single list of people sorted by their last name:

        
         BatchStage<Person> people = pipeline.readFrom(peopleSource);
         BatchStage<List<Person>> sorted = people.aggregate(
             sorting(ComparatorEx.comparing(Person::getLastName)));
         
        Type Parameters:
        T - the type of input items
        Parameters:
        comparator - the comparator to use for sorting. It must be stateless and cooperative.
      • allOf

        @Nonnull
        public static <T,​A0,​A1,​R0,​R1,​R> AggregateOperation1<T,​Tuple2<A0,​A1>,​R> allOf​(@Nonnull
                                                                                                                                     AggregateOperation1<? super T,​A0,​? extends R0> op0,
                                                                                                                                     @Nonnull
                                                                                                                                     AggregateOperation1<? super T,​A1,​? extends R1> op1,
                                                                                                                                     @Nonnull
                                                                                                                                     BiFunctionEx<? super R0,​? super R1,​? extends R> exportFinishFn)
        Returns an aggregate operation that is a composite of two aggregate operations. It allows you to calculate both aggregations over the same items at once.

        This sample takes a stream of orders and outputs a single tuple containing the orders with the smallest and the largest amount:

        
         BatchStage<Order> orders = pipeline.readFrom(orderSource);
         BatchStage<Tuple2<Order, Order>> extremes = orders.aggregate(allOf(
                 minBy(ComparatorEx.comparing(Order::getAmount)),
                 maxBy(ComparatorEx.comparing(Order::getAmount)),
                 Tuple2::tuple2
         ));
         
        Type Parameters:
        T - type of input items
        A0 - 1st accumulator type
        A1 - 2nd accumulator type
        R0 - 1st result type
        R1 - 2nd result type
        R - final result type
        Parameters:
        op0 - 1st operation
        op1 - 2nd operation
        exportFinishFn - function combining the two results into a single target instance. It must be stateless and cooperative.
        Returns:
        the composite operation
      • allOf

        @Nonnull
        public static <T,​A0,​A1,​A2,​R0,​R1,​R2,​R> AggregateOperation1<T,​Tuple3<A0,​A1,​A2>,​R> allOf​(@Nonnull
                                                                                                                                                                AggregateOperation1<? super T,​A0,​? extends R0> op0,
                                                                                                                                                                @Nonnull
                                                                                                                                                                AggregateOperation1<? super T,​A1,​? extends R1> op1,
                                                                                                                                                                @Nonnull
                                                                                                                                                                AggregateOperation1<? super T,​A2,​? extends R2> op2,
                                                                                                                                                                @Nonnull
                                                                                                                                                                TriFunction<? super R0,​? super R1,​? super R2,​? extends R> exportFinishFn)
        Returns an aggregate operation that is a composite of three aggregate operations. It allows you to calculate all three over the same items at once.

        This sample takes a stream of orders and outputs a single tuple containing the average amount ordered and the orders with the smallest and the largest amount:

        
         BatchStage<Order> orders = pipeline.readFrom(orderSource);
         BatchStage<Tuple3<Double, Order, Order>> averageAndExtremes =
             orders.aggregate(allOf(
                 averagingLong(Order::getAmount),
                 minBy(ComparatorEx.comparing(Order::getAmount)),
                 maxBy(ComparatorEx.comparing(Order::getAmount)),
                 Tuple3::tuple3
         ));
         
        Type Parameters:
        T - type of input items
        A0 - 1st accumulator type
        A1 - 2nd accumulator type
        A2 - 3rd accumulator type
        R0 - 1st result type
        R1 - 2nd result type
        R2 - 3rd result type
        R - final result type
        Parameters:
        op0 - 1st operation
        op1 - 2nd operation
        op2 - 3rd operation
        exportFinishFn - function combining the three results into a single target instance. It must be stateless and cooperative.
        Returns:
        the composite operation
      • allOfBuilder

        @Nonnull
        public static <T> AllOfAggregationBuilder<T> allOfBuilder()
        Returns a builder object that helps you create a composite of multiple aggregate operations. The resulting aggregate operation will perform all of the constituent operations at the same time and you can retrieve individual results from the ItemsByTag object you'll get in the output.

        The builder object is primarily intended to build a composite of four or more aggregate operations. For up to three operations, prefer the simpler and more type-safe variants allOf(op1, op2) and allOf(op1, op2, op3).

        In the following sample we'll construct a composite aggregate operation that takes a stream of orders and finds the extremes in terms of ordered amount. Here's the input stage:

        
         BatchStage<Order> orders = pipeline.readFrom(orderSource);
         
        Now we construct the aggregate operation using the builder:
        
         AllOfAggregationBuilder<Order> builder = allOfBuilder();
         Tag<Order> minTag = builder.add(minBy(ComparatorEx.comparing(Order::getAmount)));
         Tag<Order> maxTag = builder.add(maxBy(ComparatorEx.comparing(Order::getAmount)));
         AggregateOperation1<Order, ?, ItemsByTag> aggrOp = builder.build();
         
        Finally, we apply the aggregate operation and use the tags we got above to extract the components:
        
         BatchStage<ItemsByTag> extremes = orders.aggregate(aggrOp);
         BatchStage<Tuple2<Order, Order>> extremesAsTuple =
                 extremes.map(ibt -> tuple2(ibt.get(minTag), ibt.get(maxTag)));
         
        Type Parameters:
        T - type of input items
      • aggregateOperation2

        public static <T0,​A0,​R0,​T1,​A1,​R1,​R> AggregateOperation2<T0,​T1,​Tuple2<A0,​A1>,​R> aggregateOperation2​(@Nonnull
                                                                                                                                                                       AggregateOperation1<? super T0,​A0,​? extends R0> op0,
                                                                                                                                                                       @Nonnull
                                                                                                                                                                       AggregateOperation1<? super T1,​A1,​? extends R1> op1,
                                                                                                                                                                       @Nonnull
                                                                                                                                                                       BiFunctionEx<? super R0,​? super R1,​? extends R> exportFinishFn)
        Returns an aggregate operation that is a composite of two independent aggregate operations, each one accepting its own input. You need this kind of operation in a two-way co-aggregating pipeline stage such as BatchStage.aggregate2(BatchStage, AggregateOperation2) stage0.aggregate2(stage1, compositeAggrOp)}. Before using this method, see if you can instead use stage0.aggregate2(aggrOp0, stage1, aggrOp1) because it's simpler and doesn't require you to pre-compose the aggregate operations.

        This method is suitable when you can express your computation as two independent aggregate operations where you combine their final results. If you need an operation that combines the two inputs in the accumulation phase, you can create an aggregate operation by specifying each primitive using the aggregate operation builder.

        As a quick example, let's say you have two data streams coming from an online store, consisting of user actions: page visits and payments:

        
         BatchStage<PageVisit> pageVisits = pipeline.readFrom(pageVisitSource);
         BatchStage<Payment> payments = pipeline.readFrom(paymentSource);
         
        We want to find out how many page clicks each user did before buying a product. We can do it like this:
        
         BatchStage<Entry<Long, Double>> visitsPerPurchase = pageVisits
             .groupingKey(PageVisit::userId)
             .aggregate2(
                 payments.groupingKey(Payment::userId),
                 aggregateOperation2(counting(), counting(),
                     (numPageVisits, numPayments) -> 1.0 * numPageVisits / numPayments
                 ));
         
        The output stage's stream contains a Map.Entry where the key is the user ID and the value is the ratio of page visits to payments for that user.
        Type Parameters:
        T0 - type of items in the first stage
        A0 - type of the first aggregate operation's accumulator
        R0 - type of the first aggregate operation's result
        T1 - type of items in the second stage
        A1 - type of the second aggregate operation's accumulator
        R1 - type of the second aggregate operation's result
        R - type of the result
        Parameters:
        op0 - the aggregate operation that will receive the first stage's input
        op1 - the aggregate operation that will receive the second stage's input
        exportFinishFn - the function that transforms the individual aggregate results into the overall result that the co-aggregating stage emits. It must be stateless and cooperative.
      • aggregateOperation2

        public static <T0,​T1,​A0,​A1,​R0,​R1> AggregateOperation2<T0,​T1,​Tuple2<A0,​A1>,​Tuple2<R0,​R1>> aggregateOperation2​(@Nonnull
                                                                                                                                                                                 AggregateOperation1<? super T0,​A0,​? extends R0> op0,
                                                                                                                                                                                 @Nonnull
                                                                                                                                                                                 AggregateOperation1<? super T1,​A1,​? extends R1> op1)
        Convenience for aggregateOperation2(aggrOp0, aggrOp1, finishFn) that outputs a Tuple2(result0, result1).
        Type Parameters:
        T0 - type of items in the first stage
        A0 - type of the first aggregate operation's accumulator
        R0 - type of the first aggregate operation's result
        T1 - type of items in the second stage
        A1 - type of the second aggregate operation's accumulator
        R1 - type of the second aggregate operation's result
        Parameters:
        op0 - the aggregate operation that will receive the first stage's input
        op1 - the aggregate operation that will receive the second stage's input
      • aggregateOperation3

        public static <T0,​T1,​T2,​A0,​A1,​A2,​R0,​R1,​R2,​R> AggregateOperation3<T0,​T1,​T2,​Tuple3<A0,​A1,​A2>,​R> aggregateOperation3​(@Nonnull
                                                                                                                                                                                                                    AggregateOperation1<? super T0,​A0,​? extends R0> op0,
                                                                                                                                                                                                                    @Nonnull
                                                                                                                                                                                                                    AggregateOperation1<? super T1,​A1,​? extends R1> op1,
                                                                                                                                                                                                                    @Nonnull
                                                                                                                                                                                                                    AggregateOperation1<? super T2,​A2,​? extends R2> op2,
                                                                                                                                                                                                                    @Nonnull
                                                                                                                                                                                                                    TriFunction<? super R0,​? super R1,​? super R2,​? extends R> exportFinishFn)
        Returns an aggregate operation that is a composite of three independent aggregate operations, each one accepting its own input. You need this kind of operation in the three-way co-aggregating pipeline stage: BatchStage.aggregate3(BatchStage, BatchStage, AggregateOperation3) stage0.aggregate3(stage1, stage2, compositeAggrOp)}. Before using this method, see if you can instead use stage0.aggregate3(aggrOp0, stage1, aggrOp1, stage2, aggrOp2) because it's simpler and doesn't require you to pre-compose the aggregate operations.

        This method is suitable when you can express your computation as three independent aggregate operations where you combine their final results. If you need an operation that combines the inputs in the accumulation phase, you can create an aggregate operation by specifying each primitive using the aggregate operation builder.

        As a quick example, let's say you have three data streams coming from an online store, consisting of user actions: page visits, add-to-cart actions and payments:

        
         BatchStage<PageVisit> pageVisits = pipeline.readFrom(pageVisitSource);
         BatchStage<AddToCart> addToCarts = pipeline.readFrom(addToCartSource);
         BatchStage<Payment> payments = pipeline.readFrom(paymentSource);
         
        We want to get these metrics per each user: how many page clicks they did before buying a product, and how many products they bought per purchase. We could do it like this:
        
         BatchStage<Entry<Integer, Tuple2<Double, Double>>> userStats = pageVisits
             .groupingKey(PageVisit::userId)
             .aggregate3(
                 addToCarts.groupingKey(AddToCart::userId),
                 payments.groupingKey(Payment::userId),
                 aggregateOperation3(counting(), counting(), counting(),
                     (numPageVisits, numAddToCarts, numPayments) ->
                         tuple2(1.0 * numPageVisits / numPayments,
                                1.0 * numAddToCarts / numPayments
                         )
                 ));
         
        Type Parameters:
        T0 - type of items in the first stage
        A0 - type of the first aggregate operation's accumulator
        R0 - type of the first aggregate operation's result
        T1 - type of items in the second stage
        A1 - type of the second aggregate operation's accumulator
        R1 - type of the second aggregate operation's result
        T2 - type of items in the third stage
        A2 - type of the third aggregate operation's accumulator
        R2 - type of the third aggregate operation's result
        R - type of the result
        Parameters:
        op0 - the aggregate operation that will receive the first stage's input
        op1 - the aggregate operation that will receive the second stage's input
        op2 - the aggregate operation that will receive the third stage's input
        exportFinishFn - the function that transforms the individual aggregate results into the overall result that the co-aggregating stage emits. It must be stateless and cooperative.
      • aggregateOperation3

        public static <T0,​T1,​T2,​A0,​A1,​A2,​R0,​R1,​R2> AggregateOperation3<T0,​T1,​T2,​Tuple3<A0,​A1,​A2>,​Tuple3<R0,​R1,​R2>> aggregateOperation3​(@Nonnull
                                                                                                                                                                                                                                       AggregateOperation1<? super T0,​A0,​? extends R0> op0,
                                                                                                                                                                                                                                       @Nonnull
                                                                                                                                                                                                                                       AggregateOperation1<? super T1,​A1,​? extends R1> op1,
                                                                                                                                                                                                                                       @Nonnull
                                                                                                                                                                                                                                       AggregateOperation1<? super T2,​A2,​? extends R2> op2)
        Convenience for aggregateOperation3(aggrOp0, aggrOp1, aggrOp2, finishFn) that outputs a Tuple3(result0, result1, result2).
        Type Parameters:
        T0 - type of items in the first stage
        A0 - type of the first aggregate operation's accumulator
        R0 - type of the first aggregate operation's result
        T1 - type of items in the second stage
        A1 - type of the second aggregate operation's accumulator
        R1 - type of the second aggregate operation's result
        T2 - type of items in the third stage
        A2 - type of the third aggregate operation's accumulator
        R2 - type of the third aggregate operation's result
        Parameters:
        op0 - the aggregate operation that will receive the first stage's input
        op1 - the aggregate operation that will receive the second stage's input
        op2 - the aggregate operation that will receive the third stage's input
      • toCollector

        @Nonnull
        public static <T,​A,​R> java.util.stream.Collector<T,​A,​R> toCollector​(AggregateOperation1<? super T,​A,​? extends R> aggrOp)
        Adapts this aggregate operation to a collector which can be passed to Stream.collect(Collector).

        This can be useful when you want to combine java.util.stream with Jet aggregations. For example, the below can be used to do multiple aggregations in a single pass over the same data set:

        
           Stream<Person> personStream = people.stream();
           personStream.collect(
             AggregateOperations.toCollector(
               AggregateOperations.allOf(
                 AggregateOperations.counting(),
                 AggregateOperations.averagingLong(p -> p.getAge())
               )
             )
           );
         
      • toAggregator

        @Nonnull
        public static <T,​A,​R> Aggregator<T,​R> toAggregator​(AggregateOperation1<? super T,​A,​? extends R> aggrOp)
        Adapts this aggregate operation to be used for IMap.aggregate(Aggregator) calls.

        Using IMap aggregations can be desirable when you want to make use of indices when doing aggregations and want to use the Jet aggregations API instead of writing a custom Aggregator.

        For example, the following aggregation can be used to group people by their age and find the counts for each group.

        
           IMap<Integer, Person> map = jet.getMap("people");
           Map<Integer, Long> counts = map.aggregate(
             AggregateOperations.toAggregator(
               AggregateOperations.groupingBy(
                 e -> e.getValue().getGender(), AggregateOperations.counting()
               )
             )
           );