T0
- the type of the stream-0 itempublic class WindowAggregateBuilder<T0> extends Object
StageWithWindow.aggregateBuilder()
on
one of the stages to co-aggregate, then add the other stages by calling
add(stage)
on the builder. Collect all the tags returned
from add()
and use them when building the aggregate operation.
Retrieve the tag of the first stage (from which you obtained the
builder) by calling tag0()
.
This object is mainly intended to build a co-aggregation of four or more
contributing stages. For up to three stages, prefer the direct stage.aggregateN(...)
calls because they offer more static type safety.
Modifier and Type | Method and Description |
---|---|
<E> Tag<E> |
add(StreamStage<E> stage)
Adds another stage that will contribute its data to the aggregate
operation to be performed.
|
<A,R> StreamStage<TimestampedItem<R>> |
build(AggregateOperation<A,R> aggrOp)
Convenience for
build(aggrOp, mapToOutputFn) which emits TimestampedItem s as output. |
<A,R,OUT> StreamStage<OUT> |
build(AggregateOperation<A,R> aggrOp,
WindowResultFunction<? super R,? extends OUT> mapToOutputFn)
Creates and returns a pipeline stage that performs a windowed
co-aggregation of the pipeline stages registered with this builder
object.
|
Tag<T0> |
tag0()
Returns the tag corresponding to the pipeline stage this builder
was obtained from.
|
public Tag<T0> tag0()
AggregateOperation
that you'll pass to build(aggrOp)
.public <E> Tag<E> add(StreamStage<E> stage)
AggregateOperation
that you'll pass to
build()
.public <A,R,OUT> StreamStage<OUT> build(@Nonnull AggregateOperation<A,R> aggrOp, @Nonnull WindowResultFunction<? super R,? extends OUT> mapToOutputFn)
StageWithWindow<A> stage0 = streamStage0.window(...);
StreamStage<B> stage1 = p.drawFrom(Sources.mapJournal("b", ...));
StreamStage<C> stage2 = p.drawFrom(Sources.mapJournal("c", ...));
StreamStage<D> stage3 = p.drawFrom(Sources.mapJournal("d", ...));
WindowAggregateBuilder<A> builder = stage0.aggregateBuilder();
Tag<A> tagA = builder.tag0();
Tag<B> tagB = builder.add(stage1);
Tag<C> tagC = builder.add(stage2);
Tag<D> tagD = builder.add(stage3);
StreamStage<TimestampedItem<Result>> = builder.build(AggregateOperation
.withCreate(MyAccumulator::new)
.andAccumulate(tagA, MyAccumulator::put)
.andAccumulate(tagB, MyAccumulator::put)
.andAccumulate(tagC, MyAccumulator::put)
.andAccumulate(tagD, MyAccumulator::put)
.andCombine(MyAccumulator::combine)
.andFinish(MyAccumulator::finish));
A
- the type of items in the pipeline stage this builder was obtained fromR
- the type of the aggregation resultOUT
- the type of the output itemaggrOp
- the aggregate operation to performmapToOutputFn
- a function that creates the output item from the aggregation resultpublic <A,R> StreamStage<TimestampedItem<R>> build(@Nonnull AggregateOperation<A,R> aggrOp)
build(aggrOp, mapToOutputFn)
which emits TimestampedItem
s as output.
The timestamp corresponds to the window's end.A
- the type of items in the pipeline stage this builder was obtained fromR
- the type of the aggregation resultaggrOp
- the aggregate operation to perform.Copyright © 2018 Hazelcast, Inc.. All rights reserved.