public final class AggregateOperations extends Object
AggregateOperation
. You can
also create your own aggregate operation using the builder object
.Modifier and Type | Method and Description |
---|---|
static <T0,T1,A0,A1,R0,R1> |
aggregateOperation2(AggregateOperation1<? super T0,A0,? extends R0> op0,
AggregateOperation1<? super T1,A1,? extends R1> op1)
Convenience for
aggregateOperation2(aggrOp0, aggrOp1, finishFn) that outputs a
Tuple2(result0, result1) . |
static <T0,A0,R0,T1,A1,R1,R> |
aggregateOperation2(AggregateOperation1<? super T0,A0,? extends R0> op0,
AggregateOperation1<? super T1,A1,? extends R1> op1,
BiFunctionEx<? super R0,? super R1,? extends R> exportFinishFn)
Returns an aggregate operation that is a composite of two independent
aggregate operations, each one accepting its own input.
|
static <T0,T1,T2,A0,A1,A2,R0,R1,R2> |
aggregateOperation3(AggregateOperation1<? super T0,A0,? extends R0> op0,
AggregateOperation1<? super T1,A1,? extends R1> op1,
AggregateOperation1<? super T2,A2,? extends R2> op2)
Convenience for
aggregateOperation3(aggrOp0, aggrOp1, aggrOp2, finishFn) that outputs a
Tuple3(result0, result1, result2) . |
static <T0,T1,T2,A0,A1,A2,R0,R1,R2,R> |
aggregateOperation3(AggregateOperation1<? super T0,A0,? extends R0> op0,
AggregateOperation1<? super T1,A1,? extends R1> op1,
AggregateOperation1<? super T2,A2,? extends R2> op2,
TriFunction<? super R0,? super R1,? super R2,? extends R> exportFinishFn)
Returns an aggregate operation that is a composite of three independent
aggregate operations, each one accepting its own input.
|
static <T,A0,A1,A2,R0,R1,R2> |
allOf(AggregateOperation1<? super T,A0,? extends R0> op0,
AggregateOperation1<? super T,A1,? extends R1> op1,
AggregateOperation1<? super T,A2,? extends R2> op2)
Convenience for
allOf(AggregateOperation1, AggregateOperation1,
AggregateOperation1, TriFunction) wrapping the three results in a
Tuple3 . |
static <T,A0,A1,A2,R0,R1,R2,R> |
allOf(AggregateOperation1<? super T,A0,? extends R0> op0,
AggregateOperation1<? super T,A1,? extends R1> op1,
AggregateOperation1<? super T,A2,? extends R2> op2,
TriFunction<? super R0,? super R1,? super R2,? extends R> exportFinishFn)
Returns an aggregate operation that is a composite of three aggregate
operations.
|
static <T,A0,A1,R0,R1,R> |
allOf(AggregateOperation1<? super T,A0,? extends R0> op0,
AggregateOperation1<? super T,A1,? extends R1> op1,
BiFunctionEx<? super R0,? super R1,? extends R> exportFinishFn)
Returns an aggregate operation that is a composite of two aggregate
operations.
|
static <T,A0,A1,R0,R1> |
allOf(AggregateOperation1<? super T,A0,R0> op1,
AggregateOperation1<? super T,A1,R1> op2)
Convenience for
allOf(AggregateOperation1, AggregateOperation1,
BiFunctionEx) wrapping the two results in a Tuple2 . |
static <T> AllOfAggregationBuilder<T> |
allOfBuilder()
Returns a builder object that helps you create a composite of multiple
aggregate operations.
|
static <T> AggregateOperation1<T,LongDoubleAccumulator,Double> |
averagingDouble(ToDoubleFunctionEx<? super T> getDoubleValueFn)
Returns an aggregate operation that finds the arithmetic mean (aka.
|
static <T> AggregateOperation1<T,LongLongAccumulator,Double> |
averagingLong(ToLongFunctionEx<? super T> getLongValueFn)
Returns an aggregate operation that finds the arithmetic mean (aka.
|
static <T> AggregateOperation1<T,PriorityQueue<T>,List<T>> |
bottomN(int n,
ComparatorEx<? super T> comparator)
Returns an aggregate operation that finds the bottom
n items
according to the given comparator . |
static CoAggregateOperationBuilder |
coAggregateOperationBuilder()
Returns a builder object that offers a step-by-step fluent API to create
an aggregate operation that accepts multiple inputs.
|
static AggregateOperation1<CharSequence,StringBuilder,String> |
concatenating()
Returns an aggregate operation that takes string items and concatenates
them into a single string.
|
static AggregateOperation1<CharSequence,StringBuilder,String> |
concatenating(CharSequence delimiter)
Returns an aggregate operation that takes string items and concatenates
them, separated by the given
delimiter , into a single string. |
static AggregateOperation1<CharSequence,StringBuilder,String> |
concatenating(CharSequence delimiter,
CharSequence prefix,
CharSequence suffix)
Returns an aggregate operation that takes string items and concatenates
them, separated by the given
delimiter , into a single string. |
static <T> AggregateOperation1<T,LongAccumulator,Long> |
counting()
Returns an aggregate operation that counts the items it observes.
|
static <T,A,R> AggregateOperation1<T,A,R> |
filtering(PredicateEx<? super T> filterFn,
AggregateOperation1<? super T,A,? extends R> downstream)
Adapts an aggregate operation so that it accumulates only the items
passing the
filterFn and ignores others. |
static <T,U,A,R> AggregateOperation1<T,A,R> |
flatMapping(FunctionEx<? super T,? extends Traverser<? extends U>> flatMapFn,
AggregateOperation1<? super U,A,? extends R> downstream)
Adapts an aggregate operation that takes items of type
U to one
that takes items of type T , by exploding each T into a
sequence of U s and then accumulating all of them. |
static <T,K> AggregateOperation1<T,Map<K,List<T>>,Map<K,List<T>>> |
groupingBy(FunctionEx<? super T,? extends K> keyFn)
Returns an aggregate operation that accumulates the items into a
HashMap where the key is the result of applying keyFn
and the value is a list of the items with that key. |
static <T,K,A,R> AggregateOperation1<T,Map<K,A>,Map<K,R>> |
groupingBy(FunctionEx<? super T,? extends K> keyFn,
AggregateOperation1<? super T,A,R> downstream)
Returns an aggregate operation that accumulates the items into a
HashMap where the key is the result of applying keyFn
and the value is the result of applying the downstream aggregate
operation to the items with that key. |
static <T,K,R,A,M extends Map<K,R>> |
groupingBy(FunctionEx<? super T,? extends K> keyFn,
SupplierEx<M> createMapFn,
AggregateOperation1<? super T,A,R> downstream)
Returns an
AggregateOperation1 that accumulates the items into a
Map (as obtained from createMapFn ) where the key is the
result of applying keyFn and the value is the result of
applying the downstream aggregate operation to the items with that key. |
static <T> AggregateOperation1<T,LinTrendAccumulator,Double> |
linearTrend(ToLongFunctionEx<T> getXFn,
ToLongFunctionEx<T> getYFn)
Returns an aggregate operation that computes a linear trend over the
items.
|
static <T,U,A,R> AggregateOperation1<T,A,R> |
mapping(FunctionEx<? super T,? extends U> mapFn,
AggregateOperation1<? super U,A,? extends R> downstream)
Adapts an aggregate operation that takes items of type
U to one
that takes items of type T , by applying the given mapping
function to each item. |
static <T> AggregateOperation1<T,MutableReference<T>,T> |
maxBy(ComparatorEx<? super T> comparator)
Returns an aggregate operation that computes the greatest item according
to the given
comparator . |
static <T> AggregateOperation1<T,MutableReference<T>,T> |
minBy(ComparatorEx<? super T> comparator)
Returns an aggregate operation that computes the least item according to
the given
comparator . |
static <T> AggregateOperation1<T,PickAnyAccumulator<T>,T> |
pickAny()
Returns an aggregate operation whose result is an arbitrary item it
observed, or
null if it observed no items. |
static <T,A> AggregateOperation1<T,MutableReference<A>,A> |
reducing(A emptyAccValue,
FunctionEx<? super T,? extends A> toAccValueFn,
BinaryOperatorEx<A> combineAccValuesFn,
BinaryOperatorEx<A> deductAccValueFn)
Returns an aggregate operation that constructs the result through the
process of immutable reduction:
The initial accumulated value is
emptyAccValue . |
static <T> AggregateOperation1<T,ArrayList<T>,List<T>> |
sorting(ComparatorEx<? super T> comparator)
Returns an aggregate operation that accumulates all input items into an
ArrayList and sorts it with the given comparator. |
static <T> AggregateOperation1<T,DoubleAccumulator,Double> |
summingDouble(ToDoubleFunctionEx<? super T> getDoubleValueFn)
Returns an aggregate operation that computes the sum of the
double values it obtains by applying getDoubleValueFn to each
item. |
static <T> AggregateOperation1<T,LongAccumulator,Long> |
summingLong(ToLongFunctionEx<? super T> getLongValueFn)
Returns an aggregate operation that computes the sum of the
long
values it obtains by applying getLongValueFn to each item. |
static <T,A,R> Aggregator<T,R> |
toAggregator(AggregateOperation1<? super T,A,? extends R> aggrOp)
Adapts this aggregate operation to be used for
IMap.aggregate(Aggregator)
calls. |
static <T,C extends Collection<T>> |
toCollection(SupplierEx<C> createCollectionFn)
Returns an aggregate operation that accumulates the items into a
Collection . |
static <T,A,R> Collector<T,A,R> |
toCollector(AggregateOperation1<? super T,A,? extends R> aggrOp)
Adapts this aggregate operation to a collector which can be passed to
Stream.collect(Collector) . |
static <T> AggregateOperation1<T,List<T>,List<T>> |
toList()
Returns an aggregate operation that accumulates the items into an
ArrayList . |
static <T,K,U> AggregateOperation1<T,Map<K,U>,Map<K,U>> |
toMap(FunctionEx<? super T,? extends K> keyFn,
FunctionEx<? super T,? extends U> valueFn)
Returns an aggregate operation that accumulates the items into a
HashMap whose keys and values are the result of applying the provided
mapping functions. |
static <T,K,U> AggregateOperation1<T,Map<K,U>,Map<K,U>> |
toMap(FunctionEx<? super T,? extends K> keyFn,
FunctionEx<? super T,? extends U> valueFn,
BinaryOperatorEx<U> mergeFn)
Returns an aggregate operation that accumulates the items into a
HashMap whose keys and values are the result of applying
the provided mapping functions. |
static <T,K,U,M extends Map<K,U>> |
toMap(FunctionEx<? super T,? extends K> keyFn,
FunctionEx<? super T,? extends U> valueFn,
BinaryOperatorEx<U> mergeFn,
SupplierEx<M> createMapFn)
Returns an aggregate operation that accumulates elements into a
user-supplied
Map instance. |
static <T> AggregateOperation1<T,PriorityQueue<T>,List<T>> |
topN(int n,
ComparatorEx<? super T> comparator)
Returns an aggregate operation that finds the top
n items
according to the given comparator . |
static <T> AggregateOperation1<T,Set<T>,Set<T>> |
toSet()
Returns an aggregate operation that accumulates the items into a
HashSet . |
@Nonnull public static <T> AggregateOperation1<T,LongAccumulator,Long> counting()
long
.
This sample takes a stream of words and finds the number of occurrences of each word in it:
BatchStage<String> words = pipeline.readFrom(wordSource);
BatchStage<Entry<String, Long>> wordFrequencies =
words.groupingKey(wholeItem()).aggregate(counting());
@Nonnull public static <T> AggregateOperation1<T,LongAccumulator,Long> summingLong(@Nonnull ToLongFunctionEx<? super T> getLongValueFn)
long
values it obtains by applying getLongValueFn
to each item.
This sample takes a stream of lines of text and outputs a single long
number telling how many words there were in the stream:
BatchStage<String> linesOfText = pipeline.readFrom(textSource);
BatchStage<Long> numberOfWordsInText =
linesOfText
.map(line -> line.split("\\W+"))
.aggregate(summingLong(wordsInLine -> wordsInLine.length));
Note: if the sum exceeds Long.MAX_VALUE
, the job
will fail with an ArithmeticException
.T
- type of the input itemgetLongValueFn
- function that extracts the long
values you
want to sum. It must be stateless and cooperative.@Nonnull public static <T> AggregateOperation1<T,DoubleAccumulator,Double> summingDouble(@Nonnull ToDoubleFunctionEx<? super T> getDoubleValueFn)
double
values it obtains by applying getDoubleValueFn
to each
item.
This sample takes a stream of purchase events and outputs a single
double
value that tells the total sum of money spent in
them:
BatchStage<Purchase> purchases = pipeline.readFrom(purchaseSource);
BatchStage<Double> purchaseVolume =
purchases.aggregate(summingDouble(Purchase::amount));
T
- type of the input itemgetDoubleValueFn
- function that extracts the double
values
you want to sum. It must be stateless and cooperative.@Nonnull public static <T> AggregateOperation1<T,MutableReference<T>,T> minBy(@Nonnull ComparatorEx<? super T> comparator)
comparator
.
This sample takes a stream of people and finds the youngest person in it:
BatchStage<Person> people = pipeline.readFrom(peopleSource);
BatchStage<Person> youngestPerson =
people.aggregate(minBy(ComparatorEx.comparing(Person::age)));
NOTE: if this aggregate operation doesn't observe any
items, its result will be null
. Since the non-keyed BatchStage.aggregate(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, ?, ? extends R>)
emits just the naked aggregation result, and since
a null
cannot travel through a Jet pipeline, you will not get
any output in that case.
If several items tie for the least one, this aggregate operation will choose any one to return and may choose a different one each time.
Implementation note: this aggregate operation does not
implement the deduct
primitive.
This has performance implications for sliding
window aggregation.
T
- type of the input itemcomparator
- comparator to compare the items. It must be stateless
and cooperative.@Nonnull public static <T> AggregateOperation1<T,MutableReference<T>,T> maxBy(@Nonnull ComparatorEx<? super T> comparator)
comparator
.
This sample takes a stream of people and finds the oldest person in it:
BatchStage<Person> people = pipeline.readFrom(peopleSource);
BatchStage<Person> oldestPerson =
people.aggregate(maxBy(ComparatorEx.comparing(Person::age)));
NOTE: if this aggregate operation doesn't observe any
items, its result will be null
. Since the non-keyed BatchStage.aggregate(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, ?, ? extends R>)
emits just the naked aggregation result, and since
a null
cannot travel through a Jet pipeline, you will not get
any output in that case.
If several items tie for the greatest one, this aggregate operation will choose any one to return and may choose a different one each time.
Implementation note: this aggregate operation does not
implement the deduct
primitive.
This has performance implications for sliding
window aggregation.
T
- type of the input itemcomparator
- comparator to compare the items. It must be stateless
and cooperative.@Nonnull public static <T> AggregateOperation1<T,PriorityQueue<T>,List<T>> topN(int n, @Nonnull ComparatorEx<? super T> comparator)
n
items
according to the given comparator
. It outputs a
sorted list with the top item in the first position.
This sample takes a stream of people and finds ten oldest persons in it:
BatchStage<Person> people = pipeline.readFrom(peopleSource);
BatchStage<List<Person>> oldestDudes =
people.aggregate(topN(10, ComparatorEx.comparing(Person::age)));
Implementation note: this aggregate operation does not
implement the deduct
primitive.
This has performance implications for sliding
window aggregation.T
- type of the input itemn
- number of top items to findcomparator
- compares the items. It must be stateless and
cooperative.@Nonnull public static <T> AggregateOperation1<T,PriorityQueue<T>,List<T>> bottomN(int n, @Nonnull ComparatorEx<? super T> comparator)
n
items
according to the given comparator
. It outputs a
sorted list with the bottom item in the first position.
This sample takes a stream of people and finds ten youngest persons in it:
BatchStage<Person> people = pipeline.readFrom(peopleSource);
BatchStage<List<Person>> youngestDudes =
people.aggregate(bottomN(10, ComparatorEx.comparing(Person::age)));
Implementation note: this aggregate operation does not
implement the deduct
primitive.
This has performance implications for sliding
window aggregation.T
- type of the input itemn
- number of bottom items to findcomparator
- compares the items. It must be stateless and
cooperative.@Nonnull public static <T> AggregateOperation1<T,LongLongAccumulator,Double> averagingLong(@Nonnull ToLongFunctionEx<? super T> getLongValueFn)
long
values it obtains by applying getLongValueFn
to each item. It outputs the result as a double
.
This sample takes a stream of people and finds their mean age:
BatchStage<Person> people = pipeline.readFrom(peopleSource);
BatchStage<Double> meanAge = people.aggregate(averagingLong(Person::age));
If the aggregate operation does not observe any input, its result is
NaN
.
NOTE: this operation accumulates the sum and the
count as separate long
variables and combines them at the end
into the mean value. If either of these variables exceeds Long.MAX_VALUE
, the job will fail with an ArithmeticException
.
T
- type of the input itemgetLongValueFn
- function that extracts the long
value from
the item. It must be stateless and cooperative.@Nonnull public static <T> AggregateOperation1<T,LongDoubleAccumulator,Double> averagingDouble(@Nonnull ToDoubleFunctionEx<? super T> getDoubleValueFn)
double
values it obtains by applying getDoubleValueFn
to each item. It outputs the result as a double
.
This sample takes a stream of people and finds their mean age:
BatchStage<Person> people = pipeline.readFrom(peopleSource);
BatchStage<Double> meanAge = people.aggregate(averagingDouble(Person::age));
If the aggregate operation does not observe any input, its result is
NaN
.
T
- type of the input itemgetDoubleValueFn
- function that extracts the double
value
from the item. It must be stateless and cooperative.@Nonnull public static <T> AggregateOperation1<T,LinTrendAccumulator,Double> linearTrend(@Nonnull ToLongFunctionEx<T> getXFn, @Nonnull ToLongFunctionEx<T> getYFn)
double
-valued coefficient that
approximates the rate of change of y
as a function of x
,
where x
and y
are long
quantities obtained
by applying the two provided functions to each item.
This sample takes an infinite stream of trade events and outputs the current rate of price change using a sliding window:
StreamStage<Trade> trades = pipeline
.readFrom(tradeSource)
.withTimestamps(Trade::getTimestamp, SECONDS.toMillis(1));
StreamStage<WindowResult<Double>> priceTrend = trades
.window(WindowDefinition.sliding(MINUTES.toMillis(5), SECONDS.toMillis(1)))
.aggregate(linearTrend(Trade::getTimestamp, Trade::getPrice));
With the trade price given in cents and the timestamp in milliseconds,
the output will be in cents per millisecond. Make sure you apply a
scaling factor if you want another, more natural unit of measure.
If this aggregate operation does not observe any input, its result is
NaN
.
T
- type of the input itemgetXFn
- a function to extract x from the input.
It must be stateless and cooperative.getYFn
- a function to extract y from the input.
It must be stateless and cooperative.public static AggregateOperation1<CharSequence,StringBuilder,String> concatenating()
This sample outputs a string that you get by reading down the first column of the input text:
BatchStage<String> linesOfText = pipeline.readFrom(textSource);
BatchStage<String> lineStarters = linesOfText
.map(line -> line.charAt(0))
.map(Object::toString)
.aggregate(concatenating());
public static AggregateOperation1<CharSequence,StringBuilder,String> concatenating(CharSequence delimiter)
delimiter
, into a single string.
This sample outputs a single line of text that contains all the upper-cased and title-cased words of the input text:
BatchStage<String> linesOfText = pipeline.readFrom(textSource);
BatchStage<String> upcaseWords = linesOfText
.map(line -> line.split("\\W+"))
.flatMap(Traversers::traverseArray)
.filter(word -> word.matches("\\p{Lu}.*"))
.aggregate(concatenating(" "));
public static AggregateOperation1<CharSequence,StringBuilder,String> concatenating(CharSequence delimiter, CharSequence prefix, CharSequence suffix)
delimiter
, into a single string.
The resulting string will start with the given prefix
and end
with the given suffix
.
This sample outputs a single item, a JSON array of all the upper-cased and title-cased words of the input text:
BatchStage<String> linesOfText = pipeline.readFrom(textSource);
BatchStage<String> upcaseWords = linesOfText
.map(line -> line.split("\\W+"))
.flatMap(Traversers::traverseArray)
.filter(word -> word.matches("\\p{Lu}.*"))
.aggregate(concatenating("['", "', '", "']"));
public static <T,U,A,R> AggregateOperation1<T,A,R> mapping(@Nonnull FunctionEx<? super T,? extends U> mapFn, @Nonnull AggregateOperation1<? super U,A,? extends R> downstream)
U
to one
that takes items of type T
, by applying the given mapping
function to each item. Normally you should just apply the mapping in a
stage before the aggregation, but this adapter is useful when
simultaneously performing several aggregate operations using allOf(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>)
.
In addition to mapping, you can apply filtering as well by returning
null
for an item you want filtered out.
This sample takes a stream of people and builds two sorted lists from it, one with all the names and one with all the surnames:
BatchStage<Person> people = pipeline.readFrom(peopleSource);
BatchStage<Tuple2<List<String>, List<String>>> sortedNames =
people.aggregate(allOf(
mapping(Person::getFirstName, sorting(ComparatorEx.naturalOrder())),
mapping(Person::getLastName, sorting(ComparatorEx.naturalOrder()))));
T
- type of the input itemU
- input type of the downstream aggregate operationA
- downstream operation's accumulator typeR
- downstream operation's result typemapFn
- the function to apply to the input items. It must be
stateless and cooperative.downstream
- the downstream aggregate operationfiltering(com.hazelcast.function.PredicateEx<? super T>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A, ? extends R>)
,
flatMapping(com.hazelcast.function.FunctionEx<? super T, ? extends com.hazelcast.jet.Traverser<? extends U>>, com.hazelcast.jet.aggregate.AggregateOperation1<? super U, A, ? extends R>)
public static <T,A,R> AggregateOperation1<T,A,R> filtering(@Nonnull PredicateEx<? super T> filterFn, @Nonnull AggregateOperation1<? super T,A,? extends R> downstream)
filterFn
and ignores others. Normally you should
just apply the filter in a stage before the aggregation, but this
adapter is useful when simultaneously performing several aggregate
operations using allOf(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>)
.
This sample takes a stream of people and outputs two numbers, the average height of kids and grown-ups:
BatchStage<Person> people = pipeline.readFrom(peopleSource);
BatchStage<Tuple2<Double, Double>> avgHeightByAge = people.aggregate(allOf(
filtering((Person p) -> p.getAge() < 18, averagingLong(Person::getHeight)),
filtering((Person p) -> p.getAge() >= 18, averagingLong(Person::getHeight))
));
T
- type of the input itemA
- downstream operation's accumulator typeR
- downstream operation's result typefilterFn
- the filtering function. It must be stateless and
cooperative.downstream
- the downstream aggregate operationmapping(com.hazelcast.function.FunctionEx<? super T, ? extends U>, com.hazelcast.jet.aggregate.AggregateOperation1<? super U, A, ? extends R>)
,
flatMapping(com.hazelcast.function.FunctionEx<? super T, ? extends com.hazelcast.jet.Traverser<? extends U>>, com.hazelcast.jet.aggregate.AggregateOperation1<? super U, A, ? extends R>)
public static <T,U,A,R> AggregateOperation1<T,A,R> flatMapping(@Nonnull FunctionEx<? super T,? extends Traverser<? extends U>> flatMapFn, @Nonnull AggregateOperation1<? super U,A,? extends R> downstream)
U
to one
that takes items of type T
, by exploding each T
into a
sequence of U
s and then accumulating all of them. Normally you
should just apply the flat-mapping in a stage before the aggregation,
but this adapter is useful when simultaneously performing several
aggregate operations using allOf(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>)
.
The traverser your function returns must be non-null and null-terminated.
This sample takes a stream of people and outputs two numbers, the mean age of all the people and the mean age of people listed as someone's kid:
BatchStage<Person> people = pipeline.readFrom(peopleSource);
people.aggregate(allOf(
averagingLong(Person::getAge),
flatMapping((Person p) -> traverseIterable(p.getChildren()),
averagingLong(Person::getAge))
));
T
- type of the input itemU
- input type of the downstream aggregate operationA
- downstream operation's accumulator typeR
- downstream operation's result typeflatMapFn
- the flat-mapping function to apply. It must be
stateless and cooperative.downstream
- the downstream aggregate operationmapping(com.hazelcast.function.FunctionEx<? super T, ? extends U>, com.hazelcast.jet.aggregate.AggregateOperation1<? super U, A, ? extends R>)
,
filtering(com.hazelcast.function.PredicateEx<? super T>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T, A, ? extends R>)
public static <T,C extends Collection<T>> AggregateOperation1<T,C,C> toCollection(@Nonnull SupplierEx<C> createCollectionFn)
Collection
. It creates empty, mutable collections as needed by calling
the provided createCollectionFn
.
This sample takes a stream of words and outputs a single sorted set of all the long words (above 5 letters):
BatchStage<String> words = pipeline.readFrom(wordSource);
BatchStage<SortedSet<String>> sortedLongWords = words
.filter(w -> w.length() > 5)
.aggregate(toCollection(TreeSet::new));
Note: if you use a collection that preserves the
insertion order, keep in mind that Jet doesn't aggregate the items in
any specified order.T
- type of the input itemC
- the type of the collectioncreateCollectionFn
- a Supplier
of empty, mutable Collection
s. It must be stateless and cooperative.public static <T> AggregateOperation1<T,List<T>,List<T>> toList()
ArrayList
.
This sample takes a stream of words and outputs a single list of all the long words (above 5 letters):
BatchStage<String> words = pipeline.readFrom(wordSource);
BatchStage<List<String>> longWords = words
.filter(w -> w.length() > 5)
.aggregate(toList());
Note: accumulating all the data into an in-memory list
shouldn't be your first choice in designing a pipeline. Consider
draining the result stream to a sink.T
- type of the input itempublic static <T> AggregateOperation1<T,Set<T>,Set<T>> toSet()
HashSet
.
This sample takes a stream of people and outputs a single set of all the distinct cities they live in:
pipeline.readFrom(personSource)
.map(Person::getCity)
.aggregate(toSet());
Note: accumulating all the data into an in-memory set
shouldn't be your first choice in designing a pipeline. Consider
draining the result stream to a sink.T
- type of the input itempublic static <T,K,U> AggregateOperation1<T,Map<K,U>,Map<K,U>> toMap(FunctionEx<? super T,? extends K> keyFn, FunctionEx<? super T,? extends U> valueFn)
HashMap
whose keys and values are the result of applying the provided
mapping functions.
This aggregate operation does not tolerate duplicate keys and will throw
an IllegalStateException
if it detects them. If your data
contains duplicates, use toMap(keyFn, valueFn, mergeFn)
.
The following sample takes a stream of sensor readings and outputs a single map {sensor ID -> reading}:
BatchStage<Map<String, Double>> readings = pipeline
.readFrom(sensorData)
.aggregate(toMap(
SensorReading::getSensorId,
SensorReading::getValue));
Note: accumulating all the data into an in-memory map
shouldn't be your first choice in designing a pipeline. Consider
draining the stream to a sink.T
- type of the input itemK
- type of the keyU
- type of the valuekeyFn
- a function to extract the key from the input item. It must
be stateless and cooperative.valueFn
- a function to extract the value from the input item. It
must be stateless and cooperative.toMap(FunctionEx, FunctionEx, BinaryOperatorEx)
,
toMap(FunctionEx, FunctionEx, BinaryOperatorEx, SupplierEx)
,
groupingBy(FunctionEx)
public static <T,K,U> AggregateOperation1<T,Map<K,U>,Map<K,U>> toMap(FunctionEx<? super T,? extends K> keyFn, FunctionEx<? super T,? extends U> valueFn, BinaryOperatorEx<U> mergeFn)
HashMap
whose keys and values are the result of applying
the provided mapping functions.
This aggregate operation resolves duplicate keys by applying mergeFn
to the conflicting values. mergeFn
will act upon the
values after valueFn
has already been applied.
The following sample takes a stream of sensor readings and outputs a single map {sensor ID -> reading}. Multiple readings from the same sensor get summed up:
BatchStage<Map<String, Double>> readings = pipeline
.readFrom(sensorData)
.aggregate(toMap(
SensorReading::getSensorId,
SensorReading::getValue,
Double::sum));
Note: accumulating all the data into an in-memory map
shouldn't be your first choice in designing a pipeline. Consider
draining the stream to a sink.
The given functions must be stateless and cooperative.
T
- type of the input itemK
- the type of keyU
- the output type of the value mapping functionkeyFn
- a function to extract the key from input itemvalueFn
- a function to extract value from input itemmergeFn
- the function used to resolve collisions between values associated
with the same key, will be passed to Map.merge(Object, Object,
java.util.function.BiFunction)
toMap(FunctionEx, FunctionEx)
,
toMap(FunctionEx, FunctionEx, BinaryOperatorEx, SupplierEx)
public static <T,K,U,M extends Map<K,U>> AggregateOperation1<T,M,M> toMap(FunctionEx<? super T,? extends K> keyFn, FunctionEx<? super T,? extends U> valueFn, BinaryOperatorEx<U> mergeFn, SupplierEx<M> createMapFn)
Map
instance. The keys and values are the result
of applying the provided mapping functions to the input elements.
This aggregate operation resolves duplicate keys by applying mergeFn
to the conflicting values. mergeFn
will act upon the
values after valueFn
has already been applied.
The following sample takes a stream of sensor readings and outputs a
single ObjectToLongHashMap
of {sensor ID -> reading}. Multiple
readings from the same sensor get summed up:
BatchStage<Map<String, Long>> readings = pipeline
.readFrom(sensorData)
.aggregate(toMap(
SensorReading::getSensorId,
SensorReading::getValue,
Long::sum,
ObjectToLongHashMap::new));
The given functions must be stateless and cooperative.
T
- type of the input itemK
- the output type of the key mapping functionU
- the output type of the value mapping functionM
- the type of the resulting Map
keyFn
- a function to extract the key from input itemvalueFn
- a function to extract value from input itemmergeFn
- a merge function, used to resolve collisions between
values associated with the same key, as supplied
to Map.merge(Object, Object,
java.util.function.BiFunction)
createMapFn
- a function which returns a new, empty Map
into
which the results will be insertedtoMap(FunctionEx, FunctionEx)
,
toMap(FunctionEx, FunctionEx, BinaryOperatorEx)
public static <T,K> AggregateOperation1<T,Map<K,List<T>>,Map<K,List<T>>> groupingBy(FunctionEx<? super T,? extends K> keyFn)
HashMap
where the key is the result of applying keyFn
and the value is a list of the items with that key.
This operation is primarily useful when you need a cascaded group-by where you further classify the members of each group by a secondary key.
This sample takes a stream of persons and classifies them first by country and then by gender. It outputs a stream of map entries where the key is the country and the value is a map from gender to the list of people of that gender from that country:
BatchStage<Person> people = pipeline.readFrom(personSource);
BatchStage<Entry<String, Map<String, List<Person>>>> byCountryAndGender =
people.groupingKey(Person::getCountry)
.aggregate(groupingBy(Person::getGender));
}
This aggregate operation has a similar effect to the dedicated groupingKey()
pipeline transform
so you may wonder why not use it in all cases, not just cascaded
grouping. To see the difference, check out these two snippets:
BatchStage<Person> people = pipeline.readFrom(personSource);
// Snippet 1
BatchStage<Entry<String, List<Person>>> byCountry1 =
people.groupingKey(Person::getCountry)
.aggregate(toList());
// Snippet 2
BatchStage<Map<String, List<Person>>> byCountry2 =
people.aggregate(groupingBy(Person::getCountry));
Notice that snippet 1 outputs a stream of map entries whereas
snippet 2 outputs a single map. To produce the single map,
Jet must do all the work on a single thread and hold all the data on a
single cluster member, so you lose the advantage of distributed
computation. By contrast, snippet 2 allows Jet to partition the input by
the grouping key and split the work across the cluster. This is why you
should prefer a groupingKey
stage if you have just one level of
grouping.T
- type of the input itemK
- the output type of the key mapping functionkeyFn
- a function to extract the key from input item. It must be
stateless and cooperative.groupingBy(FunctionEx, AggregateOperation1)
,
groupingBy(FunctionEx, SupplierEx, AggregateOperation1)
,
toMap(FunctionEx, FunctionEx)
public static <T,K,A,R> AggregateOperation1<T,Map<K,A>,Map<K,R>> groupingBy(FunctionEx<? super T,? extends K> keyFn, AggregateOperation1<? super T,A,R> downstream)
HashMap
where the key is the result of applying keyFn
and the value is the result of applying the downstream aggregate
operation to the items with that key.
This operation is primarily useful when you need a cascaded group-by
where you further classify the members of each group by a secondary key.
For the difference between this operation and the groupingKey()
pipeline transform,
see the documentation on groupingBy(keyFn)
.
This sample takes a stream of people, classifies them by country and gender, and reports the number of people in each category:
BatchStage<Person> people = pipeline.readFrom(personSource);
BatchStage<Entry<String, Map<String, Long>>> countByCountryAndGender =
people.groupingKey(Person::getCountry)
.aggregate(groupingBy(Person::getGender, counting()));
T
- type of the input itemK
- the output type of the key mapping functionR
- the type of the downstream aggregation resultA
- downstream aggregation's accumulator typekeyFn
- a function to extract the key from input item. It must be
stateless and cooperative.downstream
- the downstream aggregate operationgroupingBy(FunctionEx)
,
groupingBy(FunctionEx, SupplierEx, AggregateOperation1)
,
toMap(FunctionEx, FunctionEx)
public static <T,K,R,A,M extends Map<K,R>> AggregateOperation1<T,Map<K,A>,M> groupingBy(FunctionEx<? super T,? extends K> keyFn, SupplierEx<M> createMapFn, AggregateOperation1<? super T,A,R> downstream)
AggregateOperation1
that accumulates the items into a
Map
(as obtained from createMapFn
) where the key is the
result of applying keyFn
and the value is the result of
applying the downstream aggregate operation to the items with that key.
This operation is primarily useful when you need a cascaded group-by
where you further classify the members of each group by a secondary key.
For the difference between this operation and the groupingKey()
pipeline transform,
see the documentation on groupingBy(keyFn)
.
The following sample takes a stream of people, classifies them by country
and gender, and reports the number of people in each category. It uses
the EnumMap
to optimize memory usage:
BatchStage<Person> people = pipeline.readFrom(personSource);
BatchStage<Entry<String, Map<Gender, Long>>> countByCountryAndGender =
people.groupingKey(Person::getCountry)
.aggregate(groupingBy(
Person::getGender,
() -> new EnumMap<>(Gender.class),
counting()));
T
- type of the input itemK
- the output type of the key mapping functionR
- the type of the downstream aggregation resultA
- downstream aggregation's accumulator typeM
- output type of the resulting Map
keyFn
- a function to extract the key from input item. It must be
stateless and cooperative.createMapFn
- a function which returns a new, empty Map
into
which the results will be inserted. It must be stateless and cooperative.downstream
- the downstream aggregate operationgroupingBy(FunctionEx)
,
groupingBy(FunctionEx, AggregateOperation1)
,
toMap(FunctionEx, FunctionEx)
@Nonnull public static <T,A> AggregateOperation1<T,MutableReference<A>,A> reducing(@Nonnull A emptyAccValue, @Nonnull FunctionEx<? super T,? extends A> toAccValueFn, @Nonnull BinaryOperatorEx<A> combineAccValuesFn, @Nullable BinaryOperatorEx<A> deductAccValueFn)
emptyAccValue
.
newVal = combineAccValues(currVal, toAccValue(item))
combineAccValuesFn
must be associative because
it will also be used to combine partial results, as well as
commutative because the encounter order of items is
unspecified.
The optional deductAccValueFn
allows Jet to compute the sliding
window in O(1) time. It must undo the effects of a previous combineAccValuesFn
call:
A accVal; // has some pre-existing value A itemAccVal = toAccValueFn.apply(item); A combined = combineAccValuesFn.apply(accVal, itemAccVal); A deducted = deductAccValueFn.apply(combined, itemAccVal); assert deducted.equals(accVal);
This sample takes a stream of product orders and outputs a single long
number which is the sum total of all the ordered amounts. The
aggregate operation it implements is equivalent to summingLong(com.hazelcast.function.ToLongFunctionEx<? super T>)
:
BatchStage<Order> orders = pipeline.readFrom(orderSource);
BatchStage<Long> totalAmount = orders.aggregate(reducing(
0L,
Order::getAmount,
Math::addExact,
Math::subtractExact
));
The given functions must be stateless and cooperative.
T
- type of the input itemA
- type of the accumulated valueemptyAccValue
- the reducing operation's emptyAccValue elementtoAccValueFn
- transforms the stream item into its accumulated valuecombineAccValuesFn
- combines two accumulated values into onedeductAccValueFn
- deducts the right-hand accumulated value from the left-hand one
(optional)@Nonnull public static <T> AggregateOperation1<T,PickAnyAccumulator<T>,T> pickAny()
null
if it observed no items.
The implementation of StageWithWindow.distinct()
uses this
operation and, if needed, you can use it directly for the same purpose.
This sample takes a stream of people and outputs a stream of people that
have distinct last names (same as calling groupingKey(keyFn).distinct()
:
BatchStage<Person> people = pipeline.readFrom(peopleSource);
BatchStage<Entry<String, Person>> distinctByLastName =
people.groupingKey(Person::getLastName)
.aggregate(pickAny());
NOTE: if this aggregate operation doesn't observe any
items, its result will be null
. Since the non-keyed BatchStage.aggregate(com.hazelcast.jet.aggregate.AggregateOperation1<? super T, ?, ? extends R>)
emits just the naked aggregation result, and since
a null
cannot travel through a Jet pipeline, you will not get
any output in that case.T
- type of the input itempublic static <T> AggregateOperation1<T,ArrayList<T>,List<T>> sorting(@Nonnull ComparatorEx<? super T> comparator)
ArrayList
and sorts it with the given comparator. If you have
Comparable
items that you want to sort in their natural order, use
ComparatorEx.naturalOrder()
.
This sample takes a stream of people and outputs a single list of people sorted by their last name:
BatchStage<Person> people = pipeline.readFrom(peopleSource);
BatchStage<List<Person>> sorted = people.aggregate(
sorting(ComparatorEx.comparing(Person::getLastName)));
T
- the type of input itemscomparator
- the comparator to use for sorting. It must be
stateless and cooperative.@Nonnull public static <T,A0,A1,R0,R1,R> AggregateOperation1<T,Tuple2<A0,A1>,R> allOf(@Nonnull AggregateOperation1<? super T,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T,A1,? extends R1> op1, @Nonnull BiFunctionEx<? super R0,? super R1,? extends R> exportFinishFn)
This sample takes a stream of orders and outputs a single tuple containing the orders with the smallest and the largest amount:
BatchStage<Order> orders = pipeline.readFrom(orderSource);
BatchStage<Tuple2<Order, Order>> extremes = orders.aggregate(allOf(
minBy(ComparatorEx.comparing(Order::getAmount)),
maxBy(ComparatorEx.comparing(Order::getAmount)),
Tuple2::tuple2
));
T
- type of input itemsA0
- 1st accumulator typeA1
- 2nd accumulator typeR0
- 1st result typeR1
- 2nd result typeR
- final result typeop0
- 1st operationop1
- 2nd operationexportFinishFn
- function combining the two results into a single
target instance. It must be stateless and cooperative.@Nonnull public static <T,A0,A1,R0,R1> AggregateOperation1<T,Tuple2<A0,A1>,Tuple2<R0,R1>> allOf(@Nonnull AggregateOperation1<? super T,A0,R0> op1, @Nonnull AggregateOperation1<? super T,A1,R1> op2)
allOf(AggregateOperation1, AggregateOperation1,
BiFunctionEx)
wrapping the two results in a Tuple2
.@Nonnull public static <T,A0,A1,A2,R0,R1,R2,R> AggregateOperation1<T,Tuple3<A0,A1,A2>,R> allOf(@Nonnull AggregateOperation1<? super T,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T,A1,? extends R1> op1, @Nonnull AggregateOperation1<? super T,A2,? extends R2> op2, @Nonnull TriFunction<? super R0,? super R1,? super R2,? extends R> exportFinishFn)
This sample takes a stream of orders and outputs a single tuple containing the average amount ordered and the orders with the smallest and the largest amount:
BatchStage<Order> orders = pipeline.readFrom(orderSource);
BatchStage<Tuple3<Double, Order, Order>> averageAndExtremes =
orders.aggregate(allOf(
averagingLong(Order::getAmount),
minBy(ComparatorEx.comparing(Order::getAmount)),
maxBy(ComparatorEx.comparing(Order::getAmount)),
Tuple3::tuple3
));
T
- type of input itemsA0
- 1st accumulator typeA1
- 2nd accumulator typeA2
- 3rd accumulator typeR0
- 1st result typeR1
- 2nd result typeR2
- 3rd result typeR
- final result typeop0
- 1st operationop1
- 2nd operationop2
- 3rd operationexportFinishFn
- function combining the three results into a single
target instance. It must be stateless and cooperative.@Nonnull public static <T,A0,A1,A2,R0,R1,R2> AggregateOperation1<T,Tuple3<A0,A1,A2>,Tuple3<R0,R1,R2>> allOf(@Nonnull AggregateOperation1<? super T,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T,A1,? extends R1> op1, @Nonnull AggregateOperation1<? super T,A2,? extends R2> op2)
allOf(AggregateOperation1, AggregateOperation1,
AggregateOperation1, TriFunction)
wrapping the three results in a
Tuple3
.@Nonnull public static <T> AllOfAggregationBuilder<T> allOfBuilder()
ItemsByTag
object you'll get in the
output.
The builder object is primarily intended to build a composite of four or
more aggregate operations. For up to three operations, prefer the simpler
and more type-safe variants allOf(op1, op2)
and allOf(op1, op2, op3)
.
In the following sample we'll construct a composite aggregate operation that takes a stream of orders and finds the extremes in terms of ordered amount. Here's the input stage:
BatchStage<Order> orders = pipeline.readFrom(orderSource);
Now we construct the aggregate operation using the builder:
AllOfAggregationBuilder<Order> builder = allOfBuilder();
Tag<Order> minTag = builder.add(minBy(ComparatorEx.comparing(Order::getAmount)));
Tag<Order> maxTag = builder.add(maxBy(ComparatorEx.comparing(Order::getAmount)));
AggregateOperation1<Order, ?, ItemsByTag> aggrOp = builder.build();
Finally, we apply the aggregate operation and use the tags we got
above to extract the components:
BatchStage<ItemsByTag> extremes = orders.aggregate(aggrOp);
BatchStage<Tuple2<Order, Order>> extremesAsTuple =
extremes.map(ibt -> tuple2(ibt.get(minTag), ibt.get(maxTag)));
T
- type of input itemspublic static <T0,A0,R0,T1,A1,R1,R> AggregateOperation2<T0,T1,Tuple2<A0,A1>,R> aggregateOperation2(@Nonnull AggregateOperation1<? super T0,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T1,A1,? extends R1> op1, @Nonnull BiFunctionEx<? super R0,? super R1,? extends R> exportFinishFn)
BatchStage.aggregate2(BatchStage, AggregateOperation2)
stage0.aggregate2(stage1, compositeAggrOp)}. Before using this method,
see if you can instead use stage0.aggregate2(aggrOp0, stage1, aggrOp1)
because it's simpler and
doesn't require you to pre-compose the aggregate operations.
This method is suitable when you can express your computation as two independent aggregate operations where you combine their final results. If you need an operation that combines the two inputs in the accumulation phase, you can create an aggregate operation by specifying each primitive using the aggregate operation builder.
As a quick example, let's say you have two data streams coming from an online store, consisting of user actions: page visits and payments:
BatchStage<PageVisit> pageVisits = pipeline.readFrom(pageVisitSource);
BatchStage<Payment> payments = pipeline.readFrom(paymentSource);
We want to find out how many page clicks each user did before buying a
product. We can do it like this:
BatchStage<Entry<Long, Double>> visitsPerPurchase = pageVisits
.groupingKey(PageVisit::userId)
.aggregate2(
payments.groupingKey(Payment::userId),
aggregateOperation2(counting(), counting(),
(numPageVisits, numPayments) -> 1.0 * numPageVisits / numPayments
));
The output stage's stream contains a Map.Entry
where the key is
the user ID and the value is the ratio of page visits to payments for
that user.T0
- type of items in the first stageA0
- type of the first aggregate operation's accumulatorR0
- type of the first aggregate operation's resultT1
- type of items in the second stageA1
- type of the second aggregate operation's accumulatorR1
- type of the second aggregate operation's resultR
- type of the resultop0
- the aggregate operation that will receive the first stage's inputop1
- the aggregate operation that will receive the second stage's inputexportFinishFn
- the function that transforms the individual aggregate results into the
overall result that the co-aggregating stage emits. It must be stateless
and cooperative.public static <T0,T1,A0,A1,R0,R1> AggregateOperation2<T0,T1,Tuple2<A0,A1>,Tuple2<R0,R1>> aggregateOperation2(@Nonnull AggregateOperation1<? super T0,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T1,A1,? extends R1> op1)
aggregateOperation2(aggrOp0, aggrOp1, finishFn)
that outputs a
Tuple2(result0, result1)
.T0
- type of items in the first stageA0
- type of the first aggregate operation's accumulatorR0
- type of the first aggregate operation's resultT1
- type of items in the second stageA1
- type of the second aggregate operation's accumulatorR1
- type of the second aggregate operation's resultop0
- the aggregate operation that will receive the first stage's inputop1
- the aggregate operation that will receive the second stage's inputpublic static <T0,T1,T2,A0,A1,A2,R0,R1,R2,R> AggregateOperation3<T0,T1,T2,Tuple3<A0,A1,A2>,R> aggregateOperation3(@Nonnull AggregateOperation1<? super T0,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T1,A1,? extends R1> op1, @Nonnull AggregateOperation1<? super T2,A2,? extends R2> op2, @Nonnull TriFunction<? super R0,? super R1,? super R2,? extends R> exportFinishFn)
BatchStage.aggregate3(BatchStage, BatchStage, AggregateOperation3)
stage0.aggregate3(stage1, stage2, compositeAggrOp)}. Before using this
method, see if you can instead use stage0.aggregate3(aggrOp0, stage1,
aggrOp1, stage2, aggrOp2)
because it's simpler and doesn't require you
to pre-compose the aggregate operations.
This method is suitable when you can express your computation as three independent aggregate operations where you combine their final results. If you need an operation that combines the inputs in the accumulation phase, you can create an aggregate operation by specifying each primitive using the aggregate operation builder.
As a quick example, let's say you have three data streams coming from an online store, consisting of user actions: page visits, add-to-cart actions and payments:
BatchStage<PageVisit> pageVisits = pipeline.readFrom(pageVisitSource);
BatchStage<AddToCart> addToCarts = pipeline.readFrom(addToCartSource);
BatchStage<Payment> payments = pipeline.readFrom(paymentSource);
We want to get these metrics per each user: how many page clicks they
did before buying a product, and how many products they bought per
purchase. We could do it like this:
BatchStage<Entry<Integer, Tuple2<Double, Double>>> userStats = pageVisits
.groupingKey(PageVisit::userId)
.aggregate3(
addToCarts.groupingKey(AddToCart::userId),
payments.groupingKey(Payment::userId),
aggregateOperation3(counting(), counting(), counting(),
(numPageVisits, numAddToCarts, numPayments) ->
tuple2(1.0 * numPageVisits / numPayments,
1.0 * numAddToCarts / numPayments
)
));
T0
- type of items in the first stageA0
- type of the first aggregate operation's accumulatorR0
- type of the first aggregate operation's resultT1
- type of items in the second stageA1
- type of the second aggregate operation's accumulatorR1
- type of the second aggregate operation's resultT2
- type of items in the third stageA2
- type of the third aggregate operation's accumulatorR2
- type of the third aggregate operation's resultR
- type of the resultop0
- the aggregate operation that will receive the first stage's inputop1
- the aggregate operation that will receive the second stage's inputop2
- the aggregate operation that will receive the third stage's inputexportFinishFn
- the function that transforms the individual aggregate results into the
overall result that the co-aggregating stage emits. It must be stateless
and cooperative.public static <T0,T1,T2,A0,A1,A2,R0,R1,R2> AggregateOperation3<T0,T1,T2,Tuple3<A0,A1,A2>,Tuple3<R0,R1,R2>> aggregateOperation3(@Nonnull AggregateOperation1<? super T0,A0,? extends R0> op0, @Nonnull AggregateOperation1<? super T1,A1,? extends R1> op1, @Nonnull AggregateOperation1<? super T2,A2,? extends R2> op2)
aggregateOperation3(aggrOp0, aggrOp1, aggrOp2, finishFn)
that outputs a
Tuple3(result0, result1, result2)
.T0
- type of items in the first stageA0
- type of the first aggregate operation's accumulatorR0
- type of the first aggregate operation's resultT1
- type of items in the second stageA1
- type of the second aggregate operation's accumulatorR1
- type of the second aggregate operation's resultT2
- type of items in the third stageA2
- type of the third aggregate operation's accumulatorR2
- type of the third aggregate operation's resultop0
- the aggregate operation that will receive the first stage's inputop1
- the aggregate operation that will receive the second stage's inputop2
- the aggregate operation that will receive the third stage's input@Nonnull public static CoAggregateOperationBuilder coAggregateOperationBuilder()
stage.aggregateBuilder()
. Before deciding to use it, consider using
stage.aggregateBuilder(aggrOp0)
because it will allow you to directly
pass the aggregate operation for each joined stage, without requiring
you to build a composite operation through this builder. Finally, if
you're co-aggregating two or three streams, prefer the simpler and more
type-safe variants: aggregateOperation2(com.hazelcast.jet.aggregate.AggregateOperation1<? super T0, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T1, A1, ? extends R1>, com.hazelcast.function.BiFunctionEx<? super R0, ? super R1, ? extends R>)
and aggregateOperation3(com.hazelcast.jet.aggregate.AggregateOperation1<? super T0, A0, ? extends R0>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T1, A1, ? extends R1>, com.hazelcast.jet.aggregate.AggregateOperation1<? super T2, A2, ? extends R2>, com.hazelcast.jet.function.TriFunction<? super R0, ? super R1, ? super R2, ? extends R>)
.
This builder is suitable when you can express your computation as independent aggregate operations on each input where you combine only their final results. If you need an operation that combines the inputs in the accumulation phase, you can create an aggregate operation by specifying each primitive using the aggregate operation builder.
As a quick example, let's say you have two data streams coming from an online store, consisting of user actions: page visits and payments:
BatchStage<PageVisit> pageVisits = pipeline.readFrom(pageVisitSource);
BatchStage<Payment> payments = pipeline.readFrom(paymentSource);
We want to find out how many page clicks each user did before buying a
product, and we want to do it using stage.aggregateBuilder()
. Note that there will be two builders at play:
the stage builder, which joins the pipeline stages, and the
aggregate operation builder (obtained from this method). First
we obtain the stage builder and add our pipeline stages:
GroupAggregateBuilder1<PageVisit, Long> stageBuilder =
pageVisits.groupingKey(PageVisit::userId).aggregateBuilder();
Tag<PageVisit> visitTag_in = stageBuilder.tag0();
Tag<Payment> payTag_in = stageBuilder.add(payments.groupingKey(Payment::userId));
Now we have the tags we need to build the aggregate operation, and while
building it we get new tags to get the results of the operation:
CoAggregateOperationBuilder opBuilder = coAggregateOperationBuilder();
Tag<Long> visitTag = opBuilder.add(visitTag_in, counting());
Tag<Long> payTag = opBuilder.add(payTag_in, counting());
We use these tags in the exportFinishFn
we specify at the end:
AggregateOperation<Object[], Double> aggrOp =
opBuilder.build(ibt -> 1.0 * ibt.get(visitTag) / ibt.get(payTag));
And now we're ready to construct the output stage:
BatchStage<Entry<Long, Double>> visitsPerPurchase = stageBuilder.build(aggrOp);
The output stage's stream contains Map.Entry
s where the key is
the user ID and the value is the ratio of page visits to payments for
that user.@Nonnull public static <T,A,R> Collector<T,A,R> toCollector(AggregateOperation1<? super T,A,? extends R> aggrOp)
Stream.collect(Collector)
.
This can be useful when you want to combine java.util.stream with Jet aggregations. For example, the below can be used to do multiple aggregations in a single pass over the same data set:
Stream<Person> personStream = people.stream();
personStream.collect(
AggregateOperations.toCollector(
AggregateOperations.allOf(
AggregateOperations.counting(),
AggregateOperations.averagingLong(p -> p.getAge())
)
)
);
@Nonnull public static <T,A,R> Aggregator<T,R> toAggregator(AggregateOperation1<? super T,A,? extends R> aggrOp)
IMap.aggregate(Aggregator)
calls.
Using IMap
aggregations can be desirable when you want to make
use of indices when doing aggregations and
want to use the Jet aggregations API instead of writing a custom
Aggregator
.
For example, the following aggregation can be used to group people by their age and find the counts for each group.
IMap<Integer, Person> map = jet.getMap("people");
Map<Integer, Long> counts = map.aggregate(
AggregateOperations.toAggregator(
AggregateOperations.groupingBy(
e -> e.getValue().getGender(), AggregateOperations.counting()
)
)
);
Copyright © 2023 Hazelcast, Inc.. All rights reserved.