Interface StageWithKeyAndWindow<T,K>
- Type Parameters:
T
- type of the input itemK
- type of the key
- Since:
- Jet 3.0
-
Method Summary
Modifier and TypeMethodDescription<R> StreamStage<KeyedWindowResult<K,
R>> aggregate
(AggregateOperation1<? super T, ?, ? extends R> aggrOp) Attaches a stage that performs the given group-and-aggregate operation.default <T1,
R0, R1> StreamStage<KeyedWindowResult<K, Tuple2<R0, R1>>> aggregate2
(AggregateOperation1<? super T, ?, ? extends R0> aggrOp0, StreamStageWithKey<T1, ? extends K> stage1, AggregateOperation1<? super T1, ?, ? extends R1> aggrOp1) Attaches a stage that performs the given cogroup-and-aggregate operation over the items from both this stage andstage1
you supply.<T1,
R> StreamStage<KeyedWindowResult<K, R>> aggregate2
(StreamStageWithKey<T1, ? extends K> stage1, AggregateOperation2<? super T, ? super T1, ?, ? extends R> aggrOp) Attaches a stage that performs the given cogroup-and-aggregate operation over the items from both this stage andstage1
you supply.default <T1,
T2, R0, R1, R2>
StreamStage<KeyedWindowResult<K,Tuple3<R0, R1, R2>>> aggregate3
(AggregateOperation1<? super T, ?, ? extends R0> aggrOp0, StreamStageWithKey<T1, ? extends K> stage1, AggregateOperation1<? super T1, ?, ? extends R1> aggrOp1, StreamStageWithKey<T2, ? extends K> stage2, AggregateOperation1<? super T2, ?, ? extends R2> aggrOp2) Attaches a stage that performs the given cogroup-and-aggregate operation over the items from both this stage andstage1
you supply.<T1,
T2, R> StreamStage<KeyedWindowResult<K, R>> aggregate3
(StreamStageWithKey<T1, ? extends K> stage1, StreamStageWithKey<T2, ? extends K> stage2, AggregateOperation3<? super T, ? super T1, ? super T2, ?, ? extends R> aggrOp) Attaches a stage that performs the given cogroup-and-aggregate operation over the items from this stage as well asstage1
andstage2
you supply.default WindowGroupAggregateBuilder1<T,
K> Offers a step-by-step API to build a pipeline stage that co-aggregates the data from several input stages.default <R0> WindowGroupAggregateBuilder<K,
R0> aggregateBuilder
(AggregateOperation1<? super T, ?, ? extends R0> aggrOp0) Offers a step-by-step API to build a pipeline stage that co-aggregates the data from several input stages.default StreamStage<KeyedWindowResult<K,
T>> distinct()
Attaches a stage that passes through just the items that are distinct within their window according to the grouping key (no two items emitted for a window map to the same key).FunctionEx<? super T,
? extends K> keyFn()
Returns the function that extracts the grouping key from stream items.Returns the definition of the window for the windowed aggregation operation that you are about to construct using this object.
-
Method Details
-
keyFn
Returns the function that extracts the grouping key from stream items. This function will be used in the aggregating stage you are about to construct using this object. -
windowDefinition
Returns the definition of the window for the windowed aggregation operation that you are about to construct using this object. -
distinct
Attaches a stage that passes through just the items that are distinct within their window according to the grouping key (no two items emitted for a window map to the same key). There is no guarantee which one of the items with the same key will pass through.- Returns:
- the newly attached stage
-
aggregate
@Nonnull <R> StreamStage<KeyedWindowResult<K,R>> aggregate(@Nonnull AggregateOperation1<? super T, ?, ? extends R> aggrOp) Attaches a stage that performs the given group-and-aggregate operation. It emits one key-value pair (in aKeyedWindowResult
) for each distinct key it observes in its input belonging to a given window. The value is the result of the aggregate operation across all the items with the given grouping key.Sample usage:
StreamStage<KeyedWindowResult<Long, Long>> aggregated = pageVisits .window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1))) .groupingKey(PageVisit::getUserId) .aggregate(AggregateOperations.counting());
- Type Parameters:
R
- type of the aggregation result- Parameters:
aggrOp
- the aggregate operation to perform- See Also:
-
aggregate2
@Nonnull <T1,R> StreamStage<KeyedWindowResult<K,R>> aggregate2(@Nonnull StreamStageWithKey<T1, ? extends K> stage1, @Nonnull AggregateOperation2<? super T, ? super T1, ?, ? extends R> aggrOp) Attaches a stage that performs the given cogroup-and-aggregate operation over the items from both this stage andstage1
you supply. It emits one key-value pair (in aKeyedWindowResult
) for each distinct key it observes in the input belonging to a given window. The value is the result of the aggregate operation across all the items with the given grouping key.Sample usage:
This variant requires you to provide a two-input aggregate operation (refer to its Javadoc for a simple example). If you can express your logic in terms of two single-input aggregate operations, one for each input stream, then you should useStreamStage<KeyedWindowResult<Long, Tuple2<Long, Long>>> aggregated = pageVisits .window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1))) .groupingKey(PageVisit::getUserId) .aggregate2( addToCarts.groupingKey(AddToCart::getUserId), AggregateOperations.aggregateOperation2( AggregateOperations.counting(), AggregateOperations.counting()) );
stage0.aggregate2(aggrOp0, stage1, aggrOp1)
because it offers a simpler API and you can use the already defined single-input operations. Use this variant only when you have the need to implement an aggregate operation that combines the input streams into the same accumulator.- Type Parameters:
T1
- type of items instage1
R
- type of the aggregation result- Parameters:
stage1
- the other stageaggrOp
- the aggregate operation to perform- See Also:
-
aggregate2
@Nonnull default <T1,R0, StreamStage<KeyedWindowResult<K,R1> Tuple2<R0, aggregate2R1>>> (@Nonnull AggregateOperation1<? super T, ?, ? extends R0> aggrOp0, @Nonnull StreamStageWithKey<T1, ? extends K> stage1, @Nonnull AggregateOperation1<? super T1, ?, ? extends R1> aggrOp1) Attaches a stage that performs the given cogroup-and-aggregate operation over the items from both this stage andstage1
you supply. For each distinct grouping key it observes in the input belonging to a given window, it performs the supplied aggregate operation across all the items sharing that key. It performs the aggregation separately for each input stage:aggrOp0
on this stage andaggrOp1
onstage1
. Once it has received all the items belonging to a window, it emits for each distinct key aKeyedWindowResult(key, Tuple2(result0, result1))
.Sample usage:
StreamStage<KeyedWindowResult<Long, Tuple2<Long, Long>>> aggregated = pageVisits .window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1))) .groupingKey(PageVisit::getUserId) .aggregate2( AggregateOperations.counting(), addToCarts.groupingKey(AddToCart::getUserId), AggregateOperations.counting() );
- Type Parameters:
T1
- type of the items in the other stageR0
- type of the aggregated result for this stageR1
- type of the aggregated result for the other stage- Parameters:
aggrOp0
- aggregate operation to perform on this stagestage1
- the other stageaggrOp1
- aggregate operation to perform on the other stage- See Also:
-
aggregate3
@Nonnull <T1,T2, StreamStage<KeyedWindowResult<K,R> R>> aggregate3(@Nonnull StreamStageWithKey<T1, ? extends K> stage1, @Nonnull StreamStageWithKey<T2, ? extends K> stage2, @Nonnull AggregateOperation3<? super T, ? super T1, ? super T2, ?, ? extends R> aggrOp) Attaches a stage that performs the given cogroup-and-aggregate operation over the items from this stage as well asstage1
andstage2
you supply. For each distinct grouping key it observes in the input belonging to a given window, it performs the supplied aggregate operation across all the items sharing that key. Once it has received all the items belonging to a window, it emits for each distinct key aKeyedWindowResult(key, Tuple3(result0, result1, result2))
.Sample usage: StreamStage<KeyedWindowResult<Long, Tuple3<Long, Long, Long>>> aggregated = pageVisits .window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1))) .groupingKey(PageVisit::getUserId) .aggregate3( addToCarts.groupingKey(AddToCart::getUserId), payments.groupingKey(Payment::getUserId), AggregateOperations.aggregateOperation3( AggregateOperations.counting(), AggregateOperations.counting(), AggregateOperations.counting()) );
This variant requires you to provide a three-input aggregate operation (refer to its Javadoc for a simple example). If you can express your logic in terms of three single-input aggregate operations, one for each input stream, then you should usestage0.aggregate2(aggrOp0, stage1, aggrOp1, stage2, aggrOp2)
because it offers a simpler API and you can use the already defined single-input operations. Use this variant only when you have the need to implement an aggregate operation that combines the input streams into the same accumulator.- Type Parameters:
T1
- type of items instage1
T2
- type of items instage2
R
- type of the aggregation result- Parameters:
stage1
- the first additional stagestage2
- the second additional stageaggrOp
- the aggregate operation to perform- See Also:
-
aggregate3
@Nonnull default <T1,T2, StreamStage<KeyedWindowResult<K,R0, R1, R2> Tuple3<R0, aggregate3R1, R2>>> (@Nonnull AggregateOperation1<? super T, ?, ? extends R0> aggrOp0, @Nonnull StreamStageWithKey<T1, ? extends K> stage1, @Nonnull AggregateOperation1<? super T1, ?, ? extends R1> aggrOp1, @Nonnull StreamStageWithKey<T2, ? extends K> stage2, @Nonnull AggregateOperation1<? super T2, ?, ? extends R2> aggrOp2) Attaches a stage that performs the given cogroup-and-aggregate operation over the items from both this stage andstage1
you supply. For each distinct grouping key it observes in the input belonging to a given window, it performs the supplied aggregate operation across all the items sharing that key. It performs the aggregation separately for each input stage:aggrOp0
on this stage,aggrOp1
onstage1
andaggrOp2
onstage2
. Once it has received all the items, it calls the suppliedmapToOutputFn
with each key and the associated aggregation result to create the items to emit.Sample usage:
StreamStage<KeyedWindowResult<Long, Tuple3<Long, Long, Long>>> aggregated = pageVisits .window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1))) .groupingKey(PageVisit::getUserId) .aggregate3( AggregateOperations.counting(), addToCarts.groupingKey(AddToCart::getUserId), AggregateOperations.counting(), payments.groupingKey(Payment::getUserId), AggregateOperations.counting() );
- Type Parameters:
T1
- type of the items instage1
T2
- type of the items instage2
R0
- type of the aggregated result for this stageR1
- type of the aggregated result forstage1
R2
- type of the aggregated result forstage2
- Parameters:
aggrOp0
- aggregate operation to perform on this stagestage1
- the first additional stageaggrOp1
- aggregate operation to perform onstage1
stage2
- the second additional stageaggrOp2
- aggregate operation to perform onstage2
- See Also:
-
aggregateBuilder
@Nonnull default <R0> WindowGroupAggregateBuilder<K,R0> aggregateBuilder(@Nonnull AggregateOperation1<? super T, ?, ? extends R0> aggrOp0) Offers a step-by-step API to build a pipeline stage that co-aggregates the data from several input stages. The current stage will be already registered with the builder you get. You supply an aggregate operation for each input stage and in the output you get the individual aggregation results asKeyedWindowResult(key, itemsByTag)
. Use the tag you get frombuilder.add(stageN, aggrOpN)
to retrieve the aggregated result for that stage. Usebuilder.tag0()
as the tag of this stage.This builder is mainly intended to build a co-aggregation of four or more contributing stages. For up to three stages, prefer the direct
stage.aggregateN(...)
calls because they offer more static type safety.This example reads from three stream sources that produce
Map.Entry<String, Long>
. It groups by entry key, defines a 1-second sliding window and then counts the items in stage-0, sums those in stage-1 and takes the average of those in stage-2:Pipeline p = Pipeline.create(); StreamStageWithKey<Entry<String, Long>, String> stage0 = p.readFrom(source0).withNativeTimestamps(0L) .groupingKey(Entry::getKey); StreamStageWithKey<Entry<String, Long>, String> stage1 = p.readFrom(source1).withNativeTimestamps(0L) .groupingKey(Entry::getKey); StreamStageWithKey<Entry<String, Long>, String> stage2 = p.readFrom(source2).withNativeTimestamps(0L) .groupingKey(Entry::getKey); WindowGroupAggregateBuilder<String, Long> b = stage0 .window(sliding(1000, 10)) .aggregateBuilder(AggregateOperations.counting()); Tag<Long> tag0 = b.tag0(); Tag<Long> tag1 = b.add(stage1, AggregateOperations.summingLong(Entry::getValue)); Tag<Double> tag2 = b.add(stage2, AggregateOperations.averagingLong(Entry::getValue)); StreamStage<KeyedWindowResult<String, ItemsByTag>> aggregated = b.build(); aggregated.map(e -> String.format( "Key %s, count of stage0: %d, sum of stage1: %d, average of stage2: %f", e.getKey(), e.getValue().get(tag0), e.getValue().get(tag1), e.getValue().get(tag2)) );
-
aggregateBuilder
Offers a step-by-step API to build a pipeline stage that co-aggregates the data from several input stages. This stage will be already registered with the builder you get.This builder requires you to provide a multi-input aggregate operation. If you can express your logic in terms of single-input aggregate operations, one for each input stream, then you should use
stage0.aggregateBuilder(aggrOp0)
because it offers a simpler API. Use this builder only when you have the need to implement an aggregate operation that combines all the input streams into the same accumulator.This builder is mainly intended to build a co-aggregation of four or more contributing stages. For up to three stages, prefer the direct
stage.aggregateN(...)
calls because they offer more static type safety.To add the other stages, call
builder.add(stage)
. Collect all the tags returned fromadd()
and use them when building the aggregate operation. Retrieve the tag of the first stage (from which you obtained this builder) by callingbuilder.tag0()
.This example takes three streams of
Map.Entry<String, Long>
, specifies a 1-second sliding window and, for each string key, counts the distinctLong
values across all input streams:Pipeline p = Pipeline.create(); StreamStageWithGrouping<Entry<String, Long>, String> stage0 = p.readFrom(source0).groupingKey(Entry::getKey); StreamStageWithGrouping<Entry<String, Long>, String> stage1 = p.readFrom(source1).groupingKey(Entry::getKey); StreamStageWithGrouping<Entry<String, Long>, String> stage2 = p.readFrom(source2).groupingKey(Entry::getKey); WindowGroupAggregateBuilder1<Entry<String, Long>, String> b = stage0 .window(sliding(1000, 10)) .aggregateBuilder(); Tag<Entry<String, Long>> tag0 = b.tag0(); Tag<Entry<String, Long>> tag1 = b.add(stage1); Tag<Entry<String, Long>> tag2 = b.add(stage2); StreamStage<KeyedWindowResult<String, Integer>> aggregated = b.build(AggregateOperation .withCreate(HashSet<Long>::new) .andAccumulate(tag0, (acc, item) -> acc.add(item.getValue())) .andAccumulate(tag1, (acc, item) -> acc.add(item.getValue())) .andAccumulate(tag2, (acc, item) -> acc.add(item.getValue())) .andCombine(HashSet::addAll) .andFinish(HashSet::size));
-