T
- the type of items coming out of this stagepublic interface StreamStage<T> extends GeneralStage<T>
pipeline
that will
observe an unbounded amount of data (i.e., an event stream). It accepts
input from its upstream stages (if any) and passes its output to its
downstream stages.DEFAULT_MAX_CONCURRENT_OPS, DEFAULT_PRESERVE_ORDER
Modifier and Type | Method and Description |
---|---|
default <R> StreamStage<R> |
apply(FunctionEx<? super StreamStage<T>,? extends StreamStage<R>> transformFn)
Transforms
this stage using the provided transformFn and
returns the transformed stage. |
<R> StreamStage<R> |
customTransform(String stageName,
ProcessorMetaSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
default <R> StreamStage<R> |
customTransform(String stageName,
ProcessorSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
default <R> StreamStage<R> |
customTransform(String stageName,
SupplierEx<Processor> procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
StreamStage<T> |
filter(PredicateEx<T> filterFn)
Attaches a filtering stage which applies the provided predicate function
to each input item to decide whether to pass the item to the output or
to discard it.
|
<S> StreamStage<T> |
filterStateful(SupplierEx<? extends S> createFn,
BiPredicateEx<? super S,? super T> filterFn)
Attaches a stage that performs a stateful filtering operation.
|
<S> StreamStage<T> |
filterUsingService(ServiceFactory<?,S> serviceFactory,
BiPredicateEx<? super S,? super T> filterFn)
Attaches a filtering stage which applies the provided predicate function
to each input item to decide whether to pass the item to the output or
to discard it.
|
<R> StreamStage<R> |
flatMap(FunctionEx<? super T,? extends Traverser<R>> flatMapFn)
Attaches a flat-mapping stage which applies the supplied function to
each input item independently and emits all the items from the
Traverser it returns. |
<S,R> StreamStage<R> |
flatMapStateful(SupplierEx<? extends S> createFn,
BiFunctionEx<? super S,? super T,? extends Traverser<R>> flatMapFn)
Attaches a stage that performs a stateful flat-mapping operation.
|
<S,R> StreamStage<R> |
flatMapUsingService(ServiceFactory<?,S> serviceFactory,
BiFunctionEx<? super S,? super T,? extends Traverser<R>> flatMapFn)
Attaches a flat-mapping stage which applies the supplied function to
each input item independently and emits all items from the
Traverser it returns as the output items. |
<K> StreamStageWithKey<T,K> |
groupingKey(FunctionEx<? super T,? extends K> keyFn)
Specifies the function that will extract a key from the items in the
associated pipeline stage.
|
<K,T1_IN,T1,R> |
hashJoin(BatchStage<T1_IN> stage1,
JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1,
BiFunctionEx<T,T1,R> mapToOutputFn)
Attaches to both this and the supplied stage a hash-joining stage and
returns it.
|
<K1,K2,T1_IN,T2_IN,T1,T2,R> |
hashJoin2(BatchStage<T1_IN> stage1,
JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1,
BatchStage<T2_IN> stage2,
JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2,
TriFunction<T,T1,T2,R> mapToOutputFn)
Attaches to this and the two supplied stages a hash-joining stage and
returns it.
|
default StreamHashJoinBuilder<T> |
hashJoinBuilder()
Returns a fluent API builder object to construct a hash join operation
with any number of contributing stages.
|
<K,T1_IN,T1,R> |
innerHashJoin(BatchStage<T1_IN> stage1,
JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1,
BiFunctionEx<T,T1,R> mapToOutputFn)
Attaches to both this and the supplied stage an inner hash-joining stage
and returns it.
|
<K1,K2,T1_IN,T2_IN,T1,T2,R> |
innerHashJoin2(BatchStage<T1_IN> stage1,
JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1,
BatchStage<T2_IN> stage2,
JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2,
TriFunction<T,T1,T2,R> mapToOutputFn)
Attaches to this and the two supplied stages an inner hash-joining stage
and returns it.
|
<R> StreamStage<R> |
map(FunctionEx<? super T,? extends R> mapFn)
Attaches a mapping stage which applies the given function to each input
item independently and emits the function's result as the output item.
|
<S,R> StreamStage<R> |
mapStateful(SupplierEx<? extends S> createFn,
BiFunctionEx<? super S,? super T,? extends R> mapFn)
Attaches a stage that performs a stateful mapping operation.
|
default <K,V,R> StreamStage<R> |
mapUsingIMap(IMap<K,V> iMap,
FunctionEx<? super T,? extends K> lookupKeyFn,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the supplied
IMap is performed and the result of the lookup is merged with
the item and emitted. |
default <K,V,R> StreamStage<R> |
mapUsingIMap(String mapName,
FunctionEx<? super T,? extends K> lookupKeyFn,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
IMap
with the supplied name is performed and the result of the lookup is
merged with the item and emitted. |
default <K,V,R> StreamStage<R> |
mapUsingReplicatedMap(ReplicatedMap<K,V> replicatedMap,
FunctionEx<? super T,? extends K> lookupKeyFn,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the supplied
ReplicatedMap is performed and the result of the lookup is
merged with the item and emitted. |
default <K,V,R> StreamStage<R> |
mapUsingReplicatedMap(String mapName,
FunctionEx<? super T,? extends K> lookupKeyFn,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
ReplicatedMap with the supplied name is performed and the result of the
lookup is merged with the item and emitted. |
<S,R> StreamStage<R> |
mapUsingService(ServiceFactory<?,S> serviceFactory,
BiFunctionEx<? super S,? super T,? extends R> mapFn)
Attaches a mapping stage which applies the supplied function to each
input item independently and emits the function's result as the output
item.
|
default <S,R> StreamStage<R> |
mapUsingServiceAsync(ServiceFactory<?,S> serviceFactory,
BiFunctionEx<? super S,? super T,? extends CompletableFuture<R>> mapAsyncFn)
Asynchronous version of
GeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>) : the mapAsyncFn
returns a CompletableFuture<R> instead of just R . |
<S,R> StreamStage<R> |
mapUsingServiceAsync(ServiceFactory<?,S> serviceFactory,
int maxConcurrentOps,
boolean preserveOrder,
BiFunctionEx<? super S,? super T,? extends CompletableFuture<R>> mapAsyncFn)
Asynchronous version of
GeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>) : the mapAsyncFn
returns a CompletableFuture<R> instead of just R . |
<S,R> StreamStage<R> |
mapUsingServiceAsyncBatched(ServiceFactory<?,S> serviceFactory,
int maxBatchSize,
BiFunctionEx<? super S,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
Batched version of
GeneralStage.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends java.util.concurrent.CompletableFuture<R>>) : mapAsyncFn takes
a list of input items and returns a CompletableFuture<List<R>> . |
StreamStage<T> |
merge(StreamStage<? extends T> other)
Attaches a stage that emits all the items from this stage as well as all
the items from the supplied stage.
|
default StreamStage<T> |
peek()
Adds a peeking layer to this compute stage which logs its output.
|
default StreamStage<T> |
peek(FunctionEx<? super T,? extends CharSequence> toStringFn)
Adds a peeking layer to this compute stage which logs its output.
|
StreamStage<T> |
peek(PredicateEx<? super T> shouldLogFn,
FunctionEx<? super T,? extends CharSequence> toStringFn)
Attaches a peeking stage which logs this stage's output and passes it
through without transformation.
|
StreamStage<T> |
rebalance()
Returns a new stage that applies data rebalancing to the output of this
stage.
|
<K> StreamStage<T> |
rebalance(FunctionEx<? super T,? extends K> keyFn)
Returns a new stage that applies data rebalancing to the output of this
stage.
|
default <A,R> StreamStage<R> |
rollingAggregate(AggregateOperation1<? super T,A,? extends R> aggrOp)
Attaches a rolling aggregation stage.
|
StreamStage<T> |
setLocalParallelism(int localParallelism)
Sets the preferred local parallelism (number of processors per Jet
cluster member) this stage will configure its DAG vertices with.
|
StreamStage<T> |
setName(String name)
Overrides the default name of the stage with the name you choose and
returns the stage.
|
StageWithWindow<T> |
window(WindowDefinition wDef)
Adds the given window definition to this stage, as the first step in the
construction of a pipeline stage that performs windowed aggregation.
|
addTimestamps, writeTo
getPipeline, name
@Nonnull StageWithWindow<T> window(WindowDefinition wDef)
factory methods in WindowDefiniton
@Nonnull StreamStage<T> merge(@Nonnull StreamStage<? extends T> other)
other
- the other stage whose data to merge into this one@Nonnull <K> StreamStageWithKey<T,K> groupingKey(@Nonnull FunctionEx<? super T,? extends K> keyFn)
GeneralStage
Sample usage:
users.groupingKey(User::getId)
Note: make sure the extracted key is not-null, it would fail the
job otherwise. Also make sure that it implements equals()
and
hashCode()
.
groupingKey
in interface GeneralStage<T>
K
- type of the keykeyFn
- function that extracts the grouping key. It must be
stateless and cooperative.@Nonnull <K> StreamStage<T> rebalance(@Nonnull FunctionEx<? super T,? extends K> keyFn)
GeneralStage
With partitioned rebalancing, you supply your own function that decides (indirectly) where to send each data item. Jet first applies your partition key function to the data item and then its own partitioning function to the key. The result is that all items with the same key go to the same Jet processor and different keys are distributed pseudo-randomly across the processors.
Compared to non-partitioned balancing, partitioned balancing enforces the same data distribution across members regardless of any bottlenecks. If a given member is overloaded and applies backpressure, Jet doesn't reroute the data to other members, but propagates the backpressure to the upstream. If you choose a partitioning key that has a skewed distribution (some keys being much more frequent), this will result in an imbalanced data flow.
These are some basic invariants:
stage.rebalance(rebalanceKeyFn).groupingKey(groupKeyFn).aggregate(...)
:
here Jet removes the first (local) aggregation vertex and goes straight
to distributed aggregation without combining. Grouped aggregation
requires the data to be partitioned by the grouping key and therefore
Jet must ignore the rebalancing key you supplied. We recommend that you
remove it and use the parameterless stage.rebalance()
because the end result is identical.
stage.rebalance().aggregate(...)
: in this case the second vertex
is non-parallelizable and must execute on a single member. Therefore Jet
keeps both vertices and applies partitioned rebalancing before the first
one.
rebalance
in interface GeneralStage<T>
K
- type of the keykeyFn
- the partitioning key function. It must be stateless and
cooperative.@Nonnull StreamStage<T> rebalance()
GeneralStage
To implement rebalancing, Jet uses a distributed unicast data routing pattern on the DAG edge from this stage's vertex to the next one. It routes the data in a round-robin fashion, sending each item to the next member (member list includes the local one as well). If a given member's queue is overloaded and applying backpressure, it skips it and retries in the next round. With this scheme you get perfectly balanced item counts on each member under light load, but under heavier load it favors throughput: if the network becomes a bottleneck, most data may stay local.
These are some basic invariants:
stage.rebalance().groupingKey(keyFn).aggregate(...)
: here Jet
removes the first (local) aggregation vertex and goes straight to
distributed aggregation without combining. The data is rebalanced
through partitioning.
stage.rebalance().aggregate(...)
: in this case the second vertex
is non-parallelizable and must execute on a single member. Therefore Jet
keeps both vertices and applies rebalancing before the first one.
rebalance
in interface GeneralStage<T>
@Nonnull <R> StreamStage<R> map(@Nonnull FunctionEx<? super T,? extends R> mapFn)
GeneralStage
null
, it emits nothing. Therefore, this stage
can be used to implement filtering semantics as well.
This sample takes a stream of names and outputs the names in lowercase:
stage.map(name -> name.toLowerCase(Locale.ROOT))
map
in interface GeneralStage<T>
R
- the result type of the mapping functionmapFn
- a mapping function. It must be stateless and cooperative.@Nonnull StreamStage<T> filter(@Nonnull PredicateEx<T> filterFn)
GeneralStage
This sample removes empty strings from the stream:
stage.filter(name -> !name.isEmpty())
filter
in interface GeneralStage<T>
filterFn
- a filter predicate function. It must be stateless and
cooperative.@Nonnull <R> StreamStage<R> flatMap(@Nonnull FunctionEx<? super T,? extends Traverser<R>> flatMapFn)
GeneralStage
Traverser
it returns. The traverser must be null-terminated.
This sample takes a stream of sentences and outputs a stream of individual words in them:
stage.flatMap(sentence -> traverseArray(sentence.split("\\W+")))
flatMap
in interface GeneralStage<T>
R
- the type of items in the result's traversersflatMapFn
- a flatmapping function, whose result type is
Jet's Traverser
. It must not return a null traverser, but can
return an empty traverser. It must be
stateless and cooperative.@Nonnull <S,R> StreamStage<R> mapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiFunctionEx<? super S,? super T,? extends R> mapFn)
GeneralStage
createFn
returns the object that holds the state. Jet passes this
object along with each input item to mapFn
, which can update
the object's state. The state object will be included in the state
snapshot, so it survives job restarts. For this reason it must be
serializable.
This sample takes a stream of long
numbers representing request
latencies, computes the cumulative latency of all requests so far, and
starts emitting alarm messages when the cumulative latency crosses a
"bad behavior" threshold:
StreamStage<Long> latencyAlarms = latencies.mapStateful(
LongAccumulator::new,
(sum, latency) -> {
sum.add(latency);
long cumulativeLatency = sum.get();
return (cumulativeLatency <= LATENCY_THRESHOLD)
? null
: cumulativeLatency;
}
);
This code has the same result as latencies.rollingAggregate(summing())
.mapStateful
in interface GeneralStage<T>
S
- type of the state objectR
- type of the resultcreateFn
- function that returns the state object. It must be
stateless and cooperative.mapFn
- function that receives the state object and the input item and
outputs the result item. It may modify the state object. It must be
stateless and cooperative.@Nonnull <S> StreamStage<T> filterStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiPredicateEx<? super S,? super T> filterFn)
GeneralStage
createFn
returns the object that holds the state. Jet passes this
object along with each input item to filterFn
, which can update
the object's state. The state object will be included in the state
snapshot, so it survives job restarts. For this reason it must be
serializable.
This sample decimates the input (throws out every 10th item):
GeneralStage<String> decimated = input.filterStateful(
LongAccumulator::new,
(counter, item) -> {
counter.add(1);
return counter.get() % 10 != 0;
}
);
filterStateful
in interface GeneralStage<T>
S
- type of the state objectcreateFn
- function that returns the state object. It must be
stateless and cooperative.filterFn
- function that receives the state object and the input item and
produces the boolean result. It may modify the state object. It must be
stateless and cooperative.@Nonnull <S,R> StreamStage<R> flatMapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiFunctionEx<? super S,? super T,? extends Traverser<R>> flatMapFn)
GeneralStage
createFn
returns the object that holds the state. Jet passes this
object along with each input item to flatMapFn
, which can update
the object's state. The state object will be included in the state
snapshot, so it survives job restarts. For this reason it must be
serializable.
This sample inserts a punctuation mark (a special string) after every 10th input string:
GeneralStage<String> punctuated = input.flatMapStateful(
LongAccumulator::new,
(counter, item) -> {
counter.add(1);
return counter.get() % 10 == 0
? Traversers.traverseItems("punctuation", item)
: Traversers.singleton(item);
}
);
flatMapStateful
in interface GeneralStage<T>
S
- type of the state objectR
- type of the resultcreateFn
- function that returns the state object. It must be
stateless and cooperative.flatMapFn
- function that receives the state object and the input item and
outputs the result items. It may modify the state
object. It must not return null traverser, but can
return an empty traverser. It must be
stateless and cooperative.@Nonnull default <A,R> StreamStage<R> rollingAggregate(@Nonnull AggregateOperation1<? super T,A,? extends R> aggrOp)
GeneralStage
AggregateOperation
. It passes each input item to
the accumulator and outputs the current result of aggregation (as
returned by the export
primitive).
Sample usage:
stage.rollingAggregate(AggregateOperations.summing())
For example, if your input is {2, 7, 8, -5}
, the output will be
{2, 9, 17, 12}
.
This stage is fault-tolerant and saves its state to the snapshot.
NOTE: since the output for each item depends on all
the previous items, this operation cannot be parallelized. Jet will
perform it on a single member, single-threaded. Jet also supports
keyed rolling aggregation
which it can parallelize by partitioning.
rollingAggregate
in interface GeneralStage<T>
R
- result type of the aggregate operationaggrOp
- the aggregate operation to do the aggregation@Nonnull <S,R> StreamStage<R> mapUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull BiFunctionEx<? super S,? super T,? extends R> mapFn)
GeneralStage
serviceFactory
.
If the mapping result is null
, it emits nothing. Therefore, this
stage can be used to implement filtering semantics as well.
This sample takes a stream of stock items and sets the detail
field on them by looking up from a registry:
stage.mapUsingService(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.hazelcastInstance())),
(reg, item) -> item.setDetail(reg.fetchDetail(item))
)
mapUsingService
in interface GeneralStage<T>
S
- type of service objectR
- the result type of the mapping functionserviceFactory
- the service factorymapFn
- a mapping function. It must be stateless. It must be
cooperative, if the service
is cooperative.@Nonnull default <S,R> StreamStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull BiFunctionEx<? super S,? super T,? extends CompletableFuture<R>> mapAsyncFn)
GeneralStage
GeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>)
: the mapAsyncFn
returns a CompletableFuture<R>
instead of just R
.
Uses default values for some extra parameters, so the maximum number of concurrent async operations per processor will be limited to 4 and whether or not the order of input items should be preserved will be true.
The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
The latency of the async call will add to the total latency of the output.
This sample takes a stream of stock items and sets the detail
field on them by looking up from a registry:
stage.mapUsingServiceAsync(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.hazelcastInstance())),
(reg, item) -> reg.fetchDetailAsync(item)
.thenApply(detail -> item.setDetail(detail))
)
mapUsingServiceAsync
in interface GeneralStage<T>
S
- type of service objectR
- the future result type of the mapping functionserviceFactory
- the service factorymapAsyncFn
- a mapping function. Can map to null (return a null
future). It must be stateless and cooperative.@Nonnull <S,R> StreamStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?,S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, @Nonnull BiFunctionEx<? super S,? super T,? extends CompletableFuture<R>> mapAsyncFn)
GeneralStage
GeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>)
: the mapAsyncFn
returns a CompletableFuture<R>
instead of just R
.
The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
The latency of the async call will add to the total latency of the output.
This sample takes a stream of stock items and sets the detail
field on them by looking up from a registry:
stage.mapUsingServiceAsync(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.hazelcastInstance())),
(reg, item) -> reg.fetchDetailAsync(item)
.thenApply(detail -> item.setDetail(detail))
)
mapUsingServiceAsync
in interface GeneralStage<T>
S
- type of service objectR
- the future result type of the mapping functionserviceFactory
- the service factorymaxConcurrentOps
- maximum number of concurrent async operations per processorpreserveOrder
- whether the ordering of the input items should be preservedmapAsyncFn
- a mapping function. Can map to null (return a null
future). It must be stateless and cooperative.@Nonnull <S,R> StreamStage<R> mapUsingServiceAsyncBatched(@Nonnull ServiceFactory<?,S> serviceFactory, int maxBatchSize, @Nonnull BiFunctionEx<? super S,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
GeneralStage
GeneralStage.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends java.util.concurrent.CompletableFuture<R>>)
: mapAsyncFn
takes
a list of input items and returns a CompletableFuture<List<R>>
.
The size of the input list is limited by the given maxBatchSize
.
The number of in-flight batches being completed asynchronously is limited to and this mapping operation always preserves the order of input elements.
This transform can perform filtering by putting null
elements into
the output list.
The latency of the async call will add to the total latency of the output.
This sample takes a stream of stock items and sets the detail
field on them by performing batched lookups from a registry. The max
size of the items to lookup is specified as 100
:
stage.mapUsingServiceAsyncBatched(
ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.hazelcastInstance())),
100,
(reg, itemList) -> reg
.fetchDetailsAsync(itemList)
.thenApply(detailList -> {
for (int i = 0; i < itemList.size(); i++) {
itemList.get(i).setDetail(detailList.get(i))
}
})
)
mapUsingServiceAsyncBatched
in interface GeneralStage<T>
S
- type of service objectR
- the future result type of the mapping functionserviceFactory
- the service factorymaxBatchSize
- max size of the input listmapAsyncFn
- a mapping function. It must be stateless and
cooperative.@Nonnull <S> StreamStage<T> filterUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull BiPredicateEx<? super S,? super T> filterFn)
GeneralStage
serviceFactory
.
This sample takes a stream of photos, uses an image classifier to reason about their contents, and keeps only photos of cats:
photos.filterUsingService(
ServiceFactories.sharedService(ctx -> new ImageClassifier(ctx.hazelcastInstance())),
(classifier, photo) -> classifier.classify(photo).equals("cat")
)
filterUsingService
in interface GeneralStage<T>
S
- type of service objectserviceFactory
- the service factoryfilterFn
- a filter predicate function. It must be stateless and
cooperative.@Nonnull <S,R> StreamStage<R> flatMapUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull BiFunctionEx<? super S,? super T,? extends Traverser<R>> flatMapFn)
GeneralStage
Traverser
it returns as the output items. The traverser must be
null-terminated. The mapping function receives another
parameter, the service object, which Jet will create using the supplied
serviceFactory
.
This sample takes a stream of products and outputs an "exploded" stream of all the parts that go into making them:
StreamStage<Part> parts = products.flatMapUsingService(
ServiceFactories.sharedService(ctx -> new PartRegistryCtx()),
(registry, product) -> Traversers.traverseIterable(
registry.fetchParts(product))
);
flatMapUsingService
in interface GeneralStage<T>
S
- type of service objectR
- the type of items in the result's traversersserviceFactory
- the service factoryflatMapFn
- a flatmapping function, whose result type is Jet's Traverser
. It must not return null traverser, but can return an
empty traverser. It must be stateless
and cooperative.@Nonnull default <K,V,R> StreamStage<R> mapUsingReplicatedMap(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
GeneralStage
ReplicatedMap
with the supplied name is performed and the result of the
lookup is merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore, this stage can be used to implement filtering semantics as
well.
The mapping logic is equivalent to:
K key = lookupKeyFn.apply(item);
V value = replicatedMap.get(key);
return mapFn.apply(item, value);
This sample takes a stream of stock items and sets the detail
field on them by looking up from a registry:
items.mapUsingReplicatedMap(
"enriching-map",
item -> item.getDetailId(),
(Item item, ItemDetail detail) -> item.setDetail(detail)
)
mapUsingReplicatedMap
in interface GeneralStage<T>
K
- type of the key in the ReplicatedMap
V
- type of the value in the ReplicatedMap
R
- type of the output itemmapName
- name of the ReplicatedMap
lookupKeyFn
- a function which returns the key to look up in the map. Must not return
null. It must be stateless and cooperative.mapFn
- the mapping function. It must be stateless and cooperative@Nonnull default <K,V,R> StreamStage<R> mapUsingReplicatedMap(@Nonnull ReplicatedMap<K,V> replicatedMap, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
GeneralStage
ReplicatedMap
is performed and the result of the lookup is
merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore, this stage can be used to implement filtering semantics as
well.
The mapping logic is equivalent to:
K key = lookupKeyFn.apply(item);
V value = replicatedMap.get(key);
return mapFn.apply(item, value);
This sample takes a stream of stock items and sets the detail
field on them by looking up from a registry:
items.mapUsingReplicatedMap(
enrichingMap,
item -> item.getDetailId(),
(item, detail) -> item.setDetail(detail)
)
mapUsingReplicatedMap
in interface GeneralStage<T>
K
- type of the key in the ReplicatedMap
V
- type of the value in the ReplicatedMap
R
- type of the output itemreplicatedMap
- the ReplicatedMap
to lookup fromlookupKeyFn
- a function which returns the key to look up in the map. Must not return
null. It must be stateless and cooperative.mapFn
- the mapping function. It must be stateless and cooperative@Nonnull default <K,V,R> StreamStage<R> mapUsingIMap(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
GeneralStage
IMap
with the supplied name is performed and the result of the lookup is
merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore, this stage can be used to implement filtering semantics as
well.
The mapping logic is equivalent to:
K key = lookupKeyFn.apply(item);
V value = map.get(key);
return mapFn.apply(item, value);
This sample takes a stream of stock items and sets the detail
field on them by looking up from a registry:
items.mapUsingIMap(
"enriching-map",
item -> item.getDetailId(),
(Item item, ItemDetail detail) -> item.setDetail(detail)
)
See also GeneralStageWithKey.mapUsingIMap(java.lang.String, com.hazelcast.function.BiFunctionEx<? super T, ? super V, ? extends R>)
for a partitioned
version of this operation.mapUsingIMap
in interface GeneralStage<T>
K
- type of the key in the IMap
V
- type of the value in the IMap
R
- type of the output itemmapName
- name of the IMap
lookupKeyFn
- a function which returns the key to look up in the map. Must not return
null. It must be stateless and cooperative.mapFn
- the mapping function. It must be stateless and cooperative.@Nonnull default <K,V,R> StreamStage<R> mapUsingIMap(@Nonnull IMap<K,V> iMap, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
GeneralStage
IMap
is performed and the result of the lookup is merged with
the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore, this stage can be used to implement filtering semantics as
well.
The mapping logic is equivalent to:
K key = lookupKeyFn.apply(item);
V value = map.get(key);
return mapFn.apply(item, value);
This sample takes a stream of stock items and sets the detail
field on them by looking up from a registry:
items.mapUsingIMap(
enrichingMap,
item -> item.getDetailId(),
(item, detail) -> item.setDetail(detail)
)
See also GeneralStageWithKey.mapUsingIMap(java.lang.String, com.hazelcast.function.BiFunctionEx<? super T, ? super V, ? extends R>)
for a partitioned
version of this operation.mapUsingIMap
in interface GeneralStage<T>
K
- type of the key in the IMap
V
- type of the value in the IMap
R
- type of the output itemiMap
- the IMap
to lookup fromlookupKeyFn
- a function which returns the key to look up in the map. Must not return
null. It must be stateless and cooperative.mapFn
- the mapping function. It must be stateless and cooperative.@Nonnull <K,T1_IN,T1,R> StreamStage<R> hashJoin(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull BiFunctionEx<T,T1,R> mapToOutputFn)
GeneralStage
package javadoc
for a detailed description of the hash-join transform.
This sample joins a stream of users to a stream of countries and outputs
a stream of users with the country
field set:
// Types of the input stages:
BatchStage<User> users;
BatchStage<Map.Entry<Long, Country>> idAndCountry;
users.hashJoin(
idAndCountry,
JoinClause.joinMapEntries(User::getCountryId),
(user, country) -> user.setCountry(country)
)
This operation is subject to memory limits. See JetConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
hashJoin
in interface GeneralStage<T>
K
- the type of the join keyT1_IN
- the type of stage1
itemsT1
- the result type of projection on stage1
itemsR
- the resulting output typestage1
- the stage to hash-join with this onejoinClause1
- specifies how to join the two streamsmapToOutputFn
- function to map the joined items to the output
value. It must be stateless and cooperative.@Nonnull <K,T1_IN,T1,R> StreamStage<R> innerHashJoin(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull BiFunctionEx<T,T1,R> mapToOutputFn)
GeneralStage
package javadoc
for a detailed description of the hash-join transform.
This sample joins a stream of users to a stream of countries and outputs
a stream of users with the country
field set:
// Types of the input stages:
BatchStage<User> users;
BatchStage<Map.Entry<Long, Country>> idAndCountry;
users.innerHashJoin(
idAndCountry,
JoinClause.joinMapEntries(User::getCountryId),
(user, country) -> user.setCountry(country)
)
This method is similar to GeneralStage.hashJoin(com.hazelcast.jet.pipeline.BatchStage<T1_IN>, com.hazelcast.jet.pipeline.JoinClause<K, ? super T, ? super T1_IN, ? extends T1>, com.hazelcast.function.BiFunctionEx<T, T1, R>)
method, but it guarantees
that both input items will be not-null. Nulls will be filtered out
before reaching #mapToOutputFn
.
This operation is subject to memory limits. See JetConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
innerHashJoin
in interface GeneralStage<T>
K
- the type of the join keyT1_IN
- the type of stage1
itemsT1
- the result type of projection on stage1
itemsR
- the resulting output typestage1
- the stage to hash-join with this onejoinClause1
- specifies how to join the two streamsmapToOutputFn
- function to map the joined items to the output
value. It must be stateless and cooperative.@Nonnull <K1,K2,T1_IN,T2_IN,T1,T2,R> StreamStage<R> hashJoin2(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2, @Nonnull TriFunction<T,T1,T2,R> mapToOutputFn)
GeneralStage
package javadoc
for a detailed description of the hash-join transform.
This sample joins a stream of users to streams of countries and
companies, and outputs a stream of users with the country
and
company
fields set:
// Types of the input stages:
BatchStage<User> users;
BatchStage<Map.Entry<Long, Country>> idAndCountry;
BatchStage<Map.Entry<Long, Company>> idAndCompany;
users.hashJoin2(
idAndCountry, JoinClause.joinMapEntries(User::getCountryId),
idAndCompany, JoinClause.joinMapEntries(User::getCompanyId),
(user, country, company) -> user.setCountry(country).setCompany(company)
)
This operation is subject to memory limits. See JetConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
hashJoin2
in interface GeneralStage<T>
K1
- the type of key for stage1
K2
- the type of key for stage2
T1_IN
- the type of stage1
itemsT2_IN
- the type of stage2
itemsT1
- the result type of projection of stage1
itemsT2
- the result type of projection of stage2
itemsR
- the resulting output typestage1
- the first stage to joinjoinClause1
- specifies how to join with stage1
stage2
- the second stage to joinjoinClause2
- specifies how to join with stage2
mapToOutputFn
- function to map the joined items to the output
value. It must be stateless and cooperative.@Nonnull <K1,K2,T1_IN,T2_IN,T1,T2,R> StreamStage<R> innerHashJoin2(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2, @Nonnull TriFunction<T,T1,T2,R> mapToOutputFn)
GeneralStage
package javadoc
for a detailed description of the hash-join transform.
This sample joins a stream of users to streams of countries and
companies, and outputs a stream of users with the country
and
company
fields set:
// Types of the input stages:
BatchStage<User> users;
BatchStage<Map.Entry<Long, Country>> idAndCountry;
BatchStage<Map.Entry<Long, Company>> idAndCompany;
users.innerHashJoin2(
idAndCountry, JoinClause.joinMapEntries(User::getCountryId),
idAndCompany, JoinClause.joinMapEntries(User::getCompanyId),
(user, country, company) -> user.setCountry(country).setCompany(company)
)
This operation is subject to memory limits. See JetConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
This method is similar to GeneralStage.hashJoin2(com.hazelcast.jet.pipeline.BatchStage<T1_IN>, com.hazelcast.jet.pipeline.JoinClause<K1, ? super T, ? super T1_IN, ? extends T1>, com.hazelcast.jet.pipeline.BatchStage<T2_IN>, com.hazelcast.jet.pipeline.JoinClause<K2, ? super T, ? super T2_IN, ? extends T2>, com.hazelcast.jet.function.TriFunction<T, T1, T2, R>)
method, but it guarantees
that both input items will be not-null. Nulls will be filtered out
before reaching #mapToOutputFn
.
innerHashJoin2
in interface GeneralStage<T>
K1
- the type of key for stage1
K2
- the type of key for stage2
T1_IN
- the type of stage1
itemsT2_IN
- the type of stage2
itemsT1
- the result type of projection of stage1
itemsT2
- the result type of projection of stage2
itemsR
- the resulting output typestage1
- the first stage to joinjoinClause1
- specifies how to join with stage1
stage2
- the second stage to joinjoinClause2
- specifies how to join with stage2
mapToOutputFn
- function to map the joined items to the output
value. It must be stateless and cooperative.@Nonnull default StreamHashJoinBuilder<T> hashJoinBuilder()
GeneralStage
stage.hashJoinN(...)
calls because they offer
more static type safety.
This sample joins a stream of users to streams of countries and
companies, and outputs a stream of users with the country
and
company
fields set:
// Types of the input stages:
StreamStage<User> users;
BatchStage<Map.Entry<Long, Country>> idAndCountry;
BatchStage<Map.Entry<Long, Company>> idAndCompany;
StreamHashJoinBuilder<User> builder = users.hashJoinBuilder();
Tag<Country> tCountry = builder.add(idAndCountry,
JoinClause.joinMapEntries(User::getCountryId));
Tag<Company> tCompany = builder.add(idAndCompany,
JoinClause.joinMapEntries(User::getCompanyId));
StreamStage<User> joined = builder.build((user, itemsByTag) ->
user.setCountry(itemsByTag.get(tCountry)).setCompany(itemsByTag.get(tCompany)));
This operation is subject to memory limits. See JetConfig.setMaxProcessorAccumulatedRecords(long)
for more
information.
hashJoinBuilder
in interface GeneralStage<T>
@Nonnull default StreamStage<T> peek()
GeneralStage
toString()
method at the INFO level to the log category com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
.
The stage logs each item on the cluster member that outputs it. Its
primary purpose is for development use, when running Jet on a local
machine.
Note that peek after GeneralStage.rebalance(FunctionEx)
is not supported.
peek
in interface GeneralStage<T>
GeneralStage.peek(PredicateEx, FunctionEx)
,
GeneralStage.peek(FunctionEx)
@Nonnull StreamStage<T> peek(@Nonnull PredicateEx<? super T> shouldLogFn, @Nonnull FunctionEx<? super T,? extends CharSequence> toStringFn)
GeneralStage
shouldLogFn
predicate to see whether to log the item
toStringFn
to get the item's string
representation
com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
Note that peek after GeneralStage.rebalance(FunctionEx)
operation is not supported.
Sample usage:
users.peek(
user -> user.getName().size() > 100,
User::getName
)
peek
in interface GeneralStage<T>
shouldLogFn
- a function to filter the logged items. You can use alwaysTrue()
as a pass-through filter when you
don't need any filtering. It must be stateless and cooperative.toStringFn
- a function that returns a string representation of
the item. It must be stateless and cooperative.GeneralStage.peek(FunctionEx)
,
GeneralStage.peek()
@Nonnull default StreamStage<T> peek(@Nonnull FunctionEx<? super T,? extends CharSequence> toStringFn)
GeneralStage
toStringFn
to get a string representation of the item
com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
Note that peek after GeneralStage.rebalance(FunctionEx)
operation is not supported.
Sample usage:
users.peek(User::getName)
peek
in interface GeneralStage<T>
toStringFn
- a function that returns a string representation of
the item. It must be stateless and cooperative.GeneralStage.peek(PredicateEx, FunctionEx)
,
GeneralStage.peek()
@Nonnull default <R> StreamStage<R> customTransform(@Nonnull String stageName, @Nonnull SupplierEx<Processor> procSupplier)
GeneralStage
Processor
s.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform
in interface GeneralStage<T>
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull default <R> StreamStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorSupplier procSupplier)
GeneralStage
Processor
s.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform
in interface GeneralStage<T>
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull <R> StreamStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier)
GeneralStage
Processor
s.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform
in interface GeneralStage<T>
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull default <R> StreamStage<R> apply(@Nonnull FunctionEx<? super StreamStage<T>,? extends StreamStage<R>> transformFn)
this
stage using the provided transformFn
and
returns the transformed stage. It allows you to extract common pipeline
transformations into a method and then call that method without
interrupting the chained pipeline expression.
For example, say you have this code:
StreamStage<String> input = pipeline.readFrom(textSource);
StreamStage<String> cleanedUp = input
.map(String::toLowerCase)
.filter(s -> s.startsWith("success"));
You can capture the map
and filter
steps into a common
"cleanup" transformation:
StreamStage<String> cleanUp(StreamStage<String> input) {
return input.map(String::toLowerCase)
.filter(s -> s.startsWith("success"));
}
Now you can insert this transformation as just another step in your
pipeline:
StreamStage<String> tokens = pipeline
.readFrom(textSource)
.apply(this::cleanUp)
.flatMap(line -> traverseArray(line.split("\\W+")));
R
- type of the returned stagetransformFn
- function to transform this stage into another stage@Nonnull StreamStage<T> setLocalParallelism(int localParallelism)
Stage
While most stages are backed by 1 vertex, there are exceptions. If a stage uses two vertices, each of them will have the given local parallelism, so in total there will be twice as many processors per member.
The default value is and it signals to Jet to figure out a default value. Jet will determine the vertex's local parallelism during job initialization from the global default and the processor meta-supplier's preferred value.
setLocalParallelism
in interface GeneralStage<T>
setLocalParallelism
in interface Stage
@Nonnull StreamStage<T> setName(@Nonnull String name)
Stage
setName
in interface GeneralStage<T>
setName
in interface Stage
name
- the stage nameCopyright © 2023 Hazelcast, Inc.. All rights reserved.