T0
- type of the stream-0 itemK
- type of the grouping keypublic class GroupAggregateBuilder<T0,K> extends Object
StageWithGrouping.aggregateBuilder()
on one of the
stages to co-group, then add the other stages by calling add(stage)
on the builder. Collect all the tags returned from add()
and use them when building the aggregate operation. Retrieve the
tag of the first stage (from which you obtained the builder) by calling
tag0()
.
This object is mainly intended to build a co-grouping of four or more
contributing stages. For up to three stages, prefer the direct stage.aggregateN(...)
calls because they offer more static type safety.
Modifier and Type | Method and Description |
---|---|
<T> Tag<T> |
add(StageWithGrouping<T,K> stage)
Adds another stage that will contribute its data to the aggregate
operation to be performed.
|
<A,R> BatchStage<Map.Entry<K,R>> |
build(AggregateOperation<A,R> aggrOp)
Convenience for
build(aggrOp, mapToOutputFn) which emits Map.Entry s as output. |
<A,R,OUT> BatchStage<OUT> |
build(AggregateOperation<A,R> aggrOp,
DistributedBiFunction<? super K,? super R,OUT> mapToOutputFn)
Creates and returns a pipeline stage that performs the
co-grouping and aggregation of pipeline stages registered with this
builder object.
|
Tag<T0> |
tag0()
Returns the tag corresponding to the pipeline stage this builder
was obtained from.
|
public Tag<T0> tag0()
AggregateOperation
that you'll pass to build(aggrOp)
.public <T> Tag<T> add(StageWithGrouping<T,K> stage)
AggregateOperation
that you'll pass to
build()
.public <A,R,OUT> BatchStage<OUT> build(@Nonnull AggregateOperation<A,R> aggrOp, @Nonnull DistributedBiFunction<? super K,? super R,OUT> mapToOutputFn)
StageWithGrouping<A, String> stage0 = batchStage0.groupingKey(A::key);
StageWithGrouping<B, String> stage1 = batchStage1.groupingKey(B::key);
StageWithGrouping<C, String> stage2 = batchStage2.groupingKey(C::key);
StageWithGrouping<D, String> stage3 = batchStage3.groupingKey(D::key);
AggregateBuilder<A> builder = stage0.aggregateBuilder();
Tag<A> tagA = builder.tag0();
Tag<B> tagB = builder.add(stage1, B::key);
Tag<C> tagC = builder.add(stage2, C::key);
Tag<D> tagD = builder.add(stage3, D::key);
BatchStage<Result> resultStage = builder.build(AggregateOperation
.withCreate(MyAccumulator::new)
.andAccumulate(tagA, MyAccumulator::put)
.andAccumulate(tagB, MyAccumulator::put)
.andAccumulate(tagC, MyAccumulator::put)
.andAccumulate(tagD, MyAccumulator::put)
.andCombine(MyAccumulator::combine)
.andFinish(MyAccumulator::finish),
(String key, String result) -> Util.entry(key, result)
);
A
- the type of items on the stage this builder was obtained fromR
- the type of the output itemaggrOp
- the aggregate operation to performpublic <A,R> BatchStage<Map.Entry<K,R>> build(@Nonnull AggregateOperation<A,R> aggrOp)
build(aggrOp, mapToOutputFn)
which emits Map.Entry
s as output.A
- the type of items in the pipeline stage this builder was obtained fromR
- the type of the aggregation resultaggrOp
- the aggregate operation to perform.Copyright © 2018 Hazelcast, Inc.. All rights reserved.