T - type of the stream itemsK - type of the keypublic interface StreamStageWithKey<T,K> extends GeneralStageWithKey<T,K>
mapUsingIMap(java.lang.String, com.hazelcast.function.BiFunctionEx<? super T, ? super V, ? extends R>)).| Modifier and Type | Method and Description | 
|---|---|
<R> StreamStage<R> | 
customTransform(String stageName,
               ProcessorMetaSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
 of Core API  
Processors. | 
default <R> StreamStage<R> | 
customTransform(String stageName,
               ProcessorSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
 of Core API  
Processors. | 
default <R> StreamStage<R> | 
customTransform(String stageName,
               SupplierEx<Processor> procSupplier)
Attaches a stage with a custom transform based on the provided supplier
 of Core API  
Processors. | 
<S> StreamStage<T> | 
filterStateful(long ttl,
              SupplierEx<? extends S> createFn,
              BiPredicateEx<? super S,? super T> filterFn)
Attaches a stage that performs a stateful filtering operation. 
 | 
default <S> StreamStage<T> | 
filterStateful(SupplierEx<? extends S> createFn,
              BiPredicateEx<? super S,? super T> filterFn)
Attaches a stage that performs a stateful filtering operation. 
 | 
<S> StreamStage<T> | 
filterUsingService(ServiceFactory<?,S> serviceFactory,
                  TriPredicate<? super S,? super K,? super T> filterFn)
Attaches a filtering stage which applies the provided predicate function
 to each input item to decide whether to pass the item to the output or
 to discard it. 
 | 
<S,R> StreamStage<R> | 
flatMapStateful(long ttl,
               SupplierEx<? extends S> createFn,
               TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn,
               TriFunction<? super S,? super K,? super Long,? extends Traverser<R>> onEvictFn)
Attaches a stage that performs a stateful flat-mapping operation. 
 | 
<S,R> StreamStage<R> | 
flatMapStateful(SupplierEx<? extends S> createFn,
               TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn)
Attaches a stage that performs a stateful flat-mapping operation. 
 | 
<S,R> StreamStage<R> | 
flatMapUsingService(ServiceFactory<?,S> serviceFactory,
                   TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn)
Attaches a flat-mapping stage which applies the supplied function to
 each input item independently and emits all the items from the
  
Traverser it returns as the output items. | 
<S,R> StreamStage<R> | 
mapStateful(long ttl,
           SupplierEx<? extends S> createFn,
           TriFunction<? super S,? super K,? super T,? extends R> mapFn,
           TriFunction<? super S,? super K,? super Long,? extends R> onEvictFn)
Attaches a stage that performs a stateful mapping operation. 
 | 
<S,R> StreamStage<R> | 
mapStateful(SupplierEx<? extends S> createFn,
           TriFunction<? super S,? super K,? super T,? extends R> mapFn)
Attaches a stage that performs a stateful mapping operation. 
 | 
default <V,R> StreamStage<R> | 
mapUsingIMap(IMap<K,V> iMap,
            BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
 supplied  
IMap using the grouping key is performed
 and the result of the lookup is merged with the item and emitted. | 
default <V,R> StreamStage<R> | 
mapUsingIMap(String mapName,
            BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
  
IMap with the supplied name using the grouping key is performed
 and the result of the lookup is merged with the item and emitted. | 
<S,R> StreamStage<R> | 
mapUsingService(ServiceFactory<?,S> serviceFactory,
               TriFunction<? super S,? super K,? super T,? extends R> mapFn)
Attaches a mapping stage which applies the given function to each input
 item independently and emits the function's result as the output item. 
 | 
<S,R> StreamStage<R> | 
mapUsingServiceAsync(ServiceFactory<?,S> serviceFactory,
                    int maxConcurrentOps,
                    boolean preserveOrder,
                    TriFunction<? super S,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
Asynchronous version of  
GeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>): the mapAsyncFn
 returns a CompletableFuture<R> instead of just R. | 
default <S,R> StreamStage<R> | 
mapUsingServiceAsync(ServiceFactory<?,S> serviceFactory,
                    TriFunction<? super S,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
Asynchronous version of  
GeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>): the mapAsyncFn
 returns a CompletableFuture<R> instead of just R. | 
<S,R> StreamStage<R> | 
mapUsingServiceAsyncBatched(ServiceFactory<?,S> serviceFactory,
                           int maxBatchSize,
                           BiFunctionEx<? super S,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
Batched version of  
GeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>): mapAsyncFn takes
 a list of input items and returns a CompletableFuture<List<R>>. | 
<S,R> StreamStage<R> | 
mapUsingServiceAsyncBatched(ServiceFactory<?,S> serviceFactory,
                           int maxBatchSize,
                           TriFunction<? super S,? super List<K>,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
Batched version of  
GeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>): mapAsyncFn takes
 a list of input items (and a list of their corresponding keys) and
 returns a CompletableFuture<List<R>>. | 
default <A,R> StreamStage<Map.Entry<K,R>> | 
rollingAggregate(AggregateOperation1<? super T,A,? extends R> aggrOp)
Attaches a rolling aggregation stage. 
 | 
default <A,R> StreamStage<Map.Entry<K,R>> | 
rollingAggregate(long ttl,
                AggregateOperation1<? super T,A,? extends R> aggrOp)
Attaches a rolling aggregation stage. 
 | 
StageWithKeyAndWindow<T,K> | 
window(WindowDefinition wDef)
Adds the definition of the window to use in the group-and-aggregate
 pipeline stage being constructed. 
 | 
keyFn@Nonnull StageWithKeyAndWindow<T,K> window(@Nonnull WindowDefinition wDef)
@Nonnull <S,R> StreamStage<R> mapStateful(long ttl, @Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S,? super K,? super T,? extends R> mapFn, @Nonnull TriFunction<? super S,? super K,? super Long,? extends R> onEvictFn)
createFn returns the object that holds the state. Jet passes this
 object along with each input item to mapFn, which can update
 the object's state. For each grouping key there's a separate state
 object. The state object will be included in the state snapshot, so it
 survives job restarts. For this reason it must be serializable.
 
 If the given ttl is greater than zero, Jet will consider the
 state object stale if its time-to-live has expired. The state object for
 a given key has a timestamp attached to it: the top timestamp of any
 event with that key seen so far. Whenever the watermark advances, Jet
 discards all state objects with a timestamp less than wm - ttl.
 Just before discarding the state object, Jet calls onEvictFn on
 it. The function can return an output item that will be emitted, or
 null if it doesn't need to emit an item. If TTL is used, Jet
 also drops late events; otherwise, all events are processed.
 
 This sample takes a stream of pairs (serverId, latency)
 representing the latencies of serving individual requests and keeps
 track, separately for each server, of the total latency accumulated over
 individual sessions — bursts of server activity separated
 by quiet periods of one minute or more. For each input item it outputs
 the accumulated latency so far and when a session ends, it outputs a
 special entry that reports the total latency for that session.
 
 StreamStage<Entry<String, Long>> latencies = null;
 StreamStage<Entry<String, Long>> cumulativeLatencies = latencies
         .groupingKey(Entry::getKey)
         .mapStateful(
                 MINUTES.toMillis(1),
                 LongAccumulator::new,
                 (sum, key, entry) -> {
                     sum.add(entry.getValue());
                     return entry(key, sum.get());
                 },
                 (sum, key, time) -> entry(String.format(
                         "%s:totalForSession:%d", key, time), sum.get())
         );
 
 The given functions must be stateless and cooperative.
S - type of the state objectR - type of the resultttl - time-to-live for each state object, disabled if zero or lesscreateFn - function that returns the state objectmapFn - function that receives the state object and the input item and
                   outputs the result item. It may modify the state object.onEvictFn - function that Jet calls when evicting a state object@Nonnull <S,R> StreamStage<R> mapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S,? super K,? super T,? extends R> mapFn)
GeneralStageWithKeycreateFn returns the object that holds the state. Jet passes this
 object along with each input item and its key to mapFn, which
 can update the object's state. For each grouping key there's a separate
 state object. The state object will be included in the state snapshot,
 so it survives job restarts. For this reason it must be serializable.
 If you want to return the state variable from mapFn,
 then the return value must be a copy of state variable to avoid
 situations in which the result of mapFn is modified
 after being emitted or where the state is modified by downstream processors.
 
 If you want to return the state variable from mapFn, then the
 return value must be a copy of state variable to avoid situations in
 which the result of mapFn is modified after being emitted or
 where the state is modified by downstream processors.
 
 This sample takes a stream of pairs (serverId, latency)
 representing the latencies of serving individual requests and outputs
 the cumulative latency of all handled requests so far, for each
 server separately:
 
 GeneralStage<Entry<String, Long>> latencies;
 GeneralStage<Entry<String, Long>> cumulativeLatencies = latencies
         .groupingKey(Entry::getKey)
         .mapStateful(
                 LongAccumulator::new,
                 (sum, key, entry) -> {
                     sum.add(entry.getValue());
                     return entry(key, sum.get());
                 }
         );
 
 This code has the same result as latencies.groupingKey(Entry::getKey).rollingAggregate(summing()).
 
 This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more
 information.
mapStateful in interface GeneralStageWithKey<T,K>S - type of the state objectR - type of the resultcreateFn - function that returns the state objectmapFn - function that receives the state object and the input item and
                 outputs the result item. It may modify the state object. It must be
                 stateless and cooperative.@Nonnull <S> StreamStage<T> filterStateful(long ttl, @Nonnull SupplierEx<? extends S> createFn, @Nonnull BiPredicateEx<? super S,? super T> filterFn)
createFn returns the object that holds the state. Jet passes this
 object along with each input item to filterFn, which can update
 the object's state. For each grouping key there's a separate state
 object. The state object will be included in the state snapshot, so it
 survives job restarts. For this reason it must be serializable.
 
 If the given ttl is greater than zero, Jet will consider the
 state object stale if its time-to-live has expired. The state object
 has a timestamp attached to it: the top timestamp of any event with the
 same key seen so far. Upon seeing another event, Jet compares the state
 timestamp with the current watermark. If it is less than wm - ttl,
 it discards the state object and creates a new one before processing the
 event. If TTL is used, Jet also drops late events; otherwise, all events
 are processed.
 
 This sample receives a stream of pairs (serverId, requestLatency)
 that represent the latencies of individual requests served by a cluster
 of servers. It emits the record-breaking (worst so far) latencies for
 each server independently and resets the score after one minute of
 inactivity on a given server.
 
 StreamStage<Entry<String, Long>> latencies;
 StreamStage<Entry<String, Long>> topLatencies = latencies
         .groupingKey(Entry::getKey)
         .filterStateful(
                 MINUTES.toMillis(1),
                 LongAccumulator::new,
                 (topLatencyState, entry) -> {
                     long currLatency = entry.getValue();
                     long topLatency = topLatencyState.get();
                     topLatencyState.set(Math.max(currLatency, topLatency));
                     return currLatency > topLatency;
                 }
         );
 
 The given functions must be stateless and cooperative.
S - type of the state objectttl - time-to-live for each state object, disabled if zero or lesscreateFn - function that returns the state objectfilterFn - predicate that receives the state object and the input item and
                 outputs a boolean value. It may modify the state object.@Nonnull default <S> StreamStage<T> filterStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiPredicateEx<? super S,? super T> filterFn)
GeneralStageWithKeycreateFn returns the object that holds the state. Jet passes this
 object along with each input item and its key to filterFn, which
 can update the object's state. For each grouping key there's a separate
 state object. The state object will be included in the state snapshot,
 so it survives job restarts. For this reason it must be serializable.
 This sample groups a stream of strings by length and decimates each group (throws out every 10th string of each length):
 GeneralStage<String> decimated = input
         .groupingKey(String::length)
         .filterStateful(
                 LongAccumulator::new,
                 (counter, item) -> {
                     counter.add(1);
                     return counter.get() % 10 != 0;
                 }
         );
 
 The given functions must be stateless and cooperative.
 This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more
 information.
filterStateful in interface GeneralStageWithKey<T,K>S - type of the state objectcreateFn - function that returns the state objectfilterFn - predicate that receives the state object and the input item and
                 outputs a boolean value. It may modify the state object.@Nonnull <S,R> StreamStage<R> flatMapStateful(long ttl, @Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn, @Nonnull TriFunction<? super S,? super K,? super Long,? extends Traverser<R>> onEvictFn)
createFn returns the object that holds the state. Jet passes this
 object along with each input item to flatMapFn, which can update
 the object's state. For each grouping key there's a separate state
 object. The state object will be included in the state snapshot, so it
 survives job restarts. For this reason it must be serializable.
 
 If the given ttl is greater than zero, Jet will consider the
 state object stale if its time-to-live has expired. The state object for
 a given key has a timestamp attached to it: the top timestamp of any
 event with that key seen so far. Whenever the watermark advances, Jet
 discards all state objects with a timestamp less than wm - ttl.
 Just before discarding the state object, Jet calls onEvictFn on
 it. The function returns a traverser over the items it wants to emit, or
 it can return an empty traverser. If TTL
 is used, Jet also drops late events; otherwise, all events are
 processed.
 
This sample groups a stream of strings by length and inserts punctuation (a special string) after every 10th string in each group, or after one minute elapses without further input for a given key:
 StreamStage<String> punctuated = input
         .groupingKey(String::length)
         .flatMapStateful(
                 MINUTES.toMillis(1),
                 LongAccumulator::new,
                 (counter, key, item) -> {
                     counter.add(1);
                     return counter.get() % 10 == 0
                             ? Traversers.traverseItems("punctuation" + key, item)
                             : Traversers.singleton(item);
                 },
                 (counter, key, wm) -> Traversers.singleton("punctuation" + key)
         );
 
 The given functions must be stateless and cooperative.
S - type of the state objectR - type of the resultttl - time-to-live for each state object, disabled if zero or lesscreateFn - function that returns the state objectflatMapFn - function that receives the state object and the input item and
                   outputs the result items. It may modify the state object.onEvictFn - function that Jet calls when evicting a state object@Nonnull <S,R> StreamStage<R> flatMapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn)
GeneralStageWithKeycreateFn returns the object that holds the state. Jet passes this
 object along with each input item and its key to flatMapFn,
 which can update the object's state. For each grouping key there's a
 separate state object. The state object will be included in the state
 snapshot, so it survives job restarts. For this reason it must be
 serializable.
 If you want to return the state variable from flatMapStateful,
 then the return value must be a copy of state variable to avoid
 situations in which the result of mapFn is modified
 after being emitted or where the state is modified by downstream processors.
 
 If you want to return the state variable from flatMapFn, then the
 return value must be a copy of state variable to avoid situations in
 which the result of mapFn is modified after being emitted or
 where the state is modified by downstream processors.
 
This sample groups a stream of strings by length and inserts punctuation (a special string) after every 10th string in each group:
 GeneralStage<String> punctuated = input
         .groupingKey(String::length)
         .flatMapStateful(
                 LongAccumulator::new,
                 (counter, key, item) -> {
                     counter.add(1);
                     return counter.get() % 10 == 0
                             ? Traversers.traverseItems("punctuation" + key, item)
                             : Traversers.singleton(item);
                 }
         );
 
 The given functions must be stateless and cooperative.
 This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more
 information.
flatMapStateful in interface GeneralStageWithKey<T,K>S - type of the state objectR - type of the resultcreateFn - function that returns the state objectflatMapFn - function that receives the state object and the input item and
                  outputs the result items. It may modify the state
                  object. It must not return null traverser, but can
                  return an empty traverser.@Nonnull default <A,R> StreamStage<Map.Entry<K,R>> rollingAggregate(@Nonnull AggregateOperation1<? super T,A,? extends R> aggrOp)
GeneralStageWithKeyAggregateOperation. It passes each input item to
 the accumulator and outputs the current result of aggregation (as
 returned by the export primitive).
 Sample usage:
 StreamStage<Entry<Color, Long>> aggregated = items
         .groupingKey(Item::getColor)
         .rollingAggregate(AggregateOperations.counting());
 
 For example, if your input is {2, 7, 8, -5}, the output will be
 {2, 9, 17, 12}.
 This stage is fault-tolerant and saves its state to the snapshot.
 This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more
 information.
rollingAggregate in interface GeneralStageWithKey<T,K>R - type of the aggregate operation resultaggrOp - the aggregate operation to perform@Nonnull default <A,R> StreamStage<Map.Entry<K,R>> rollingAggregate(long ttl, @Nonnull AggregateOperation1<? super T,A,? extends R> aggrOp)
AggregateOperation. It passes each input item to
 the accumulator and outputs the current result of aggregation (as
 returned by the export primitive).
 This sample takes a stream of items and gives rolling counts of items of each color:
 StreamStage<Entry<Color, Long>> aggregated = items
         .groupingKey(Item::getColor)
         .rollingAggregate(AggregateOperations.counting());
 
 
 If the given ttl is greater than zero, Jet will consider the
 accumulator object stale if its time-to-live has expired. The
 accumulator object has a timestamp attached to it: the top timestamp of
 any event with the same key seen so far. Upon seeing another event, Jet
 compares the accumulator timestamp with the current watermark. If it is
 less than wm - ttl, it discards the accumulator object and
 creates a new one before processing the event.
 
This stage is fault-tolerant and saves its state to the snapshot.
R - type of the aggregate operation resultaggrOp - the aggregate operation to perform@Nonnull default <V,R> StreamStage<R> mapUsingIMap(@Nonnull String mapName, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
GeneralStageWithKeyIMap with the supplied name using the grouping key is performed
 and the result of the lookup is merged with the item and emitted.
 
 If the result of the mapping is null, it emits nothing.
 Therefore this stage can be used to implement filtering semantics as well.
 
The mapping logic is equivalent to:
 V value = map.get(groupingKey);
 return mapFn.apply(item, value);
 
 Sample usage:
 
 items.groupingKey(Item::getDetailId)
      .mapUsingIMap(
              "enriching-map",
              (Item item, ItemDetail detail) -> item.setDetail(detail)
      );
 
 This stage is similar to stageWithoutKey.mapUsingIMap(),
 but here Jet knows the key and uses it to partition and distribute the input in order
 to achieve data locality. The value it fetches from the IMap is
 stored on the cluster member where the processing takes place. However,
 if the map doesn't use the default partitioning strategy, the data
 locality will be broken.mapUsingIMap in interface GeneralStageWithKey<T,K>V - type of the value in the IMapR - type of the output itemmapName - name of the IMapmapFn - the mapping function. It must be stateless and cooperative.@Nonnull default <V,R> StreamStage<R> mapUsingIMap(@Nonnull IMap<K,V> iMap, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
GeneralStageWithKeyIMap using the grouping key is performed
 and the result of the lookup is merged with the item and emitted.
 
 If the result of the mapping is null, it emits nothing.
 Therefore this stage can be used to implement filtering semantics as well.
 
The mapping logic is equivalent to:
 V value = map.get(groupingKey);
 return mapFn.apply(item, value);
 
 Sample usage:
 
 items.groupingKey(Item::getDetailId)
      .mapUsingIMap(enrichingMap, (item, detail) -> item.setDetail(detail));
 
 This stage is similar to stageWithoutKey.mapUsingIMap(),
 but here Jet knows the key and uses it to partition and distribute the
 input in order to achieve data locality. The value it fetches from the
 IMap is stored on the cluster member where the processing takes
 place. However, if the map doesn't use the default partitioning strategy,
 data locality will be broken.mapUsingIMap in interface GeneralStageWithKey<T,K>V - type of the value in the IMapR - type of the output itemiMap - the IMap to use as the servicemapFn - the mapping function. It must be stateless and cooperative.@Nonnull <S,R> StreamStage<R> mapUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull TriFunction<? super S,? super K,? super T,? extends R> mapFn)
GeneralStageWithKeyserviceFactory. If the
 mapping result is null, it emits nothing. Therefore this stage
 can be used to implement filtering semantics as well.
 Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
 items.groupingKey(Item::getDetailId)
      .mapUsingService(
          ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
          (reg, key, item) -> item.setDetail(reg.fetchDetail(key))
      );
 
 mapUsingService in interface GeneralStageWithKey<T,K>S - type of service objectR - the result type of the mapping functionserviceFactory - the service factorymapFn - a mapping function. It must be stateless. It must be
     cooperative, if the service
     is cooperative.@Nonnull default <S,R> StreamStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull TriFunction<? super S,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
GeneralStageWithKeyGeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>): the mapAsyncFn
 returns a CompletableFuture<R> instead of just R.
 Uses default values for some extra parameters, so the maximum number of concurrent async operations per processor will be limited to and whether or not the order of input items should be preserved will be .
The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
Sample usage:
 items.groupingKey(Item::getDetailId)
      .mapUsingServiceAsync(
          ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
          (reg, key, item) -> reg.fetchDetailAsync(key).thenApply(item::setDetail)
      );
 
 The latency of the async call will add to the total latency of the
 output.mapUsingServiceAsync in interface GeneralStageWithKey<T,K>S - type of service objectR - the future's result type of the mapping functionserviceFactory - the service factorymapAsyncFn - a mapping function. Can map to null (return a null
     future). It must be stateless and cooperative.@Nonnull <S,R> StreamStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?,S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, @Nonnull TriFunction<? super S,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
GeneralStageWithKeyGeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>): the mapAsyncFn
 returns a CompletableFuture<R> instead of just R.
 The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
Sample usage:
 items.groupingKey(Item::getDetailId)
      .mapUsingServiceAsync(
          ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
          16,
          true,
          (reg, key, item) -> reg.fetchDetailAsync(key).thenApply(item::setDetail)
      );
 
 The latency of the async call will add to the total latency of the
 output.mapUsingServiceAsync in interface GeneralStageWithKey<T,K>S - type of service objectR - the future's result type of the mapping functionserviceFactory - the service factorymaxConcurrentOps - maximum number of concurrent async operations per processorpreserveOrder - whether the async responses are ordered or notmapAsyncFn - a mapping function. Can map to null (return
      a null future). It must be stateless and cooperative.@Nonnull <S,R> StreamStage<R> mapUsingServiceAsyncBatched(@Nonnull ServiceFactory<?,S> serviceFactory, int maxBatchSize, @Nonnull BiFunctionEx<? super S,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
GeneralStageWithKeyGeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>): mapAsyncFn takes
 a list of input items and returns a CompletableFuture<List<R>>.
 The size of list is limited by the given maxBatchSize.
 
 This transform can perform filtering by putting null elements into
 the output list.
 
The latency of the async call will add to the total latency of the output.
 This sample takes a stream of stock items and sets the detail
 field on them by performing batched lookups from a registry. The max
 size of the items to lookup is specified as 100:
 
 items.groupingKey(Item::getDetailId)
      .mapUsingServiceAsyncBatched(
          ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
          100,
          (reg, itemList) -> reg
              .fetchDetailsAsync(itemList.stream().map(Item::getDetailId).collect(Collectors.toList()))
              .thenApply(details -> {
                  for (int i = 0; i < itemList.size(); i++) {
                      itemList.get(i).setDetail(details.get(i));
                  }
                  return itemList;
              })
      );
 mapUsingServiceAsyncBatched in interface GeneralStageWithKey<T,K>S - type of service objectR - the future result type of the mapping functionserviceFactory - the service factorymaxBatchSize - max size of the input listmapAsyncFn - a mapping function. It must be stateless. It must be
     cooperative, if the service
     is cooperative.@Nonnull <S,R> StreamStage<R> mapUsingServiceAsyncBatched(@Nonnull ServiceFactory<?,S> serviceFactory, int maxBatchSize, @Nonnull TriFunction<? super S,? super List<K>,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
GeneralStageWithKeyGeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>): mapAsyncFn takes
 a list of input items (and a list of their corresponding keys) and
 returns a CompletableFuture<List<R>>.
 The sizes of the input lists are identical and are limited by the given
 maxBatchSize. The key at index N corresponds to the input item
 at index N.
 The number of in-flight batches being completed asynchronously is limited to and this mapping operation always preserves the order of input elements.
 This transform can perform filtering by putting null elements into
 the output list.
 
The latency of the async call will add to the total latency of the output.
 This sample takes a stream of stock items and sets the detail
 field on them by performing batched lookups from a registry. The max
 size of the items to lookup is specified as 100:
 
 items.groupingKey(Item::getDetailId)
      .mapUsingServiceAsyncBatched(
          ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
          100,
          (reg, keyList, itemList) -> reg.fetchDetailsAsync(keyList).thenApply(details -> {
              for (int i = 0; i < itemList.size(); i++) {
                  itemList.get(i).setDetail(details.get(i));
              }
              return itemList;
          })
      );
 mapUsingServiceAsyncBatched in interface GeneralStageWithKey<T,K>S - type of service objectR - the future result type of the mapping functionserviceFactory - the service factorymaxBatchSize - max size of the input listmapAsyncFn - a mapping function. It must be stateless and
     cooperative.@Nonnull <S> StreamStage<T> filterUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull TriPredicate<? super S,? super K,? super T> filterFn)
GeneralStageWithKeyserviceFactory.
 The number of in-flight batches being completed asynchronously is limited to and this mapping operation always preserves the order of input elements.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
 items.groupingKey(Item::getDetailId)
      .filterUsingService(
          ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
          (reg, key, item) -> reg.fetchDetail(key).contains("blade")
      );
 
 filterUsingService in interface GeneralStageWithKey<T,K>S - type of service objectserviceFactory - the service factoryfilterFn - a filter predicate function. It must be stateless. It
     must be cooperative, if the
     service is cooperative.@Nonnull <S,R> StreamStage<R> flatMapUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn)
GeneralStageWithKeyTraverser it returns as the output items. The traverser must
 be null-terminated. The mapping function receives another
 parameter, the service object, which Jet will create using the supplied
 serviceFactory.
 Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
 StreamStage<Part> parts = products
     .groupingKey(Product::getId)
     .flatMapUsingService(
         ServiceFactories.sharedService(ctx -> new PartRegistry()),
         (registry, productId, product) -> Traversers.traverseIterable(
                 registry.fetchParts(productId))
     );
 
 flatMapUsingService in interface GeneralStageWithKey<T,K>S - type of service objectR - type of the output itemsserviceFactory - the service factoryflatMapFn - a flatmapping function. It must not return null
     traverser, but can return an empty
     traverser. It must be stateless. It must be cooperative, if the service is
     cooperative.@Nonnull default <R> StreamStage<R> customTransform(@Nonnull String stageName, @Nonnull SupplierEx<Processor> procSupplier)
GeneralStageWithKeyProcessors. The inbound edge will be distributed and
 partitioned using the key function assigned to this stage.
 Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform in interface GeneralStageWithKey<T,K>R - the type of the output itemsstageName - a human-readable name for the custom stageprocSupplier - the supplier of processors@Nonnull default <R> StreamStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorSupplier procSupplier)
GeneralStageWithKeyProcessors. The inbound edge will be distributed and
 partitioned using the key function assigned to this stage.
 Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform in interface GeneralStageWithKey<T,K>R - the type of the output itemsstageName - a human-readable name for the custom stageprocSupplier - the supplier of processors@Nonnull <R> StreamStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier)
GeneralStageWithKeyProcessors. The inbound edge will be distributed and
 partitioned using the key function assigned to this stage.
 Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform in interface GeneralStageWithKey<T,K>R - the type of the output itemsstageName - a human-readable name for the custom stageprocSupplier - the supplier of processorsCopyright © 2024 Hazelcast, Inc.. All rights reserved.