Interface BatchStageWithKey<T,K> 
- Type Parameters:
- T- type of the input item
- K- type of the key
- All Superinterfaces:
- GeneralStageWithKey<T,- K> 
- Since:
- Jet 3.0
- 
Method SummaryModifier and TypeMethodDescription<R> BatchStage<Map.Entry<K,R>> aggregate(AggregateOperation1<? super T, ?, ? extends R> aggrOp) Attaches a stage that performs the given group-and-aggregate operation.default <T1,R0, R1> BatchStage<Map.Entry<K, Tuple2<R0, R1>>> aggregate2(AggregateOperation1<? super T, ?, ? extends R0> aggrOp0, BatchStageWithKey<? extends T1, ? extends K> stage1, AggregateOperation1<? super T1, ?, ? extends R1> aggrOp1) Attaches a stage that performs the given cogroup-and-aggregate transformation of the items from both this stage andstage1you supply.<T1,R> BatchStage<Map.Entry<K, R>> aggregate2(BatchStageWithKey<T1, ? extends K> stage1, AggregateOperation2<? super T, ? super T1, ?, R> aggrOp) Attaches a stage that performs the given cogroup-and-aggregate operation over the items from both this stage andstage1you supply.default <T1,T2, R0, R1, R2> 
 BatchStage<Map.Entry<K,Tuple3<R0, R1, R2>>> aggregate3(AggregateOperation1<? super T, ?, ? extends R0> aggrOp0, BatchStageWithKey<T1, ? extends K> stage1, AggregateOperation1<? super T1, ?, ? extends R1> aggrOp1, BatchStageWithKey<T2, ? extends K> stage2, AggregateOperation1<? super T2, ?, ? extends R2> aggrOp2) Attaches a stage that performs the given cogroup-and-aggregate transformation of the items from this stage as well asstage1andstage2you supply.<T1,T2, R> BatchStage<Map.Entry<K, R>> aggregate3(BatchStageWithKey<T1, ? extends K> stage1, BatchStageWithKey<T2, ? extends K> stage2, AggregateOperation3<? super T, ? super T1, ? super T2, ?, ? extends R> aggrOp) Attaches a stage that performs the given cogroup-and-aggregate operation over the items from this stage as well asstage1andstage2you supply.default GroupAggregateBuilder1<T,K> Offers a step-by-step API to build a pipeline stage that co-aggregates the data from several input stages.default <R0> GroupAggregateBuilder<K,R0> aggregateBuilder(AggregateOperation1<? super T, ?, ? extends R0> aggrOp0) Offers a step-by-step API to build a pipeline stage that co-aggregates the data from several input stages.default <R> BatchStage<R>customTransform(String stageName, SupplierEx<Processor> procSupplier) Attaches a stage with a custom transform based on the provided supplier of Core APIProcessors.<R> BatchStage<R>customTransform(String stageName, ProcessorMetaSupplier procSupplier) Attaches a stage with a custom transform based on the provided supplier of Core APIProcessors.default <R> BatchStage<R>customTransform(String stageName, ProcessorSupplier procSupplier) Attaches a stage with a custom transform based on the provided supplier of Core APIProcessors.distinct()Attaches a stage that emits just the items that are distinct according to the grouping key (no two items which map to the same key will be on the output).<S> BatchStage<T>filterStateful(SupplierEx<? extends S> createFn, BiPredicateEx<? super S, ? super T> filterFn) Attaches a stage that performs a stateful filtering operation.<S> BatchStage<T>filterUsingService(ServiceFactory<?, S> serviceFactory, TriPredicate<? super S, ? super K, ? super T> filterFn) Attaches a filtering stage which applies the provided predicate function to each input item to decide whether to pass the item to the output or to discard it.<S,R> BatchStage<R> flatMapStateful(SupplierEx<? extends S> createFn, TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> flatMapFn) Attaches a stage that performs a stateful flat-mapping operation.<S,R> BatchStage<R> flatMapUsingService(ServiceFactory<?, S> serviceFactory, TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> flatMapFn) Attaches a flat-mapping stage which applies the supplied function to each input item independently and emits all the items from theTraverserit returns as the output items.<S,R> BatchStage<R> mapStateful(SupplierEx<? extends S> createFn, TriFunction<? super S, ? super K, ? super T, ? extends R> mapFn) Attaches a stage that performs a stateful mapping operation.default <V,R> BatchStage<R> mapUsingIMap(IMap<K, V> iMap, BiFunctionEx<? super T, ? super V, ? extends R> mapFn) Attaches a mapping stage where for each item a lookup in the suppliedIMapusing the grouping key is performed and the result of the lookup is merged with the item and emitted.default <V,R> BatchStage<R> mapUsingIMap(String mapName, BiFunctionEx<? super T, ? super V, ? extends R> mapFn) Attaches a mapping stage where for each item a lookup in theIMapwith the supplied name using the grouping key is performed and the result of the lookup is merged with the item and emitted.<S,R> BatchStage<R> mapUsingService(ServiceFactory<?, S> serviceFactory, TriFunction<? super S, ? super K, ? super T, ? extends R> mapFn) Attaches a mapping stage which applies the given function to each input item independently and emits the function's result as the output item.<S,R> BatchStage<R> mapUsingServiceAsync(ServiceFactory<?, S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, TriFunction<? super S, ? super K, ? super T, CompletableFuture<R>> mapAsyncFn) Asynchronous version ofGeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>): themapAsyncFnreturns aCompletableFuture<R>instead of justR.default <S,R> BatchStage<R> mapUsingServiceAsync(ServiceFactory<?, S> serviceFactory, TriFunction<? super S, ? super K, ? super T, CompletableFuture<R>> mapAsyncFn) Asynchronous version ofGeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>): themapAsyncFnreturns aCompletableFuture<R>instead of justR.<S,R> BatchStage<R> mapUsingServiceAsyncBatched(ServiceFactory<?, S> serviceFactory, int maxBatchSize, BiFunctionEx<? super S, ? super List<T>, ? extends CompletableFuture<List<R>>> mapAsyncFn) Batched version ofGeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>):mapAsyncFntakes a list of input items and returns aCompletableFuture<List<R>>.<S,R> BatchStage<R> mapUsingServiceAsyncBatched(ServiceFactory<?, S> serviceFactory, int maxBatchSize, TriFunction<? super S, ? super List<K>, ? super List<T>, ? extends CompletableFuture<List<R>>> mapAsyncFn) Batched version ofGeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>):mapAsyncFntakes a list of input items (and a list of their corresponding keys) and returns aCompletableFuture<List<R>>.default <A,R> BatchStage<Map.Entry<K, R>> rollingAggregate(AggregateOperation1<? super T, A, ? extends R> aggrOp) Attaches a rolling aggregation stage.Methods inherited from interface com.hazelcast.jet.pipeline.GeneralStageWithKeykeyFn
- 
Method Details- 
distinctAttaches a stage that emits just the items that are distinct according to the grouping key (no two items which map to the same key will be on the output). There is no guarantee which one of the items with the same key it will emit.This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)for more information.- Returns:
- the newly attached stage
 
- 
mapStateful@Nonnull <S,R> BatchStage<R> mapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends R> mapFn) Description copied from interface:GeneralStageWithKeyAttaches a stage that performs a stateful mapping operation.createFnreturns the object that holds the state. Jet passes this object along with each input item and its key tomapFn, which can update the object's state. For each grouping key there's a separate state object. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable. If you want to return the state variable frommapFn, then the return value must be a copy of state variable to avoid situations in which the result ofmapFnis modified after being emitted or where the state is modified by downstream processors.If you want to return the state variable from mapFn, then the return value must be a copy of state variable to avoid situations in which the result ofmapFnis modified after being emitted or where the state is modified by downstream processors.This sample takes a stream of pairs (serverId, latency)representing the latencies of serving individual requests and outputs the cumulative latency of all handled requests so far, for each server separately:
 This code has the same result asGeneralStage<Entry<String, Long>> latencies; GeneralStage<Entry<String, Long>> cumulativeLatencies = latencies .groupingKey(Entry::getKey) .mapStateful( LongAccumulator::new, (sum, key, entry) -> { sum.add(entry.getValue()); return entry(key, sum.get()); } );latencies.groupingKey(Entry::getKey).rollingAggregate(summing()).This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)for more information.- Specified by:
- mapStatefulin interface- GeneralStageWithKey<T,- K> 
- Type Parameters:
- S- type of the state object
- R- type of the result
- Parameters:
- createFn- function that returns the state object
- mapFn- function that receives the state object and the input item and outputs the result item. It may modify the state object. It must be stateless and cooperative.
 
- 
filterStateful@Nonnull <S> BatchStage<T> filterStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiPredicateEx<? super S, ? super T> filterFn) Description copied from interface:GeneralStageWithKeyAttaches a stage that performs a stateful filtering operation.createFnreturns the object that holds the state. Jet passes this object along with each input item and its key tofilterFn, which can update the object's state. For each grouping key there's a separate state object. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable.This sample groups a stream of strings by length and decimates each group (throws out every 10th string of each length): GeneralStage<String> decimated = input .groupingKey(String::length) .filterStateful( LongAccumulator::new, (counter, item) -> { counter.add(1); return counter.get() % 10 != 0; } );The given functions must be stateless and cooperative. This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)for more information.- Specified by:
- filterStatefulin interface- GeneralStageWithKey<T,- K> 
- Type Parameters:
- S- type of the state object
- Parameters:
- createFn- function that returns the state object
- filterFn- predicate that receives the state object and the input item and outputs a boolean value. It may modify the state object.
 
- 
flatMapStateful@Nonnull <S,R> BatchStage<R> flatMapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> flatMapFn) Description copied from interface:GeneralStageWithKeyAttaches a stage that performs a stateful flat-mapping operation.createFnreturns the object that holds the state. Jet passes this object along with each input item and its key toflatMapFn, which can update the object's state. For each grouping key there's a separate state object. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable. If you want to return the state variable fromflatMapStateful, then the return value must be a copy of state variable to avoid situations in which the result ofmapFnis modified after being emitted or where the state is modified by downstream processors.If you want to return the state variable from flatMapFn, then the return value must be a copy of state variable to avoid situations in which the result ofmapFnis modified after being emitted or where the state is modified by downstream processors.This sample groups a stream of strings by length and inserts punctuation (a special string) after every 10th string in each group: GeneralStage<String> punctuated = input .groupingKey(String::length) .flatMapStateful( LongAccumulator::new, (counter, key, item) -> { counter.add(1); return counter.get() % 10 == 0 ? Traversers.traverseItems("punctuation" + key, item) : Traversers.singleton(item); } );The given functions must be stateless and cooperative. This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)for more information.- Specified by:
- flatMapStatefulin interface- GeneralStageWithKey<T,- K> 
- Type Parameters:
- S- type of the state object
- R- type of the result
- Parameters:
- createFn- function that returns the state object
- flatMapFn- function that receives the state object and the input item and outputs the result items. It may modify the state object. It must not return null traverser, but can return an empty traverser.
 
- 
rollingAggregate@Nonnull default <A,R> BatchStage<Map.Entry<K,R>> rollingAggregate(@Nonnull AggregateOperation1<? super T, A, ? extends R> aggrOp) Description copied from interface:GeneralStageWithKeyAttaches a rolling aggregation stage. This is a special case of stateful mapping that uses anAggregateOperation. It passes each input item to the accumulator and outputs the current result of aggregation (as returned by theexportprimitive).Sample usage: 
 For example, if your input isStreamStage<Entry<Color, Long>> aggregated = items .groupingKey(Item::getColor) .rollingAggregate(AggregateOperations.counting());{2, 7, 8, -5}, the output will be{2, 9, 17, 12}.This stage is fault-tolerant and saves its state to the snapshot. This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)for more information.- Specified by:
- rollingAggregatein interface- GeneralStageWithKey<T,- K> 
- R- type of the aggregate operation result
- Parameters:
- aggrOp- the aggregate operation to perform
- Returns:
- the newly attached stage
 
- 
mapUsingIMap@Nonnull default <V,R> BatchStage<R> mapUsingIMap(@Nonnull String mapName, @Nonnull BiFunctionEx<? super T, ? super V, ? extends R> mapFn) Description copied from interface:GeneralStageWithKeyAttaches a mapping stage where for each item a lookup in theIMapwith the supplied name using the grouping key is performed and the result of the lookup is merged with the item and emitted.If the result of the mapping is null, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.The mapping logic is equivalent to: 
 Sample usage:V value = map.get(groupingKey); return mapFn.apply(item, value);
 This stage is similar toitems.groupingKey(Item::getDetailId) .mapUsingIMap( "enriching-map", (Item item, ItemDetail detail) -> item.setDetail(detail) );stageWithoutKey.mapUsingIMap(), but here Jet knows the key and uses it to partition and distribute the input in order to achieve data locality. The value it fetches from theIMapis stored on the cluster member where the processing takes place. However, if the map doesn't use the default partitioning strategy, the data locality will be broken.- Specified by:
- mapUsingIMapin interface- GeneralStageWithKey<T,- K> 
- Type Parameters:
- V- type of the value in the- IMap
- R- type of the output item
- Parameters:
- mapName- name of the- IMap
- mapFn- the mapping function. It must be stateless and cooperative.
- Returns:
- the newly attached stage
 
- 
mapUsingIMap@Nonnull default <V,R> BatchStage<R> mapUsingIMap(@Nonnull IMap<K, V> iMap, @Nonnull BiFunctionEx<? super T, ? super V, ? extends R> mapFn) Description copied from interface:GeneralStageWithKeyAttaches a mapping stage where for each item a lookup in the suppliedIMapusing the grouping key is performed and the result of the lookup is merged with the item and emitted.If the result of the mapping is null, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.The mapping logic is equivalent to: 
 Sample usage:V value = map.get(groupingKey); return mapFn.apply(item, value);
 This stage is similar toitems.groupingKey(Item::getDetailId) .mapUsingIMap(enrichingMap, (item, detail) -> item.setDetail(detail));stageWithoutKey.mapUsingIMap(), but here Jet knows the key and uses it to partition and distribute the input in order to achieve data locality. The value it fetches from theIMapis stored on the cluster member where the processing takes place. However, if the map doesn't use the default partitioning strategy, data locality will be broken.- Specified by:
- mapUsingIMapin interface- GeneralStageWithKey<T,- K> 
- Type Parameters:
- V- type of the value in the- IMap
- R- type of the output item
- Parameters:
- iMap- the- IMapto use as the service
- mapFn- the mapping function. It must be stateless and cooperative.
- Returns:
- the newly attached stage
 
- 
mapUsingService@Nonnull <S,R> BatchStage<R> mapUsingService(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends R> mapFn) Description copied from interface:GeneralStageWithKeyAttaches a mapping stage which applies the given function to each input item independently and emits the function's result as the output item. The mapping function receives another parameter, the service object, which Jet will create using the suppliedserviceFactory. If the mapping result isnull, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data. Sample usage: items.groupingKey(Item::getDetailId) .mapUsingService( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()), (reg, key, item) -> item.setDetail(reg.fetchDetail(key)) );Interaction with fault-tolerant unbounded jobsIf you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.- Specified by:
- mapUsingServicein interface- GeneralStageWithKey<T,- K> 
- Type Parameters:
- S- type of service object
- R- the result type of the mapping function
- Parameters:
- serviceFactory- the service factory
- mapFn- a mapping function. It must be stateless. It must be cooperative, if the service is cooperative.
- Returns:
- the newly attached stage
 
- 
mapUsingServiceAsync@Nonnull default <S,R> BatchStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull TriFunction<? super S, ? super K, ? super T, CompletableFuture<R>> mapAsyncFn) Description copied from interface:GeneralStageWithKeyAsynchronous version ofGeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>): themapAsyncFnreturns aCompletableFuture<R>instead of justR.Uses default values for some extra parameters, so the maximum number of concurrent async operations per processor will be limited to 4 and whether or not the order of input items should be preserved will be true. The function can return a null future or the future can return a null result: in both cases it will act just like a filter. Sample usage: 
 The latency of the async call will add to the total latency of the output.items.groupingKey(Item::getDetailId) .mapUsingServiceAsync( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()), (reg, key, item) -> reg.fetchDetailAsync(key).thenApply(item::setDetail) );- Specified by:
- mapUsingServiceAsyncin interface- GeneralStageWithKey<T,- K> 
- Type Parameters:
- S- type of service object
- R- the future's result type of the mapping function
- Parameters:
- serviceFactory- the service factory
- mapAsyncFn- a mapping function. Can map to null (return a null future). It must be stateless and cooperative.
- Returns:
- the newly attached stage
 
- 
mapUsingServiceAsync@Nonnull <S,R> BatchStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?, S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, @Nonnull TriFunction<? super S, ? super K, ? super T, CompletableFuture<R>> mapAsyncFn) Description copied from interface:GeneralStageWithKeyAsynchronous version ofGeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>): themapAsyncFnreturns aCompletableFuture<R>instead of justR.The function can return a null future or the future can return a null result: in both cases it will act just like a filter. Sample usage: 
 The latency of the async call will add to the total latency of the output.items.groupingKey(Item::getDetailId) .mapUsingServiceAsync( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()), 16, true, (reg, key, item) -> reg.fetchDetailAsync(key).thenApply(item::setDetail) );- Specified by:
- mapUsingServiceAsyncin interface- GeneralStageWithKey<T,- K> 
- Type Parameters:
- S- type of service object
- R- the future's result type of the mapping function
- Parameters:
- serviceFactory- the service factory
- maxConcurrentOps- maximum number of concurrent async operations per processor
- preserveOrder- whether the async responses are ordered or not
- mapAsyncFn- a mapping function. Can map to null (return a null future). It must be stateless and cooperative.
- Returns:
- the newly attached stage
 
- 
mapUsingServiceAsyncBatched@Nonnull <S,R> BatchStage<R> mapUsingServiceAsyncBatched(@Nonnull ServiceFactory<?, S> serviceFactory, int maxBatchSize, @Nonnull BiFunctionEx<? super S, ? super List<T>, ? extends CompletableFuture<List<R>>> mapAsyncFn) Description copied from interface:GeneralStageWithKeyBatched version ofGeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>):mapAsyncFntakes a list of input items and returns aCompletableFuture<List<R>>. The size of list is limited by the givenmaxBatchSize.This transform can perform filtering by putting nullelements into the output list.The latency of the async call will add to the total latency of the output. This sample takes a stream of stock items and sets the detailfield on them by performing batched lookups from a registry. The max size of the items to lookup is specified as100:items.groupingKey(Item::getDetailId) .mapUsingServiceAsyncBatched( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()), 100, (reg, itemList) -> reg .fetchDetailsAsync(itemList.stream().map(Item::getDetailId).collect(Collectors.toList())) .thenApply(details -> { for (int i = 0; i < itemList.size(); i++) { itemList.get(i).setDetail(details.get(i)); } return itemList; }) );- Specified by:
- mapUsingServiceAsyncBatchedin interface- GeneralStageWithKey<T,- K> 
- Type Parameters:
- S- type of service object
- R- the future result type of the mapping function
- Parameters:
- serviceFactory- the service factory
- maxBatchSize- max size of the input list
- mapAsyncFn- a mapping function. It must be stateless. It must be cooperative, if the service is cooperative.
- Returns:
- the newly attached stage
 
- 
mapUsingServiceAsyncBatched@Nonnull <S,R> BatchStage<R> mapUsingServiceAsyncBatched(@Nonnull ServiceFactory<?, S> serviceFactory, int maxBatchSize, @Nonnull TriFunction<? super S, ? super List<K>, ? super List<T>, ? extends CompletableFuture<List<R>>> mapAsyncFn) Description copied from interface:GeneralStageWithKeyBatched version ofGeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>):mapAsyncFntakes a list of input items (and a list of their corresponding keys) and returns aCompletableFuture<List<R>>. The sizes of the input lists are identical and are limited by the givenmaxBatchSize. The key at index N corresponds to the input item at index N.The number of in-flight batches being completed asynchronously is limited to 2 and this mapping operation always preserves the order of input elements. This transform can perform filtering by putting nullelements into the output list.The latency of the async call will add to the total latency of the output. This sample takes a stream of stock items and sets the detailfield on them by performing batched lookups from a registry. The max size of the items to lookup is specified as100:items.groupingKey(Item::getDetailId) .mapUsingServiceAsyncBatched( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()), 100, (reg, keyList, itemList) -> reg.fetchDetailsAsync(keyList).thenApply(details -> { for (int i = 0; i < itemList.size(); i++) { itemList.get(i).setDetail(details.get(i)); } return itemList; }) );- Specified by:
- mapUsingServiceAsyncBatchedin interface- GeneralStageWithKey<T,- K> 
- Type Parameters:
- S- type of service object
- R- the future result type of the mapping function
- Parameters:
- serviceFactory- the service factory
- maxBatchSize- max size of the input list
- mapAsyncFn- a mapping function. It must be stateless and cooperative.
- Returns:
- the newly attached stage
 
- 
filterUsingService@Nonnull <S> BatchStage<T> filterUsingService(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull TriPredicate<? super S, ? super K, ? super T> filterFn) Description copied from interface:GeneralStageWithKeyAttaches a filtering stage which applies the provided predicate function to each input item to decide whether to pass the item to the output or to discard it. The predicate function receives another parameter, the service object, which Jet will create using the suppliedserviceFactory.The number of in-flight batches being completed asynchronously is limited to 2 and this mapping operation always preserves the order of input elements. Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data. Sample usage: items.groupingKey(Item::getDetailId) .filterUsingService( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()), (reg, key, item) -> reg.fetchDetail(key).contains("blade") );Interaction with fault-tolerant unbounded jobsIf you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.- Specified by:
- filterUsingServicein interface- GeneralStageWithKey<T,- K> 
- Type Parameters:
- S- type of service object
- Parameters:
- serviceFactory- the service factory
- filterFn- a filter predicate function. It must be stateless. It must be cooperative, if the service is cooperative.
- Returns:
- the newly attached stage
 
- 
flatMapUsingService@Nonnull <S,R> BatchStage<R> flatMapUsingService(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> flatMapFn) Description copied from interface:GeneralStageWithKeyAttaches a flat-mapping stage which applies the supplied function to each input item independently and emits all the items from theTraverserit returns as the output items. The traverser must be null-terminated. The mapping function receives another parameter, the service object, which Jet will create using the suppliedserviceFactory.Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data. Sample usage: StreamStage<Part> parts = products .groupingKey(Product::getId) .flatMapUsingService( ServiceFactories.sharedService(ctx -> new PartRegistry()), (registry, productId, product) -> Traversers.traverseIterable( registry.fetchParts(productId)) );Interaction with fault-tolerant unbounded jobsIf you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.- Specified by:
- flatMapUsingServicein interface- GeneralStageWithKey<T,- K> 
- Type Parameters:
- S- type of service object
- R- type of the output items
- Parameters:
- serviceFactory- the service factory
- flatMapFn- a flatmapping function. It must not return null traverser, but can return an empty traverser. It must be stateless. It must be cooperative, if the service is cooperative.
- Returns:
- the newly attached stage
 
- 
aggregate@Nonnull <R> BatchStage<Map.Entry<K,R>> aggregate(@Nonnull AggregateOperation1<? super T, ?, ? extends R> aggrOp) Attaches a stage that performs the given group-and-aggregate operation. It emits one key-value pair (in aMap.Entry) for each distinct key it observes in its input. The value is the result of the aggregate operation across all the items with the given grouping key.Sample usage: BatchStage<Entry<String, Long>> aggregated = people. .groupingKey(Person::getLastName) .aggregate(AggregateOperations.counting());This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)for more information.- Type Parameters:
- R- type of the aggregation result
- Parameters:
- aggrOp- the aggregate operation to perform
- See Also:
 
- 
aggregate2@Nonnull <T1,R> BatchStage<Map.Entry<K,R>> aggregate2(@Nonnull BatchStageWithKey<T1, ? extends K> stage1, @Nonnull AggregateOperation2<? super T, ? super T1, ?, R> aggrOp) Attaches a stage that performs the given cogroup-and-aggregate operation over the items from both this stage andstage1you supply. It emits one key-value pair (in aMap.Entry) for each distinct key it observes in its input. The value is the result of the aggregate operation across all the items with the given grouping key.Sample usage: 
 This variant requires you to provide a two-input aggregate operation. If you can express your logic in terms of two single-input aggregate operations, one for each input stream, then you should useBatchStage<Entry<Long, Tuple2<Long, Long>>> aggregated = pageVisits .groupingKey(PageVisit::getUserId) .aggregate2(addToCarts.groupingKey(AddToCart::getUserId), aggregateOperation2( AggregateOperations.counting(), AggregateOperations.counting()) );stage0.aggregate2(aggrOp0, stage1, aggrOp1)because it offers a simpler API. Use this variant only when your aggregate operation must combine the input streams into the same accumulator.This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)for more information.- Type Parameters:
- T1- type of items in- stage1
- R- type of the aggregation result
- Parameters:
- aggrOp- the aggregate operation to perform
- See Also:
 
- 
aggregate2@Nonnull default <T1,R0, BatchStage<Map.Entry<K,R1> Tuple2<R0, aggregate2R1>>> (@Nonnull AggregateOperation1<? super T, ?, ? extends R0> aggrOp0, @Nonnull BatchStageWithKey<? extends T1, ? extends K> stage1, @Nonnull AggregateOperation1<? super T1, ?, ? extends R1> aggrOp1) Attaches a stage that performs the given cogroup-and-aggregate transformation of the items from both this stage andstage1you supply. For each distinct grouping key it performs the supplied aggregate operation across all the items sharing that key. It performs the aggregation separately for each input stage:aggrOp0on this stage andaggrOp1onstage1. Once it has received all the items, it emits for each distinct key aMap.Entry(key, Tuple2(result0, result1)).Sample usage: BatchStage<Entry<Long, Tuple2<Long, Long>>> aggregated = pageVisits .groupingKey(PageVisit::getUserId) .aggregate2( AggregateOperations.counting(), addToCarts.groupingKey(AddToCart::getUserId), AggregateOperations.counting() );This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)for more information.- Type Parameters:
- R0- type of the aggregation result for stream-0
- T1- type of items in- stage1
- R1- type of the aggregation result for stream-1
- Parameters:
- aggrOp0- aggregate operation to perform on this stage
- stage1- the other stage
- aggrOp1- aggregate operation to perform on the other stage
- See Also:
 
- 
aggregate3@Nonnull <T1,T2, BatchStage<Map.Entry<K,R> R>> aggregate3(@Nonnull BatchStageWithKey<T1, ? extends K> stage1, @Nonnull BatchStageWithKey<T2, ? extends K> stage2, @Nonnull AggregateOperation3<? super T, ? super T1, ? super T2, ?, ? extends R> aggrOp) Attaches a stage that performs the given cogroup-and-aggregate operation over the items from this stage as well asstage1andstage2you supply. It emits one key-value pair (in aMap.Entry) for each distinct key it observes in its input. The value is the result of the aggregate operation across all the items with the given grouping key.Sample usage: 
 This variant requires you to provide a three-input aggregate operation. If you can express your logic in terms of three single-input aggregate operations, one for each input stream, then you should useBatchStage<Entry<Long, Tuple3<Long, Long, Long>>> aggregated = pageVisits .groupingKey(PageVisit::getUserId) .aggregate3( addToCarts.groupingKey(AddToCart::getUserId), payments.groupingKey(Payment::getUserId), aggregateOperation3( AggregateOperations.counting(), AggregateOperations.counting(), AggregateOperations.counting()) );stage0.aggregate2(aggrOp0, stage1, aggrOp1, stage2, aggrOp2)because it offers a simpler API. Use this variant only when your aggregate operation must combine the input streams into the same accumulator.This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)for more information.- Type Parameters:
- T1- type of items in- stage1
- T2- type of items in- stage2
- R- type of the aggregation result
- Parameters:
- aggrOp- the aggregate operation to perform
- See Also:
 
- 
aggregate3@Nonnull default <T1,T2, BatchStage<Map.Entry<K,R0, R1, R2> Tuple3<R0, aggregate3R1, R2>>> (@Nonnull AggregateOperation1<? super T, ?, ? extends R0> aggrOp0, @Nonnull BatchStageWithKey<T1, ? extends K> stage1, @Nonnull AggregateOperation1<? super T1, ?, ? extends R1> aggrOp1, @Nonnull BatchStageWithKey<T2, ? extends K> stage2, @Nonnull AggregateOperation1<? super T2, ?, ? extends R2> aggrOp2) Attaches a stage that performs the given cogroup-and-aggregate transformation of the items from this stage as well asstage1andstage2you supply. For each distinct grouping key it observes in the input, it performs the supplied aggregate operation across all the items sharing that key. It performs the aggregation separately for each input stage:aggrOp0on this stage,aggrOp1onstage1andaggrOp2onstage2. Once it has received all the items, it emits for each distinct key aMap.Entry(key, Tuple3(result0, result1, result2)).Sample usage: BatchStage<Entry<Long, Tuple3<Long, Long, Long>>> aggregated = pageVisits .groupingKey(PageVisit::getUserId) .aggregate3( AggregateOperations.counting(), addToCarts.groupingKey(AddToCart::getUserId), AggregateOperations.counting(), payments.groupingKey(Payment::getUserId), AggregateOperations.counting() );This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)for more information.- Type Parameters:
- T1- type of items in- stage1
- T2- type of items in- stage2
- R0- type of the aggregation result for stream-0
- R1- type of the aggregation result for stream-1
- R2- type of the aggregation result for stream-2
- Parameters:
- aggrOp0- aggregate operation to perform on this stage
- stage1- the first additional stage
- aggrOp1- aggregate operation to perform on- stage1
- stage2- the second additional stage
- aggrOp2- aggregate operation to perform on- stage2
- See Also:
 
- 
aggregateBuilder@Nonnull default <R0> GroupAggregateBuilder<K,R0> aggregateBuilder(@Nonnull AggregateOperation1<? super T, ?, ? extends R0> aggrOp0) Offers a step-by-step API to build a pipeline stage that co-aggregates the data from several input stages. The current stage will be already registered with the builder you get. You supply an aggregate operation for each input stage and in the output you get the individual aggregation results in aMap.Entry(key, itemsByTag). Use the tag you get frombuilder.add(stageN)to retrieve the aggregated result for that stage. Usebuilder.tag0()as the tag of this stage. You will also be able to supply a function to the builder that immediately transforms theItemsByTagto the desired output type.This example reads from three sources that produce Map.Entry<String, Long>. It groups by entry key and then counts the items in stage-0, sums those in stage-1 and takes the average of those in stage-2:Pipeline p = Pipeline.create(); StageWithGrouping<Entry<String, Long>, String> stage0 = p.readFrom(source0).groupingKey(Entry::getKey); StageWithGrouping<Entry<String, Long>, String> stage1 = p.readFrom(source1).groupingKey(Entry::getKey); StageWithGrouping<Entry<String, Long>, String> stage2 = p.readFrom(source2).groupingKey(Entry::getKey); GroupAggregateBuilder<String, Long> b = stage0.aggregateBuilder( AggregateOperations.counting()); Tag<Long> tag0 = b.tag0(); Tag<Long> tag1 = b.add(stage1, AggregateOperations.summingLong(Entry::getValue)); Tag<Double> tag2 = b.add(stage2, AggregateOperations.averagingLong(Entry::getValue)); BatchStage<Entry<String, ItemsByTag>> aggregated = b.build(); aggregated.map(e -> String.format( "Key %s, count of stage0: %d, sum of stage1: %d, average of stage2: %f", e.getKey(), e.getValue().get(tag0), e.getValue().get(tag1), e.getValue().get(tag2)) );This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)for more information.
- 
aggregateBuilderOffers a step-by-step API to build a pipeline stage that co-aggregates the data from several input stages. The current stage will be already registered with the builder you get.This builder requires you to provide a multi-input aggregate operation. If you can express your logic in terms of single-input aggregate operations, one for each input stream, then you should use stage0.aggregateBuilder(aggrOp0)because it offers a simpler API. Use this builder only when you have the need to implement an aggregate operation that combines all the input streams into the same accumulator.This builder is mainly intended to build a co-aggregation of four or more contributing stages. For up to three stages, prefer the direct stage.aggregateN(...)calls because they offer more static type safety.To add the other stages, call add(stage). Collect all the tags returned fromadd()and use them when building the aggregate operation. Retrieve the tag of the first stage (from which you obtained this builder) by callingGroupAggregateBuilder1.tag0().This example takes three streams of Map.Entry<String, Long>and, for each string key, counts the distinctLongvalues across all input streams:Pipeline p = Pipeline.create(); StageWithGrouping<Entry<String, Long>, String> stage0 = p.readFrom(source0).groupingKey(Entry::getKey); StageWithGrouping<Entry<String, Long>, String> stage1 = p.readFrom(source1).groupingKey(Entry::getKey); StageWithGrouping<Entry<String, Long>, String> stage2 = p.readFrom(source2).groupingKey(Entry::getKey); GroupAggregateBuilder1<Entry<String, Long>, String> b = stage0.aggregateBuilder(); Tag<Entry<String, Long>> tag0 = b.tag0(); Tag<Entry<String, Long>> tag1 = b.add(stage1); Tag<Entry<String, Long>> tag2 = b.add(stage2); BatchStage<Entry<String, Integer>> aggregated = b.build(AggregateOperation .withCreate(HashSet<Long>::new) .andAccumulate(tag0, (acc, item) -> acc.add(item.getValue())) .andAccumulate(tag1, (acc, item) -> acc.add(item.getValue())) .andAccumulate(tag2, (acc, item) -> acc.add(item.getValue())) .andCombine(HashSet::addAll) .andFinish(HashSet::size));This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)for more information.
- 
customTransform@Nonnull default <R> BatchStage<R> customTransform(@Nonnull String stageName, @Nonnull SupplierEx<Processor> procSupplier) Description copied from interface:GeneralStageWithKeyAttaches a stage with a custom transform based on the provided supplier of Core APIProcessors. The inbound edge will be distributed and partitioned using the key function assigned to this stage.Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided. - Specified by:
- customTransformin interface- GeneralStageWithKey<T,- K> 
- Type Parameters:
- R- the type of the output items
- Parameters:
- stageName- a human-readable name for the custom stage
- procSupplier- the supplier of processors
 
- 
customTransform@Nonnull default <R> BatchStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorSupplier procSupplier) Description copied from interface:GeneralStageWithKeyAttaches a stage with a custom transform based on the provided supplier of Core APIProcessors. The inbound edge will be distributed and partitioned using the key function assigned to this stage.Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided. - Specified by:
- customTransformin interface- GeneralStageWithKey<T,- K> 
- Type Parameters:
- R- the type of the output items
- Parameters:
- stageName- a human-readable name for the custom stage
- procSupplier- the supplier of processors
 
- 
customTransform@Nonnull <R> BatchStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier) Description copied from interface:GeneralStageWithKeyAttaches a stage with a custom transform based on the provided supplier of Core APIProcessors. The inbound edge will be distributed and partitioned using the key function assigned to this stage.Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided. - Specified by:
- customTransformin interface- GeneralStageWithKey<T,- K> 
- Type Parameters:
- R- the type of the output items
- Parameters:
- stageName- a human-readable name for the custom stage
- procSupplier- the supplier of processors
 
 
-