T
- type of the stream itemK
- type of the grouping keypublic interface GeneralStageWithKey<T,K>
Modifier and Type | Method and Description |
---|---|
<R> GeneralStage<R> |
customTransform(String stageName,
ProcessorMetaSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
<R> GeneralStage<R> |
customTransform(String stageName,
ProcessorSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
<R> GeneralStage<R> |
customTransform(String stageName,
SupplierEx<Processor> procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
<S> GeneralStage<T> |
filterStateful(SupplierEx<? extends S> createFn,
BiPredicateEx<? super S,? super T> filterFn)
Attaches a stage that performs a stateful filtering operation.
|
<S> GeneralStage<T> |
filterUsingService(ServiceFactory<?,S> serviceFactory,
TriPredicate<? super S,? super K,? super T> filterFn)
Deprecated.
Jet now has first-class support for data rebalancing, see
GeneralStage.rebalance() and GeneralStage.rebalance(FunctionEx) . |
<S,R> GeneralStage<R> |
flatMapStateful(SupplierEx<? extends S> createFn,
TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn)
Attaches a stage that performs a stateful flat-mapping operation.
|
<S,R> GeneralStage<R> |
flatMapUsingService(ServiceFactory<?,S> serviceFactory,
TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn)
Deprecated.
Jet now has first-class support for data rebalancing, see
GeneralStage.rebalance() and GeneralStage.rebalance(FunctionEx) . |
FunctionEx<? super T,? extends K> |
keyFn()
Returns the function that extracts the key from stream items.
|
<S,R> GeneralStage<R> |
mapStateful(SupplierEx<? extends S> createFn,
TriFunction<? super S,? super K,? super T,? extends R> mapFn)
Attaches a stage that performs a stateful mapping operation.
|
default <V,R> GeneralStage<R> |
mapUsingIMap(IMap<K,V> iMap,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
supplied
IMap using the grouping key is performed
and the result of the lookup is merged with the item and emitted. |
default <V,R> GeneralStage<R> |
mapUsingIMap(String mapName,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
IMap with the supplied name using the grouping key is performed
and the result of the lookup is merged with the item and emitted. |
<S,R> GeneralStage<R> |
mapUsingService(ServiceFactory<?,S> serviceFactory,
TriFunction<? super S,? super K,? super T,? extends R> mapFn)
Deprecated.
Jet now has first-class support for data rebalancing, see
GeneralStage.rebalance() and GeneralStage.rebalance(FunctionEx) . |
<S,R> GeneralStage<R> |
mapUsingServiceAsync(ServiceFactory<?,S> serviceFactory,
int maxConcurrentOps,
boolean preserveOrder,
TriFunction<? super S,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
Deprecated.
Jet now has first-class support for data rebalancing, see
GeneralStage.rebalance() and GeneralStage.rebalance(FunctionEx) . |
default <S,R> GeneralStage<R> |
mapUsingServiceAsync(ServiceFactory<?,S> serviceFactory,
TriFunction<? super S,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
Deprecated.
Jet now has first-class support for data rebalancing, see
GeneralStage.rebalance() and GeneralStage.rebalance(FunctionEx) . |
<S,R> GeneralStage<R> |
mapUsingServiceAsyncBatched(ServiceFactory<?,S> serviceFactory,
int maxBatchSize,
BiFunctionEx<? super S,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
Deprecated.
Jet now has first-class support for data rebalancing, see
GeneralStage.rebalance() and GeneralStage.rebalance(FunctionEx) . |
<S,R> GeneralStage<R> |
mapUsingServiceAsyncBatched(ServiceFactory<?,S> serviceFactory,
int maxBatchSize,
TriFunction<? super S,? super List<K>,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
Deprecated.
Jet now has first-class support for data rebalancing, see
GeneralStage.rebalance() and GeneralStage.rebalance(FunctionEx) . |
default <A,R> GeneralStage<Map.Entry<K,R>> |
rollingAggregate(AggregateOperation1<? super T,A,? extends R> aggrOp)
Attaches a rolling aggregation stage.
|
@Nonnull FunctionEx<? super T,? extends K> keyFn()
@Nonnull <S,R> GeneralStage<R> mapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S,? super K,? super T,? extends R> mapFn)
createFn
returns the object that holds the state. Jet passes this
object along with each input item and its key to mapFn
, which
can update the object's state. For each grouping key there's a separate
state object. The state object will be included in the state snapshot,
so it survives job restarts. For this reason it must be serializable.
If you want to return the state variable from mapFn
,
then the return value must be a copy of state variable to avoid
situations in which the result of mapFn
is modified
after being emitted or where the state is modified by downstream processors.
If you want to return the state variable from mapFn
, then the
return value must be a copy of state variable to avoid situations in
which the result of mapFn
is modified after being emitted or
where the state is modified by downstream processors.
This sample takes a stream of pairs (serverId, latency)
representing the latencies of serving individual requests and outputs
the cumulative latency of all handled requests so far, for each
server separately:
GeneralStage<Entry<String, Long>> latencies;
GeneralStage<Entry<String, Long>> cumulativeLatencies = latencies
.groupingKey(Entry::getKey)
.mapStateful(
LongAccumulator::new,
(sum, key, entry) -> {
sum.add(entry.getValue());
return entry(key, sum.get());
}
);
This code has the same result as latencies.groupingKey(Entry::getKey).rollingAggregate(summing())
.
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
S
- type of the state objectR
- type of the resultcreateFn
- function that returns the state objectmapFn
- function that receives the state object and the input item and
outputs the result item. It may modify the state object. It must be
stateless and cooperative.@Nonnull <S> GeneralStage<T> filterStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiPredicateEx<? super S,? super T> filterFn)
createFn
returns the object that holds the state. Jet passes this
object along with each input item and its key to filterFn
, which
can update the object's state. For each grouping key there's a separate
state object. The state object will be included in the state snapshot,
so it survives job restarts. For this reason it must be serializable.
This sample groups a stream of strings by length and decimates each group (throws out every 10th string of each length):
GeneralStage<String> decimated = input
.groupingKey(String::length)
.filterStateful(
LongAccumulator::new,
(counter, item) -> {
counter.add(1);
return counter.get() % 10 != 0;
}
);
The given functions must be stateless and cooperative.
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
S
- type of the state objectcreateFn
- function that returns the state objectfilterFn
- predicate that receives the state object and the input item and
outputs a boolean value. It may modify the state object.@Nonnull <S,R> GeneralStage<R> flatMapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn)
createFn
returns the object that holds the state. Jet passes this
object along with each input item and its key to flatMapFn
,
which can update the object's state. For each grouping key there's a
separate state object. The state object will be included in the state
snapshot, so it survives job restarts. For this reason it must be
serializable.
If you want to return the state variable from flatMapStateful
,
then the return value must be a copy of state variable to avoid
situations in which the result of mapFn
is modified
after being emitted or where the state is modified by downstream processors.
If you want to return the state variable from flatMapFn
, then the
return value must be a copy of state variable to avoid situations in
which the result of mapFn
is modified after being emitted or
where the state is modified by downstream processors.
This sample groups a stream of strings by length and inserts punctuation (a special string) after every 10th string in each group:
GeneralStage<String> punctuated = input
.groupingKey(String::length)
.flatMapStateful(
LongAccumulator::new,
(counter, key, item) -> {
counter.add(1);
return counter.get() % 10 == 0
? Traversers.traverseItems("punctuation" + key, item)
: Traversers.singleton(item);
}
);
The given functions must be stateless and cooperative.
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
S
- type of the state objectR
- type of the resultcreateFn
- function that returns the state objectflatMapFn
- function that receives the state object and the input item and
outputs the result items. It may modify the state
object. It must not return null traverser, but can
return an empty traverser.@Nonnull default <A,R> GeneralStage<Map.Entry<K,R>> rollingAggregate(@Nonnull AggregateOperation1<? super T,A,? extends R> aggrOp)
AggregateOperation
. It passes each input item to
the accumulator and outputs the current result of aggregation (as
returned by the export
primitive).
Sample usage:
StreamStage<Entry<Color, Long>> aggregated = items
.groupingKey(Item::getColor)
.rollingAggregate(AggregateOperations.counting());
For example, if your input is {2, 7, 8, -5}
, the output will be
{2, 9, 17, 12}
.
This stage is fault-tolerant and saves its state to the snapshot.
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
R
- type of the aggregate operation resultaggrOp
- the aggregate operation to perform@Nonnull <S,R> GeneralStage<R> mapUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull TriFunction<? super S,? super K,? super T,? extends R> mapFn)
GeneralStage.rebalance()
and GeneralStage.rebalance(FunctionEx)
.serviceFactory
. If the
mapping result is null
, it emits nothing. Therefore this stage
can be used to implement filtering semantics as well.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingService(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
(reg, key, item) -> item.setDetail(reg.fetchDetail(key))
);
S
- type of service objectR
- the result type of the mapping functionserviceFactory
- the service factorymapFn
- a mapping function. It must be stateless. It must be
cooperative, if the service
is cooperative.@Nonnull default <S,R> GeneralStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull TriFunction<? super S,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
GeneralStage.rebalance()
and GeneralStage.rebalance(FunctionEx)
.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>)
: the mapAsyncFn
returns a CompletableFuture<R>
instead of just R
.
Uses default values for some extra parameters, so the maximum number of concurrent async operations per processor will be limited to and whether or not the order of input items should be preserved will be .
The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingServiceAsync(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
(reg, key, item) -> reg.fetchDetailAsync(key).thenApply(item::setDetail)
);
The latency of the async call will add to the total latency of the
output.S
- type of service objectR
- the future's result type of the mapping functionserviceFactory
- the service factorymapAsyncFn
- a mapping function. Can map to null (return a null
future). It must be stateless and cooperative.@Nonnull <S,R> GeneralStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?,S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, @Nonnull TriFunction<? super S,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
GeneralStage.rebalance()
and GeneralStage.rebalance(FunctionEx)
.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>)
: the mapAsyncFn
returns a CompletableFuture<R>
instead of just R
.
The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingServiceAsync(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
16,
true,
(reg, key, item) -> reg.fetchDetailAsync(key).thenApply(item::setDetail)
);
The latency of the async call will add to the total latency of the
output.S
- type of service objectR
- the future's result type of the mapping functionserviceFactory
- the service factorymaxConcurrentOps
- maximum number of concurrent async operations per processorpreserveOrder
- whether the async responses are ordered or notmapAsyncFn
- a mapping function. Can map to null (return
a null future). It must be stateless and cooperative.@Nonnull <S,R> GeneralStage<R> mapUsingServiceAsyncBatched(@Nonnull ServiceFactory<?,S> serviceFactory, int maxBatchSize, @Nonnull BiFunctionEx<? super S,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
GeneralStage.rebalance()
and GeneralStage.rebalance(FunctionEx)
.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>)
: mapAsyncFn
takes
a list of input items and returns a CompletableFuture<List<R>>
.
The size of list is limited by the given maxBatchSize
.
This transform can perform filtering by putting null
elements into
the output list.
The latency of the async call will add to the total latency of the output.
This sample takes a stream of stock items and sets the detail
field on them by performing batched lookups from a registry. The max
size of the items to lookup is specified as 100
:
items.groupingKey(Item::getDetailId)
.mapUsingServiceAsyncBatched(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
100,
(reg, itemList) -> reg
.fetchDetailsAsync(itemList.stream().map(Item::getDetailId).collect(Collectors.toList()))
.thenApply(details -> {
for (int i = 0; i < itemList.size(); i++) {
itemList.get(i).setDetail(details.get(i));
}
return itemList;
})
);
S
- type of service objectR
- the future result type of the mapping functionserviceFactory
- the service factorymaxBatchSize
- max size of the input listmapAsyncFn
- a mapping function. It must be stateless. It must be
cooperative, if the service
is cooperative.@Nonnull <S,R> GeneralStage<R> mapUsingServiceAsyncBatched(@Nonnull ServiceFactory<?,S> serviceFactory, int maxBatchSize, @Nonnull TriFunction<? super S,? super List<K>,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
GeneralStage.rebalance()
and GeneralStage.rebalance(FunctionEx)
.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>)
: mapAsyncFn
takes
a list of input items (and a list of their corresponding keys) and
returns a CompletableFuture<List<R>>
.
The sizes of the input lists are identical and are limited by the given
maxBatchSize
. The key at index N corresponds to the input item
at index N.
The number of in-flight batches being completed asynchronously is limited to and this mapping operation always preserves the order of input elements.
This transform can perform filtering by putting null
elements into
the output list.
The latency of the async call will add to the total latency of the output.
This sample takes a stream of stock items and sets the detail
field on them by performing batched lookups from a registry. The max
size of the items to lookup is specified as 100
:
items.groupingKey(Item::getDetailId)
.mapUsingServiceAsyncBatched(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
100,
(reg, keyList, itemList) -> reg.fetchDetailsAsync(keyList).thenApply(details -> {
for (int i = 0; i < itemList.size(); i++) {
itemList.get(i).setDetail(details.get(i));
}
return itemList;
})
);
S
- type of service objectR
- the future result type of the mapping functionserviceFactory
- the service factorymaxBatchSize
- max size of the input listmapAsyncFn
- a mapping function. It must be stateless and
cooperative.@Nonnull <S> GeneralStage<T> filterUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull TriPredicate<? super S,? super K,? super T> filterFn)
GeneralStage.rebalance()
and GeneralStage.rebalance(FunctionEx)
.serviceFactory
.
The number of in-flight batches being completed asynchronously is limited to and this mapping operation always preserves the order of input elements.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
items.groupingKey(Item::getDetailId)
.filterUsingService(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
(reg, key, item) -> reg.fetchDetail(key).contains("blade")
);
S
- type of service objectserviceFactory
- the service factoryfilterFn
- a filter predicate function. It must be stateless. It
must be cooperative, if the
service is cooperative.@Nonnull <S,R> GeneralStage<R> flatMapUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn)
GeneralStage.rebalance()
and GeneralStage.rebalance(FunctionEx)
.Traverser
it returns as the output items. The traverser must
be null-terminated. The mapping function receives another
parameter, the service object, which Jet will create using the supplied
serviceFactory
.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
StreamStage<Part> parts = products
.groupingKey(Product::getId)
.flatMapUsingService(
ServiceFactories.sharedService(ctx -> new PartRegistry()),
(registry, productId, product) -> Traversers.traverseIterable(
registry.fetchParts(productId))
);
S
- type of service objectR
- type of the output itemsserviceFactory
- the service factoryflatMapFn
- a flatmapping function. It must not return null
traverser, but can return an empty
traverser. It must be stateless. It must be cooperative, if the service is
cooperative.@Nonnull default <V,R> GeneralStage<R> mapUsingIMap(@Nonnull String mapName, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
IMap
with the supplied name using the grouping key is performed
and the result of the lookup is merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore this stage can be used to implement filtering semantics as well.
The mapping logic is equivalent to:
V value = map.get(groupingKey);
return mapFn.apply(item, value);
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingIMap(
"enriching-map",
(Item item, ItemDetail detail) -> item.setDetail(detail)
);
This stage is similar to stageWithoutKey.mapUsingIMap()
,
but here Jet knows the key and uses it to partition and distribute the input in order
to achieve data locality. The value it fetches from the IMap
is
stored on the cluster member where the processing takes place. However,
if the map doesn't use the default partitioning strategy, the data
locality will be broken.V
- type of the value in the IMap
R
- type of the output itemmapName
- name of the IMap
mapFn
- the mapping function. It must be stateless and cooperative.@Nonnull default <V,R> GeneralStage<R> mapUsingIMap(@Nonnull IMap<K,V> iMap, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
IMap
using the grouping key is performed
and the result of the lookup is merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore this stage can be used to implement filtering semantics as well.
The mapping logic is equivalent to:
V value = map.get(groupingKey);
return mapFn.apply(item, value);
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingIMap(enrichingMap, (item, detail) -> item.setDetail(detail));
This stage is similar to stageWithoutKey.mapUsingIMap()
,
but here Jet knows the key and uses it to partition and distribute the
input in order to achieve data locality. The value it fetches from the
IMap
is stored on the cluster member where the processing takes
place. However, if the map doesn't use the default partitioning strategy,
data locality will be broken.V
- type of the value in the IMap
R
- type of the output itemiMap
- the IMap
to use as the servicemapFn
- the mapping function. It must be stateless and cooperative.@Nonnull <R> GeneralStage<R> customTransform(@Nonnull String stageName, @Nonnull SupplierEx<Processor> procSupplier)
Processor
s. The inbound edge will be distributed and
partitioned using the key function assigned to this stage.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull <R> GeneralStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorSupplier procSupplier)
Processor
s. The inbound edge will be distributed and
partitioned using the key function assigned to this stage.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull <R> GeneralStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier)
Processor
s. The inbound edge will be distributed and
partitioned using the key function assigned to this stage.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processorsCopyright © 2022 Hazelcast, Inc.. All rights reserved.