T
- type of the input itempublic interface StageWithWindow<T>
Modifier and Type | Method and Description |
---|---|
<R> StreamStage<WindowResult<R>> |
aggregate(AggregateOperation1<? super T,?,? extends R> aggrOp)
Attaches a stage that performs the given aggregate operation over all
the items that belong to a given window.
|
default <T1,R0,R1> StreamStage<WindowResult<Tuple2<R0,R1>>> |
aggregate2(AggregateOperation1<? super T,?,? extends R0> aggrOp0,
StreamStage<T1> stage1,
AggregateOperation1<? super T1,?,? extends R1> aggrOp1)
Attaches a stage that performs the given co-aggregate operations over
the items from this stage and
stage1 you supply. |
<T1,R> StreamStage<WindowResult<R>> |
aggregate2(StreamStage<T1> stage1,
AggregateOperation2<? super T,? super T1,?,? extends R> aggrOp)
Attaches a stage that performs the given aggregate operation over all
the items that belong to the same window.
|
default <T1,T2,R0,R1,R2> |
aggregate3(AggregateOperation1<? super T,?,? extends R0> aggrOp0,
StreamStage<T1> stage1,
AggregateOperation1<? super T1,?,? extends R1> aggrOp1,
StreamStage<T2> stage2,
AggregateOperation1<? super T2,?,? extends R2> aggrOp2)
Attaches a stage that performs the given aggregate operation over all
the items that belong to the same window.
|
<T1,T2,R> StreamStage<WindowResult<R>> |
aggregate3(StreamStage<T1> stage1,
StreamStage<T2> stage2,
AggregateOperation3<? super T,? super T1,? super T2,?,? extends R> aggrOp)
Attaches a stage that performs the given aggregate operation over the
items it receives from this stage as well as
stage1 and stage2 you supply. |
default WindowAggregateBuilder1<T> |
aggregateBuilder()
Offers a step-by-step API to build a pipeline stage that co-aggregates
the data from several input stages.
|
default <R0> WindowAggregateBuilder<R0> |
aggregateBuilder(AggregateOperation1<? super T,?,? extends R0> aggrOp)
Offers a step-by-step API to build a pipeline stage that co-aggregates
the data from several input stages.
|
default StreamStage<WindowResult<T>> |
distinct()
Attaches a stage that passes through just the items that are distinct
within their window (no two items emitted for a window are equal).
|
<K> StageWithKeyAndWindow<T,K> |
groupingKey(FunctionEx<? super T,? extends K> keyFn)
Specifies the function that will extract the grouping key from the
items in the associated pipeline stage and moves on to the step in
which you'll complete the construction of a windowed group-and-aggregate
stage.
|
StreamStage<T> |
streamStage()
Returns the pipeline stage associated with this object.
|
WindowDefinition |
windowDefinition()
Returns the definition of the window for the windowed aggregation
operation that you are about to construct using this object.
|
@Nonnull StreamStage<T> streamStage()
@Nonnull WindowDefinition windowDefinition()
@Nonnull <K> StageWithKeyAndWindow<T,K> groupingKey(@Nonnull FunctionEx<? super T,? extends K> keyFn)
Note: make sure the extracted key is not-null, it would fail the
job otherwise. Also make sure that it implements equals()
and
hashCode()
.
K
- type of the keykeyFn
- function that extracts the grouping key. It must be
stateless and cooperative.@Nonnull default StreamStage<WindowResult<T>> distinct()
WindowResult(windowEnd, distinctItem)
.@Nonnull <R> StreamStage<WindowResult<R>> aggregate(@Nonnull AggregateOperation1<? super T,?,? extends R> aggrOp)
WindowResult
with the result of the aggregate
operation and the timestamp denoting the window's ending time.
Sample usage:
StreamStage<WindowResult<Long>> aggregated = pageVisits
.window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
.aggregate(AggregateOperations.counting());
R
- the type of the resultaggrOp
- the aggregate operation to performAggregateOperations
@Nonnull <T1,R> StreamStage<WindowResult<R>> aggregate2(@Nonnull StreamStage<T1> stage1, @Nonnull AggregateOperation2<? super T,? super T1,?,? extends R> aggrOp)
stage1
. Once a given window is complete, it
invokes mapToOutputFn
with the result of the aggregate
operation and emits its return value as the window result.
Sample usage:
StreamStage<WindowResult<Tuple2<Long, Long>>> aggregated = pageVisits
.window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
.aggregate2(
addToCarts,
AggregateOperations.aggregateOperation2(
AggregateOperations.counting(),
AggregateOperations.counting())
);
This variant requires you to provide a two-input aggregate operation
(refer to its Javadoc for a simple
example). If you can express your logic in terms of two single-input
aggregate operations, one for each input stream, then you should use
stage0.aggregate2(aggrOp0, stage1, aggrOp1)
because it offers a simpler
API and you can use the already defined single-input operations. Use
this variant only when you have the need to implement an aggregate
operation that combines the input streams into the same accumulator.
The aggregating stage emits a single item for each completed window.
T1
- type of items in stage1
R
- type of the aggregation resultaggrOp
- the aggregate operation to performAggregateOperations
@Nonnull default <T1,R0,R1> StreamStage<WindowResult<Tuple2<R0,R1>>> aggregate2(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull StreamStage<T1> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1)
stage1
you supply. It performs
the aggregation separately for each input stage: aggrOp0
on this
stage and aggrOp1
on stage1
. Once it has received all
the items belonging to a window, it emits a WindowResult(Tuple2(result0, result1))
.
The aggregating stage emits a single item for each completed window.
Sample usage:
StreamStage<WindowResult<Tuple2<Long, Long>>> aggregated = pageVisits
.window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
.aggregate2(
AggregateOperations.counting(),
addToCarts,
AggregateOperations.counting()
);
T1
- type of the items in the other stageR0
- type of the aggregated result for this stageR1
- type of the aggregated result for the other stageaggrOp0
- aggregate operation to perform on this stagestage1
- the other stageaggrOp1
- aggregate operation to perform on the other stageAggregateOperations
@Nonnull <T1,T2,R> StreamStage<WindowResult<R>> aggregate3(@Nonnull StreamStage<T1> stage1, @Nonnull StreamStage<T2> stage2, @Nonnull AggregateOperation3<? super T,? super T1,? super T2,?,? extends R> aggrOp)
stage1
and stage2
you supply. Once a given window is complete, it emits a WindowResult
with the result of the aggregate operation and the
timestamp denoting the window's ending time.
Sample usage:
StreamStage<WindowResult<Tuple3<Long, Long, Long>>> aggregated = pageVisits
.window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
.aggregate3(
addToCarts,
payments,
AggregateOperations.aggregateOperation3(
AggregateOperations.counting(),
AggregateOperations.counting(),
AggregateOperations.counting())
);
This variant requires you to provide a three-input aggregate operation
(refer to its Javadoc for a simple
example). If you can express your logic in terms of three single-input
aggregate operations, one for each input stream, then you should use
stage0.aggregate2(aggrOp0, stage1, aggrOp1, stage2, aggrOp2)
because it
offers a simpler API and you can use the already defined single-input
operations. Use this variant only when you have the need to implement an
aggregate operation that combines the input streams into the same
accumulator.T1
- type of items in stage1
T2
- type of items in stage2
R
- type of the resultstage1
- the first additional stagestage2
- the second additional stageaggrOp
- the aggregate operation to performAggregateOperations
@Nonnull default <T1,T2,R0,R1,R2> StreamStage<WindowResult<Tuple3<R0,R1,R2>>> aggregate3(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull StreamStage<T1> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1, @Nonnull StreamStage<T2> stage2, @Nonnull AggregateOperation1<? super T2,?,? extends R2> aggrOp2)
stage1
. It performs the aggregation
separately for each input stage: aggrOp0
on this stage, aggrOp1
on stage1
and aggrOp2
on stage2
. Once
it has received all the items belonging to a window, it emits a WindowResult(Tuple3(result0, result1, result2))
.
The aggregating stage emits a single item for each completed window.
Sample usage:
StreamStage<WindowResult<Tuple3<Long, Long, Long>>> aggregated = pageVisits
.window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
.aggregate3(
AggregateOperations.counting(),
addToCarts,
AggregateOperations.counting(),
payments,
AggregateOperations.counting()
);
T1
- type of items in stage1
T2
- type of items in stage2
R0
- type of the result from stream-0R1
- type of the result from stream-1R2
- type of the result from stream-2aggrOp0
- aggregate operation to perform on this stagestage1
- the first additional stageaggrOp1
- aggregate operation to perform on stage1
stage2
- the second additional stageaggrOp2
- aggregate operation to perform on stage2
AggregateOperations
@Nonnull default <R0> WindowAggregateBuilder<R0> aggregateBuilder(AggregateOperation1<? super T,?,? extends R0> aggrOp)
WindowResult(windowEnd, itemsByTag)
.
Use the tag you get from builder.add(stageN,
aggrOpN)
to retrieve the aggregated result for that stage. Use builder.tag0()
as the tag of this stage. You
will also be able to supply a function to the builder that immediately
transforms the results to the desired output type.
This builder is mainly intended to build a co-aggregation of four or
more contributing stages. For up to three stages, prefer the direct
stage.aggregateN(...)
calls because they offer more static type
safety.
This example defines a 1-second sliding window and counts the items in stage-0, sums those in stage-1 and takes the average of those in stage-2:
Pipeline p = Pipeline.create();
StreamStage<Long> stage0 = p.readFrom(source0).withNativeTimestamps(0L);;
StreamStage<Long> stage1 = p.readFrom(source1).withNativeTimestamps(0L);;
StreamStage<Long> stage2 = p.readFrom(source2).withNativeTimestamps(0L);;
WindowAggregateBuilder<Long> b = stage0
.window(sliding(1000, 10))
.aggregateBuilder(AggregateOperations.counting());
Tag<Long> tag0 = b.tag0();
Tag<Long> tag1 = b.add(stage1,
AggregateOperations.summingLong(Long::longValue));
Tag<Double> tag2 = b.add(stage2,
AggregateOperations.averagingLong(Long::longValue));
StreamStage<WindowResult<ItemsByTag>> aggregated = b.build();
aggregated.map(e -> String.format(
"Timestamp %d, count of stage0: %d, sum of stage1: %d, average of stage2: %f",
e.timestamp(), e.item().get(tag0), e.item().get(tag1), e.item().get(tag2))
);
@Nonnull default WindowAggregateBuilder1<T> aggregateBuilder()
This builder requires you to provide a multi-input aggregate operation.
If you can express your logic in terms of single-input aggregate
operations, one for each input stream, then you should use stage0.aggregateBuilder(aggrOp0)
because it offers a simpler API. Use this builder only when you have the
need to implement an aggregate operation that combines all the input
streams into the same accumulator.
This builder is mainly intended to build a co-aggregation of four or
more contributing stages. For up to three stages, prefer the direct
stage.aggregateN(...)
calls because they offer more static type
safety.
To add the other stages, call add(stage)
. Collect all the tags returned from add()
and use
them when building the aggregate operation. Retrieve the tag of the
first stage (from which you obtained this builder) by calling WindowAggregateBuilder1.tag0()
.
This example takes three streams of strings, specifies a 1-second sliding window and counts the distinct strings across all streams:
Pipeline p = Pipeline.create();
StreamStage<String> stage0 = p.readFrom(source0).withNativeTimestamps(0L);;
StreamStage<String> stage1 = p.readFrom(source1).withNativeTimestamps(0L);;
StreamStage<String> stage2 = p.readFrom(source2).withNativeTimestamps(0L);;
WindowAggregateBuilder1<String> b = stage0
.window(sliding(1000, 10))
.aggregateBuilder();
Tag<String> tag0 = b.tag0();
Tag<String> tag1 = b.add(stage1);
Tag<String> tag2 = b.add(stage2);
StreamStage<WindowResult<Integer>> aggregated = b.build(AggregateOperation
.withCreate(HashSet<String>::new)
.andAccumulate(tag0, (acc, item) -> acc.add(item))
.andAccumulate(tag1, (acc, item) -> acc.add(item))
.andAccumulate(tag2, (acc, item) -> acc.add(item))
.andCombine(HashSet::addAll)
.andExportFinish(HashSet::size));
Copyright © 2023 Hazelcast, Inc.. All rights reserved.