T
- type of the input itemK
- type of the keypublic interface BatchStageWithKey<T,K> extends GeneralStageWithKey<T,K>
Modifier and Type | Method and Description |
---|---|
<R> BatchStage<Map.Entry<K,R>> |
aggregate(AggregateOperation1<? super T,?,? extends R> aggrOp)
Attaches a stage that performs the given group-and-aggregate operation.
|
default <T1,R0,R1> BatchStage<Map.Entry<K,Tuple2<R0,R1>>> |
aggregate2(AggregateOperation1<? super T,?,? extends R0> aggrOp0,
BatchStageWithKey<? extends T1,? extends K> stage1,
AggregateOperation1<? super T1,?,? extends R1> aggrOp1)
Attaches a stage that performs the given cogroup-and-aggregate
transformation of the items from both this stage and
stage1
you supply. |
<T1,R> BatchStage<Map.Entry<K,R>> |
aggregate2(BatchStageWithKey<T1,? extends K> stage1,
AggregateOperation2<? super T,? super T1,?,R> aggrOp)
Attaches a stage that performs the given cogroup-and-aggregate operation
over the items from both this stage and
stage1 you supply. |
default <T1,T2,R0,R1,R2> |
aggregate3(AggregateOperation1<? super T,?,? extends R0> aggrOp0,
BatchStageWithKey<T1,? extends K> stage1,
AggregateOperation1<? super T1,?,? extends R1> aggrOp1,
BatchStageWithKey<T2,? extends K> stage2,
AggregateOperation1<? super T2,?,? extends R2> aggrOp2)
Attaches a stage that performs the given cogroup-and-aggregate
transformation of the items from this stage as well as
stage1
and stage2 you supply. |
<T1,T2,R> BatchStage<Map.Entry<K,R>> |
aggregate3(BatchStageWithKey<T1,? extends K> stage1,
BatchStageWithKey<T2,? extends K> stage2,
AggregateOperation3<? super T,? super T1,? super T2,?,? extends R> aggrOp)
Attaches a stage that performs the given cogroup-and-aggregate operation
over the items from this stage as well as
stage1 and stage2 you supply. |
default GroupAggregateBuilder1<T,K> |
aggregateBuilder()
Offers a step-by-step API to build a pipeline stage that co-aggregates
the data from several input stages.
|
default <R0> GroupAggregateBuilder<K,R0> |
aggregateBuilder(AggregateOperation1<? super T,?,? extends R0> aggrOp0)
Offers a step-by-step API to build a pipeline stage that co-aggregates
the data from several input stages.
|
<R> BatchStage<R> |
customTransform(String stageName,
ProcessorMetaSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
default <R> BatchStage<R> |
customTransform(String stageName,
ProcessorSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
default <R> BatchStage<R> |
customTransform(String stageName,
SupplierEx<Processor> procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
BatchStage<T> |
distinct()
Attaches a stage that emits just the items that are distinct according
to the grouping key (no two items which map to the same key will be on
the output).
|
<S> BatchStage<T> |
filterStateful(SupplierEx<? extends S> createFn,
BiPredicateEx<? super S,? super T> filterFn)
Attaches a stage that performs a stateful filtering operation.
|
<S> BatchStage<T> |
filterUsingService(ServiceFactory<?,S> serviceFactory,
TriPredicate<? super S,? super K,? super T> filterFn)
Attaches a filtering stage which applies the provided predicate function
to each input item to decide whether to pass the item to the output or
to discard it.
|
<S,R> BatchStage<R> |
flatMapStateful(SupplierEx<? extends S> createFn,
TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn)
Attaches a stage that performs a stateful flat-mapping operation.
|
<S,R> BatchStage<R> |
flatMapUsingService(ServiceFactory<?,S> serviceFactory,
TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn)
Attaches a flat-mapping stage which applies the supplied function to
each input item independently and emits all the items from the
Traverser it returns as the output items. |
<S,R> BatchStage<R> |
mapStateful(SupplierEx<? extends S> createFn,
TriFunction<? super S,? super K,? super T,? extends R> mapFn)
Attaches a stage that performs a stateful mapping operation.
|
default <V,R> BatchStage<R> |
mapUsingIMap(IMap<K,V> iMap,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
supplied
IMap using the grouping key is performed
and the result of the lookup is merged with the item and emitted. |
default <V,R> BatchStage<R> |
mapUsingIMap(String mapName,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
IMap with the supplied name using the grouping key is performed
and the result of the lookup is merged with the item and emitted. |
<S,R> BatchStage<R> |
mapUsingService(ServiceFactory<?,S> serviceFactory,
TriFunction<? super S,? super K,? super T,? extends R> mapFn)
Attaches a mapping stage which applies the given function to each input
item independently and emits the function's result as the output item.
|
<S,R> BatchStage<R> |
mapUsingServiceAsync(ServiceFactory<?,S> serviceFactory,
int maxConcurrentOps,
boolean preserveOrder,
TriFunction<? super S,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
Asynchronous version of
GeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>) : the mapAsyncFn
returns a CompletableFuture<R> instead of just R . |
default <S,R> BatchStage<R> |
mapUsingServiceAsync(ServiceFactory<?,S> serviceFactory,
TriFunction<? super S,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
Asynchronous version of
GeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>) : the mapAsyncFn
returns a CompletableFuture<R> instead of just R . |
<S,R> BatchStage<R> |
mapUsingServiceAsyncBatched(ServiceFactory<?,S> serviceFactory,
int maxBatchSize,
BiFunctionEx<? super S,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
Batched version of
GeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>) : mapAsyncFn takes
a list of input items and returns a CompletableFuture<List<R>> . |
<S,R> BatchStage<R> |
mapUsingServiceAsyncBatched(ServiceFactory<?,S> serviceFactory,
int maxBatchSize,
TriFunction<? super S,? super List<K>,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
Batched version of
GeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>) : mapAsyncFn takes
a list of input items (and a list of their corresponding keys) and
returns a CompletableFuture<List<R>> . |
default <A,R> BatchStage<Map.Entry<K,R>> |
rollingAggregate(AggregateOperation1<? super T,A,? extends R> aggrOp)
Attaches a rolling aggregation stage.
|
keyFn
@Nonnull BatchStage<T> distinct()
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
@Nonnull <S,R> BatchStage<R> mapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S,? super K,? super T,? extends R> mapFn)
GeneralStageWithKey
createFn
returns the object that holds the state. Jet passes this
object along with each input item and its key to mapFn
, which
can update the object's state. For each grouping key there's a separate
state object. The state object will be included in the state snapshot,
so it survives job restarts. For this reason it must be serializable.
If you want to return the state variable from mapFn
,
then the return value must be a copy of state variable to avoid
situations in which the result of mapFn
is modified
after being emitted or where the state is modified by downstream processors.
If you want to return the state variable from mapFn
, then the
return value must be a copy of state variable to avoid situations in
which the result of mapFn
is modified after being emitted or
where the state is modified by downstream processors.
This sample takes a stream of pairs (serverId, latency)
representing the latencies of serving individual requests and outputs
the cumulative latency of all handled requests so far, for each
server separately:
GeneralStage<Entry<String, Long>> latencies;
GeneralStage<Entry<String, Long>> cumulativeLatencies = latencies
.groupingKey(Entry::getKey)
.mapStateful(
LongAccumulator::new,
(sum, key, entry) -> {
sum.add(entry.getValue());
return entry(key, sum.get());
}
);
This code has the same result as latencies.groupingKey(Entry::getKey).rollingAggregate(summing())
.
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
mapStateful
in interface GeneralStageWithKey<T,K>
S
- type of the state objectR
- type of the resultcreateFn
- function that returns the state objectmapFn
- function that receives the state object and the input item and
outputs the result item. It may modify the state object. It must be
stateless and cooperative.@Nonnull <S> BatchStage<T> filterStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiPredicateEx<? super S,? super T> filterFn)
GeneralStageWithKey
createFn
returns the object that holds the state. Jet passes this
object along with each input item and its key to filterFn
, which
can update the object's state. For each grouping key there's a separate
state object. The state object will be included in the state snapshot,
so it survives job restarts. For this reason it must be serializable.
This sample groups a stream of strings by length and decimates each group (throws out every 10th string of each length):
GeneralStage<String> decimated = input
.groupingKey(String::length)
.filterStateful(
LongAccumulator::new,
(counter, item) -> {
counter.add(1);
return counter.get() % 10 != 0;
}
);
The given functions must be stateless and cooperative.
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
filterStateful
in interface GeneralStageWithKey<T,K>
S
- type of the state objectcreateFn
- function that returns the state objectfilterFn
- predicate that receives the state object and the input item and
outputs a boolean value. It may modify the state object.@Nonnull <S,R> BatchStage<R> flatMapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn)
GeneralStageWithKey
createFn
returns the object that holds the state. Jet passes this
object along with each input item and its key to flatMapFn
,
which can update the object's state. For each grouping key there's a
separate state object. The state object will be included in the state
snapshot, so it survives job restarts. For this reason it must be
serializable.
If you want to return the state variable from flatMapStateful
,
then the return value must be a copy of state variable to avoid
situations in which the result of mapFn
is modified
after being emitted or where the state is modified by downstream processors.
If you want to return the state variable from flatMapFn
, then the
return value must be a copy of state variable to avoid situations in
which the result of mapFn
is modified after being emitted or
where the state is modified by downstream processors.
This sample groups a stream of strings by length and inserts punctuation (a special string) after every 10th string in each group:
GeneralStage<String> punctuated = input
.groupingKey(String::length)
.flatMapStateful(
LongAccumulator::new,
(counter, key, item) -> {
counter.add(1);
return counter.get() % 10 == 0
? Traversers.traverseItems("punctuation" + key, item)
: Traversers.singleton(item);
}
);
The given functions must be stateless and cooperative.
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
flatMapStateful
in interface GeneralStageWithKey<T,K>
S
- type of the state objectR
- type of the resultcreateFn
- function that returns the state objectflatMapFn
- function that receives the state object and the input item and
outputs the result items. It may modify the state
object. It must not return null traverser, but can
return an empty traverser.@Nonnull default <A,R> BatchStage<Map.Entry<K,R>> rollingAggregate(@Nonnull AggregateOperation1<? super T,A,? extends R> aggrOp)
GeneralStageWithKey
AggregateOperation
. It passes each input item to
the accumulator and outputs the current result of aggregation (as
returned by the export
primitive).
Sample usage:
StreamStage<Entry<Color, Long>> aggregated = items
.groupingKey(Item::getColor)
.rollingAggregate(AggregateOperations.counting());
For example, if your input is {2, 7, 8, -5}
, the output will be
{2, 9, 17, 12}
.
This stage is fault-tolerant and saves its state to the snapshot.
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
rollingAggregate
in interface GeneralStageWithKey<T,K>
R
- type of the aggregate operation resultaggrOp
- the aggregate operation to perform@Nonnull default <V,R> BatchStage<R> mapUsingIMap(@Nonnull String mapName, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
GeneralStageWithKey
IMap
with the supplied name using the grouping key is performed
and the result of the lookup is merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore this stage can be used to implement filtering semantics as well.
The mapping logic is equivalent to:
V value = map.get(groupingKey);
return mapFn.apply(item, value);
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingIMap(
"enriching-map",
(Item item, ItemDetail detail) -> item.setDetail(detail)
);
This stage is similar to stageWithoutKey.mapUsingIMap()
,
but here Jet knows the key and uses it to partition and distribute the input in order
to achieve data locality. The value it fetches from the IMap
is
stored on the cluster member where the processing takes place. However,
if the map doesn't use the default partitioning strategy, the data
locality will be broken.mapUsingIMap
in interface GeneralStageWithKey<T,K>
V
- type of the value in the IMap
R
- type of the output itemmapName
- name of the IMap
mapFn
- the mapping function. It must be stateless and cooperative.@Nonnull default <V,R> BatchStage<R> mapUsingIMap(@Nonnull IMap<K,V> iMap, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
GeneralStageWithKey
IMap
using the grouping key is performed
and the result of the lookup is merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore this stage can be used to implement filtering semantics as well.
The mapping logic is equivalent to:
V value = map.get(groupingKey);
return mapFn.apply(item, value);
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingIMap(enrichingMap, (item, detail) -> item.setDetail(detail));
This stage is similar to stageWithoutKey.mapUsingIMap()
,
but here Jet knows the key and uses it to partition and distribute the
input in order to achieve data locality. The value it fetches from the
IMap
is stored on the cluster member where the processing takes
place. However, if the map doesn't use the default partitioning strategy,
data locality will be broken.mapUsingIMap
in interface GeneralStageWithKey<T,K>
V
- type of the value in the IMap
R
- type of the output itemiMap
- the IMap
to use as the servicemapFn
- the mapping function. It must be stateless and cooperative.@Nonnull <S,R> BatchStage<R> mapUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull TriFunction<? super S,? super K,? super T,? extends R> mapFn)
GeneralStageWithKey
serviceFactory
. If the
mapping result is null
, it emits nothing. Therefore this stage
can be used to implement filtering semantics as well.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingService(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
(reg, key, item) -> item.setDetail(reg.fetchDetail(key))
);
mapUsingService
in interface GeneralStageWithKey<T,K>
S
- type of service objectR
- the result type of the mapping functionserviceFactory
- the service factorymapFn
- a mapping function. It must be stateless. It must be
cooperative, if the service
is cooperative.@Nonnull default <S,R> BatchStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull TriFunction<? super S,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
GeneralStageWithKey
GeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>)
: the mapAsyncFn
returns a CompletableFuture<R>
instead of just R
.
Uses default values for some extra parameters, so the maximum number of concurrent async operations per processor will be limited to and whether or not the order of input items should be preserved will be .
The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingServiceAsync(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
(reg, key, item) -> reg.fetchDetailAsync(key).thenApply(item::setDetail)
);
The latency of the async call will add to the total latency of the
output.mapUsingServiceAsync
in interface GeneralStageWithKey<T,K>
S
- type of service objectR
- the future's result type of the mapping functionserviceFactory
- the service factorymapAsyncFn
- a mapping function. Can map to null (return a null
future). It must be stateless and cooperative.@Nonnull <S,R> BatchStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?,S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, @Nonnull TriFunction<? super S,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
GeneralStageWithKey
GeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>)
: the mapAsyncFn
returns a CompletableFuture<R>
instead of just R
.
The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingServiceAsync(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
16,
true,
(reg, key, item) -> reg.fetchDetailAsync(key).thenApply(item::setDetail)
);
The latency of the async call will add to the total latency of the
output.mapUsingServiceAsync
in interface GeneralStageWithKey<T,K>
S
- type of service objectR
- the future's result type of the mapping functionserviceFactory
- the service factorymaxConcurrentOps
- maximum number of concurrent async operations per processorpreserveOrder
- whether the async responses are ordered or notmapAsyncFn
- a mapping function. Can map to null (return
a null future). It must be stateless and cooperative.@Nonnull <S,R> BatchStage<R> mapUsingServiceAsyncBatched(@Nonnull ServiceFactory<?,S> serviceFactory, int maxBatchSize, @Nonnull BiFunctionEx<? super S,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
GeneralStageWithKey
GeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>)
: mapAsyncFn
takes
a list of input items and returns a CompletableFuture<List<R>>
.
The size of list is limited by the given maxBatchSize
.
This transform can perform filtering by putting null
elements into
the output list.
The latency of the async call will add to the total latency of the output.
This sample takes a stream of stock items and sets the detail
field on them by performing batched lookups from a registry. The max
size of the items to lookup is specified as 100
:
items.groupingKey(Item::getDetailId)
.mapUsingServiceAsyncBatched(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
100,
(reg, itemList) -> reg
.fetchDetailsAsync(itemList.stream().map(Item::getDetailId).collect(Collectors.toList()))
.thenApply(details -> {
for (int i = 0; i < itemList.size(); i++) {
itemList.get(i).setDetail(details.get(i));
}
return itemList;
})
);
mapUsingServiceAsyncBatched
in interface GeneralStageWithKey<T,K>
S
- type of service objectR
- the future result type of the mapping functionserviceFactory
- the service factorymaxBatchSize
- max size of the input listmapAsyncFn
- a mapping function. It must be stateless. It must be
cooperative, if the service
is cooperative.@Nonnull <S,R> BatchStage<R> mapUsingServiceAsyncBatched(@Nonnull ServiceFactory<?,S> serviceFactory, int maxBatchSize, @Nonnull TriFunction<? super S,? super List<K>,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
GeneralStageWithKey
GeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>)
: mapAsyncFn
takes
a list of input items (and a list of their corresponding keys) and
returns a CompletableFuture<List<R>>
.
The sizes of the input lists are identical and are limited by the given
maxBatchSize
. The key at index N corresponds to the input item
at index N.
The number of in-flight batches being completed asynchronously is limited to and this mapping operation always preserves the order of input elements.
This transform can perform filtering by putting null
elements into
the output list.
The latency of the async call will add to the total latency of the output.
This sample takes a stream of stock items and sets the detail
field on them by performing batched lookups from a registry. The max
size of the items to lookup is specified as 100
:
items.groupingKey(Item::getDetailId)
.mapUsingServiceAsyncBatched(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
100,
(reg, keyList, itemList) -> reg.fetchDetailsAsync(keyList).thenApply(details -> {
for (int i = 0; i < itemList.size(); i++) {
itemList.get(i).setDetail(details.get(i));
}
return itemList;
})
);
mapUsingServiceAsyncBatched
in interface GeneralStageWithKey<T,K>
S
- type of service objectR
- the future result type of the mapping functionserviceFactory
- the service factorymaxBatchSize
- max size of the input listmapAsyncFn
- a mapping function. It must be stateless and
cooperative.@Nonnull <S> BatchStage<T> filterUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull TriPredicate<? super S,? super K,? super T> filterFn)
GeneralStageWithKey
serviceFactory
.
The number of in-flight batches being completed asynchronously is limited to and this mapping operation always preserves the order of input elements.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
items.groupingKey(Item::getDetailId)
.filterUsingService(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
(reg, key, item) -> reg.fetchDetail(key).contains("blade")
);
filterUsingService
in interface GeneralStageWithKey<T,K>
S
- type of service objectserviceFactory
- the service factoryfilterFn
- a filter predicate function. It must be stateless. It
must be cooperative, if the
service is cooperative.@Nonnull <S,R> BatchStage<R> flatMapUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn)
GeneralStageWithKey
Traverser
it returns as the output items. The traverser must
be null-terminated. The mapping function receives another
parameter, the service object, which Jet will create using the supplied
serviceFactory
.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
StreamStage<Part> parts = products
.groupingKey(Product::getId)
.flatMapUsingService(
ServiceFactories.sharedService(ctx -> new PartRegistry()),
(registry, productId, product) -> Traversers.traverseIterable(
registry.fetchParts(productId))
);
flatMapUsingService
in interface GeneralStageWithKey<T,K>
S
- type of service objectR
- type of the output itemsserviceFactory
- the service factoryflatMapFn
- a flatmapping function. It must not return null
traverser, but can return an empty
traverser. It must be stateless. It must be cooperative, if the service is
cooperative.@Nonnull <R> BatchStage<Map.Entry<K,R>> aggregate(@Nonnull AggregateOperation1<? super T,?,? extends R> aggrOp)
Map.Entry
) for each distinct
key it observes in its input. The value is the result of the aggregate
operation across all the items with the given grouping key.
Sample usage:
BatchStage<Entry<String, Long>> aggregated = people.
.groupingKey(Person::getLastName)
.aggregate(AggregateOperations.counting());
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
R
- type of the aggregation resultaggrOp
- the aggregate operation to performAggregateOperations
@Nonnull <T1,R> BatchStage<Map.Entry<K,R>> aggregate2(@Nonnull BatchStageWithKey<T1,? extends K> stage1, @Nonnull AggregateOperation2<? super T,? super T1,?,R> aggrOp)
stage1
you supply. It
emits one key-value pair (in a Map.Entry
) for each distinct key
it observes in its input. The value is the result of the aggregate
operation across all the items with the given grouping key.
Sample usage:
BatchStage<Entry<Long, Tuple2<Long, Long>>> aggregated = pageVisits
.groupingKey(PageVisit::getUserId)
.aggregate2(addToCarts.groupingKey(AddToCart::getUserId),
aggregateOperation2(
AggregateOperations.counting(),
AggregateOperations.counting())
);
This variant requires you to provide a two-input aggregate operation. If
you can express your logic in terms of two single-input aggregate
operations, one for each input stream, then you should use stage0.aggregate2(aggrOp0, stage1, aggrOp1)
because it offers a simpler
API. Use this variant only when your aggregate operation must combine
the input streams into the same accumulator.
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
T1
- type of items in stage1
R
- type of the aggregation resultaggrOp
- the aggregate operation to performAggregateOperations
@Nonnull default <T1,R0,R1> BatchStage<Map.Entry<K,Tuple2<R0,R1>>> aggregate2(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull BatchStageWithKey<? extends T1,? extends K> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1)
stage1
you supply. For each distinct grouping key it performs the supplied
aggregate operation across all the items sharing that key. It
performs the aggregation separately for each input stage: aggrOp0
on this stage and aggrOp1
on stage1
. Once it
has received all the items, it emits for each distinct key a Map.Entry(key, Tuple2(result0, result1))
.
Sample usage:
BatchStage<Entry<Long, Tuple2<Long, Long>>> aggregated = pageVisits
.groupingKey(PageVisit::getUserId)
.aggregate2(
AggregateOperations.counting(),
addToCarts.groupingKey(AddToCart::getUserId),
AggregateOperations.counting()
);
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
R0
- type of the aggregation result for stream-0T1
- type of items in stage1
R1
- type of the aggregation result for stream-1aggrOp0
- aggregate operation to perform on this stagestage1
- the other stageaggrOp1
- aggregate operation to perform on the other stageAggregateOperations
@Nonnull <T1,T2,R> BatchStage<Map.Entry<K,R>> aggregate3(@Nonnull BatchStageWithKey<T1,? extends K> stage1, @Nonnull BatchStageWithKey<T2,? extends K> stage2, @Nonnull AggregateOperation3<? super T,? super T1,? super T2,?,? extends R> aggrOp)
stage1
and stage2
you supply. It emits one key-value pair (in a Map.Entry
)
for each distinct key it observes in its input. The value is the result
of the aggregate operation across all the items with the given grouping
key.
Sample usage:
BatchStage<Entry<Long, Tuple3<Long, Long, Long>>> aggregated = pageVisits
.groupingKey(PageVisit::getUserId)
.aggregate3(
addToCarts.groupingKey(AddToCart::getUserId),
payments.groupingKey(Payment::getUserId),
aggregateOperation3(
AggregateOperations.counting(),
AggregateOperations.counting(),
AggregateOperations.counting())
);
This variant requires you to provide a three-input aggregate operation.
If you can express your logic in terms of three single-input aggregate
operations, one for each input stream, then you should use stage0.aggregate2(aggrOp0, stage1, aggrOp1, stage2, aggrOp2)
because it
offers a simpler API. Use this variant only when your aggregate
operation must combine the input streams into the same accumulator.
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
T1
- type of items in stage1
T2
- type of items in stage2
R
- type of the aggregation resultaggrOp
- the aggregate operation to performAggregateOperations
@Nonnull default <T1,T2,R0,R1,R2> BatchStage<Map.Entry<K,Tuple3<R0,R1,R2>>> aggregate3(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull BatchStageWithKey<T1,? extends K> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1, @Nonnull BatchStageWithKey<T2,? extends K> stage2, @Nonnull AggregateOperation1<? super T2,?,? extends R2> aggrOp2)
stage1
and stage2
you supply. For each distinct grouping key it
observes in the input, it performs the supplied aggregate operation
across all the items sharing that key. It performs the aggregation
separately for each input stage: aggrOp0
on this stage, aggrOp1
on stage1
and aggrOp2
on stage2
. Once
it has received all the items, it emits for each distinct key a Map.Entry(key, Tuple3(result0, result1, result2))
.
Sample usage:
BatchStage<Entry<Long, Tuple3<Long, Long, Long>>> aggregated = pageVisits
.groupingKey(PageVisit::getUserId)
.aggregate3(
AggregateOperations.counting(),
addToCarts.groupingKey(AddToCart::getUserId),
AggregateOperations.counting(),
payments.groupingKey(Payment::getUserId),
AggregateOperations.counting()
);
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
T1
- type of items in stage1
T2
- type of items in stage2
R0
- type of the aggregation result for stream-0R1
- type of the aggregation result for stream-1R2
- type of the aggregation result for stream-2aggrOp0
- aggregate operation to perform on this stagestage1
- the first additional stageaggrOp1
- aggregate operation to perform on stage1
stage2
- the second additional stageaggrOp2
- aggregate operation to perform on stage2
AggregateOperations
@Nonnull default <R0> GroupAggregateBuilder<K,R0> aggregateBuilder(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0)
Map.Entry(key, itemsByTag)
. Use the tag
you get from builder.add(stageN)
to
retrieve the aggregated result for that stage. Use builder.tag0()
as the tag of this stage. You
will also be able to supply a function to the builder that immediately
transforms the ItemsByTag
to the desired output type.
This example reads from three sources that produce Map.Entry<String, Long>
. It groups by entry key and then counts the
items in stage-0, sums those in stage-1 and takes the average of those
in stage-2:
Pipeline p = Pipeline.create();
StageWithGrouping<Entry<String, Long>, String> stage0 =
p.readFrom(source0).groupingKey(Entry::getKey);
StageWithGrouping<Entry<String, Long>, String> stage1 =
p.readFrom(source1).groupingKey(Entry::getKey);
StageWithGrouping<Entry<String, Long>, String> stage2 =
p.readFrom(source2).groupingKey(Entry::getKey);
GroupAggregateBuilder<String, Long> b = stage0.aggregateBuilder(
AggregateOperations.counting());
Tag<Long> tag0 = b.tag0();
Tag<Long> tag1 = b.add(stage1,
AggregateOperations.summingLong(Entry::getValue));
Tag<Double> tag2 = b.add(stage2,
AggregateOperations.averagingLong(Entry::getValue));
BatchStage<Entry<String, ItemsByTag>> aggregated = b.build();
aggregated.map(e -> String.format(
"Key %s, count of stage0: %d, sum of stage1: %d, average of stage2: %f",
e.getKey(),
e.getValue().get(tag0), e.getValue().get(tag1), e.getValue().get(tag2))
);
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
@Nonnull default GroupAggregateBuilder1<T,K> aggregateBuilder()
This builder requires you to provide a multi-input aggregate operation.
If you can express your logic in terms of single-input aggregate
operations, one for each input stream, then you should use stage0.aggregateBuilder(aggrOp0)
because it offers a simpler API. Use this builder only when you have the
need to implement an aggregate operation that combines all the input
streams into the same accumulator.
This builder is mainly intended to build a co-aggregation of four or
more contributing stages. For up to three stages, prefer the direct
stage.aggregateN(...)
calls because they offer more static type
safety.
To add the other stages, call add(stage)
. Collect all the tags returned from add()
and use
them when building the aggregate operation. Retrieve the tag of the
first stage (from which you obtained this builder) by calling GroupAggregateBuilder1.tag0()
.
This example takes three streams of Map.Entry<String, Long>
and,
for each string key, counts the distinct Long
values across all
input streams:
Pipeline p = Pipeline.create();
StageWithGrouping<Entry<String, Long>, String> stage0 =
p.readFrom(source0).groupingKey(Entry::getKey);
StageWithGrouping<Entry<String, Long>, String> stage1 =
p.readFrom(source1).groupingKey(Entry::getKey);
StageWithGrouping<Entry<String, Long>, String> stage2 =
p.readFrom(source2).groupingKey(Entry::getKey);
GroupAggregateBuilder1<Entry<String, Long>, String> b = stage0.aggregateBuilder();
Tag<Entry<String, Long>> tag0 = b.tag0();
Tag<Entry<String, Long>> tag1 = b.add(stage1);
Tag<Entry<String, Long>> tag2 = b.add(stage2);
BatchStage<Entry<String, Integer>> aggregated = b.build(AggregateOperation
.withCreate(HashSet<Long>::new)
.andAccumulate(tag0, (acc, item) -> acc.add(item.getValue()))
.andAccumulate(tag1, (acc, item) -> acc.add(item.getValue()))
.andAccumulate(tag2, (acc, item) -> acc.add(item.getValue()))
.andCombine(HashSet::addAll)
.andFinish(HashSet::size));
This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
@Nonnull default <R> BatchStage<R> customTransform(@Nonnull String stageName, @Nonnull SupplierEx<Processor> procSupplier)
GeneralStageWithKey
Processor
s. The inbound edge will be distributed and
partitioned using the key function assigned to this stage.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform
in interface GeneralStageWithKey<T,K>
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull default <R> BatchStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorSupplier procSupplier)
GeneralStageWithKey
Processor
s. The inbound edge will be distributed and
partitioned using the key function assigned to this stage.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform
in interface GeneralStageWithKey<T,K>
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull <R> BatchStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier)
GeneralStageWithKey
Processor
s. The inbound edge will be distributed and
partitioned using the key function assigned to this stage.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform
in interface GeneralStageWithKey<T,K>
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processorsCopyright © 2022 Hazelcast, Inc.. All rights reserved.