Interface StreamStage<T>

  • Type Parameters:
    T - the type of items coming out of this stage
    All Superinterfaces:
    GeneralStage<T>, Stage

    public interface StreamStage<T>
    extends GeneralStage<T>
    A stage in a distributed computation pipeline that will observe an unbounded amount of data (i.e., an event stream). It accepts input from its upstream stages (if any) and passes its output to its downstream stages.
    Since:
    Jet 3.0
    • Method Detail

      • merge

        @Nonnull
        StreamStage<T> merge​(@Nonnull
                             StreamStage<? extends T> other)
        Attaches a stage that emits all the items from this stage as well as all the items from the supplied stage. The other stage's type parameter must be assignment-compatible with this stage's type parameter.
        Parameters:
        other - the other stage whose data to merge into this one
        Returns:
        the newly attached stage
      • groupingKey

        @Nonnull
        <K> StreamStageWithKey<T,​K> groupingKey​(@Nonnull
                                                      FunctionEx<? super T,​? extends K> keyFn)
        Description copied from interface: GeneralStage
        Specifies the function that will extract a key from the items in the associated pipeline stage. This enables the operations that need the key, such as grouped aggregation.

        Sample usage:

        
         users.groupingKey(User::getId)
         

        Note: make sure the extracted key is not-null, it would fail the job otherwise. Also make sure that it implements equals() and hashCode().

        Specified by:
        groupingKey in interface GeneralStage<T>
        Type Parameters:
        K - type of the key
        Parameters:
        keyFn - function that extracts the grouping key. It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • rebalance

        @Nonnull
        <K> StreamStage<T> rebalance​(@Nonnull
                                     FunctionEx<? super T,​? extends K> keyFn)
        Description copied from interface: GeneralStage
        Returns a new stage that applies data rebalancing to the output of this stage. By default, Jet prefers to process the data locally, on the cluster member where it was originally received. This is generally a good option because it eliminates unneeded network traffic. However, if the data volume is highly skewed across members, for example when using a non-distributed data source, you can tell Jet to rebalance the data by sending some to the other members.

        With partitioned rebalancing, you supply your own function that decides (indirectly) where to send each data item. Jet first applies your partition key function to the data item and then its own partitioning function to the key. The result is that all items with the same key go to the same Jet processor and different keys are distributed pseudo-randomly across the processors.

        Compared to non-partitioned balancing, partitioned balancing enforces the same data distribution across members regardless of any bottlenecks. If a given member is overloaded and applies backpressure, Jet doesn't reroute the data to other members, but propagates the backpressure to the upstream. If you choose a partitioning key that has a skewed distribution (some keys being much more frequent), this will result in an imbalanced data flow.

        These are some basic invariants:

        1. The rebalancing stage does not transform data, it just changes the physical layout of computation.
        2. If rebalancing is inapplicable due to the nature of the downstream stage (for example, a non-parallelizable operation like stateful mapping), the rebalancing stage is removed from the execution plan.
        3. If the downstream stage already does rebalancing for correctness (e.g., grouping by key implies partitioning by that key), this rebalancing stage is optimized away.
        Aggregation is a special case because it is implemented with two vertices at the Core DAG level. The first vertex accumulates local partial results and the second one combines them globally. There are two cases:
        1. stage.rebalance(rebalanceKeyFn).groupingKey(groupKeyFn).aggregate(...): here Jet removes the first (local) aggregation vertex and goes straight to distributed aggregation without combining. Grouped aggregation requires the data to be partitioned by the grouping key and therefore Jet must ignore the rebalancing key you supplied. We recommend that you remove it and use the parameterless stage.rebalance() because the end result is identical.
        2. stage.rebalance().aggregate(...): in this case the second vertex is non-parallelizable and must execute on a single member. Therefore Jet keeps both vertices and applies partitioned rebalancing before the first one.
        Specified by:
        rebalance in interface GeneralStage<T>
        Type Parameters:
        K - type of the key
        Parameters:
        keyFn - the partitioning key function. It must be stateless and cooperative.
        Returns:
        a new stage using the same transform as this one, only with a rebalancing flag raised that will affect data routing into the next stage.
      • rebalance

        @Nonnull
        StreamStage<T> rebalance()
        Description copied from interface: GeneralStage
        Returns a new stage that applies data rebalancing to the output of this stage. By default, Jet prefers to process the data locally, on the cluster member where it was originally received. This is generally a good option because it eliminates unneeded network traffic. However, if the data volume is highly skewed across members, for example when using a non-distributed data source, you can tell Jet to rebalance the data by sending some to the other members.

        To implement rebalancing, Jet uses a distributed unicast data routing pattern on the DAG edge from this stage's vertex to the next one. It routes the data in a round-robin fashion, sending each item to the next member (member list includes the local one as well). If a given member's queue is overloaded and applying backpressure, it skips it and retries in the next round. With this scheme you get perfectly balanced item counts on each member under light load, but under heavier load it favors throughput: if the network becomes a bottleneck, most data may stay local.

        These are some basic invariants:

        1. The rebalancing stage does not transform data, it just changes the physical layout of computation.
        2. If rebalancing is inapplicable due to the nature of the downstream stage (for example, a non-parallelizable operation like stateful mapping), the rebalancing stage is removed from the execution plan.
        3. If the downstream stage already does rebalancing for correctness (e.g., grouping by key implies partitioning by that key), this rebalancing stage is optimized away.
        Aggregation is a special case because it is implemented with two vertices at the Core DAG level. The first vertex accumulates local partial results and the second one combines them globally. There are two cases:
        1. stage.rebalance().groupingKey(keyFn).aggregate(...): here Jet removes the first (local) aggregation vertex and goes straight to distributed aggregation without combining. The data is rebalanced through partitioning.
        2. stage.rebalance().aggregate(...): in this case the second vertex is non-parallelizable and must execute on a single member. Therefore Jet keeps both vertices and applies rebalancing before the first one.
        Specified by:
        rebalance in interface GeneralStage<T>
        Returns:
        a new stage using the same transform as this one, only with a rebalancing flag raised that will affect data routing into the next stage.
      • map

        @Nonnull
        <R> StreamStage<R> map​(@Nonnull
                               FunctionEx<? super T,​? extends R> mapFn)
        Description copied from interface: GeneralStage
        Attaches a mapping stage which applies the given function to each input item independently and emits the function's result as the output item. If the result is null, it emits nothing. Therefore, this stage can be used to implement filtering semantics as well.

        This sample takes a stream of names and outputs the names in lowercase:

        
         stage.map(name -> name.toLowerCase(Locale.ROOT))
         
        Specified by:
        map in interface GeneralStage<T>
        Type Parameters:
        R - the result type of the mapping function
        Parameters:
        mapFn - a mapping function. It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • filter

        @Nonnull
        StreamStage<T> filter​(@Nonnull
                              PredicateEx<T> filterFn)
        Description copied from interface: GeneralStage
        Attaches a filtering stage which applies the provided predicate function to each input item to decide whether to pass the item to the output or to discard it. Returns the newly attached stage.

        This sample removes empty strings from the stream:

        
         stage.filter(name -> !name.isEmpty())
         
        Specified by:
        filter in interface GeneralStage<T>
        Parameters:
        filterFn - a filter predicate function. It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • flatMap

        @Nonnull
        <R> StreamStage<R> flatMap​(@Nonnull
                                   FunctionEx<? super T,​? extends Traverser<R>> flatMapFn)
        Description copied from interface: GeneralStage
        Attaches a flat-mapping stage which applies the supplied function to each input item independently and emits all the items from the Traverser it returns. The traverser must be null-terminated.

        This sample takes a stream of sentences and outputs a stream of individual words in them:

        
         stage.flatMap(sentence -> traverseArray(sentence.split("\\W+")))
         
        Specified by:
        flatMap in interface GeneralStage<T>
        Type Parameters:
        R - the type of items in the result's traversers
        Parameters:
        flatMapFn - a flatmapping function, whose result type is Jet's Traverser. It must not return a null traverser, but can return an empty traverser. It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • mapStateful

        @Nonnull
        <S,​R> StreamStage<R> mapStateful​(@Nonnull
                                               SupplierEx<? extends S> createFn,
                                               @Nonnull
                                               BiFunctionEx<? super S,​? super T,​? extends R> mapFn)
        Description copied from interface: GeneralStage
        Attaches a stage that performs a stateful mapping operation. createFn returns the object that holds the state. Jet passes this object along with each input item to mapFn, which can update the object's state. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable. If you want to return the state variable from mapFn, then the return value must be a copy of state variable to avoid situations in which the result of mapFn is modified after being emitted or where the state is modified by downstream processors.

        If you want to return the state variable from mapFn, then the return value must be a copy of state variable to avoid situations in which the result of mapFn is modified after being emitted or where the state is modified by downstream processors.

        This sample takes a stream of long numbers representing request latencies, computes the cumulative latency of all requests so far, and starts emitting alarm messages when the cumulative latency crosses a "bad behavior" threshold:

        
         StreamStage<Long> latencyAlarms = latencies.mapStateful(
                 LongAccumulator::new,
                 (sum, latency) -> {
                     sum.add(latency);
                     long cumulativeLatency = sum.get();
                     return (cumulativeLatency <= LATENCY_THRESHOLD)
                             ? null
                             : cumulativeLatency;
                 }
         );
         
        This code has the same result as latencies.rollingAggregate(summing()).
        Specified by:
        mapStateful in interface GeneralStage<T>
        Type Parameters:
        S - type of the state object
        R - type of the result
        Parameters:
        createFn - function that returns the state object. It must be stateless and cooperative.
        mapFn - function that receives the state object and the input item and outputs the result item. It may modify the state object. It must be stateless and cooperative.
      • filterStateful

        @Nonnull
        <S> StreamStage<T> filterStateful​(@Nonnull
                                          SupplierEx<? extends S> createFn,
                                          @Nonnull
                                          BiPredicateEx<? super S,​? super T> filterFn)
        Description copied from interface: GeneralStage
        Attaches a stage that performs a stateful filtering operation. createFn returns the object that holds the state. Jet passes this object along with each input item to filterFn, which can update the object's state. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable.

        This sample decimates the input (throws out every 10th item):

        
         GeneralStage<String> decimated = input.filterStateful(
                 LongAccumulator::new,
                 (counter, item) -> {
                     counter.add(1);
                     return counter.get() % 10 != 0;
                 }
         );
         
        Specified by:
        filterStateful in interface GeneralStage<T>
        Type Parameters:
        S - type of the state object
        Parameters:
        createFn - function that returns the state object. It must be stateless and cooperative.
        filterFn - function that receives the state object and the input item and produces the boolean result. It may modify the state object. It must be stateless and cooperative.
      • flatMapStateful

        @Nonnull
        <S,​R> StreamStage<R> flatMapStateful​(@Nonnull
                                                   SupplierEx<? extends S> createFn,
                                                   @Nonnull
                                                   BiFunctionEx<? super S,​? super T,​? extends Traverser<R>> flatMapFn)
        Description copied from interface: GeneralStage
        Attaches a stage that performs a stateful flat-mapping operation. createFn returns the object that holds the state. Jet passes this object along with each input item to flatMapFn, which can update the object's state. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable. If you want to return the state variable from mapFn, then the return value must be a copy of state variable to avoid situations in which the result of mapFn is modified after being emitted or where the state is modified by downstream processors.

        If you want to return the state variable from flatMapFn, then the return value must be a copy of state variable to avoid situations in which the result of mapFn is modified after being emitted or where the state is modified by downstream processors.

        This sample inserts a punctuation mark (a special string) after every 10th input string:

        
         GeneralStage<String> punctuated = input.flatMapStateful(
                 LongAccumulator::new,
                 (counter, item) -> {
                     counter.add(1);
                     return counter.get() % 10 == 0
                             ? Traversers.traverseItems("punctuation", item)
                             : Traversers.singleton(item);
                 }
         );
         
        Specified by:
        flatMapStateful in interface GeneralStage<T>
        Type Parameters:
        S - type of the state object
        R - type of the result
        Parameters:
        createFn - function that returns the state object. It must be stateless and cooperative.
        flatMapFn - function that receives the state object and the input item and outputs the result items. It may modify the state object. It must not return null traverser, but can return an empty traverser. It must be stateless and cooperative.
      • rollingAggregate

        @Nonnull
        default <A,​R> StreamStage<R> rollingAggregate​(@Nonnull
                                                            AggregateOperation1<? super T,​A,​? extends R> aggrOp)
        Description copied from interface: GeneralStage
        Attaches a rolling aggregation stage. This is a special case of stateful mapping that uses an AggregateOperation. It passes each input item to the accumulator and outputs the current result of aggregation (as returned by the export primitive).

        Sample usage:

        
         stage.rollingAggregate(AggregateOperations.summing())
         
        For example, if your input is {2, 7, 8, -5}, the output will be {2, 9, 17, 12}.

        This stage is fault-tolerant and saves its state to the snapshot.

        NOTE: since the output for each item depends on all the previous items, this operation cannot be parallelized. Jet will perform it on a single member, single-threaded. Jet also supports keyed rolling aggregation which it can parallelize by partitioning.

        Specified by:
        rollingAggregate in interface GeneralStage<T>
        R - result type of the aggregate operation
        Parameters:
        aggrOp - the aggregate operation to do the aggregation
        Returns:
        the newly attached stage
      • mapUsingService

        @Nonnull
        <S,​R> StreamStage<R> mapUsingService​(@Nonnull
                                                   ServiceFactory<?,​S> serviceFactory,
                                                   @Nonnull
                                                   BiFunctionEx<? super S,​? super T,​? extends R> mapFn)
        Description copied from interface: GeneralStage
        Attaches a mapping stage which applies the supplied function to each input item independently and emits the function's result as the output item. The mapping function receives another parameter, the service object, which Jet will create using the supplied serviceFactory.

        If the mapping result is null, it emits nothing. Therefore, this stage can be used to implement filtering semantics as well.

        This sample takes a stream of stock items and sets the detail field on them by looking up from a registry:

        
         stage.mapUsingService(
             ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.hazelcastInstance())),
             (reg, item) -> item.setDetail(reg.fetchDetail(item))
         )
         

        Interaction with fault-tolerant unbounded jobs

        If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.
        Specified by:
        mapUsingService in interface GeneralStage<T>
        Type Parameters:
        S - type of service object
        R - the result type of the mapping function
        Parameters:
        serviceFactory - the service factory
        mapFn - a mapping function. It must be stateless. It must be cooperative, if the service is cooperative.
        Returns:
        the newly attached stage
      • mapUsingServiceAsync

        @Nonnull
        default <S,​R> StreamStage<R> mapUsingServiceAsync​(@Nonnull
                                                                ServiceFactory<?,​S> serviceFactory,
                                                                @Nonnull
                                                                BiFunctionEx<? super S,​? super T,​? extends java.util.concurrent.CompletableFuture<R>> mapAsyncFn)
        Description copied from interface: GeneralStage
        Asynchronous version of GeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>): the mapAsyncFn returns a CompletableFuture<R> instead of just R.

        Uses default values for some extra parameters, so the maximum number of concurrent async operations per processor will be limited to 4 and whether or not the order of input items should be preserved will be true.

        The function can return a null future or the future can return a null result: in both cases it will act just like a filter.

        The latency of the async call will add to the total latency of the output.

        This sample takes a stream of stock items and sets the detail field on them by looking up from a registry:

        
         stage.mapUsingServiceAsync(
             ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.hazelcastInstance())),
             (reg, item) -> reg.fetchDetailAsync(item)
                               .thenApply(detail -> item.setDetail(detail))
         )
         

        Interaction with fault-tolerant unbounded jobs

        If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.
        Specified by:
        mapUsingServiceAsync in interface GeneralStage<T>
        Type Parameters:
        S - type of service object
        R - the future result type of the mapping function
        Parameters:
        serviceFactory - the service factory
        mapAsyncFn - a mapping function. Can map to null (return a null future). It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • mapUsingServiceAsync

        @Nonnull
        <S,​R> StreamStage<R> mapUsingServiceAsync​(@Nonnull
                                                        ServiceFactory<?,​S> serviceFactory,
                                                        int maxConcurrentOps,
                                                        boolean preserveOrder,
                                                        @Nonnull
                                                        BiFunctionEx<? super S,​? super T,​? extends java.util.concurrent.CompletableFuture<R>> mapAsyncFn)
        Description copied from interface: GeneralStage
        Asynchronous version of GeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>): the mapAsyncFn returns a CompletableFuture<R> instead of just R.

        The function can return a null future or the future can return a null result: in both cases it will act just like a filter.

        The latency of the async call will add to the total latency of the output.

        This sample takes a stream of stock items and sets the detail field on them by looking up from a registry:

        
         stage.mapUsingServiceAsync(
             ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.hazelcastInstance())),
             (reg, item) -> reg.fetchDetailAsync(item)
                               .thenApply(detail -> item.setDetail(detail))
         )
         

        Interaction with fault-tolerant unbounded jobs

        If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.
        Specified by:
        mapUsingServiceAsync in interface GeneralStage<T>
        Type Parameters:
        S - type of service object
        R - the future result type of the mapping function
        Parameters:
        serviceFactory - the service factory
        maxConcurrentOps - maximum number of concurrent async operations per processor
        preserveOrder - whether the ordering of the input items should be preserved
        mapAsyncFn - a mapping function. Can map to null (return a null future). It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • mapUsingServiceAsyncBatched

        @Nonnull
        <S,​R> StreamStage<R> mapUsingServiceAsyncBatched​(@Nonnull
                                                               ServiceFactory<?,​S> serviceFactory,
                                                               int maxBatchSize,
                                                               @Nonnull
                                                               BiFunctionEx<? super S,​? super java.util.List<T>,​? extends java.util.concurrent.CompletableFuture<java.util.List<R>>> mapAsyncFn)
        Description copied from interface: GeneralStage
        Batched version of GeneralStage.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends java.util.concurrent.CompletableFuture<R>>): mapAsyncFn takes a list of input items and returns a CompletableFuture<List<R>>. The size of the input list is limited by the given maxBatchSize.

        The number of in-flight batches being completed asynchronously is limited to 2 and this mapping operation always preserves the order of input elements.

        This transform can perform filtering by putting null elements into the output list.

        The latency of the async call will add to the total latency of the output.

        This sample takes a stream of stock items and sets the detail field on them by performing batched lookups from a registry. The max size of the items to lookup is specified as 100:

        
         stage.mapUsingServiceAsyncBatched(
             ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.hazelcastInstance())),
             100,
             (reg, itemList) -> reg
                     .fetchDetailsAsync(itemList)
                     .thenApply(detailList -> {
                         for (int i = 0; i < itemList.size(); i++) {
                             itemList.get(i).setDetail(detailList.get(i))
                         }
                     })
         )
         

        Interaction with fault-tolerant unbounded jobs

        If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.
        Specified by:
        mapUsingServiceAsyncBatched in interface GeneralStage<T>
        Type Parameters:
        S - type of service object
        R - the future result type of the mapping function
        Parameters:
        serviceFactory - the service factory
        maxBatchSize - max size of the input list
        mapAsyncFn - a mapping function. It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • filterUsingService

        @Nonnull
        <S> StreamStage<T> filterUsingService​(@Nonnull
                                              ServiceFactory<?,​S> serviceFactory,
                                              @Nonnull
                                              BiPredicateEx<? super S,​? super T> filterFn)
        Description copied from interface: GeneralStage
        Attaches a filtering stage which applies the provided predicate function to each input item to decide whether to pass the item to the output or to discard it. The predicate function receives another parameter, the service object, which Jet will create using the supplied serviceFactory.

        This sample takes a stream of photos, uses an image classifier to reason about their contents, and keeps only photos of cats:

        
         photos.filterUsingService(
             ServiceFactories.sharedService(ctx -> new ImageClassifier(ctx.hazelcastInstance())),
             (classifier, photo) -> classifier.classify(photo).equals("cat")
         )
         

        Interaction with fault-tolerant unbounded jobs

        If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.
        Specified by:
        filterUsingService in interface GeneralStage<T>
        Type Parameters:
        S - type of service object
        Parameters:
        serviceFactory - the service factory
        filterFn - a filter predicate function. It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • flatMapUsingService

        @Nonnull
        <S,​R> StreamStage<R> flatMapUsingService​(@Nonnull
                                                       ServiceFactory<?,​S> serviceFactory,
                                                       @Nonnull
                                                       BiFunctionEx<? super S,​? super T,​? extends Traverser<R>> flatMapFn)
        Description copied from interface: GeneralStage
        Attaches a flat-mapping stage which applies the supplied function to each input item independently and emits all items from the Traverser it returns as the output items. The traverser must be null-terminated. The mapping function receives another parameter, the service object, which Jet will create using the supplied serviceFactory.

        This sample takes a stream of products and outputs an "exploded" stream of all the parts that go into making them:

        
         StreamStage<Part> parts = products.flatMapUsingService(
             ServiceFactories.sharedService(ctx -> new PartRegistryCtx()),
             (registry, product) -> Traversers.traverseIterable(
                                        registry.fetchParts(product))
         );
         

        Interaction with fault-tolerant unbounded jobs

        If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.
        Specified by:
        flatMapUsingService in interface GeneralStage<T>
        Type Parameters:
        S - type of service object
        R - the type of items in the result's traversers
        Parameters:
        serviceFactory - the service factory
        flatMapFn - a flatmapping function, whose result type is Jet's Traverser. It must not return null traverser, but can return an empty traverser. It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • mapUsingReplicatedMap

        @Nonnull
        default <K,​V,​R> StreamStage<R> mapUsingReplicatedMap​(@Nonnull
                                                                         java.lang.String mapName,
                                                                         @Nonnull
                                                                         FunctionEx<? super T,​? extends K> lookupKeyFn,
                                                                         @Nonnull
                                                                         BiFunctionEx<? super T,​? super V,​? extends R> mapFn)
        Description copied from interface: GeneralStage
        Attaches a mapping stage where for each item a lookup in the ReplicatedMap with the supplied name is performed and the result of the lookup is merged with the item and emitted.

        If the result of the mapping is null, it emits nothing. Therefore, this stage can be used to implement filtering semantics as well.

        The mapping logic is equivalent to:

        
         K key = lookupKeyFn.apply(item);
         V value = replicatedMap.get(key);
         return mapFn.apply(item, value);
         
        This sample takes a stream of stock items and sets the detail field on them by looking up from a registry:
        
         items.mapUsingReplicatedMap(
             "enriching-map",
             item -> item.getDetailId(),
             (Item item, ItemDetail detail) -> item.setDetail(detail)
         )
         
        Specified by:
        mapUsingReplicatedMap in interface GeneralStage<T>
        Type Parameters:
        K - type of the key in the ReplicatedMap
        V - type of the value in the ReplicatedMap
        R - type of the output item
        Parameters:
        mapName - name of the ReplicatedMap
        lookupKeyFn - a function which returns the key to look up in the map. Must not return null. It must be stateless and cooperative.
        mapFn - the mapping function. It must be stateless and cooperative
        Returns:
        the newly attached stage
      • mapUsingReplicatedMap

        @Nonnull
        default <K,​V,​R> StreamStage<R> mapUsingReplicatedMap​(@Nonnull
                                                                         ReplicatedMap<K,​V> replicatedMap,
                                                                         @Nonnull
                                                                         FunctionEx<? super T,​? extends K> lookupKeyFn,
                                                                         @Nonnull
                                                                         BiFunctionEx<? super T,​? super V,​? extends R> mapFn)
        Description copied from interface: GeneralStage
        Attaches a mapping stage where for each item a lookup in the supplied ReplicatedMap is performed and the result of the lookup is merged with the item and emitted.

        If the result of the mapping is null, it emits nothing. Therefore, this stage can be used to implement filtering semantics as well.

        The mapping logic is equivalent to:

        
         K key = lookupKeyFn.apply(item);
         V value = replicatedMap.get(key);
         return mapFn.apply(item, value);
         
        This sample takes a stream of stock items and sets the detail field on them by looking up from a registry:
        
         items.mapUsingReplicatedMap(
             enrichingMap,
             item -> item.getDetailId(),
             (item, detail) -> item.setDetail(detail)
         )
         
        Specified by:
        mapUsingReplicatedMap in interface GeneralStage<T>
        Type Parameters:
        K - type of the key in the ReplicatedMap
        V - type of the value in the ReplicatedMap
        R - type of the output item
        Parameters:
        replicatedMap - the ReplicatedMap to lookup from
        lookupKeyFn - a function which returns the key to look up in the map. Must not return null. It must be stateless and cooperative.
        mapFn - the mapping function. It must be stateless and cooperative
        Returns:
        the newly attached stage
      • mapUsingIMap

        @Nonnull
        default <K,​V,​R> StreamStage<R> mapUsingIMap​(@Nonnull
                                                                java.lang.String mapName,
                                                                @Nonnull
                                                                FunctionEx<? super T,​? extends K> lookupKeyFn,
                                                                @Nonnull
                                                                BiFunctionEx<? super T,​? super V,​? extends R> mapFn)
        Description copied from interface: GeneralStage
        Attaches a mapping stage where for each item a lookup in the IMap with the supplied name is performed and the result of the lookup is merged with the item and emitted.

        If the result of the mapping is null, it emits nothing. Therefore, this stage can be used to implement filtering semantics as well.

        The mapping logic is equivalent to:

        
         K key = lookupKeyFn.apply(item);
         V value = map.get(key);
         return mapFn.apply(item, value);
         
        This sample takes a stream of stock items and sets the detail field on them by looking up from a registry:
        
         items.mapUsingIMap(
             "enriching-map",
             item -> item.getDetailId(),
             (Item item, ItemDetail detail) -> item.setDetail(detail)
         )
         
        See also GeneralStageWithKey.mapUsingIMap(java.lang.String, com.hazelcast.function.BiFunctionEx<? super T, ? super V, ? extends R>) for a partitioned version of this operation.
        Specified by:
        mapUsingIMap in interface GeneralStage<T>
        Type Parameters:
        K - type of the key in the IMap
        V - type of the value in the IMap
        R - type of the output item
        Parameters:
        mapName - name of the IMap
        lookupKeyFn - a function which returns the key to look up in the map. Must not return null. It must be stateless and cooperative.
        mapFn - the mapping function. It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • mapUsingIMap

        @Nonnull
        default <K,​V,​R> StreamStage<R> mapUsingIMap​(@Nonnull
                                                                IMap<K,​V> iMap,
                                                                @Nonnull
                                                                FunctionEx<? super T,​? extends K> lookupKeyFn,
                                                                @Nonnull
                                                                BiFunctionEx<? super T,​? super V,​? extends R> mapFn)
        Description copied from interface: GeneralStage
        Attaches a mapping stage where for each item a lookup in the supplied IMap is performed and the result of the lookup is merged with the item and emitted.

        If the result of the mapping is null, it emits nothing. Therefore, this stage can be used to implement filtering semantics as well.

        The mapping logic is equivalent to:

        
         K key = lookupKeyFn.apply(item);
         V value = map.get(key);
         return mapFn.apply(item, value);
         
        This sample takes a stream of stock items and sets the detail field on them by looking up from a registry:
        
         items.mapUsingIMap(
             enrichingMap,
             item -> item.getDetailId(),
             (item, detail) -> item.setDetail(detail)
         )
         
        See also GeneralStageWithKey.mapUsingIMap(java.lang.String, com.hazelcast.function.BiFunctionEx<? super T, ? super V, ? extends R>) for a partitioned version of this operation.
        Specified by:
        mapUsingIMap in interface GeneralStage<T>
        Type Parameters:
        K - type of the key in the IMap
        V - type of the value in the IMap
        R - type of the output item
        Parameters:
        iMap - the IMap to lookup from
        lookupKeyFn - a function which returns the key to look up in the map. Must not return null. It must be stateless and cooperative.
        mapFn - the mapping function. It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • hashJoin

        @Nonnull
        <K,​T1_IN,​T1,​R> StreamStage<R> hashJoin​(@Nonnull
                                                                 BatchStage<T1_IN> stage1,
                                                                 @Nonnull
                                                                 JoinClause<K,​? super T,​? super T1_IN,​? extends T1> joinClause1,
                                                                 @Nonnull
                                                                 BiFunctionEx<T,​T1,​R> mapToOutputFn)
        Description copied from interface: GeneralStage
        Attaches to both this and the supplied stage a hash-joining stage and returns it. This stage plays the role of the primary stage in the hash-join. Please refer to the package javadoc for a detailed description of the hash-join transform.

        This sample joins a stream of users to a stream of countries and outputs a stream of users with the country field set:

        
         // Types of the input stages:
         BatchStage<User> users;
         BatchStage<Map.Entry<Long, Country>> idAndCountry;
        
         users.hashJoin(
             idAndCountry,
             JoinClause.joinMapEntries(User::getCountryId),
             (user, country) -> user.setCountry(country)
         )
         

        This operation is subject to memory limits. See JetConfig.setMaxProcessorAccumulatedRecords(long) for more information.

        Specified by:
        hashJoin in interface GeneralStage<T>
        Type Parameters:
        K - the type of the join key
        T1_IN - the type of stage1 items
        T1 - the result type of projection on stage1 items
        R - the resulting output type
        Parameters:
        stage1 - the stage to hash-join with this one
        joinClause1 - specifies how to join the two streams
        mapToOutputFn - function to map the joined items to the output value. It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • innerHashJoin

        @Nonnull
        <K,​T1_IN,​T1,​R> StreamStage<R> innerHashJoin​(@Nonnull
                                                                      BatchStage<T1_IN> stage1,
                                                                      @Nonnull
                                                                      JoinClause<K,​? super T,​? super T1_IN,​? extends T1> joinClause1,
                                                                      @Nonnull
                                                                      BiFunctionEx<T,​T1,​R> mapToOutputFn)
        Description copied from interface: GeneralStage
        Attaches to both this and the supplied stage an inner hash-joining stage and returns it. This stage plays the role of the primary stage in the hash-join. Please refer to the package javadoc for a detailed description of the hash-join transform.

        This sample joins a stream of users to a stream of countries and outputs a stream of users with the country field set:

        
         // Types of the input stages:
         BatchStage<User> users;
         BatchStage<Map.Entry<Long, Country>> idAndCountry;
        
         users.innerHashJoin(
             idAndCountry,
             JoinClause.joinMapEntries(User::getCountryId),
             (user, country) -> user.setCountry(country)
         )
         

        This method is similar to GeneralStage.hashJoin(com.hazelcast.jet.pipeline.BatchStage<T1_IN>, com.hazelcast.jet.pipeline.JoinClause<K, ? super T, ? super T1_IN, ? extends T1>, com.hazelcast.function.BiFunctionEx<T, T1, R>) method, but it guarantees that both input items will be not-null. Nulls will be filtered out before reaching #mapToOutputFn.

        This operation is subject to memory limits. See JetConfig.setMaxProcessorAccumulatedRecords(long) for more information.

        Specified by:
        innerHashJoin in interface GeneralStage<T>
        Type Parameters:
        K - the type of the join key
        T1_IN - the type of stage1 items
        T1 - the result type of projection on stage1 items
        R - the resulting output type
        Parameters:
        stage1 - the stage to hash-join with this one
        joinClause1 - specifies how to join the two streams
        mapToOutputFn - function to map the joined items to the output value. It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • hashJoin2

        @Nonnull
        <K1,​K2,​T1_IN,​T2_IN,​T1,​T2,​R> StreamStage<R> hashJoin2​(@Nonnull
                                                                                                 BatchStage<T1_IN> stage1,
                                                                                                 @Nonnull
                                                                                                 JoinClause<K1,​? super T,​? super T1_IN,​? extends T1> joinClause1,
                                                                                                 @Nonnull
                                                                                                 BatchStage<T2_IN> stage2,
                                                                                                 @Nonnull
                                                                                                 JoinClause<K2,​? super T,​? super T2_IN,​? extends T2> joinClause2,
                                                                                                 @Nonnull
                                                                                                 TriFunction<T,​T1,​T2,​R> mapToOutputFn)
        Description copied from interface: GeneralStage
        Attaches to this and the two supplied stages a hash-joining stage and returns it. This stage plays the role of the primary stage in the hash-join. Please refer to the package javadoc for a detailed description of the hash-join transform.

        This sample joins a stream of users to streams of countries and companies, and outputs a stream of users with the country and company fields set:

        
         // Types of the input stages:
         BatchStage<User> users;
         BatchStage<Map.Entry<Long, Country>> idAndCountry;
         BatchStage<Map.Entry<Long, Company>> idAndCompany;
        
         users.hashJoin2(
             idAndCountry, JoinClause.joinMapEntries(User::getCountryId),
             idAndCompany, JoinClause.joinMapEntries(User::getCompanyId),
             (user, country, company) -> user.setCountry(country).setCompany(company)
         )
         

        This operation is subject to memory limits. See JetConfig.setMaxProcessorAccumulatedRecords(long) for more information.

        Specified by:
        hashJoin2 in interface GeneralStage<T>
        Type Parameters:
        K1 - the type of key for stage1
        K2 - the type of key for stage2
        T1_IN - the type of stage1 items
        T2_IN - the type of stage2 items
        T1 - the result type of projection of stage1 items
        T2 - the result type of projection of stage2 items
        R - the resulting output type
        Parameters:
        stage1 - the first stage to join
        joinClause1 - specifies how to join with stage1
        stage2 - the second stage to join
        joinClause2 - specifies how to join with stage2
        mapToOutputFn - function to map the joined items to the output value. It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • hashJoinBuilder

        @Nonnull
        default StreamHashJoinBuilder<T> hashJoinBuilder()
        Description copied from interface: GeneralStage
        Returns a fluent API builder object to construct a hash join operation with any number of contributing stages. It is mainly intended for hash-joins with three or more enriching stages. For one or two stages prefer the direct stage.hashJoinN(...) calls because they offer more static type safety.

        This sample joins a stream of users to streams of countries and companies, and outputs a stream of users with the country and company fields set:

        
         // Types of the input stages:
         StreamStage<User> users;
         BatchStage<Map.Entry<Long, Country>> idAndCountry;
         BatchStage<Map.Entry<Long, Company>> idAndCompany;
        
         StreamHashJoinBuilder<User> builder = users.hashJoinBuilder();
         Tag<Country> tCountry = builder.add(idAndCountry,
                 JoinClause.joinMapEntries(User::getCountryId));
         Tag<Company> tCompany = builder.add(idAndCompany,
                 JoinClause.joinMapEntries(User::getCompanyId));
         StreamStage<User> joined = builder.build((user, itemsByTag) ->
                 user.setCountry(itemsByTag.get(tCountry)).setCompany(itemsByTag.get(tCompany)));
         

        This operation is subject to memory limits. See JetConfig.setMaxProcessorAccumulatedRecords(long) for more information.

        Specified by:
        hashJoinBuilder in interface GeneralStage<T>
        Returns:
        the newly attached stage
      • peek

        @Nonnull
        default StreamStage<T> peek()
        Description copied from interface: GeneralStage
        Adds a peeking layer to this compute stage which logs its output. For each item the stage emits, it logs the result of its toString() method at the INFO level to the log category com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>. The stage logs each item on the cluster member that outputs it. Its primary purpose is for development use, when running Jet on a local machine.

        Note that peek after GeneralStage.rebalance(FunctionEx) is not supported.

        Specified by:
        peek in interface GeneralStage<T>
        Returns:
        the newly attached stage
        See Also:
        GeneralStage.peek(PredicateEx, FunctionEx), GeneralStage.peek(FunctionEx)
      • peek

        @Nonnull
        StreamStage<T> peek​(@Nonnull
                            PredicateEx<? super T> shouldLogFn,
                            @Nonnull
                            FunctionEx<? super T,​? extends java.lang.CharSequence> toStringFn)
        Description copied from interface: GeneralStage
        Attaches a peeking stage which logs this stage's output and passes it through without transformation. For each item the stage emits, it:
        1. uses the shouldLogFn predicate to see whether to log the item
        2. if yes, uses then uses toStringFn to get the item's string representation
        3. logs the string at the INFO level to the log category com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
        The stage logs each item on the cluster member that outputs it. Its primary purpose is for development use, when running Jet on a local machine.

        Note that peek after GeneralStage.rebalance(FunctionEx) operation is not supported.

        Sample usage:

        
         users.peek(
             user -> user.getName().size() > 100,
             User::getName
         )
         
        Specified by:
        peek in interface GeneralStage<T>
        Parameters:
        shouldLogFn - a function to filter the logged items. You can use alwaysTrue() as a pass-through filter when you don't need any filtering. It must be stateless and cooperative.
        toStringFn - a function that returns a string representation of the item. It must be stateless and cooperative.
        Returns:
        the newly attached stage
        See Also:
        GeneralStage.peek(FunctionEx), GeneralStage.peek()
      • peek

        @Nonnull
        default StreamStage<T> peek​(@Nonnull
                                    FunctionEx<? super T,​? extends java.lang.CharSequence> toStringFn)
        Description copied from interface: GeneralStage
        Adds a peeking layer to this compute stage which logs its output. For each item the stage emits, it:
        1. uses toStringFn to get a string representation of the item
        2. logs the string at the INFO level to the log category com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
        The stage logs each item on the cluster member that outputs it. Its primary purpose is for development use, when running Jet on a local machine.

        Note that peek after GeneralStage.rebalance(FunctionEx) operation is not supported.

        Sample usage:

        
         users.peek(User::getName)
         
        Specified by:
        peek in interface GeneralStage<T>
        Parameters:
        toStringFn - a function that returns a string representation of the item. It must be stateless and cooperative.
        Returns:
        the newly attached stage
        See Also:
        GeneralStage.peek(PredicateEx, FunctionEx), GeneralStage.peek()
      • customTransform

        @Nonnull
        default <R> StreamStage<R> customTransform​(@Nonnull
                                                   java.lang.String stageName,
                                                   @Nonnull
                                                   SupplierEx<Processor> procSupplier)
        Description copied from interface: GeneralStage
        Attaches a stage with a custom transform based on the provided supplier of Core API Processors.

        Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.

        Specified by:
        customTransform in interface GeneralStage<T>
        Type Parameters:
        R - the type of the output items
        Parameters:
        stageName - a human-readable name for the custom stage
        procSupplier - the supplier of processors
        Returns:
        the newly attached stage
      • customTransform

        @Nonnull
        default <R> StreamStage<R> customTransform​(@Nonnull
                                                   java.lang.String stageName,
                                                   @Nonnull
                                                   ProcessorSupplier procSupplier)
        Description copied from interface: GeneralStage
        Attaches a stage with a custom transform based on the provided supplier of Core API Processors.

        Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.

        Specified by:
        customTransform in interface GeneralStage<T>
        Type Parameters:
        R - the type of the output items
        Parameters:
        stageName - a human-readable name for the custom stage
        procSupplier - the supplier of processors
        Returns:
        the newly attached stage
      • customTransform

        @Nonnull
        <R> StreamStage<R> customTransform​(@Nonnull
                                           java.lang.String stageName,
                                           @Nonnull
                                           ProcessorMetaSupplier procSupplier)
        Description copied from interface: GeneralStage
        Attaches a stage with a custom transform based on the provided supplier of Core API Processors.

        Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.

        Specified by:
        customTransform in interface GeneralStage<T>
        Type Parameters:
        R - the type of the output items
        Parameters:
        stageName - a human-readable name for the custom stage
        procSupplier - the supplier of processors
        Returns:
        the newly attached stage
      • apply

        @Nonnull
        default <R> StreamStage<R> apply​(@Nonnull
                                         FunctionEx<? super StreamStage<T>,​? extends StreamStage<R>> transformFn)
        Transforms this stage using the provided transformFn and returns the transformed stage. It allows you to extract common pipeline transformations into a method and then call that method without interrupting the chained pipeline expression.

        For example, say you have this code:

        
         StreamStage<String> input = pipeline.readFrom(textSource);
         StreamStage<String> cleanedUp = input
                 .map(String::toLowerCase)
                 .filter(s -> s.startsWith("success"));
         
        You can capture the map and filter steps into a common "cleanup" transformation:
        
         StreamStage<String> cleanUp(StreamStage<String> input) {
              return input.map(String::toLowerCase)
                          .filter(s -> s.startsWith("success"));
         }
         
        Now you can insert this transformation as just another step in your pipeline:
        
         StreamStage<String> tokens = pipeline
             .readFrom(textSource)
             .apply(this::cleanUp)
             .flatMap(line -> traverseArray(line.split("\\W+")));
         
        Type Parameters:
        R - type of the returned stage
        Parameters:
        transformFn - function to transform this stage into another stage
        Since:
        Jet 3.1
      • setLocalParallelism

        @Nonnull
        StreamStage<T> setLocalParallelism​(int localParallelism)
        Description copied from interface: Stage
        Sets the preferred local parallelism (number of processors per Jet cluster member) this stage will configure its DAG vertices with. Jet always uses the same number of processors on each member, so the total parallelism automatically increases if another member joins the cluster.

        While most stages are backed by 1 vertex, there are exceptions. If a stage uses two vertices, each of them will have the given local parallelism, so in total there will be twice as many processors per member.

        The default value is -1 and it signals to Jet to figure out a default value. Jet will determine the vertex's local parallelism during job initialization from the global default and the processor meta-supplier's preferred value.

        Specified by:
        setLocalParallelism in interface GeneralStage<T>
        Specified by:
        setLocalParallelism in interface Stage
        Returns:
        this stage
      • setName

        @Nonnull
        StreamStage<T> setName​(@Nonnull
                               java.lang.String name)
        Description copied from interface: Stage
        Overrides the default name of the stage with the name you choose and returns the stage. This can be useful for debugging purposes, to better distinguish pipeline stages in the diagnostic output.
        Specified by:
        setName in interface GeneralStage<T>
        Specified by:
        setName in interface Stage
        Parameters:
        name - the stage name
        Returns:
        this stage