T - type of the input itemK - type of the keypublic interface StageWithKeyAndWindow<T,K>
| Modifier and Type | Method and Description | 
|---|---|
| <R> StreamStage<KeyedWindowResult<K,R>> | aggregate(AggregateOperation1<? super T,?,? extends R> aggrOp)Attaches a stage that performs the given group-and-aggregate operation. | 
| default <T1,R0,R1> StreamStage<KeyedWindowResult<K,Tuple2<R0,R1>>> | aggregate2(AggregateOperation1<? super T,?,? extends R0> aggrOp0,
          StreamStageWithKey<T1,? extends K> stage1,
          AggregateOperation1<? super T1,?,? extends R1> aggrOp1)Attaches a stage that performs the given cogroup-and-aggregate operation
 over the items from both this stage and  stage1you supply. | 
| <T1,R> StreamStage<KeyedWindowResult<K,R>> | aggregate2(StreamStageWithKey<T1,? extends K> stage1,
          AggregateOperation2<? super T,? super T1,?,? extends R> aggrOp)Attaches a stage that performs the given cogroup-and-aggregate operation
 over the items from both this stage and  stage1you supply. | 
| default <T1,T2,R0,R1,R2> | aggregate3(AggregateOperation1<? super T,?,? extends R0> aggrOp0,
          StreamStageWithKey<T1,? extends K> stage1,
          AggregateOperation1<? super T1,?,? extends R1> aggrOp1,
          StreamStageWithKey<T2,? extends K> stage2,
          AggregateOperation1<? super T2,?,? extends R2> aggrOp2)Attaches a stage that performs the given cogroup-and-aggregate operation
 over the items from both this stage and  stage1you supply. | 
| <T1,T2,R> StreamStage<KeyedWindowResult<K,R>> | aggregate3(StreamStageWithKey<T1,? extends K> stage1,
          StreamStageWithKey<T2,? extends K> stage2,
          AggregateOperation3<? super T,? super T1,? super T2,?,? extends R> aggrOp)Attaches a stage that performs the given cogroup-and-aggregate operation
 over the items from this stage as well as  stage1andstage2you supply. | 
| default WindowGroupAggregateBuilder1<T,K> | aggregateBuilder()Offers a step-by-step API to build a pipeline stage that co-aggregates
 the data from several input stages. | 
| default <R0> WindowGroupAggregateBuilder<K,R0> | aggregateBuilder(AggregateOperation1<? super T,?,? extends R0> aggrOp0)Offers a step-by-step API to build a pipeline stage that co-aggregates
 the data from several input stages. | 
| default StreamStage<KeyedWindowResult<K,T>> | distinct()Attaches a stage that passes through just the items that are distinct
 within their window according to the grouping key (no two items emitted
 for a window map to the same key). | 
| FunctionEx<? super T,? extends K> | keyFn()Returns the function that extracts the grouping key from stream items. | 
| WindowDefinition | windowDefinition()Returns the definition of the window for the windowed aggregation
 operation that you are about to construct using this object. | 
@Nonnull FunctionEx<? super T,? extends K> keyFn()
@Nonnull WindowDefinition windowDefinition()
@Nonnull default StreamStage<KeyedWindowResult<K,T>> distinct()
@Nonnull <R> StreamStage<KeyedWindowResult<K,R>> aggregate(@Nonnull AggregateOperation1<? super T,?,? extends R> aggrOp)
KeyedWindowResult) for each
 distinct key it observes in its input belonging to a given window. The
 value is the result of the aggregate operation across all the items with
 the given grouping key.
 Sample usage:
 StreamStage<KeyedWindowResult<Long, Long>> aggregated = pageVisits
     .window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
     .groupingKey(PageVisit::getUserId)
     .aggregate(AggregateOperations.counting());
 R - type of the aggregation resultaggrOp - the aggregate operation to performAggregateOperations@Nonnull <T1,R> StreamStage<KeyedWindowResult<K,R>> aggregate2(@Nonnull StreamStageWithKey<T1,? extends K> stage1, @Nonnull AggregateOperation2<? super T,? super T1,?,? extends R> aggrOp)
stage1 you supply. It
 emits one key-value pair (in a KeyedWindowResult) for each
 distinct key it observes in the input belonging to a given window. The
 value is the result of the aggregate operation across all the items with
 the given grouping key.
 Sample usage:
 StreamStage<KeyedWindowResult<Long, Tuple2<Long, Long>>> aggregated = pageVisits
     .window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
     .groupingKey(PageVisit::getUserId)
     .aggregate2(
         addToCarts.groupingKey(AddToCart::getUserId),
         AggregateOperations.aggregateOperation2(
                 AggregateOperations.counting(),
                 AggregateOperations.counting())
     );
 stage0.aggregate2(aggrOp0, stage1, aggrOp1) because it offers a simpler
 API and you can use the already defined single-input operations. Use
 this variant only when you have the need to implement an aggregate
 operation that combines the input streams into the same accumulator.T1 - type of items in stage1R - type of the aggregation resultstage1 - the other stageaggrOp - the aggregate operation to performAggregateOperations@Nonnull default <T1,R0,R1> StreamStage<KeyedWindowResult<K,Tuple2<R0,R1>>> aggregate2(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull StreamStageWithKey<T1,? extends K> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1)
stage1 you supply. For
 each distinct grouping key it observes in the input belonging to a given
 window, it performs the supplied aggregate operation across all the
 items sharing that key. It performs the aggregation separately for each
 input stage: aggrOp0 on this stage and aggrOp1 on stage1. Once it has received all the items belonging to a window, it
 emits for each distinct key a KeyedWindowResult(key, Tuple2(result0,
 result1)).
 Sample usage:
 StreamStage<KeyedWindowResult<Long, Tuple2<Long, Long>>> aggregated = pageVisits
     .window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
     .groupingKey(PageVisit::getUserId)
     .aggregate2(
             AggregateOperations.counting(),
             addToCarts.groupingKey(AddToCart::getUserId),
             AggregateOperations.counting()
     );
 T1 - type of the items in the other stageR0 - type of the aggregated result for this stageR1 - type of the aggregated result for the other stageaggrOp0 - aggregate operation to perform on this stagestage1 - the other stageaggrOp1 - aggregate operation to perform on the other stageAggregateOperations@Nonnull <T1,T2,R> StreamStage<KeyedWindowResult<K,R>> aggregate3(@Nonnull StreamStageWithKey<T1,? extends K> stage1, @Nonnull StreamStageWithKey<T2,? extends K> stage2, @Nonnull AggregateOperation3<? super T,? super T1,? super T2,?,? extends R> aggrOp)
stage1 and stage2 you supply. For each distinct grouping key it observes in the
 input belonging to a given window, it performs the supplied aggregate
 operation across all the items sharing that key. Once it has received
 all the items belonging to a window, it emits for each distinct key a
 KeyedWindowResult(key, Tuple3(result0, result1, result2)).
 
 Sample usage:
 StreamStage
 This variant requires you to provide a three-input aggregate operation
 (refer to its Javadoc for a simple
 example). If you can express your logic in terms of three single-input
 aggregate operations, one for each input stream, then you should use
 
 stage0.aggregate2(aggrOp0, stage1, aggrOp1, stage2, aggrOp2) because it
 offers a simpler API and you can use the already defined single-input
 operations. Use this variant only when you have the need to implement an
 aggregate operation that combines the input streams into the same
 accumulator.
T1 - type of items in stage1T2 - type of items in stage2R - type of the aggregation resultstage1 - the first additional stagestage2 - the second additional stageaggrOp - the aggregate operation to performAggregateOperations@Nonnull default <T1,T2,R0,R1,R2> StreamStage<KeyedWindowResult<K,Tuple3<R0,R1,R2>>> aggregate3(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull StreamStageWithKey<T1,? extends K> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1, @Nonnull StreamStageWithKey<T2,? extends K> stage2, @Nonnull AggregateOperation1<? super T2,?,? extends R2> aggrOp2)
stage1 you supply. For
 each distinct grouping key it observes in the input belonging to a given
 window, it performs the supplied aggregate operation across all the
 items sharing that key. It performs the aggregation separately for each
 input stage: aggrOp0 on this stage, aggrOp1 on stage1 and aggrOp2 on stage2. Once it has received all
 the items, it calls the supplied mapToOutputFn with each key and
 the associated aggregation result to create the items to emit.
 Sample usage:
 StreamStage<KeyedWindowResult<Long, Tuple3<Long, Long, Long>>> aggregated = pageVisits
     .window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
     .groupingKey(PageVisit::getUserId)
     .aggregate3(
         AggregateOperations.counting(),
         addToCarts.groupingKey(AddToCart::getUserId),
         AggregateOperations.counting(),
         payments.groupingKey(Payment::getUserId),
         AggregateOperations.counting()
     );
 T1 - type of the items in stage1T2 - type of the items in stage2R0 - type of the aggregated result for this stageR1 - type of the aggregated result for stage1R2 - type of the aggregated result for stage2aggrOp0 - aggregate operation to perform on this stagestage1 - the first additional stageaggrOp1 - aggregate operation to perform on stage1stage2 - the second additional stageaggrOp2 - aggregate operation to perform on stage2AggregateOperations@Nonnull default <R0> WindowGroupAggregateBuilder<K,R0> aggregateBuilder(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0)
KeyedWindowResult(key,
 itemsByTag). Use the tag you get from builder.add(stageN, aggrOpN) to retrieve the aggregated result for that
 stage. Use builder.tag0() as the tag of
 this stage.
 
 This builder is mainly intended to build a co-aggregation of four or
 more contributing stages. For up to three stages, prefer the direct
 stage.aggregateN(...) calls because they offer more static type
 safety.
 
 This example reads from three stream sources that produce Map.Entry<String, Long>. It groups by entry key, defines a 1-second
 sliding window and then counts the items in stage-0, sums those in
 stage-1 and takes the average of those in stage-2:
 
 Pipeline p = Pipeline.create();
 StreamStageWithKey<Entry<String, Long>, String> stage0 =
         p.readFrom(source0).withNativeTimestamps(0L)
          .groupingKey(Entry::getKey);
 StreamStageWithKey<Entry<String, Long>, String> stage1 =
         p.readFrom(source1).withNativeTimestamps(0L)
          .groupingKey(Entry::getKey);
 StreamStageWithKey<Entry<String, Long>, String> stage2 =
         p.readFrom(source2).withNativeTimestamps(0L)
          .groupingKey(Entry::getKey);
 WindowGroupAggregateBuilder<String, Long> b = stage0
         .window(sliding(1000, 10))
         .aggregateBuilder(AggregateOperations.counting());
 Tag<Long> tag0 = b.tag0();
 Tag<Long> tag1 = b.add(stage1,
         AggregateOperations.summingLong(Entry::getValue));
 Tag<Double> tag2 = b.add(stage2,
         AggregateOperations.averagingLong(Entry::getValue));
 StreamStage<KeyedWindowResult<String, ItemsByTag>> aggregated = b.build();
 aggregated.map(e -> String.format(
         "Key %s, count of stage0: %d, sum of stage1: %d, average of stage2: %f",
         e.getKey(),
         e.getValue().get(tag0), e.getValue().get(tag1), e.getValue().get(tag2))
 );
@Nonnull default WindowGroupAggregateBuilder1<T,K> aggregateBuilder()
 This builder requires you to provide a multi-input aggregate operation.
 If you can express your logic in terms of single-input aggregate
 operations, one for each input stream, then you should use stage0.aggregateBuilder(aggrOp0)
 because it offers a simpler API. Use this builder only when you have the
 need to implement an aggregate operation that combines all the input
 streams into the same accumulator.
 
 This builder is mainly intended to build a co-aggregation of four or
 more contributing stages. For up to three stages, prefer the direct
 stage.aggregateN(...) calls because they offer more static type
 safety.
 
 To add the other stages, call builder.add(stage). Collect all the tags returned from add()
 and use them when building the aggregate operation. Retrieve the tag of
 the first stage (from which you obtained this builder) by calling builder.tag0().
 
 This example takes three streams of Map.Entry<String, Long>,
 specifies a 1-second sliding window and, for each string key, counts
 the distinct Long values across all input streams:
 
 Pipeline p = Pipeline.create();
 StreamStageWithGrouping<Entry<String, Long>, String> stage0 =
         p.readFrom(source0).groupingKey(Entry::getKey);
 StreamStageWithGrouping<Entry<String, Long>, String> stage1 =
         p.readFrom(source1).groupingKey(Entry::getKey);
 StreamStageWithGrouping<Entry<String, Long>, String> stage2 =
         p.readFrom(source2).groupingKey(Entry::getKey);
 WindowGroupAggregateBuilder1<Entry<String, Long>, String> b = stage0
         .window(sliding(1000, 10))
         .aggregateBuilder();
 Tag<Entry<String, Long>> tag0 = b.tag0();
 Tag<Entry<String, Long>> tag1 = b.add(stage1);
 Tag<Entry<String, Long>> tag2 = b.add(stage2);
 StreamStage<KeyedWindowResult<String, Integer>> aggregated = b.build(AggregateOperation
         .withCreate(HashSet<Long>::new)
         .andAccumulate(tag0, (acc, item) -> acc.add(item.getValue()))
         .andAccumulate(tag1, (acc, item) -> acc.add(item.getValue()))
         .andAccumulate(tag2, (acc, item) -> acc.add(item.getValue()))
         .andCombine(HashSet::addAll)
         .andFinish(HashSet::size));
 Copyright © 2023 Hazelcast, Inc.. All rights reserved.