Interface StreamStageWithKey<T,K>
- Type Parameters:
T
- type of the stream itemsK
- type of the key
- All Superinterfaces:
GeneralStageWithKey<T,
K>
mapUsingIMap(java.lang.String, com.hazelcast.function.BiFunctionEx<? super T, ? super V, ? extends R>)
).- Since:
- Jet 3.0
-
Method Summary
Modifier and TypeMethodDescriptiondefault <R> StreamStage<R>
customTransform
(String stageName, SupplierEx<Processor> procSupplier) Attaches a stage with a custom transform based on the provided supplier of Core APIProcessor
s.<R> StreamStage<R>
customTransform
(String stageName, ProcessorMetaSupplier procSupplier) Attaches a stage with a custom transform based on the provided supplier of Core APIProcessor
s.default <R> StreamStage<R>
customTransform
(String stageName, ProcessorSupplier procSupplier) Attaches a stage with a custom transform based on the provided supplier of Core APIProcessor
s.<S> StreamStage<T>
filterStateful
(long ttl, SupplierEx<? extends S> createFn, BiPredicateEx<? super S, ? super T> filterFn) Attaches a stage that performs a stateful filtering operation.default <S> StreamStage<T>
filterStateful
(SupplierEx<? extends S> createFn, BiPredicateEx<? super S, ? super T> filterFn) Attaches a stage that performs a stateful filtering operation.<S> StreamStage<T>
filterUsingService
(ServiceFactory<?, S> serviceFactory, TriPredicate<? super S, ? super K, ? super T> filterFn) Attaches a filtering stage which applies the provided predicate function to each input item to decide whether to pass the item to the output or to discard it.<S,
R> StreamStage<R> flatMapStateful
(long ttl, SupplierEx<? extends S> createFn, TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> flatMapFn, TriFunction<? super S, ? super K, ? super Long, ? extends Traverser<R>> onEvictFn) Attaches a stage that performs a stateful flat-mapping operation.<S,
R> StreamStage<R> flatMapStateful
(SupplierEx<? extends S> createFn, TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> flatMapFn) Attaches a stage that performs a stateful flat-mapping operation.<S,
R> StreamStage<R> flatMapUsingService
(ServiceFactory<?, S> serviceFactory, TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> flatMapFn) Attaches a flat-mapping stage which applies the supplied function to each input item independently and emits all the items from theTraverser
it returns as the output items.<S,
R> StreamStage<R> mapStateful
(long ttl, SupplierEx<? extends S> createFn, TriFunction<? super S, ? super K, ? super T, ? extends R> mapFn, TriFunction<? super S, ? super K, ? super Long, ? extends R> onEvictFn) Attaches a stage that performs a stateful mapping operation.<S,
R> StreamStage<R> mapStateful
(SupplierEx<? extends S> createFn, TriFunction<? super S, ? super K, ? super T, ? extends R> mapFn) Attaches a stage that performs a stateful mapping operation.default <V,
R> StreamStage<R> mapUsingIMap
(IMap<K, V> iMap, BiFunctionEx<? super T, ? super V, ? extends R> mapFn) Attaches a mapping stage where for each item a lookup in the suppliedIMap
using the grouping key is performed and the result of the lookup is merged with the item and emitted.default <V,
R> StreamStage<R> mapUsingIMap
(String mapName, BiFunctionEx<? super T, ? super V, ? extends R> mapFn) Attaches a mapping stage where for each item a lookup in theIMap
with the supplied name using the grouping key is performed and the result of the lookup is merged with the item and emitted.<S,
R> StreamStage<R> mapUsingService
(ServiceFactory<?, S> serviceFactory, TriFunction<? super S, ? super K, ? super T, ? extends R> mapFn) Attaches a mapping stage which applies the given function to each input item independently and emits the function's result as the output item.<S,
R> StreamStage<R> mapUsingServiceAsync
(ServiceFactory<?, S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, TriFunction<? super S, ? super K, ? super T, CompletableFuture<R>> mapAsyncFn) Asynchronous version ofGeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>)
: themapAsyncFn
returns aCompletableFuture<R>
instead of justR
.default <S,
R> StreamStage<R> mapUsingServiceAsync
(ServiceFactory<?, S> serviceFactory, TriFunction<? super S, ? super K, ? super T, CompletableFuture<R>> mapAsyncFn) Asynchronous version ofGeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>)
: themapAsyncFn
returns aCompletableFuture<R>
instead of justR
.<S,
R> StreamStage<R> mapUsingServiceAsyncBatched
(ServiceFactory<?, S> serviceFactory, int maxBatchSize, BiFunctionEx<? super S, ? super List<T>, ? extends CompletableFuture<List<R>>> mapAsyncFn) Batched version ofGeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>)
:mapAsyncFn
takes a list of input items and returns aCompletableFuture<List<R>>
.<S,
R> StreamStage<R> mapUsingServiceAsyncBatched
(ServiceFactory<?, S> serviceFactory, int maxBatchSize, TriFunction<? super S, ? super List<K>, ? super List<T>, ? extends CompletableFuture<List<R>>> mapAsyncFn) Batched version ofGeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>)
:mapAsyncFn
takes a list of input items (and a list of their corresponding keys) and returns aCompletableFuture<List<R>>
.default <A,
R> StreamStage<Map.Entry<K, R>> rollingAggregate
(long ttl, AggregateOperation1<? super T, A, ? extends R> aggrOp) Attaches a rolling aggregation stage.default <A,
R> StreamStage<Map.Entry<K, R>> rollingAggregate
(AggregateOperation1<? super T, A, ? extends R> aggrOp) Attaches a rolling aggregation stage.window
(WindowDefinition wDef) Adds the definition of the window to use in the group-and-aggregate pipeline stage being constructed.Methods inherited from interface com.hazelcast.jet.pipeline.GeneralStageWithKey
keyFn
-
Method Details
-
window
Adds the definition of the window to use in the group-and-aggregate pipeline stage being constructed. -
mapStateful
@Nonnull <S,R> StreamStage<R> mapStateful(long ttl, @Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends R> mapFn, @Nonnull TriFunction<? super S, ? super K, ? super Long, ? extends R> onEvictFn) Attaches a stage that performs a stateful mapping operation.createFn
returns the object that holds the state. Jet passes this object along with each input item tomapFn
, which can update the object's state. For each grouping key there's a separate state object. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable.If the given
ttl
is greater than zero, Jet will consider the state object stale if its time-to-live has expired. The state object for a given key has a timestamp attached to it: the top timestamp of any event with that key seen so far. Whenever the watermark advances, Jet discards all state objects with a timestamp less thanwm - ttl
. Just before discarding the state object, Jet callsonEvictFn
on it. The function can return an output item that will be emitted, ornull
if it doesn't need to emit an item. If TTL is used, Jet also drops late events; otherwise, all events are processed.This sample takes a stream of pairs
(serverId, latency)
representing the latencies of serving individual requests and keeps track, separately for each server, of the total latency accumulated over individual sessions — bursts of server activity separated by quiet periods of one minute or more. For each input item it outputs the accumulated latency so far and when a session ends, it outputs a special entry that reports the total latency for that session.StreamStage<Entry<String, Long>> latencies = null; StreamStage<Entry<String, Long>> cumulativeLatencies = latencies .groupingKey(Entry::getKey) .mapStateful( MINUTES.toMillis(1), LongAccumulator::new, (sum, key, entry) -> { sum.add(entry.getValue()); return entry(key, sum.get()); }, (sum, key, time) -> entry(String.format( "%s:totalForSession:%d", key, time), sum.get()) );
The given functions must be stateless and cooperative.
- Type Parameters:
S
- type of the state objectR
- type of the result- Parameters:
ttl
- time-to-live for each state object, disabled if zero or lesscreateFn
- function that returns the state objectmapFn
- function that receives the state object and the input item and outputs the result item. It may modify the state object.onEvictFn
- function that Jet calls when evicting a state object
-
mapStateful
@Nonnull <S,R> StreamStage<R> mapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends R> mapFn) Description copied from interface:GeneralStageWithKey
Attaches a stage that performs a stateful mapping operation.createFn
returns the object that holds the state. Jet passes this object along with each input item and its key tomapFn
, which can update the object's state. For each grouping key there's a separate state object. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable. If you want to return the state variable frommapFn
, then the return value must be a copy of state variable to avoid situations in which the result ofmapFn
is modified after being emitted or where the state is modified by downstream processors.If you want to return the state variable from
mapFn
, then the return value must be a copy of state variable to avoid situations in which the result ofmapFn
is modified after being emitted or where the state is modified by downstream processors.This sample takes a stream of pairs
(serverId, latency)
representing the latencies of serving individual requests and outputs the cumulative latency of all handled requests so far, for each server separately:
This code has the same result asGeneralStage<Entry<String, Long>> latencies; GeneralStage<Entry<String, Long>> cumulativeLatencies = latencies .groupingKey(Entry::getKey) .mapStateful( LongAccumulator::new, (sum, key, entry) -> { sum.add(entry.getValue()); return entry(key, sum.get()); } );
latencies.groupingKey(Entry::getKey).rollingAggregate(summing())
.This operation is subject to memory limits. See
InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more information.- Specified by:
mapStateful
in interfaceGeneralStageWithKey<T,
K> - Type Parameters:
S
- type of the state objectR
- type of the result- Parameters:
createFn
- function that returns the state objectmapFn
- function that receives the state object and the input item and outputs the result item. It may modify the state object. It must be stateless and cooperative.
-
filterStateful
@Nonnull <S> StreamStage<T> filterStateful(long ttl, @Nonnull SupplierEx<? extends S> createFn, @Nonnull BiPredicateEx<? super S, ? super T> filterFn) Attaches a stage that performs a stateful filtering operation.createFn
returns the object that holds the state. Jet passes this object along with each input item tofilterFn
, which can update the object's state. For each grouping key there's a separate state object. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable.If the given
ttl
is greater than zero, Jet will consider the state object stale if its time-to-live has expired. The state object has a timestamp attached to it: the top timestamp of any event with the same key seen so far. Upon seeing another event, Jet compares the state timestamp with the current watermark. If it is less thanwm - ttl
, it discards the state object and creates a new one before processing the event. If TTL is used, Jet also drops late events; otherwise, all events are processed.This sample receives a stream of pairs
(serverId, requestLatency)
that represent the latencies of individual requests served by a cluster of servers. It emits the record-breaking (worst so far) latencies for each server independently and resets the score after one minute of inactivity on a given server.StreamStage<Entry<String, Long>> latencies; StreamStage<Entry<String, Long>> topLatencies = latencies .groupingKey(Entry::getKey) .filterStateful( MINUTES.toMillis(1), LongAccumulator::new, (topLatencyState, entry) -> { long currLatency = entry.getValue(); long topLatency = topLatencyState.get(); topLatencyState.set(Math.max(currLatency, topLatency)); return currLatency > topLatency; } );
The given functions must be stateless and cooperative.
- Type Parameters:
S
- type of the state object- Parameters:
ttl
- time-to-live for each state object, disabled if zero or lesscreateFn
- function that returns the state objectfilterFn
- predicate that receives the state object and the input item and outputs a boolean value. It may modify the state object.
-
filterStateful
@Nonnull default <S> StreamStage<T> filterStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiPredicateEx<? super S, ? super T> filterFn) Description copied from interface:GeneralStageWithKey
Attaches a stage that performs a stateful filtering operation.createFn
returns the object that holds the state. Jet passes this object along with each input item and its key tofilterFn
, which can update the object's state. For each grouping key there's a separate state object. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable.This sample groups a stream of strings by length and decimates each group (throws out every 10th string of each length):
GeneralStage<String> decimated = input .groupingKey(String::length) .filterStateful( LongAccumulator::new, (counter, item) -> { counter.add(1); return counter.get() % 10 != 0; } );
The given functions must be stateless and cooperative.
This operation is subject to memory limits. See
InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more information.- Specified by:
filterStateful
in interfaceGeneralStageWithKey<T,
K> - Type Parameters:
S
- type of the state object- Parameters:
createFn
- function that returns the state objectfilterFn
- predicate that receives the state object and the input item and outputs a boolean value. It may modify the state object.
-
flatMapStateful
@Nonnull <S,R> StreamStage<R> flatMapStateful(long ttl, @Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> flatMapFn, @Nonnull TriFunction<? super S, ? super K, ? super Long, ? extends Traverser<R>> onEvictFn) Attaches a stage that performs a stateful flat-mapping operation.createFn
returns the object that holds the state. Jet passes this object along with each input item toflatMapFn
, which can update the object's state. For each grouping key there's a separate state object. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable.If the given
ttl
is greater than zero, Jet will consider the state object stale if its time-to-live has expired. The state object for a given key has a timestamp attached to it: the top timestamp of any event with that key seen so far. Whenever the watermark advances, Jet discards all state objects with a timestamp less thanwm - ttl
. Just before discarding the state object, Jet callsonEvictFn
on it. The function returns a traverser over the items it wants to emit, or it can return an empty traverser. If TTL is used, Jet also drops late events; otherwise, all events are processed.This sample groups a stream of strings by length and inserts punctuation (a special string) after every 10th string in each group, or after one minute elapses without further input for a given key:
StreamStage<String> punctuated = input .groupingKey(String::length) .flatMapStateful( MINUTES.toMillis(1), LongAccumulator::new, (counter, key, item) -> { counter.add(1); return counter.get() % 10 == 0 ? Traversers.traverseItems("punctuation" + key, item) : Traversers.singleton(item); }, (counter, key, wm) -> Traversers.singleton("punctuation" + key) );
The given functions must be stateless and cooperative.
- Type Parameters:
S
- type of the state objectR
- type of the result- Parameters:
ttl
- time-to-live for each state object, disabled if zero or lesscreateFn
- function that returns the state objectflatMapFn
- function that receives the state object and the input item and outputs the result items. It may modify the state object.onEvictFn
- function that Jet calls when evicting a state object
-
flatMapStateful
@Nonnull <S,R> StreamStage<R> flatMapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> flatMapFn) Description copied from interface:GeneralStageWithKey
Attaches a stage that performs a stateful flat-mapping operation.createFn
returns the object that holds the state. Jet passes this object along with each input item and its key toflatMapFn
, which can update the object's state. For each grouping key there's a separate state object. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable. If you want to return the state variable fromflatMapStateful
, then the return value must be a copy of state variable to avoid situations in which the result ofmapFn
is modified after being emitted or where the state is modified by downstream processors.If you want to return the state variable from
flatMapFn
, then the return value must be a copy of state variable to avoid situations in which the result ofmapFn
is modified after being emitted or where the state is modified by downstream processors.This sample groups a stream of strings by length and inserts punctuation (a special string) after every 10th string in each group:
GeneralStage<String> punctuated = input .groupingKey(String::length) .flatMapStateful( LongAccumulator::new, (counter, key, item) -> { counter.add(1); return counter.get() % 10 == 0 ? Traversers.traverseItems("punctuation" + key, item) : Traversers.singleton(item); } );
The given functions must be stateless and cooperative.
This operation is subject to memory limits. See
InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more information.- Specified by:
flatMapStateful
in interfaceGeneralStageWithKey<T,
K> - Type Parameters:
S
- type of the state objectR
- type of the result- Parameters:
createFn
- function that returns the state objectflatMapFn
- function that receives the state object and the input item and outputs the result items. It may modify the state object. It must not return null traverser, but can return an empty traverser.
-
rollingAggregate
@Nonnull default <A,R> StreamStage<Map.Entry<K,R>> rollingAggregate(@Nonnull AggregateOperation1<? super T, A, ? extends R> aggrOp) Description copied from interface:GeneralStageWithKey
Attaches a rolling aggregation stage. This is a special case of stateful mapping that uses anAggregateOperation
. It passes each input item to the accumulator and outputs the current result of aggregation (as returned by theexport
primitive).Sample usage:
For example, if your input isStreamStage<Entry<Color, Long>> aggregated = items .groupingKey(Item::getColor) .rollingAggregate(AggregateOperations.counting());
{2, 7, 8, -5}
, the output will be{2, 9, 17, 12}
.This stage is fault-tolerant and saves its state to the snapshot.
This operation is subject to memory limits. See
InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more information.- Specified by:
rollingAggregate
in interfaceGeneralStageWithKey<T,
K> R
- type of the aggregate operation result- Parameters:
aggrOp
- the aggregate operation to perform- Returns:
- the newly attached stage
-
rollingAggregate
@Nonnull default <A,R> StreamStage<Map.Entry<K,R>> rollingAggregate(long ttl, @Nonnull AggregateOperation1<? super T, A, ? extends R> aggrOp) Attaches a rolling aggregation stage. This is a special case of stateful mapping that uses anAggregateOperation
. It passes each input item to the accumulator and outputs the current result of aggregation (as returned by theexport
primitive).This sample takes a stream of items and gives rolling counts of items of each color:
StreamStage<Entry<Color, Long>> aggregated = items .groupingKey(Item::getColor) .rollingAggregate(AggregateOperations.counting());
If the given
ttl
is greater than zero, Jet will consider the accumulator object stale if its time-to-live has expired. The accumulator object has a timestamp attached to it: the top timestamp of any event with the same key seen so far. Upon seeing another event, Jet compares the accumulator timestamp with the current watermark. If it is less thanwm - ttl
, it discards the accumulator object and creates a new one before processing the event.This stage is fault-tolerant and saves its state to the snapshot.
- Type Parameters:
R
- type of the aggregate operation result- Parameters:
aggrOp
- the aggregate operation to perform- Returns:
- the newly attached stage
-
mapUsingIMap
@Nonnull default <V,R> StreamStage<R> mapUsingIMap(@Nonnull String mapName, @Nonnull BiFunctionEx<? super T, ? super V, ? extends R> mapFn) Description copied from interface:GeneralStageWithKey
Attaches a mapping stage where for each item a lookup in theIMap
with the supplied name using the grouping key is performed and the result of the lookup is merged with the item and emitted.If the result of the mapping is
null
, it emits nothing. Therefore, this stage can be used to implement filtering semantics as well.The mapping logic is equivalent to:
Sample usage:V value = map.get(groupingKey); return mapFn.apply(item, value);
This stage is similar toitems.groupingKey(Item::getDetailId) .mapUsingIMap( "enriching-map", (Item item, ItemDetail detail) -> item.setDetail(detail) );
stageWithoutKey.mapUsingIMap()
, but here Jet knows the key and uses it to partition and distribute the input in order to achieve data locality. The value it fetches from theIMap
is stored on the cluster member where the processing takes place. However, if the map doesn't use the default partitioning strategy, the data locality will be broken.- Specified by:
mapUsingIMap
in interfaceGeneralStageWithKey<T,
K> - Type Parameters:
V
- type of the value in theIMap
R
- type of the output item- Parameters:
mapName
- name of theIMap
mapFn
- the mapping function. It must be stateless and cooperative.- Returns:
- the newly attached stage
-
mapUsingIMap
@Nonnull default <V,R> StreamStage<R> mapUsingIMap(@Nonnull IMap<K, V> iMap, @Nonnull BiFunctionEx<? super T, ? super V, ? extends R> mapFn) Description copied from interface:GeneralStageWithKey
Attaches a mapping stage where for each item a lookup in the suppliedIMap
using the grouping key is performed and the result of the lookup is merged with the item and emitted.If the result of the mapping is
null
, it emits nothing. Therefore, this stage can be used to implement filtering semantics as well.The mapping logic is equivalent to:
Sample usage:V value = map.get(groupingKey); return mapFn.apply(item, value);
This stage is similar toitems.groupingKey(Item::getDetailId) .mapUsingIMap(enrichingMap, (item, detail) -> item.setDetail(detail));
stageWithoutKey.mapUsingIMap()
, but here Jet knows the key and uses it to partition and distribute the input in order to achieve data locality. The value it fetches from theIMap
is stored on the cluster member where the processing takes place. However, if the map doesn't use the default partitioning strategy, data locality will be broken.- Specified by:
mapUsingIMap
in interfaceGeneralStageWithKey<T,
K> - Type Parameters:
V
- type of the value in theIMap
R
- type of the output item- Parameters:
iMap
- theIMap
to use as the servicemapFn
- the mapping function. It must be stateless and cooperative.- Returns:
- the newly attached stage
-
mapUsingService
@Nonnull <S,R> StreamStage<R> mapUsingService(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends R> mapFn) Description copied from interface:GeneralStageWithKey
Attaches a mapping stage which applies the given function to each input item independently and emits the function's result as the output item. The mapping function receives another parameter, the service object, which Jet will create using the suppliedserviceFactory
. If the mapping result isnull
, it emits nothing. Therefore, this stage can be used to implement filtering semantics as well.Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
items.groupingKey(Item::getDetailId) .mapUsingService( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()), (reg, key, item) -> item.setDetail(reg.fetchDetail(key)) );
Interaction with fault-tolerant unbounded jobs
If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.- Specified by:
mapUsingService
in interfaceGeneralStageWithKey<T,
K> - Type Parameters:
S
- type of service objectR
- the result type of the mapping function- Parameters:
serviceFactory
- the service factorymapFn
- a mapping function. It must be stateless. It must be cooperative, if the service is cooperative.- Returns:
- the newly attached stage
-
mapUsingServiceAsync
@Nonnull default <S,R> StreamStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull TriFunction<? super S, ? super K, ? super T, CompletableFuture<R>> mapAsyncFn) Description copied from interface:GeneralStageWithKey
Asynchronous version ofGeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>)
: themapAsyncFn
returns aCompletableFuture<R>
instead of justR
.Uses default values for some extra parameters, so the maximum number of concurrent async operations per processor will be limited to 4 and whether or not the order of input items should be preserved will be true.
The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
Sample usage:
The latency of the async call will add to the total latency of the output.items.groupingKey(Item::getDetailId) .mapUsingServiceAsync( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()), (reg, key, item) -> reg.fetchDetailAsync(key).thenApply(item::setDetail) );
- Specified by:
mapUsingServiceAsync
in interfaceGeneralStageWithKey<T,
K> - Type Parameters:
S
- type of service objectR
- the future's result type of the mapping function- Parameters:
serviceFactory
- the service factorymapAsyncFn
- a mapping function. Can map to null (return a null future). It must be stateless and cooperative.- Returns:
- the newly attached stage
-
mapUsingServiceAsync
@Nonnull <S,R> StreamStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?, S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, @Nonnull TriFunction<? super S, ? super K, ? super T, CompletableFuture<R>> mapAsyncFn) Description copied from interface:GeneralStageWithKey
Asynchronous version ofGeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>)
: themapAsyncFn
returns aCompletableFuture<R>
instead of justR
.The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
Sample usage:
The latency of the async call will add to the total latency of the output.items.groupingKey(Item::getDetailId) .mapUsingServiceAsync( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()), 16, true, (reg, key, item) -> reg.fetchDetailAsync(key).thenApply(item::setDetail) );
- Specified by:
mapUsingServiceAsync
in interfaceGeneralStageWithKey<T,
K> - Type Parameters:
S
- type of service objectR
- the future's result type of the mapping function- Parameters:
serviceFactory
- the service factorymaxConcurrentOps
- maximum number of concurrent async operations per processorpreserveOrder
- whether the async responses are ordered or notmapAsyncFn
- a mapping function. Can map to null (return a null future). It must be stateless and cooperative.- Returns:
- the newly attached stage
-
mapUsingServiceAsyncBatched
@Nonnull <S,R> StreamStage<R> mapUsingServiceAsyncBatched(@Nonnull ServiceFactory<?, S> serviceFactory, int maxBatchSize, @Nonnull BiFunctionEx<? super S, ? super List<T>, ? extends CompletableFuture<List<R>>> mapAsyncFn) Description copied from interface:GeneralStageWithKey
Batched version ofGeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>)
:mapAsyncFn
takes a list of input items and returns aCompletableFuture<List<R>>
. The size of list is limited by the givenmaxBatchSize
.This transform can perform filtering by putting
null
elements into the output list.The latency of the async call will add to the total latency of the output.
This sample takes a stream of stock items and sets the
detail
field on them by performing batched lookups from a registry. The max size of the items to lookup is specified as100
:items.groupingKey(Item::getDetailId) .mapUsingServiceAsyncBatched( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()), 100, (reg, itemList) -> reg .fetchDetailsAsync(itemList.stream().map(Item::getDetailId).collect(Collectors.toList())) .thenApply(details -> { for (int i = 0; i < itemList.size(); i++) { itemList.get(i).setDetail(details.get(i)); } return itemList; }) );
- Specified by:
mapUsingServiceAsyncBatched
in interfaceGeneralStageWithKey<T,
K> - Type Parameters:
S
- type of service objectR
- the future result type of the mapping function- Parameters:
serviceFactory
- the service factorymaxBatchSize
- max size of the input listmapAsyncFn
- a mapping function. It must be stateless. It must be cooperative, if the service is cooperative.- Returns:
- the newly attached stage
-
mapUsingServiceAsyncBatched
@Nonnull <S,R> StreamStage<R> mapUsingServiceAsyncBatched(@Nonnull ServiceFactory<?, S> serviceFactory, int maxBatchSize, @Nonnull TriFunction<? super S, ? super List<K>, ? super List<T>, ? extends CompletableFuture<List<R>>> mapAsyncFn) Description copied from interface:GeneralStageWithKey
Batched version ofGeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>)
:mapAsyncFn
takes a list of input items (and a list of their corresponding keys) and returns aCompletableFuture<List<R>>
. The sizes of the input lists are identical and are limited by the givenmaxBatchSize
. The key at index N corresponds to the input item at index N.The number of in-flight batches being completed asynchronously is limited to 2 and this mapping operation always preserves the order of input elements.
This transform can perform filtering by putting
null
elements into the output list.The latency of the async call will add to the total latency of the output.
This sample takes a stream of stock items and sets the
detail
field on them by performing batched lookups from a registry. The max size of the items to lookup is specified as100
:items.groupingKey(Item::getDetailId) .mapUsingServiceAsyncBatched( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()), 100, (reg, keyList, itemList) -> reg.fetchDetailsAsync(keyList).thenApply(details -> { for (int i = 0; i < itemList.size(); i++) { itemList.get(i).setDetail(details.get(i)); } return itemList; }) );
- Specified by:
mapUsingServiceAsyncBatched
in interfaceGeneralStageWithKey<T,
K> - Type Parameters:
S
- type of service objectR
- the future result type of the mapping function- Parameters:
serviceFactory
- the service factorymaxBatchSize
- max size of the input listmapAsyncFn
- a mapping function. It must be stateless and cooperative.- Returns:
- the newly attached stage
-
filterUsingService
@Nonnull <S> StreamStage<T> filterUsingService(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull TriPredicate<? super S, ? super K, ? super T> filterFn) Description copied from interface:GeneralStageWithKey
Attaches a filtering stage which applies the provided predicate function to each input item to decide whether to pass the item to the output or to discard it. The predicate function receives another parameter, the service object, which Jet will create using the suppliedserviceFactory
.The number of in-flight batches being completed asynchronously is limited to 2 and this mapping operation always preserves the order of input elements.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
items.groupingKey(Item::getDetailId) .filterUsingService( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()), (reg, key, item) -> reg.fetchDetail(key).contains("blade") );
Interaction with fault-tolerant unbounded jobs
If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.- Specified by:
filterUsingService
in interfaceGeneralStageWithKey<T,
K> - Type Parameters:
S
- type of service object- Parameters:
serviceFactory
- the service factoryfilterFn
- a filter predicate function. It must be stateless. It must be cooperative, if the service is cooperative.- Returns:
- the newly attached stage
-
flatMapUsingService
@Nonnull <S,R> StreamStage<R> flatMapUsingService(@Nonnull ServiceFactory<?, S> serviceFactory, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> flatMapFn) Description copied from interface:GeneralStageWithKey
Attaches a flat-mapping stage which applies the supplied function to each input item independently and emits all the items from theTraverser
it returns as the output items. The traverser must be null-terminated. The mapping function receives another parameter, the service object, which Jet will create using the suppliedserviceFactory
.Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
StreamStage<Part> parts = products .groupingKey(Product::getId) .flatMapUsingService( ServiceFactories.sharedService(ctx -> new PartRegistry()), (registry, productId, product) -> Traversers.traverseIterable( registry.fetchParts(productId)) );
Interaction with fault-tolerant unbounded jobs
If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.- Specified by:
flatMapUsingService
in interfaceGeneralStageWithKey<T,
K> - Type Parameters:
S
- type of service objectR
- type of the output items- Parameters:
serviceFactory
- the service factoryflatMapFn
- a flatmapping function. It must not return null traverser, but can return an empty traverser. It must be stateless. It must be cooperative, if the service is cooperative.- Returns:
- the newly attached stage
-
customTransform
@Nonnull default <R> StreamStage<R> customTransform(@Nonnull String stageName, @Nonnull SupplierEx<Processor> procSupplier) Description copied from interface:GeneralStageWithKey
Attaches a stage with a custom transform based on the provided supplier of Core APIProcessor
s. The inbound edge will be distributed and partitioned using the key function assigned to this stage.Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
- Specified by:
customTransform
in interfaceGeneralStageWithKey<T,
K> - Type Parameters:
R
- the type of the output items- Parameters:
stageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors
-
customTransform
@Nonnull default <R> StreamStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorSupplier procSupplier) Description copied from interface:GeneralStageWithKey
Attaches a stage with a custom transform based on the provided supplier of Core APIProcessor
s. The inbound edge will be distributed and partitioned using the key function assigned to this stage.Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
- Specified by:
customTransform
in interfaceGeneralStageWithKey<T,
K> - Type Parameters:
R
- the type of the output items- Parameters:
stageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors
-
customTransform
@Nonnull <R> StreamStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier) Description copied from interface:GeneralStageWithKey
Attaches a stage with a custom transform based on the provided supplier of Core APIProcessor
s. The inbound edge will be distributed and partitioned using the key function assigned to this stage.Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
- Specified by:
customTransform
in interfaceGeneralStageWithKey<T,
K> - Type Parameters:
R
- the type of the output items- Parameters:
stageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors
-