Class AggregateOperations

java.lang.Object
com.hazelcast.jet.aggregate.AggregateOperations

public final class AggregateOperations extends Object
Utility class with factory methods for several useful aggregate operations. See the Javadoc on AggregateOperation. You can also create your own aggregate operation using the builder object.
Since:
Jet 3.0
  • Method Details

    • counting

      @Nonnull public static <T> AggregateOperation1<T,LongAccumulator,Long> counting()
      Returns an aggregate operation that counts the items it observes. The result is of type long.

      This sample takes a stream of words and finds the number of occurrences of each word in it:

      
       BatchStage<String> words = pipeline.readFrom(wordSource);
       BatchStage<Entry<String, Long>> wordFrequencies =
           words.groupingKey(wholeItem()).aggregate(counting());
       
    • summingLong

      @Nonnull public static <T> AggregateOperation1<T,LongAccumulator,Long> summingLong(@Nonnull ToLongFunctionEx<? super T> getLongValueFn)
      Returns an aggregate operation that computes the sum of the long values it obtains by applying getLongValueFn to each item.

      This sample takes a stream of lines of text and outputs a single long number telling how many words there were in the stream:

      
       BatchStage<String> linesOfText = pipeline.readFrom(textSource);
       BatchStage<Long> numberOfWordsInText =
               linesOfText
                       .map(line -> line.split("\\W+"))
                       .aggregate(summingLong(wordsInLine -> wordsInLine.length));
       
      Note: if the sum exceeds Long.MAX_VALUE, the job will fail with an ArithmeticException.
      Type Parameters:
      T - type of the input item
      Parameters:
      getLongValueFn - function that extracts the long values you want to sum. It must be stateless and cooperative.
    • summingDouble

      @Nonnull public static <T> AggregateOperation1<T,DoubleAccumulator,Double> summingDouble(@Nonnull ToDoubleFunctionEx<? super T> getDoubleValueFn)
      Returns an aggregate operation that computes the sum of the double values it obtains by applying getDoubleValueFn to each item.

      This sample takes a stream of purchase events and outputs a single double value that tells the total sum of money spent in them:

      
       BatchStage<Purchase> purchases = pipeline.readFrom(purchaseSource);
       BatchStage<Double> purchaseVolume =
               purchases.aggregate(summingDouble(Purchase::amount));
       
      Type Parameters:
      T - type of the input item
      Parameters:
      getDoubleValueFn - function that extracts the double values you want to sum. It must be stateless and cooperative.
    • minBy

      @Nonnull public static <T> AggregateOperation1<T,MutableReference<T>,T> minBy(@Nonnull ComparatorEx<? super T> comparator)
      Returns an aggregate operation that computes the least item according to the given comparator.

      This sample takes a stream of people and finds the youngest person in it:

      
       BatchStage<Person> people = pipeline.readFrom(peopleSource);
       BatchStage<Person> youngestPerson =
               people.aggregate(minBy(ComparatorEx.comparing(Person::age)));
       
      NOTE: if this aggregate operation doesn't observe any items, its result will be null. Since the non-keyed BatchStage.aggregate(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, ?, ? extends R>) emits just the naked aggregation result, and since a null cannot travel through a Jet pipeline, you will not get any output in that case.

      If several items tie for the least one, this aggregate operation will choose any one to return and may choose a different one each time.

      Implementation note: this aggregate operation does not implement the deduct primitive. This has performance implications for sliding window aggregation.

      Type Parameters:
      T - type of the input item
      Parameters:
      comparator - comparator to compare the items. It must be stateless and cooperative.
    • maxBy

      @Nonnull public static <T> AggregateOperation1<T,MutableReference<T>,T> maxBy(@Nonnull ComparatorEx<? super T> comparator)
      Returns an aggregate operation that computes the greatest item according to the given comparator.

      This sample takes a stream of people and finds the oldest person in it:

      
       BatchStage<Person> people = pipeline.readFrom(peopleSource);
       BatchStage<Person> oldestPerson =
               people.aggregate(maxBy(ComparatorEx.comparing(Person::age)));
       
      NOTE: if this aggregate operation doesn't observe any items, its result will be null. Since the non-keyed BatchStage.aggregate(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, ?, ? extends R>) emits just the naked aggregation result, and since a null cannot travel through a Jet pipeline, you will not get any output in that case.

      If several items tie for the greatest one, this aggregate operation will choose any one to return and may choose a different one each time.

      Implementation note: this aggregate operation does not implement the deduct primitive. This has performance implications for sliding window aggregation.

      Type Parameters:
      T - type of the input item
      Parameters:
      comparator - comparator to compare the items. It must be stateless and cooperative.
    • topN

      @Nonnull public static <T> AggregateOperation1<T,PriorityQueue<T>,List<T>> topN(int n, @Nonnull ComparatorEx<? super T> comparator)
      Returns an aggregate operation that finds the top n items according to the given comparator. It outputs a sorted list with the top item in the first position.

      This sample takes a stream of people and finds ten oldest persons in it:

      
       BatchStage<Person> people = pipeline.readFrom(peopleSource);
       BatchStage<List<Person>> oldestDudes =
               people.aggregate(topN(10, ComparatorEx.comparing(Person::age)));
       
      Implementation note: this aggregate operation does not implement the deduct primitive. This has performance implications for sliding window aggregation.
      Type Parameters:
      T - type of the input item
      Parameters:
      n - number of top items to find
      comparator - compares the items. It must be stateless and cooperative.
    • bottomN

      @Nonnull public static <T> AggregateOperation1<T,PriorityQueue<T>,List<T>> bottomN(int n, @Nonnull ComparatorEx<? super T> comparator)
      Returns an aggregate operation that finds the bottom n items according to the given comparator. It outputs a sorted list with the bottom item in the first position.

      This sample takes a stream of people and finds ten youngest persons in it:

      
       BatchStage<Person> people = pipeline.readFrom(peopleSource);
       BatchStage<List<Person>> youngestDudes =
               people.aggregate(bottomN(10, ComparatorEx.comparing(Person::age)));
       
      Implementation note: this aggregate operation does not implement the deduct primitive. This has performance implications for sliding window aggregation.
      Type Parameters:
      T - type of the input item
      Parameters:
      n - number of bottom items to find
      comparator - compares the items. It must be stateless and cooperative.
    • averagingLong

      @Nonnull public static <T> AggregateOperation1<T,LongLongAccumulator,Double> averagingLong(@Nonnull ToLongFunctionEx<? super T> getLongValueFn)
      Returns an aggregate operation that finds the arithmetic mean (aka. average) of the long values it obtains by applying getLongValueFn to each item. It outputs the result as a double.

      This sample takes a stream of people and finds their mean age:

      
       BatchStage<Person> people = pipeline.readFrom(peopleSource);
       BatchStage<Double> meanAge = people.aggregate(averagingLong(Person::age));
       

      If the aggregate operation does not observe any input, its result is NaN.

      NOTE: this operation accumulates the sum and the count as separate long variables and combines them at the end into the mean value. If either of these variables exceeds Long.MAX_VALUE, the job will fail with an ArithmeticException.

      Type Parameters:
      T - type of the input item
      Parameters:
      getLongValueFn - function that extracts the long value from the item. It must be stateless and cooperative.
    • averagingDouble

      @Nonnull public static <T> AggregateOperation1<T,LongDoubleAccumulator,Double> averagingDouble(@Nonnull ToDoubleFunctionEx<? super T> getDoubleValueFn)
      Returns an aggregate operation that finds the arithmetic mean (aka. average) of the double values it obtains by applying getDoubleValueFn to each item. It outputs the result as a double.

      This sample takes a stream of people and finds their mean age:

      
       BatchStage<Person> people = pipeline.readFrom(peopleSource);
       BatchStage<Double> meanAge = people.aggregate(averagingDouble(Person::age));
       

      If the aggregate operation does not observe any input, its result is NaN.

      Type Parameters:
      T - type of the input item
      Parameters:
      getDoubleValueFn - function that extracts the double value from the item. It must be stateless and cooperative.
    • linearTrend

      @Nonnull public static <T> AggregateOperation1<T,LinTrendAccumulator,Double> linearTrend(@Nonnull ToLongFunctionEx<T> getXFn, @Nonnull ToLongFunctionEx<T> getYFn)
      Returns an aggregate operation that computes a linear trend over the items. It will produce a double-valued coefficient that approximates the rate of change of y as a function of x, where x and y are long quantities obtained by applying the two provided functions to each item.

      This sample takes an infinite stream of trade events and outputs the current rate of price change using a sliding window:

      
       StreamStage<Trade> trades = pipeline
               .readFrom(tradeSource)
               .withTimestamps(Trade::getTimestamp, SECONDS.toMillis(1));
       StreamStage<WindowResult<Double>> priceTrend = trades
           .window(WindowDefinition.sliding(MINUTES.toMillis(5), SECONDS.toMillis(1)))
           .aggregate(linearTrend(Trade::getTimestamp, Trade::getPrice));
       
      With the trade price given in cents and the timestamp in milliseconds, the output will be in cents per millisecond. Make sure you apply a scaling factor if you want another, more natural unit of measure.

      If this aggregate operation does not observe any input, its result is NaN.

      Type Parameters:
      T - type of the input item
      Parameters:
      getXFn - a function to extract x from the input. It must be stateless and cooperative.
      getYFn - a function to extract y from the input. It must be stateless and cooperative.
    • concatenating

      public static AggregateOperation1<CharSequence,StringBuilder,String> concatenating()
      Returns an aggregate operation that takes string items and concatenates them into a single string.

      This sample outputs a string that you get by reading down the first column of the input text:

      
       BatchStage<String> linesOfText = pipeline.readFrom(textSource);
       BatchStage<String> lineStarters = linesOfText
               .map(line -> line.charAt(0))
               .map(Object::toString)
               .aggregate(concatenating());
       
    • concatenating

      public static AggregateOperation1<CharSequence,StringBuilder,String> concatenating(CharSequence delimiter)
      Returns an aggregate operation that takes string items and concatenates them, separated by the given delimiter, into a single string.

      This sample outputs a single line of text that contains all the upper-cased and title-cased words of the input text:

      
       BatchStage<String> linesOfText = pipeline.readFrom(textSource);
       BatchStage<String> upcaseWords = linesOfText
               .map(line -> line.split("\\W+"))
               .flatMap(Traversers::traverseArray)
               .filter(word -> word.matches("\\p{Lu}.*"))
               .aggregate(concatenating(" "));
       
    • concatenating

      public static AggregateOperation1<CharSequence,StringBuilder,String> concatenating(CharSequence delimiter, CharSequence prefix, CharSequence suffix)
      Returns an aggregate operation that takes string items and concatenates them, separated by the given delimiter, into a single string. The resulting string will start with the given prefix and end with the given suffix.

      This sample outputs a single item, a JSON array of all the upper-cased and title-cased words of the input text:

      
       BatchStage<String> linesOfText = pipeline.readFrom(textSource);
       BatchStage<String> upcaseWords = linesOfText
               .map(line -> line.split("\\W+"))
               .flatMap(Traversers::traverseArray)
               .filter(word -> word.matches("\\p{Lu}.*"))
               .aggregate(concatenating("['", "', '", "']"));
       
    • mapping

      public static <T, U, A, R> AggregateOperation1<T,A,R> mapping(@Nonnull FunctionEx<? super T,? extends U> mapFn, @Nonnull AggregateOperation1<? super U,A,? extends R> downstream)
      Adapts an aggregate operation that takes items of type U to one that takes items of type T, by applying the given mapping function to each item. Normally you should just apply the mapping in a stage before the aggregation, but this adapter is useful when simultaneously performing several aggregate operations using allOf(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>).

      In addition to mapping, you can apply filtering as well by returning null for an item you want filtered out.

      This sample takes a stream of people and builds two sorted lists from it, one with all the names and one with all the surnames:

      
       BatchStage<Person> people = pipeline.readFrom(peopleSource);
       BatchStage<Tuple2<List<String>, List<String>>> sortedNames =
           people.aggregate(allOf(
               mapping(Person::getFirstName, sorting(ComparatorEx.naturalOrder())),
               mapping(Person::getLastName, sorting(ComparatorEx.naturalOrder()))));
       
      Type Parameters:
      T - type of the input item
      U - input type of the downstream aggregate operation
      A - downstream operation's accumulator type
      R - downstream operation's result type
      Parameters:
      mapFn - the function to apply to the input items. It must be stateless and cooperative.
      downstream - the downstream aggregate operation
      See Also:
    • filtering

      public static <T, A, R> AggregateOperation1<T,A,R> filtering(@Nonnull PredicateEx<? super T> filterFn, @Nonnull AggregateOperation1<? super T,A,? extends R> downstream)
      Adapts an aggregate operation so that it accumulates only the items passing the filterFn and ignores others. Normally you should just apply the filter in a stage before the aggregation, but this adapter is useful when simultaneously performing several aggregate operations using allOf(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>).

      This sample takes a stream of people and outputs two numbers, the average height of kids and grown-ups:

      
       BatchStage<Person> people = pipeline.readFrom(peopleSource);
       BatchStage<Tuple2<Double, Double>> avgHeightByAge = people.aggregate(allOf(
           filtering((Person p) -> p.getAge() < 18, averagingLong(Person::getHeight)),
           filtering((Person p) -> p.getAge() >= 18, averagingLong(Person::getHeight))
       ));
       
      Type Parameters:
      T - type of the input item
      A - downstream operation's accumulator type
      R - downstream operation's result type
      Parameters:
      filterFn - the filtering function. It must be stateless and cooperative.
      downstream - the downstream aggregate operation
      Since:
      Jet 3.1
      See Also:
    • flatMapping

      public static <T, U, A, R> AggregateOperation1<T,A,R> flatMapping(@Nonnull FunctionEx<? super T,? extends Traverser<? extends U>> flatMapFn, @Nonnull AggregateOperation1<? super U,A,? extends R> downstream)
      Adapts an aggregate operation that takes items of type U to one that takes items of type T, by exploding each T into a sequence of Us and then accumulating all of them. Normally you should just apply the flat-mapping in a stage before the aggregation, but this adapter is useful when simultaneously performing several aggregate operations using allOf(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>).

      The traverser your function returns must be non-null and null-terminated.

      This sample takes a stream of people and outputs two numbers, the mean age of all the people and the mean age of people listed as someone's kid:

      
       BatchStage<Person> people = pipeline.readFrom(peopleSource);
       people.aggregate(allOf(
           averagingLong(Person::getAge),
           flatMapping((Person p) -> traverseIterable(p.getChildren()),
                   averagingLong(Person::getAge))
       ));
       
      Type Parameters:
      T - type of the input item
      U - input type of the downstream aggregate operation
      A - downstream operation's accumulator type
      R - downstream operation's result type
      Parameters:
      flatMapFn - the flat-mapping function to apply. It must be stateless and cooperative.
      downstream - the downstream aggregate operation
      Since:
      Jet 3.1
      See Also:
    • toCollection

      public static <T, C extends Collection<T>> AggregateOperation1<T,C,C> toCollection(@Nonnull SupplierEx<C> createCollectionFn)
      Returns an aggregate operation that accumulates the items into a Collection. It creates empty, mutable collections as needed by calling the provided createCollectionFn.

      This sample takes a stream of words and outputs a single sorted set of all the long words (above 5 letters):

      
       BatchStage<String> words = pipeline.readFrom(wordSource);
       BatchStage<SortedSet<String>> sortedLongWords = words
               .filter(w -> w.length() > 5)
               .aggregate(toCollection(TreeSet::new));
       
      Note: if you use a collection that preserves the insertion order, keep in mind that Jet doesn't aggregate the items in any specified order.
      Type Parameters:
      T - type of the input item
      C - the type of the collection
      Parameters:
      createCollectionFn - a Supplier of empty, mutable Collections. It must be stateless and cooperative.
    • toList

      public static <T> AggregateOperation1<T,List<T>,List<T>> toList()
      Returns an aggregate operation that accumulates the items into an ArrayList.

      This sample takes a stream of words and outputs a single list of all the long words (above 5 letters):

      
       BatchStage<String> words = pipeline.readFrom(wordSource);
       BatchStage<List<String>> longWords = words
               .filter(w -> w.length() > 5)
               .aggregate(toList());
       
      Note: accumulating all the data into an in-memory list shouldn't be your first choice in designing a pipeline. Consider draining the result stream to a sink.
      Type Parameters:
      T - type of the input item
    • toSet

      public static <T> AggregateOperation1<T,Set<T>,Set<T>> toSet()
      Returns an aggregate operation that accumulates the items into a HashSet.

      This sample takes a stream of people and outputs a single set of all the distinct cities they live in:

      
       pipeline.readFrom(personSource)
               .map(Person::getCity)
               .aggregate(toSet());
       
      Note: accumulating all the data into an in-memory set shouldn't be your first choice in designing a pipeline. Consider draining the result stream to a sink.
      Type Parameters:
      T - type of the input item
    • toMap

      public static <T, K, U> AggregateOperation1<T,Map<K,U>,Map<K,U>> toMap(FunctionEx<? super T,? extends K> keyFn, FunctionEx<? super T,? extends U> valueFn)
      Returns an aggregate operation that accumulates the items into a HashMap whose keys and values are the result of applying the provided mapping functions.

      This aggregate operation does not tolerate duplicate keys and will throw an IllegalStateException if it detects them. If your data contains duplicates, use toMap(keyFn, valueFn, mergeFn).

      The following sample takes a stream of sensor readings and outputs a single map {sensor ID -> reading}:

      
       BatchStage<Map<String, Double>> readings = pipeline
               .readFrom(sensorData)
               .aggregate(toMap(
                       SensorReading::getSensorId,
                       SensorReading::getValue));
       
      Note: accumulating all the data into an in-memory map shouldn't be your first choice in designing a pipeline. Consider draining the stream to a sink.
      Type Parameters:
      T - type of the input item
      K - type of the key
      U - type of the value
      Parameters:
      keyFn - a function to extract the key from the input item. It must be stateless and cooperative.
      valueFn - a function to extract the value from the input item. It must be stateless and cooperative.
      See Also:
    • toMap

      public static <T, K, U> AggregateOperation1<T,Map<K,U>,Map<K,U>> toMap(FunctionEx<? super T,? extends K> keyFn, FunctionEx<? super T,? extends U> valueFn, BinaryOperatorEx<U> mergeFn)
      Returns an aggregate operation that accumulates the items into a HashMap whose keys and values are the result of applying the provided mapping functions.

      This aggregate operation resolves duplicate keys by applying mergeFn to the conflicting values. mergeFn will act upon the values after valueFn has already been applied.

      The following sample takes a stream of sensor readings and outputs a single map {sensor ID -> reading}. Multiple readings from the same sensor get summed up:

      
       BatchStage<Map<String, Double>> readings = pipeline
               .readFrom(sensorData)
               .aggregate(toMap(
                       SensorReading::getSensorId,
                       SensorReading::getValue,
                       Double::sum));
       
      Note: accumulating all the data into an in-memory map shouldn't be your first choice in designing a pipeline. Consider draining the stream to a sink.

      The given functions must be stateless and cooperative.

      Type Parameters:
      T - type of the input item
      K - the type of key
      U - the output type of the value mapping function
      Parameters:
      keyFn - a function to extract the key from input item
      valueFn - a function to extract value from input item
      mergeFn - the function used to resolve collisions between values associated with the same key, will be passed to Map.merge(Object, Object, java.util.function.BiFunction)
      See Also:
    • toMap

      public static <T, K, U, M extends Map<K, U>> AggregateOperation1<T,M,M> toMap(FunctionEx<? super T,? extends K> keyFn, FunctionEx<? super T,? extends U> valueFn, BinaryOperatorEx<U> mergeFn, SupplierEx<M> createMapFn)
      Returns an aggregate operation that accumulates elements into a user-supplied Map instance. The keys and values are the result of applying the provided mapping functions to the input elements.

      This aggregate operation resolves duplicate keys by applying mergeFn to the conflicting values. mergeFn will act upon the values after valueFn has already been applied.

      The following sample takes a stream of sensor readings and outputs a single ObjectToLongHashMap of {sensor ID -> reading}. Multiple readings from the same sensor get summed up:

      
       BatchStage<Map<String, Long>> readings = pipeline
               .readFrom(sensorData)
               .aggregate(toMap(
                       SensorReading::getSensorId,
                       SensorReading::getValue,
                       Long::sum,
                       ObjectToLongHashMap::new));
       

      The given functions must be stateless and cooperative.

      Type Parameters:
      T - type of the input item
      K - the output type of the key mapping function
      U - the output type of the value mapping function
      M - the type of the resulting Map
      Parameters:
      keyFn - a function to extract the key from input item
      valueFn - a function to extract value from input item
      mergeFn - a merge function, used to resolve collisions between values associated with the same key, as supplied to Map.merge(Object, Object, java.util.function.BiFunction)
      createMapFn - a function which returns a new, empty Map into which the results will be inserted
      See Also:
    • groupingBy

      public static <T, K> AggregateOperation1<T,Map<K,List<T>>,Map<K,List<T>>> groupingBy(FunctionEx<? super T,? extends K> keyFn)
      Returns an aggregate operation that accumulates the items into a HashMap where the key is the result of applying keyFn and the value is a list of the items with that key.

      This operation is primarily useful when you need a cascaded group-by where you further classify the members of each group by a secondary key.

      This sample takes a stream of persons and classifies them first by country and then by gender. It outputs a stream of map entries where the key is the country and the value is a map from gender to the list of people of that gender from that country:

      
       BatchStage<Person> people = pipeline.readFrom(personSource);
       BatchStage<Entry<String, Map<String, List<Person>>>> byCountryAndGender =
               people.groupingKey(Person::getCountry)
                     .aggregate(groupingBy(Person::getGender));
           
       }
      This aggregate operation has a similar effect to the dedicated groupingKey() pipeline transform so you may wonder why not use it in all cases, not just cascaded grouping. To see the difference, check out these two snippets:
      
       BatchStage<Person> people = pipeline.readFrom(personSource);
      
       // Snippet 1
       BatchStage<Entry<String, List<Person>>> byCountry1 =
               people.groupingKey(Person::getCountry)
                     .aggregate(toList());
      
       // Snippet 2
       BatchStage<Map<String, List<Person>>> byCountry2 =
               people.aggregate(groupingBy(Person::getCountry));
       
      Notice that snippet 1 outputs a stream of map entries whereas snippet 2 outputs a single map. To produce the single map, Jet must do all the work on a single thread and hold all the data on a single cluster member, so you lose the advantage of distributed computation. By contrast, snippet 1 allows Jet to partition the input by the grouping key and split the work across the cluster. This is why you should prefer a groupingKey stage if you have just one level of grouping.
      Type Parameters:
      T - type of the input item
      K - the output type of the key mapping function
      Parameters:
      keyFn - a function to extract the key from input item. It must be stateless and cooperative.
      See Also:
    • groupingBy

      public static <T, K, A, R> AggregateOperation1<T,Map<K,A>,Map<K,R>> groupingBy(FunctionEx<? super T,? extends K> keyFn, AggregateOperation1<? super T,A,R> downstream)
      Returns an aggregate operation that accumulates the items into a HashMap where the key is the result of applying keyFn and the value is the result of applying the downstream aggregate operation to the items with that key.

      This operation is primarily useful when you need a cascaded group-by where you further classify the members of each group by a secondary key. For the difference between this operation and the groupingKey() pipeline transform, see the documentation on groupingBy(keyFn).

      This sample takes a stream of people, classifies them by country and gender, and reports the number of people in each category:

      
       BatchStage<Person> people = pipeline.readFrom(personSource);
       BatchStage<Entry<String, Map<String, Long>>> countByCountryAndGender =
               people.groupingKey(Person::getCountry)
                     .aggregate(groupingBy(Person::getGender, counting()));
       
      Type Parameters:
      T - type of the input item
      K - the output type of the key mapping function
      R - the type of the downstream aggregation result
      A - downstream aggregation's accumulator type
      Parameters:
      keyFn - a function to extract the key from input item. It must be stateless and cooperative.
      downstream - the downstream aggregate operation
      See Also:
    • groupingBy

      public static <T, K, R, A, M extends Map<K, R>> AggregateOperation1<T,Map<K,A>,M> groupingBy(FunctionEx<? super T,? extends K> keyFn, SupplierEx<M> createMapFn, AggregateOperation1<? super T,A,R> downstream)
      Returns an AggregateOperation1 that accumulates the items into a Map (as obtained from createMapFn) where the key is the result of applying keyFn and the value is the result of applying the downstream aggregate operation to the items with that key.

      This operation is primarily useful when you need a cascaded group-by where you further classify the members of each group by a secondary key. For the difference between this operation and the groupingKey() pipeline transform, see the documentation on groupingBy(keyFn).

      The following sample takes a stream of people, classifies them by country and gender, and reports the number of people in each category. It uses the EnumMap to optimize memory usage:

      
       BatchStage<Person> people = pipeline.readFrom(personSource);
       BatchStage<Entry<String, Map<Gender, Long>>> countByCountryAndGender =
               people.groupingKey(Person::getCountry)
                     .aggregate(groupingBy(
                             Person::getGender,
                             () -> new EnumMap<>(Gender.class),
                             counting()));
       
      Type Parameters:
      T - type of the input item
      K - the output type of the key mapping function
      R - the type of the downstream aggregation result
      A - downstream aggregation's accumulator type
      M - output type of the resulting Map
      Parameters:
      keyFn - a function to extract the key from input item. It must be stateless and cooperative.
      createMapFn - a function which returns a new, empty Map into which the results will be inserted. It must be stateless and cooperative.
      downstream - the downstream aggregate operation
      See Also:
    • reducing

      @Nonnull public static <T, A> AggregateOperation1<T,MutableReference<A>,A> reducing(@Nonnull A emptyAccValue, @Nonnull FunctionEx<? super T,? extends A> toAccValueFn, @Nonnull BinaryOperatorEx<A> combineAccValuesFn, @Nullable BinaryOperatorEx<A> deductAccValueFn)
      Returns an aggregate operation that constructs the result through the process of immutable reduction:
      1. The initial accumulated value is emptyAccValue.
      2. For each input item, compute the new value: newVal = combineAccValues(currVal, toAccValue(item))
      combineAccValuesFn must be associative because it will also be used to combine partial results, as well as commutative because the encounter order of items is unspecified.

      The optional deductAccValueFn allows Jet to compute the sliding window in O(1) time. It must undo the effects of a previous combineAccValuesFn call:

           A accVal;  // has some pre-existing value
           A itemAccVal = toAccValueFn.apply(item);
           A combined = combineAccValuesFn.apply(accVal, itemAccVal);
           A deducted = deductAccValueFn.apply(combined, itemAccVal);
           assert deducted.equals(accVal);
       

      This sample takes a stream of product orders and outputs a single long number which is the sum total of all the ordered amounts. The aggregate operation it implements is equivalent to summingLong(com.hazelcast.function.ToLongFunctionEx<? super T>):

      
       BatchStage<Order> orders = pipeline.readFrom(orderSource);
       BatchStage<Long> totalAmount = orders.aggregate(reducing(
               0L,
               Order::getAmount,
               Math::addExact,
               Math::subtractExact
       ));
       

      The given functions must be stateless and cooperative.

      Type Parameters:
      T - type of the input item
      A - type of the accumulated value
      Parameters:
      emptyAccValue - the reducing operation's emptyAccValue element
      toAccValueFn - transforms the stream item into its accumulated value
      combineAccValuesFn - combines two accumulated values into one
      deductAccValueFn - deducts the right-hand accumulated value from the left-hand one (optional)
    • pickAny

      @Nonnull public static <T> AggregateOperation1<T,PickAnyAccumulator<T>,T> pickAny()
      Returns an aggregate operation whose result is an arbitrary item it observed, or null if it observed no items.

      The implementation of StageWithWindow.distinct() uses this operation and, if needed, you can use it directly for the same purpose.

      This sample takes a stream of people and outputs a stream of people that have distinct last names (same as calling groupingKey(keyFn).distinct():

      
       BatchStage<Person> people = pipeline.readFrom(peopleSource);
       BatchStage<Entry<String, Person>> distinctByLastName =
               people.groupingKey(Person::getLastName)
                     .aggregate(pickAny());
       
      NOTE: if this aggregate operation doesn't observe any items, its result will be null. Since the non-keyed BatchStage.aggregate(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, ?, ? extends R>) emits just the naked aggregation result, and since a null cannot travel through a Jet pipeline, you will not get any output in that case.
      Type Parameters:
      T - type of the input item
    • sorting

      public static <T> AggregateOperation1<T,ArrayList<T>,List<T>> sorting(@Nonnull ComparatorEx<? super T> comparator)
      Returns an aggregate operation that accumulates all input items into an ArrayList and sorts it with the given comparator. If you have Comparable items that you want to sort in their natural order, use ComparatorEx.naturalOrder().

      This sample takes a stream of people and outputs a single list of people sorted by their last name:

      
       BatchStage<Person> people = pipeline.readFrom(peopleSource);
       BatchStage<List<Person>> sorted = people.aggregate(
           sorting(ComparatorEx.comparing(Person::getLastName)));
       
      Type Parameters:
      T - the type of input items
      Parameters:
      comparator - the comparator to use for sorting. It must be stateless and cooperative.
    • allOf

      @Nonnull public static <T, A0, A1, R0, R1, R> AggregateOperation1<T,Tuple2<A0,A1>,R> allOf(@Nonnull AggregateOperation1<? super T,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T,A1,? extends R1> op1, @Nonnull BiFunctionEx<? super R0,? super R1,? extends R> exportFinishFn)
      Returns an aggregate operation that is a composite of two aggregate operations. It allows you to calculate both aggregations over the same items at once.

      This sample takes a stream of orders and outputs a single tuple containing the orders with the smallest and the largest amount:

      
       BatchStage<Order> orders = pipeline.readFrom(orderSource);
       BatchStage<Tuple2<Order, Order>> extremes = orders.aggregate(allOf(
               minBy(ComparatorEx.comparing(Order::getAmount)),
               maxBy(ComparatorEx.comparing(Order::getAmount)),
               Tuple2::tuple2
       ));
       
      Type Parameters:
      T - type of input items
      A0 - 1st accumulator type
      A1 - 2nd accumulator type
      R0 - 1st result type
      R1 - 2nd result type
      R - final result type
      Parameters:
      op0 - 1st operation
      op1 - 2nd operation
      exportFinishFn - function combining the two results into a single target instance. It must be stateless and cooperative.
      Returns:
      the composite operation
    • allOf

      @Nonnull public static <T, A0, A1, R0, R1> AggregateOperation1<T,Tuple2<A0,A1>,Tuple2<R0,R1>> allOf(@Nonnull AggregateOperation1<? super T,A0,R0> op1, @Nonnull AggregateOperation1<? super T,A1,R1> op2)
      Convenience for allOf(AggregateOperation1, AggregateOperation1, BiFunctionEx) wrapping the two results in a Tuple2.
    • allOf

      @Nonnull public static <T, A0, A1, A2, R0, R1, R2, R> AggregateOperation1<T,Tuple3<A0,A1,A2>,R> allOf(@Nonnull AggregateOperation1<? super T,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T,A1,? extends R1> op1, @Nonnull AggregateOperation1<? super T,A2,? extends R2> op2, @Nonnull TriFunction<? super R0,? super R1,? super R2,? extends R> exportFinishFn)
      Returns an aggregate operation that is a composite of three aggregate operations. It allows you to calculate all three over the same items at once.

      This sample takes a stream of orders and outputs a single tuple containing the average amount ordered and the orders with the smallest and the largest amount:

      
       BatchStage<Order> orders = pipeline.readFrom(orderSource);
       BatchStage<Tuple3<Double, Order, Order>> averageAndExtremes =
           orders.aggregate(allOf(
               averagingLong(Order::getAmount),
               minBy(ComparatorEx.comparing(Order::getAmount)),
               maxBy(ComparatorEx.comparing(Order::getAmount)),
               Tuple3::tuple3
       ));
       
      Type Parameters:
      T - type of input items
      A0 - 1st accumulator type
      A1 - 2nd accumulator type
      A2 - 3rd accumulator type
      R0 - 1st result type
      R1 - 2nd result type
      R2 - 3rd result type
      R - final result type
      Parameters:
      op0 - 1st operation
      op1 - 2nd operation
      op2 - 3rd operation
      exportFinishFn - function combining the three results into a single target instance. It must be stateless and cooperative.
      Returns:
      the composite operation
    • allOf

      @Nonnull public static <T, A0, A1, A2, R0, R1, R2> AggregateOperation1<T,Tuple3<A0,A1,A2>,Tuple3<R0,R1,R2>> allOf(@Nonnull AggregateOperation1<? super T,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T,A1,? extends R1> op1, @Nonnull AggregateOperation1<? super T,A2,? extends R2> op2)
    • allOfBuilder

      @Nonnull public static <T> AllOfAggregationBuilder<T> allOfBuilder()
      Returns a builder object that helps you create a composite of multiple aggregate operations. The resulting aggregate operation will perform all of the constituent operations at the same time and you can retrieve individual results from the ItemsByTag object you'll get in the output.

      The builder object is primarily intended to build a composite of four or more aggregate operations. For up to three operations, prefer the simpler and more type-safe variants allOf(op1, op2) and allOf(op1, op2, op3).

      In the following sample we'll construct a composite aggregate operation that takes a stream of orders and finds the extremes in terms of ordered amount. Here's the input stage:

      
       BatchStage<Order> orders = pipeline.readFrom(orderSource);
       
      Now we construct the aggregate operation using the builder:
      
       AllOfAggregationBuilder<Order> builder = allOfBuilder();
       Tag<Order> minTag = builder.add(minBy(ComparatorEx.comparing(Order::getAmount)));
       Tag<Order> maxTag = builder.add(maxBy(ComparatorEx.comparing(Order::getAmount)));
       AggregateOperation1<Order, ?, ItemsByTag> aggrOp = builder.build();
       
      Finally, we apply the aggregate operation and use the tags we got above to extract the components:
      
       BatchStage<ItemsByTag> extremes = orders.aggregate(aggrOp);
       BatchStage<Tuple2<Order, Order>> extremesAsTuple =
               extremes.map(ibt -> tuple2(ibt.get(minTag), ibt.get(maxTag)));
       
      Type Parameters:
      T - type of input items
    • aggregateOperation2

      public static <T0, A0, R0, T1, A1, R1, R> AggregateOperation2<T0,T1,Tuple2<A0,A1>,R> aggregateOperation2(@Nonnull AggregateOperation1<? super T0,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T1,A1,? extends R1> op1, @Nonnull BiFunctionEx<? super R0,? super R1,? extends R> exportFinishFn)
      Returns an aggregate operation that is a composite of two independent aggregate operations, each one accepting its own input. You need this kind of operation in a two-way co-aggregating pipeline stage such as BatchStage.aggregate2(BatchStage, AggregateOperation2) stage0.aggregate2(stage1, compositeAggrOp)}. Before using this method, see if you can instead use stage0.aggregate2(aggrOp0, stage1, aggrOp1) because it's simpler and doesn't require you to pre-compose the aggregate operations.

      This method is suitable when you can express your computation as two independent aggregate operations where you combine their final results. If you need an operation that combines the two inputs in the accumulation phase, you can create an aggregate operation by specifying each primitive using the aggregate operation builder.

      As a quick example, let's say you have two data streams coming from an online store, consisting of user actions: page visits and payments:

      
       BatchStage<PageVisit> pageVisits = pipeline.readFrom(pageVisitSource);
       BatchStage<Payment> payments = pipeline.readFrom(paymentSource);
       
      We want to find out how many page clicks each user did before buying a product. We can do it like this:
      
       BatchStage<Entry<Long, Double>> visitsPerPurchase = pageVisits
           .groupingKey(PageVisit::userId)
           .aggregate2(
               payments.groupingKey(Payment::userId),
               aggregateOperation2(counting(), counting(),
                   (numPageVisits, numPayments) -> 1.0 * numPageVisits / numPayments
               ));
       
      The output stage's stream contains a Map.Entry where the key is the user ID and the value is the ratio of page visits to payments for that user.
      Type Parameters:
      T0 - type of items in the first stage
      A0 - type of the first aggregate operation's accumulator
      R0 - type of the first aggregate operation's result
      T1 - type of items in the second stage
      A1 - type of the second aggregate operation's accumulator
      R1 - type of the second aggregate operation's result
      R - type of the result
      Parameters:
      op0 - the aggregate operation that will receive the first stage's input
      op1 - the aggregate operation that will receive the second stage's input
      exportFinishFn - the function that transforms the individual aggregate results into the overall result that the co-aggregating stage emits. It must be stateless and cooperative.
    • aggregateOperation2

      public static <T0, T1, A0, A1, R0, R1> AggregateOperation2<T0,T1,Tuple2<A0,A1>,Tuple2<R0,R1>> aggregateOperation2(@Nonnull AggregateOperation1<? super T0,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T1,A1,? extends R1> op1)
      Convenience for aggregateOperation2(aggrOp0, aggrOp1, finishFn) that outputs a Tuple2(result0, result1).
      Type Parameters:
      T0 - type of items in the first stage
      A0 - type of the first aggregate operation's accumulator
      R0 - type of the first aggregate operation's result
      T1 - type of items in the second stage
      A1 - type of the second aggregate operation's accumulator
      R1 - type of the second aggregate operation's result
      Parameters:
      op0 - the aggregate operation that will receive the first stage's input
      op1 - the aggregate operation that will receive the second stage's input
    • aggregateOperation3

      public static <T0, T1, T2, A0, A1, A2, R0, R1, R2, R> AggregateOperation3<T0,T1,T2,Tuple3<A0,A1,A2>,R> aggregateOperation3(@Nonnull AggregateOperation1<? super T0,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T1,A1,? extends R1> op1, @Nonnull AggregateOperation1<? super T2,A2,? extends R2> op2, @Nonnull TriFunction<? super R0,? super R1,? super R2,? extends R> exportFinishFn)
      Returns an aggregate operation that is a composite of three independent aggregate operations, each one accepting its own input. You need this kind of operation in the three-way co-aggregating pipeline stage: BatchStage.aggregate3(BatchStage, BatchStage, AggregateOperation3) stage0.aggregate3(stage1, stage2, compositeAggrOp)}. Before using this method, see if you can instead use stage0.aggregate3(aggrOp0, stage1, aggrOp1, stage2, aggrOp2) because it's simpler and doesn't require you to pre-compose the aggregate operations.

      This method is suitable when you can express your computation as three independent aggregate operations where you combine their final results. If you need an operation that combines the inputs in the accumulation phase, you can create an aggregate operation by specifying each primitive using the aggregate operation builder.

      As a quick example, let's say you have three data streams coming from an online store, consisting of user actions: page visits, add-to-cart actions and payments:

      
       BatchStage<PageVisit> pageVisits = pipeline.readFrom(pageVisitSource);
       BatchStage<AddToCart> addToCarts = pipeline.readFrom(addToCartSource);
       BatchStage<Payment> payments = pipeline.readFrom(paymentSource);
       
      We want to get these metrics per each user: how many page clicks they did before buying a product, and how many products they bought per purchase. We could do it like this:
      
       BatchStage<Entry<Integer, Tuple2<Double, Double>>> userStats = pageVisits
           .groupingKey(PageVisit::userId)
           .aggregate3(
               addToCarts.groupingKey(AddToCart::userId),
               payments.groupingKey(Payment::userId),
               aggregateOperation3(counting(), counting(), counting(),
                   (numPageVisits, numAddToCarts, numPayments) ->
                       tuple2(1.0 * numPageVisits / numPayments,
                              1.0 * numAddToCarts / numPayments
                       )
               ));
       
      Type Parameters:
      T0 - type of items in the first stage
      A0 - type of the first aggregate operation's accumulator
      R0 - type of the first aggregate operation's result
      T1 - type of items in the second stage
      A1 - type of the second aggregate operation's accumulator
      R1 - type of the second aggregate operation's result
      T2 - type of items in the third stage
      A2 - type of the third aggregate operation's accumulator
      R2 - type of the third aggregate operation's result
      R - type of the result
      Parameters:
      op0 - the aggregate operation that will receive the first stage's input
      op1 - the aggregate operation that will receive the second stage's input
      op2 - the aggregate operation that will receive the third stage's input
      exportFinishFn - the function that transforms the individual aggregate results into the overall result that the co-aggregating stage emits. It must be stateless and cooperative.
    • aggregateOperation3

      public static <T0, T1, T2, A0, A1, A2, R0, R1, R2> AggregateOperation3<T0,T1,T2,Tuple3<A0,A1,A2>,Tuple3<R0,R1,R2>> aggregateOperation3(@Nonnull AggregateOperation1<? super T0,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T1,A1,? extends R1> op1, @Nonnull AggregateOperation1<? super T2,A2,? extends R2> op2)
      Convenience for aggregateOperation3(aggrOp0, aggrOp1, aggrOp2, finishFn) that outputs a Tuple3(result0, result1, result2).
      Type Parameters:
      T0 - type of items in the first stage
      A0 - type of the first aggregate operation's accumulator
      R0 - type of the first aggregate operation's result
      T1 - type of items in the second stage
      A1 - type of the second aggregate operation's accumulator
      R1 - type of the second aggregate operation's result
      T2 - type of items in the third stage
      A2 - type of the third aggregate operation's accumulator
      R2 - type of the third aggregate operation's result
      Parameters:
      op0 - the aggregate operation that will receive the first stage's input
      op1 - the aggregate operation that will receive the second stage's input
      op2 - the aggregate operation that will receive the third stage's input
    • coAggregateOperationBuilder

      @Nonnull public static CoAggregateOperationBuilder coAggregateOperationBuilder()
      Returns a builder object that offers a step-by-step fluent API to create an aggregate operation that accepts multiple inputs. You must supply this kind of operation to a co-aggregating pipeline stage. You need this builder if you're using the stage.aggregateBuilder(). Before deciding to use it, consider using stage.aggregateBuilder(aggrOp0) because it will allow you to directly pass the aggregate operation for each joined stage, without requiring you to build a composite operation through this builder. Finally, if you're co-aggregating two or three streams, prefer the simpler and more type-safe variants: aggregateOperation2(com.hazelcast.jet.aggregate.AggregateOperation1<? super T0, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T1, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>) and aggregateOperation3(com.hazelcast.jet.aggregate.AggregateOperation1<? super T0, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T1, A1, ? extends R1>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T2, A2, ? extends R2>, com.hazelcast.jet.function.TriFunction<? super R0, ? super R1, ? super R2, ? extends R>).

      This builder is suitable when you can express your computation as independent aggregate operations on each input where you combine only their final results. If you need an operation that combines the inputs in the accumulation phase, you can create an aggregate operation by specifying each primitive using the aggregate operation builder.

      As a quick example, let's say you have two data streams coming from an online store, consisting of user actions: page visits and payments:

      
       BatchStage<PageVisit> pageVisits = pipeline.readFrom(pageVisitSource);
       BatchStage<Payment> payments = pipeline.readFrom(paymentSource);
       
      We want to find out how many page clicks each user did before buying a product, and we want to do it using stage.aggregateBuilder(). Note that there will be two builders at play: the stage builder, which joins the pipeline stages, and the aggregate operation builder (obtained from this method). First we obtain the stage builder and add our pipeline stages:
      
       GroupAggregateBuilder1<PageVisit, Long> stageBuilder =
               pageVisits.groupingKey(PageVisit::userId).aggregateBuilder();
       Tag<PageVisit> visitTag_in = stageBuilder.tag0();
       Tag<Payment> payTag_in = stageBuilder.add(payments.groupingKey(Payment::userId));
       
      Now we have the tags we need to build the aggregate operation, and while building it we get new tags to get the results of the operation:
      
       CoAggregateOperationBuilder opBuilder = coAggregateOperationBuilder();
       Tag<Long> visitTag = opBuilder.add(visitTag_in, counting());
       Tag<Long> payTag = opBuilder.add(payTag_in, counting());
       
      We use these tags in the exportFinishFn we specify at the end:
      
       AggregateOperation<Object[], Double> aggrOp =
               opBuilder.build(ibt -> 1.0 * ibt.get(visitTag) / ibt.get(payTag));
       
      And now we're ready to construct the output stage:
      
       BatchStage<Entry<Long, Double>> visitsPerPurchase = stageBuilder.build(aggrOp);
       
      The output stage's stream contains Map.Entrys where the key is the user ID and the value is the ratio of page visits to payments for that user.
    • toCollector

      @Nonnull public static <T, A, R> Collector<T,A,R> toCollector(AggregateOperation1<? super T,A,? extends R> aggrOp)
      Adapts this aggregate operation to a collector which can be passed to Stream.collect(Collector).

      This can be useful when you want to combine java.util.stream with Jet aggregations. For example, the below can be used to do multiple aggregations in a single pass over the same data set:

      
         Stream<Person> personStream = people.stream();
         personStream.collect(
           AggregateOperations.toCollector(
             AggregateOperations.allOf(
               AggregateOperations.counting(),
               AggregateOperations.averagingLong(p -> p.getAge())
             )
           )
         );
       
    • toAggregator

      @Nonnull public static <T, A, R> Aggregator<T,R> toAggregator(AggregateOperation1<? super T,A,? extends R> aggrOp)
      Adapts this aggregate operation to be used for IMap.aggregate(Aggregator) calls.

      Using IMap aggregations can be desirable when you want to make use of indices when doing aggregations and want to use the Jet aggregations API instead of writing a custom Aggregator.

      For example, the following aggregation can be used to group people by their age and find the counts for each group.

      
         IMap<Integer, Person> map = jet.getMap("people");
         Map<Integer, Long> counts = map.aggregate(
           AggregateOperations.toAggregator(
             AggregateOperations.groupingBy(
               e -> e.getValue().getGender(), AggregateOperations.counting()
             )
           )
         );