T
- type of the input itemK
- type of the keypublic interface BatchStageWithKey<T,K> extends GeneralStageWithKey<T,K>
Modifier and Type | Method and Description |
---|---|
default <R> BatchStage<Map.Entry<K,R>> |
aggregate(AggregateOperation1<? super T,?,? extends R> aggrOp)
Attaches a stage that performs the given group-and-aggregate operation.
|
<R,OUT> BatchStage<OUT> |
aggregate(AggregateOperation1<? super T,?,? extends R> aggrOp,
DistributedBiFunction<? super K,? super R,? extends OUT> mapToOutputFn)
Attaches a stage that performs the given group-and-aggregate operation.
|
default <T1,R0,R1> BatchStage<Map.Entry<K,Tuple2<R0,R1>>> |
aggregate2(AggregateOperation1<? super T,?,? extends R0> aggrOp0,
BatchStageWithKey<? extends T1,? extends K> stage1,
AggregateOperation1<? super T1,?,? extends R1> aggrOp1)
Attaches a stage that performs the given cogroup-and-aggregate
transformation of the items from both this stage and
stage1
you supply. |
default <T1,R0,R1,OUT> |
aggregate2(AggregateOperation1<? super T,?,R0> aggrOp0,
BatchStageWithKey<T1,? extends K> stage1,
AggregateOperation1<? super T1,?,R1> aggrOp1,
DistributedTriFunction<? super K,? super R0,? super R1,OUT> mapToOutputFn)
Attaches a stage that performs the given cogroup-and-aggregate
transformation of the items from both this stage and
stage1 you
supply. |
<T1,R,OUT> BatchStage<OUT> |
aggregate2(BatchStageWithKey<T1,? extends K> stage1,
AggregateOperation2<? super T,? super T1,?,? extends R> aggrOp,
DistributedBiFunction<? super K,? super R,? extends OUT> mapToOutputFn)
Attaches a stage that performs the given cogroup-and-aggregate operation
over the items from both this stage and
stage1 you supply. |
default <T1,R> BatchStage<Map.Entry<K,R>> |
aggregate2(BatchStageWithKey<T1,? extends K> stage1,
AggregateOperation2<? super T,? super T1,?,R> aggrOp)
Attaches a stage that performs the given cogroup-and-aggregate operation
over the items from both this stage and
stage1 you supply. |
default <T1,T2,R0,R1,R2> |
aggregate3(AggregateOperation1<? super T,?,? extends R0> aggrOp0,
BatchStageWithKey<T1,? extends K> stage1,
AggregateOperation1<? super T1,?,? extends R1> aggrOp1,
BatchStageWithKey<T2,? extends K> stage2,
AggregateOperation1<? super T2,?,? extends R2> aggrOp2)
Attaches a stage that performs the given cogroup-and-aggregate
transformation of the items from this stage as well as
stage1
and stage2 you supply. |
default <T1,T2,R0,R1,R2,OUT> |
aggregate3(AggregateOperation1<? super T,?,? extends R0> aggrOp0,
BatchStageWithKey<T1,? extends K> stage1,
AggregateOperation1<? super T1,?,? extends R1> aggrOp1,
BatchStageWithKey<T2,? extends K> stage2,
AggregateOperation1<? super T2,?,? extends R2> aggrOp2,
DistributedQuadFunction<? super K,? super R0,? super R1,? super R2,? extends OUT> mapToOutputFn)
Attaches a stage that performs the given cogroup-and-aggregate
transformation of the items from this stage as well as
stage1
and stage2 you supply. |
default <T1,T2,R> BatchStage<Map.Entry<K,R>> |
aggregate3(BatchStageWithKey<T1,? extends K> stage1,
BatchStageWithKey<T2,? extends K> stage2,
AggregateOperation3<? super T,? super T1,? super T2,?,? extends R> aggrOp)
Attaches a stage that performs the given cogroup-and-aggregate operation
over the items from this stage as well as
stage1 and stage2 you supply. |
<T1,T2,R,OUT> |
aggregate3(BatchStageWithKey<T1,? extends K> stage1,
BatchStageWithKey<T2,? extends K> stage2,
AggregateOperation3<? super T,? super T1,? super T2,?,R> aggrOp,
DistributedBiFunction<? super K,? super R,? extends OUT> mapToOutputFn)
Attaches a stage that performs the given cogroup-and-aggregate operation
over the items from this stage as well as
stage1 and stage2 you supply. |
default GroupAggregateBuilder1<T,K> |
aggregateBuilder()
Offers a step-by-step API to build a pipeline stage that co-aggregates
the data from several input stages.
|
default <R0> GroupAggregateBuilder<K,R0> |
aggregateBuilder(AggregateOperation1<? super T,?,? extends R0> aggrOp0)
Offers a step-by-step API to build a pipeline stage that co-aggregates
the data from several input stages.
|
BatchStage<T> |
distinct()
Attaches a stage that emits just the items that are distinct according
to the grouping key (no two items which map to the same key will be on
the output).
|
<C> BatchStage<T> |
filterUsingContext(ContextFactory<C> contextFactory,
DistributedTriPredicate<? super C,? super K,? super T> filterFn)
Attaches a filtering stage which applies the provided predicate function
to each input item to decide whether to pass the item to the output or
to discard it.
|
<C,R> BatchStage<R> |
flatMapUsingContext(ContextFactory<C> contextFactory,
DistributedTriFunction<? super C,? super K,? super T,? extends Traverser<? extends R>> flatMapFn)
Attaches a flat-mapping stage which applies the supplied function to
each input item independently and emits all the items from the
Traverser it returns as the output items. |
<C,R> BatchStage<R> |
mapUsingContext(ContextFactory<C> contextFactory,
DistributedTriFunction<? super C,? super K,? super T,? extends R> mapFn)
Attaches a mapping stage which applies the given function to each input
item independently and emits the function's result as the output item.
|
default <V,R> BatchStage<R> |
mapUsingIMap(IMap<K,V> iMap,
DistributedBiFunction<? super T,? super V,? extends R> mapFn)
Attaches a
GeneralStageWithKey.mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.DistributedTriFunction<? super C, ? super K, ? super T, ? extends R>) stage where the context is a
Hazelcast IMap . |
default <V,R> BatchStage<R> |
mapUsingIMap(String mapName,
DistributedBiFunction<? super T,? super V,? extends R> mapFn)
Attaches a
GeneralStageWithKey.mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.DistributedTriFunction<? super C, ? super K, ? super T, ? extends R>) stage where the context is a
Hazelcast IMap with the supplied name. |
default <R> BatchStage<Map.Entry<K,R>> |
rollingAggregate(AggregateOperation1<? super T,?,? extends R> aggrOp)
A shortcut for:
aggregateRolling(aggrOp, Util::entry) . |
<R,OUT> BatchStage<OUT> |
rollingAggregate(AggregateOperation1<? super T,?,? extends R> aggrOp,
DistributedBiFunction<? super K,? super R,? extends OUT> mapToOutputFn)
Attaches a rolling aggregation stage.
|
customTransform, keyFn
@Nonnull BatchStage<T> distinct()
@Nonnull default <V,R> BatchStage<R> mapUsingIMap(@Nonnull String mapName, @Nonnull DistributedBiFunction<? super T,? super V,? extends R> mapFn)
GeneralStageWithKey
GeneralStageWithKey.mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.DistributedTriFunction<? super C, ? super K, ? super T, ? extends R>)
stage where the context is a
Hazelcast IMap
with the supplied name. Jet will use the
specified key function to retrieve a value from
the map and pass it to the mapping function you supply, as the second
argument.
This stage is similar to stageWithoutKey.mapUsingIMap()
, but here Jet
knows the key and uses it to partition and distribute the input in order
to achieve data locality. The value it fetches from the IMap
is
stored on the cluster member where the processing takes place. However,
if the map doesn't use the default partitioning strategy, the data
locality will be broken.
mapUsingIMap
in interface GeneralStageWithKey<T,K>
V
- type of the value in the IMap
R
- type of the output itemmapName
- name of the IMap
mapFn
- the mapping function@Nonnull default <V,R> BatchStage<R> mapUsingIMap(@Nonnull IMap<K,V> iMap, @Nonnull DistributedBiFunction<? super T,? super V,? extends R> mapFn)
GeneralStageWithKey
GeneralStageWithKey.mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.DistributedTriFunction<? super C, ? super K, ? super T, ? extends R>)
stage where the context is a
Hazelcast IMap
. It is not necessarily the map you
provide here, but a map with the same name in the Jet cluster
that executes the pipeline. Jet will use the specified key function to retrieve a value from the map and pass it to
the mapping function you supply, as the second argument.
This stage is similar to stageWithoutKey.mapUsingIMap()
, but here Jet
knows the key and uses it to partition and distribute the input in order
to achieve data locality. The value it fetches from the IMap
is
stored on the cluster member where the processing takes place. However,
if the map doesn't use the default partitioning strategy, the data
locality will be broken.
mapUsingIMap
in interface GeneralStageWithKey<T,K>
V
- type of the value in the IMap
R
- type of the output itemiMap
- the IMap
to use as the contextmapFn
- the mapping function@Nonnull <C,R> BatchStage<R> mapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedTriFunction<? super C,? super K,? super T,? extends R> mapFn)
GeneralStageWithKey
contextFactory
. If the
mapping result is null
, it emits nothing. Therefore this stage
can be used to implement filtering semantics as well.
Jet uses the key-extracting function
specified on this
stage for partitioning: all the items with the same key will see the
same context instance (but note that the same instance serves many keys).
One case where this is useful is fetching data from an external system
because you can use a near-cache without duplicating the cached data.
mapUsingContext
in interface GeneralStageWithKey<T,K>
C
- type of context objectR
- the result type of the mapping functioncontextFactory
- the context factorymapFn
- a stateless mapping function@Nonnull <C> BatchStage<T> filterUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedTriPredicate<? super C,? super K,? super T> filterFn)
GeneralStageWithKey
contextFactory
.
Jet uses the key-extracting function
specified on this
stage for partitioning: all the items with the same key will see the
same context instance (but note that the same instance serves many keys).
One case where this is useful is fetching data from an external system
because you can use a near-cache without duplicating the cached data.
filterUsingContext
in interface GeneralStageWithKey<T,K>
C
- type of context objectcontextFactory
- the context factoryfilterFn
- a stateless filter predicate function@Nonnull <C,R> BatchStage<R> flatMapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedTriFunction<? super C,? super K,? super T,? extends Traverser<? extends R>> flatMapFn)
GeneralStageWithKey
Traverser
it returns as the output items. The traverser must
be null-terminated. The mapping function receives another
parameter, the context object, which Jet will create using the supplied
contextFactory
.
Jet uses the key-extracting function
specified on this
stage for partitioning: all the items with the same key will see the
same context instance (but note that the same instance serves many keys).
One case where this is useful is fetching data from an external system
because you can use a near-cache without duplicating the cached data.
flatMapUsingContext
in interface GeneralStageWithKey<T,K>
C
- type of context objectR
- type of the output itemscontextFactory
- the context factoryflatMapFn
- a stateless flatmapping function@Nonnull <R,OUT> BatchStage<OUT> rollingAggregate(@Nonnull AggregateOperation1<? super T,?,? extends R> aggrOp, @Nonnull DistributedBiFunction<? super K,? super R,? extends OUT> mapToOutputFn)
GeneralStageWithKey
{2, 7, 8, -5}
, the output will be {2,
9, 17, 12}
.
This stage is fault-tolerant and saves its state to the snapshot.
NOTE: if you plan to use an aggregate operation whose
result size grows with input size (such as toList
and your data
source is unbounded, carefully consider the memory demands this implies.
The result will keep growing forever.
rollingAggregate
in interface GeneralStageWithKey<T,K>
R
- type of the aggregate operation resultOUT
- type of the output itemaggrOp
- the aggregate operation to performmapToOutputFn
- function that transforms the key and the aggregation result into the
output item@Nonnull default <R> BatchStage<Map.Entry<K,R>> rollingAggregate(@Nonnull AggregateOperation1<? super T,?,? extends R> aggrOp)
GeneralStageWithKey
aggregateRolling(aggrOp, Util::entry)
.
rollingAggregate
in interface GeneralStageWithKey<T,K>
@Nonnull <R,OUT> BatchStage<OUT> aggregate(@Nonnull AggregateOperation1<? super T,?,? extends R> aggrOp, @Nonnull DistributedBiFunction<? super K,? super R,? extends OUT> mapToOutputFn)
mapToOutputFn
with each key and the associated aggregation result to create the items
to emit.R
- type of the aggregation resultOUT
- type of the output itemaggrOp
- the aggregate operation to performmapToOutputFn
- the function that creates the output itemAggregateOperations
@Nonnull default <R> BatchStage<Map.Entry<K,R>> aggregate(@Nonnull AggregateOperation1<? super T,?,? extends R> aggrOp)
Map.Entry
) for each distinct
key it observes in its input. The value is the result of the aggregate
operation across all the items with the given grouping key.R
- type of the aggregation resultaggrOp
- the aggregate operation to performAggregateOperations
@Nonnull <T1,R,OUT> BatchStage<OUT> aggregate2(@Nonnull BatchStageWithKey<T1,? extends K> stage1, @Nonnull AggregateOperation2<? super T,? super T1,?,? extends R> aggrOp, @Nonnull DistributedBiFunction<? super K,? super R,? extends OUT> mapToOutputFn)
stage1
you supply. For
each distinct grouping key it observes in the input, it performs the
supplied aggregate operation across all the items sharing that key. Once
it has received all the items, it calls the supplied mapToOutputFn
with each key and the associated aggregation result to create the items
to emit.
This variant requires you to provide a two-input aggregate operation
(refer to its Javadoc for a simple
example). If you can express your logic in terms of two single-input
aggregate operations, one for each input stream, then you should use
stage0.aggregate2(aggrOp0, stage1, aggrOp1)
because it offers a simpler
API and you can use the already defined single-input operations. Use
this variant only when you have the need to implement an aggregate
operation that combines the input streams into the same accumulator.
T1
- type of items in stage1
R
- type of the aggregation resultOUT
- type of the output itemaggrOp
- the aggregate operation to performmapToOutputFn
- the function that creates the output itemAggregateOperations
@Nonnull default <T1,R> BatchStage<Map.Entry<K,R>> aggregate2(@Nonnull BatchStageWithKey<T1,? extends K> stage1, @Nonnull AggregateOperation2<? super T,? super T1,?,R> aggrOp)
stage1
you supply. It
emits one key-value pair (in a Map.Entry
) for each distinct key
it observes in its input. The value is the result of the aggregate
operation across all the items with the given grouping key.
This variant requires you to provide a two-input aggregate operation
(refer to its Javadoc for a simple
example). If you can express your logic in terms of two single-input
aggregate operations, one for each input stream, then you should use
stage0.aggregate2(aggrOp0, stage1, aggrOp1)
because it offers a simpler
API and you can use the already defined single-input operations. Use
this variant only when you have the need to implement an aggregate
operation that combines the input streams into the same accumulator.
T1
- type of items in stage1
R
- type of the aggregation resultaggrOp
- the aggregate operation to performAggregateOperations
@Nonnull default <T1,R0,R1,OUT> BatchStage<OUT> aggregate2(@Nonnull AggregateOperation1<? super T,?,R0> aggrOp0, @Nonnull BatchStageWithKey<T1,? extends K> stage1, @Nonnull AggregateOperation1<? super T1,?,R1> aggrOp1, @Nonnull DistributedTriFunction<? super K,? super R0,? super R1,OUT> mapToOutputFn)
stage1
you
supply. For each distinct grouping key it observes in the input, it
performs the supplied aggregate operation across all the items sharing
that key. It performs the aggregation separately for each input stage:
aggrOp0
on this stage and aggrOp1
on stage1
.
Once it has received all the items, it calls the supplied mapToOutputFn
with each key and the associated aggregation results to
create the items to emit.R0
- type of the aggregation result for stream-0T1
- type of items in stage1
R1
- type of the aggregation result for stream-1OUT
- type of the output itemaggrOp0
- aggregate operation to perform on this stagestage1
- the other stageaggrOp1
- aggregate operation to perform on the other stagemapToOutputFn
- the function that creates the output itemAggregateOperations
@Nonnull default <T1,R0,R1> BatchStage<Map.Entry<K,Tuple2<R0,R1>>> aggregate2(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull BatchStageWithKey<? extends T1,? extends K> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1)
stage1
you supply. For each distinct grouping key it observes in the input, it
performs the supplied aggregate operation across all the items sharing
that key. It performs the aggregation separately for each input stage:
aggrOp0
on this stage and aggrOp1
on stage1
.
Once it has received all the items, it emits for each distinct key a
Map.Entry(key, Tuple2(result0, result1))
.R0
- type of the aggregation result for stream-0T1
- type of items in stage1
R1
- type of the aggregation result for stream-1aggrOp0
- aggregate operation to perform on this stagestage1
- the other stageaggrOp1
- aggregate operation to perform on the other stageAggregateOperations
@Nonnull <T1,T2,R,OUT> BatchStage<OUT> aggregate3(@Nonnull BatchStageWithKey<T1,? extends K> stage1, @Nonnull BatchStageWithKey<T2,? extends K> stage2, @Nonnull AggregateOperation3<? super T,? super T1,? super T2,?,R> aggrOp, @Nonnull DistributedBiFunction<? super K,? super R,? extends OUT> mapToOutputFn)
stage1
and stage2
you supply. For each distinct grouping key it observes in the
input, it performs the supplied aggregate operation across all the items
sharing that key. Once it has received all the items, it calls the
supplied mapToOutputFn
with each key and the associated
aggregation result to create the items to
emit.
This variant requires you to provide a three-input aggregate operation
(refer to its Javadoc for a simple
example). If you can express your logic in terms of three single-input
aggregate operations, one for each input stream, then you should use
stage0.aggregate2(aggrOp0, stage1, aggrOp1, stage2, aggrOp2)
because it
offers a simpler API and you can use the already defined single-input
operations. Use this variant only when you have the need to implement an
aggregate operation that combines the input streams into the same
accumulator.
T1
- type of items in stage1
T2
- type of items in stage2
R
- type of the aggregation resultOUT
- type of the output itemaggrOp
- the aggregate operation to performmapToOutputFn
- the function that creates the output itemAggregateOperations
@Nonnull default <T1,T2,R> BatchStage<Map.Entry<K,R>> aggregate3(@Nonnull BatchStageWithKey<T1,? extends K> stage1, @Nonnull BatchStageWithKey<T2,? extends K> stage2, @Nonnull AggregateOperation3<? super T,? super T1,? super T2,?,? extends R> aggrOp)
stage1
and stage2
you supply. It emits one key-value pair (in a Map.Entry
)
for each distinct key it observes in its input. The value is the result
of the aggregate operation across all the items with the given grouping
key.
This variant requires you to provide a three-input aggregate operation
(refer to its Javadoc for a simple
example). If you can express your logic in terms of three single-input
aggregate operations, one for each input stream, then you should use
stage0.aggregate2(aggrOp0, stage1, aggrOp1, stage2, aggrOp2)
because it
offers a simpler API and you can use the already defined single-input
operations. Use this variant only when you have the need to implement an
aggregate operation that combines the input streams into the same
accumulator.
T1
- type of items in stage1
T2
- type of items in stage2
R
- type of the aggregation resultaggrOp
- the aggregate operation to performAggregateOperations
@Nonnull default <T1,T2,R0,R1,R2,OUT> BatchStage<OUT> aggregate3(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull BatchStageWithKey<T1,? extends K> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1, @Nonnull BatchStageWithKey<T2,? extends K> stage2, @Nonnull AggregateOperation1<? super T2,?,? extends R2> aggrOp2, @Nonnull DistributedQuadFunction<? super K,? super R0,? super R1,? super R2,? extends OUT> mapToOutputFn)
stage1
and stage2
you supply. For each distinct grouping key it
observes in the input, it performs the supplied aggregate operation
across all the items sharing that key. It performs the aggregation
separately for each input stage: aggrOp0
on this stage, aggrOp1
on stage1
and aggrOp2
on stage2
. Once
it has received all the items, it calls the supplied mapToOutputFn
with each key and the associated aggregation results to
create the items to emit.T1
- type of items in stage1
T2
- type of items in stage2
R0
- type of the aggregation result for stream-0R1
- type of the aggregation result for stream-1R2
- type of the aggregation result for stream-2OUT
- type of the output itemaggrOp0
- aggregate operation to perform on this stagestage1
- the first additional stageaggrOp1
- aggregate operation to perform on stage1
stage2
- the second additional stageaggrOp2
- aggregate operation to perform on stage2
mapToOutputFn
- the function that creates the output itemAggregateOperations
@Nonnull default <T1,T2,R0,R1,R2> BatchStage<Map.Entry<K,Tuple3<R0,R1,R2>>> aggregate3(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull BatchStageWithKey<T1,? extends K> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1, @Nonnull BatchStageWithKey<T2,? extends K> stage2, @Nonnull AggregateOperation1<? super T2,?,? extends R2> aggrOp2)
stage1
and stage2
you supply. For each distinct grouping key it
observes in the input, it performs the supplied aggregate operation
across all the items sharing that key. It performs the aggregation
separately for each input stage: aggrOp0
on this stage, aggrOp1
on stage1
and aggrOp2
on stage2
. Once
it has received all the items, it emits for each distinct key a Map.Entry(key, Tuple3(result0, result1, result2))
.T1
- type of items in stage1
T2
- type of items in stage2
R0
- type of the aggregation result for stream-0R1
- type of the aggregation result for stream-1R2
- type of the aggregation result for stream-2aggrOp0
- aggregate operation to perform on this stagestage1
- the first additional stageaggrOp1
- aggregate operation to perform on stage1
stage2
- the second additional stageaggrOp2
- aggregate operation to perform on stage2
AggregateOperations
@Nonnull default <R0> GroupAggregateBuilder<K,R0> aggregateBuilder(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0)
Map.Entry(key, itemsByTag)
. Use the tag
you get from builder.add(stageN)
to
retrieve the aggregated result for that stage. Use builder.tag0()
as the tag of this stage. You
will also be able to supply a function to the builder that immediately
transforms the ItemsByTag
to the desired output type.
This example draws from three sources that produce Map.Entry<String, Long>
. It groups by entry key and then counts the
items in stage-0, sums those in stage-1 and takes the average of those
in stage-2:
Pipeline p = Pipeline.create();
StageWithGrouping<Entry<String, Long>, String> stage0 =
p.drawFrom(source0).groupingKey(Entry::getKey);
StageWithGrouping<Entry<String, Long>, String> stage1 =
p.drawFrom(source1).groupingKey(Entry::getKey);
StageWithGrouping<Entry<String, Long>, String> stage2 =
p.drawFrom(source2).groupingKey(Entry::getKey);
GroupAggregateBuilder<String, Long> b = stage0.aggregateBuilder(
AggregateOperations.counting());
Tag<Long> tag0 = b.tag0();
Tag<Long> tag1 = b.add(stage1,
AggregateOperations.summingLong(Entry::getValue));
Tag<Double> tag2 = b.add(stage2,
AggregateOperations.averagingLong(Entry::getValue));
BatchStage<Entry<String, ItemsByTag>> aggregated = b.build();
aggregated.map(e -> String.format(
"Key %s, count of stage0: %d, sum of stage1: %d, average of stage2: %f",
e.getKey(),
e.getValue().get(tag0), e.getValue().get(tag1), e.getValue().get(tag2))
);
@Nonnull default GroupAggregateBuilder1<T,K> aggregateBuilder()
This builder requires you to provide a multi-input aggregate operation.
If you can express your logic in terms of single-input aggregate
operations, one for each input stream, then you should use stage0.aggregateBuilder(aggrOp0)
because it offers a simpler API. Use this builder only when you have the
need to implement an aggregate operation that combines all the input
streams into the same accumulator.
This builder is mainly intended to build a co-aggregation of four or
more contributing stages. For up to three stages, prefer the direct
stage.aggregateN(...)
calls because they offer more static type
safety.
To add the other stages, call add(stage)
. Collect all the tags returned from add()
and use
them when building the aggregate operation. Retrieve the tag of the
first stage (from which you obtained this builder) by calling GroupAggregateBuilder1.tag0()
.
This example takes three streams of Map.Entry<String, Long>
and,
for each string key, counts the distinct Long
values across all
input streams:
Pipeline p = Pipeline.create();
StageWithGrouping<Entry<String, Long>, String> stage0 =
p.drawFrom(source0).groupingKey(Entry::getKey);
StageWithGrouping<Entry<String, Long>, String> stage1 =
p.drawFrom(source1).groupingKey(Entry::getKey);
StageWithGrouping<Entry<String, Long>, String> stage2 =
p.drawFrom(source2).groupingKey(Entry::getKey);
GroupAggregateBuilder1<Entry<String, Long>, String> b = stage0.aggregateBuilder();
Tag<Entry<String, Long>> tag0 = b.tag0();
Tag<Entry<String, Long>> tag1 = b.add(stage1);
Tag<Entry<String, Long>> tag2 = b.add(stage2);
BatchStage<Entry<String, Integer>> aggregated = b.build(AggregateOperation
.withCreate(HashSet<Long>::new)
.andAccumulate(tag0, (acc, item) -> acc.add(item.getValue()))
.andAccumulate(tag1, (acc, item) -> acc.add(item.getValue()))
.andAccumulate(tag2, (acc, item) -> acc.add(item.getValue()))
.andCombine(HashSet::addAll)
.andFinish(HashSet::size));
Copyright © 2018 Hazelcast, Inc.. All rights reserved.