T - the type of items coming out of this stagepublic interface BatchStage<T> extends GeneralStage<T>
pipeline that will
 observe a finite amount of data (a batch). It accepts input from its
 upstream stages (if any) and passes its output to its downstream stages.DEFAULT_MAX_CONCURRENT_OPS, DEFAULT_PRESERVE_ORDER| Modifier and Type | Method and Description | 
|---|---|
| <R> BatchStage<R> | aggregate(AggregateOperation1<? super T,?,? extends R> aggrOp)Attaches a stage that performs the given aggregate operation over all
 the items it receives. | 
| default <T1,R0,R1> BatchStage<Tuple2<R0,R1>> | aggregate2(AggregateOperation1<? super T,?,? extends R0> aggrOp0,
          BatchStage<T1> stage1,
          AggregateOperation1<? super T1,?,? extends R1> aggrOp1)Attaches a stage that co-aggregates the data from this and the supplied
 stage by performing a separate aggregate operation on each and emits a
 single  Tuple2with their results. | 
| <T1,R> BatchStage<R> | aggregate2(BatchStage<T1> stage1,
          AggregateOperation2<? super T,? super T1,?,? extends R> aggrOp)Attaches a stage that performs the given aggregate operation over all
 the items it receives from both this stage and  stage1you supply. | 
| default <T1,T2,R0,R1,R2> | aggregate3(AggregateOperation1<? super T,?,? extends R0> aggrOp0,
          BatchStage<T1> stage1,
          AggregateOperation1<? super T1,?,? extends R1> aggrOp1,
          BatchStage<T2> stage2,
          AggregateOperation1<? super T2,?,? extends R2> aggrOp2)Attaches a stage that co-aggregates the data from this and the two
 supplied stages by performing a separate aggregate operation on each and
 emits a single  Tuple3with their results. | 
| <T1,T2,R> BatchStage<R> | aggregate3(BatchStage<T1> stage1,
          BatchStage<T2> stage2,
          AggregateOperation3<? super T,? super T1,? super T2,?,? extends R> aggrOp)Attaches a stage that performs the given aggregate operation over all
 the items it receives from this stage as well as  stage1andstage2you supply. | 
| default AggregateBuilder1<T> | aggregateBuilder()Offers a step-by-step API to build a pipeline stage that co-aggregates
 the data from several input stages. | 
| default <R0> AggregateBuilder<R0> | aggregateBuilder(AggregateOperation1<? super T,?,? extends R0> aggrOp0)Offers a step-by-step API to build a pipeline stage that co-aggregates
 the data from several input stages. | 
| default <R> BatchStage<R> | apply(FunctionEx<? super BatchStage<T>,? extends BatchStage<R>> transformFn)Transforms  thisstage using the providedtransformFnand
 returns the transformed stage. | 
| <R> BatchStage<R> | customTransform(String stageName,
               ProcessorMetaSupplier procSupplier)Attaches a stage with a custom transform based on the provided supplier
 of Core API  Processors. | 
| default <R> BatchStage<R> | customTransform(String stageName,
               ProcessorSupplier procSupplier)Attaches a stage with a custom transform based on the provided supplier
 of Core API  Processors. | 
| default <R> BatchStage<R> | customTransform(String stageName,
               SupplierEx<Processor> procSupplier)Attaches a stage with a custom transform based on the provided supplier
 of Core API  Processors. | 
| default BatchStage<T> | distinct()Attaches a stage that emits just the items that are distinct according
 to their definition of equality ( equalsandhashCode). | 
| BatchStage<T> | filter(PredicateEx<T> filterFn)Attaches a filtering stage which applies the provided predicate function
 to each input item to decide whether to pass the item to the output or
 to discard it. | 
| <S> BatchStage<T> | filterStateful(SupplierEx<? extends S> createFn,
              BiPredicateEx<? super S,? super T> filterFn)Attaches a stage that performs a stateful filtering operation. | 
| <S> BatchStage<T> | filterUsingService(ServiceFactory<?,S> serviceFactory,
                  BiPredicateEx<? super S,? super T> filterFn)Attaches a filtering stage which applies the provided predicate function
 to each input item to decide whether to pass the item to the output or
 to discard it. | 
| <R> BatchStage<R> | flatMap(FunctionEx<? super T,? extends Traverser<R>> flatMapFn)Attaches a flat-mapping stage which applies the supplied function to
 each input item independently and emits all the items from the  Traverserit returns. | 
| <S,R> BatchStage<R> | flatMapStateful(SupplierEx<? extends S> createFn,
               BiFunctionEx<? super S,? super T,? extends Traverser<R>> flatMapFn)Attaches a stage that performs a stateful flat-mapping operation. | 
| <S,R> BatchStage<R> | flatMapUsingService(ServiceFactory<?,S> serviceFactory,
                   BiFunctionEx<? super S,? super T,? extends Traverser<R>> flatMapFn)Attaches a flat-mapping stage which applies the supplied function to
 each input item independently and emits all items from the  Traverserit returns as the output items. | 
| <K> BatchStageWithKey<T,K> | groupingKey(FunctionEx<? super T,? extends K> keyFn)Specifies the function that will extract a key from the items in the
 associated pipeline stage. | 
| <K,T1_IN,T1,R> | hashJoin(BatchStage<T1_IN> stage1,
        JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1,
        BiFunctionEx<T,T1,R> mapToOutputFn)Attaches to both this and the supplied stage a hash-joining stage and
 returns it. | 
| <K1,K2,T1_IN,T2_IN,T1,T2,R> | hashJoin2(BatchStage<T1_IN> stage1,
         JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1,
         BatchStage<T2_IN> stage2,
         JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2,
         TriFunction<T,T1,T2,R> mapToOutputFn)Attaches to this and the two supplied stages a hash-joining stage and
 returns it. | 
| default HashJoinBuilder<T> | hashJoinBuilder()Returns a fluent API builder object to construct a hash join operation
 with any number of contributing stages. | 
| <K,T1_IN,T1,R> | innerHashJoin(BatchStage<T1_IN> stage1,
             JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1,
             BiFunctionEx<T,T1,R> mapToOutputFn)Attaches to both this and the supplied stage an inner hash-joining stage
 and returns it. | 
| <K1,K2,T1_IN,T2_IN,T1,T2,R> | innerHashJoin2(BatchStage<T1_IN> stage1,
              JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1,
              BatchStage<T2_IN> stage2,
              JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2,
              TriFunction<T,T1,T2,R> mapToOutputFn)Attaches to this and the two supplied stages a inner hash-joining stage
 and returns it. | 
| <R> BatchStage<R> | map(FunctionEx<? super T,? extends R> mapFn)Attaches a mapping stage which applies the given function to each input
 item independently and emits the function's result as the output item. | 
| <S,R> BatchStage<R> | mapStateful(SupplierEx<? extends S> createFn,
           BiFunctionEx<? super S,? super T,? extends R> mapFn)Attaches a stage that performs a stateful mapping operation. | 
| default <K,V,R> BatchStage<R> | mapUsingIMap(IMap<K,V> iMap,
            FunctionEx<? super T,? extends K> lookupKeyFn,
            BiFunctionEx<? super T,? super V,? extends R> mapFn)Attaches a mapping stage where for each item a lookup in the supplied
  IMapis performed and the result of the lookup is merged with
 the item and emitted. | 
| default <K,V,R> BatchStage<R> | mapUsingIMap(String mapName,
            FunctionEx<? super T,? extends K> lookupKeyFn,
            BiFunctionEx<? super T,? super V,? extends R> mapFn)Attaches a mapping stage where for each item a lookup in the  IMapwith the supplied name is performed and the result of the lookup is
 merged with the item and emitted. | 
| default <K,V,R> BatchStage<R> | mapUsingReplicatedMap(ReplicatedMap<K,V> replicatedMap,
                     FunctionEx<? super T,? extends K> lookupKeyFn,
                     BiFunctionEx<? super T,? super V,? extends R> mapFn)Attaches a mapping stage where for each item a lookup in the supplied
  ReplicatedMapis performed and the result of the lookup is
 merged with the item and emitted. | 
| default <K,V,R> BatchStage<R> | mapUsingReplicatedMap(String mapName,
                     FunctionEx<? super T,? extends K> lookupKeyFn,
                     BiFunctionEx<? super T,? super V,? extends R> mapFn)Attaches a mapping stage where for each item a lookup in the  ReplicatedMapwith the supplied name is performed and the result of the
 lookup is merged with the item and emitted. | 
| <S,R> BatchStage<R> | mapUsingService(ServiceFactory<?,S> serviceFactory,
               BiFunctionEx<? super S,? super T,? extends R> mapFn)Attaches a mapping stage which applies the supplied function to each
 input item independently and emits the function's result as the output
 item. | 
| default <S,R> BatchStage<R> | mapUsingServiceAsync(ServiceFactory<?,S> serviceFactory,
                    BiFunctionEx<? super S,? super T,? extends CompletableFuture<R>> mapAsyncFn)Asynchronous version of  GeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>): themapAsyncFnreturns aCompletableFuture<R>instead of justR. | 
| <S,R> BatchStage<R> | mapUsingServiceAsync(ServiceFactory<?,S> serviceFactory,
                    int maxConcurrentOps,
                    boolean preserveOrder,
                    BiFunctionEx<? super S,? super T,? extends CompletableFuture<R>> mapAsyncFn)Asynchronous version of  GeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>): themapAsyncFnreturns aCompletableFuture<R>instead of justR. | 
| <S,R> BatchStage<R> | mapUsingServiceAsyncBatched(ServiceFactory<?,S> serviceFactory,
                           int maxBatchSize,
                           BiFunctionEx<? super S,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)Batched version of  GeneralStage.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends java.util.concurrent.CompletableFuture<R>>):mapAsyncFntakes
 a list of input items and returns aCompletableFuture<List<R>>. | 
| BatchStage<T> | merge(BatchStage<? extends T> other)Attaches a stage that emits all the items from this stage as well as all
 the items from the supplied stage. | 
| default BatchStage<T> | peek()Adds a peeking layer to this compute stage which logs its output. | 
| default BatchStage<T> | peek(FunctionEx<? super T,? extends CharSequence> toStringFn)Adds a peeking layer to this compute stage which logs its output. | 
| BatchStage<T> | peek(PredicateEx<? super T> shouldLogFn,
    FunctionEx<? super T,? extends CharSequence> toStringFn)Attaches a peeking stage which logs this stage's output and passes it
 through without transformation. | 
| BatchStage<T> | rebalance()Returns a new stage that applies data rebalancing to the output of this
 stage. | 
| <K> BatchStage<T> | rebalance(FunctionEx<? super T,? extends K> keyFn)Returns a new stage that applies data rebalancing to the output of this
 stage. | 
| default <A,R> BatchStage<R> | rollingAggregate(AggregateOperation1<? super T,A,? extends R> aggrOp)Attaches a rolling aggregation stage. | 
| BatchStage<T> | setLocalParallelism(int localParallelism)Sets the preferred local parallelism (number of processors per Jet
 cluster member) this stage will configure its DAG vertices with. | 
| BatchStage<T> | setName(String name)Overrides the default name of the stage with the name you choose and
 returns the stage. | 
| BatchStage<T> | sort()Attaches a stage that sorts the input items according to their natural order. | 
| BatchStage<T> | sort(ComparatorEx<? super T> comparator)Attaches a stage that sorts the input items according to the supplied
 comparator. | 
addTimestamps, writeTogetPipeline, name@Nonnull <K> BatchStageWithKey<T,K> groupingKey(@Nonnull FunctionEx<? super T,? extends K> keyFn)
Sample usage:
 users.groupingKey(User::getId)
 
 Note: make sure the extracted key is not-null, it would fail the
 job otherwise. Also make sure that it implements equals() and
 hashCode().
groupingKey in interface GeneralStage<T>K - type of the keykeyFn - function that extracts the grouping key. It must be
     stateless and cooperative.@Nonnull <K> BatchStage<T> rebalance(@Nonnull FunctionEx<? super T,? extends K> keyFn)
GeneralStageWith partitioned rebalancing, you supply your own function that decides (indirectly) where to send each data item. Jet first applies your partition key function to the data item and then its own partitioning function to the key. The result is that all items with the same key go to the same Jet processor and different keys are distributed pseudo-randomly across the processors.
Compared to non-partitioned balancing, partitioned balancing enforces the same data distribution across members regardless of any bottlenecks. If a given member is overloaded and applies backpressure, Jet doesn't reroute the data to other members, but propagates the backpressure to the upstream. If you choose a partitioning key that has a skewed distribution (some keys being much more frequent), this will result in an imbalanced data flow.
These are some basic invariants:
stage.rebalance(rebalanceKeyFn).groupingKey(groupKeyFn).aggregate(...):
     here Jet removes the first (local) aggregation vertex and goes straight
     to distributed aggregation without combining. Grouped aggregation
     requires the data to be partitioned by the grouping key and therefore
     Jet must ignore the rebalancing key you supplied. We recommend that you
     remove it and use the parameterless stage.rebalance()
     because the end result is identical.
 stage.rebalance().aggregate(...): in this case the second vertex
     is non-parallelizable and must execute on a single member. Therefore Jet
     keeps both vertices and applies partitioned rebalancing before the first
     one.
 rebalance in interface GeneralStage<T>K - type of the keykeyFn - the partitioning key function. It must be stateless and
     cooperative.@Nonnull BatchStage<T> rebalance()
GeneralStageTo implement rebalancing, Jet uses a distributed unicast data routing pattern on the DAG edge from this stage's vertex to the next one. It routes the data in a round-robin fashion, sending each item to the next member (member list includes the local one as well). If a given member's queue is overloaded and applying backpressure, it skips it and retries in the next round. With this scheme you get perfectly balanced item counts on each member under light load, but under heavier load it favors throughput: if the network becomes a bottleneck, most data may stay local.
These are some basic invariants:
stage.rebalance().groupingKey(keyFn).aggregate(...): here Jet
     removes the first (local) aggregation vertex and goes straight to
     distributed aggregation without combining. The data is rebalanced
     through partitioning.
 stage.rebalance().aggregate(...): in this case the second vertex
     is non-parallelizable and must execute on a single member. Therefore Jet
     keeps both vertices and applies rebalancing before the first one.
 rebalance in interface GeneralStage<T>@Nonnull BatchStage<T> sort()
Sample usage:
 items.sort()
 
 This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more
 information.
ComparatorEx.naturalOrder()@Nonnull BatchStage<T> sort(@Nonnull ComparatorEx<? super T> comparator)
For example, you can sort a stream of trade events based on their stock ticker:
 BatchStage<Trade> trades = pipeline.readFrom(tradeSource);
 BatchStage<Trade> sortedTrades = trades.sort(ComparatorEx.comparing(Trade::ticker));
 
 This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more
 information.
comparator - the user-provided comparator that will be used for
     sorting. It must be stateless and cooperative.@Nonnull <R> BatchStage<R> map(@Nonnull FunctionEx<? super T,? extends R> mapFn)
GeneralStagenull, it emits nothing. Therefore this stage
 can be used to implement filtering semantics as well.
 This sample takes a stream of names and outputs the names in lowercase:
 stage.map(name -> name.toLowerCase(Locale.ROOT))
 map in interface GeneralStage<T>R - the result type of the mapping functionmapFn - a mapping function. It must be stateless and cooperative.@Nonnull BatchStage<T> filter(@Nonnull PredicateEx<T> filterFn)
GeneralStageThis sample removes empty strings from the stream:
 stage.filter(name -> !name.isEmpty())
 filter in interface GeneralStage<T>filterFn - a filter predicate function. It must be stateless and
     cooperative.@Nonnull <R> BatchStage<R> flatMap(@Nonnull FunctionEx<? super T,? extends Traverser<R>> flatMapFn)
GeneralStageTraverser it returns. The traverser must be null-terminated.
 This sample takes a stream of sentences and outputs a stream of individual words in them:
 stage.flatMap(sentence -> traverseArray(sentence.split("\\W+")))
 flatMap in interface GeneralStage<T>R - the type of items in the result's traversersflatMapFn - a flatmapping function, whose result type is
                  Jet's Traverser. It must not return a null traverser, but can
                  return an empty traverser. It must be
                  stateless and cooperative.@Nonnull <S,R> BatchStage<R> mapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiFunctionEx<? super S,? super T,? extends R> mapFn)
GeneralStagecreateFn returns the object that holds the state. Jet passes this
 object along with each input item to mapFn, which can update
 the object's state. The state object will be included in the state
 snapshot, so it survives job restarts. For this reason it must be
 serializable.
 
 This sample takes a stream of long numbers representing request
 latencies, computes the cumulative latency of all requests so far, and
 starts emitting alarm messages when the cumulative latency crosses a
 "bad behavior" threshold:
 
 StreamStage<Long> latencyAlarms = latencies.mapStateful(
         LongAccumulator::new,
         (sum, latency) -> {
             sum.add(latency);
             long cumulativeLatency = sum.get();
             return (cumulativeLatency <= LATENCY_THRESHOLD)
                     ? null
                     : cumulativeLatency;
         }
 );
 latencies.rollingAggregate(summing()).mapStateful in interface GeneralStage<T>S - type of the state objectR - type of the resultcreateFn - function that returns the state object. It must be
                 stateless and cooperative.mapFn - function that receives the state object and the input item and
                 outputs the result item. It may modify the state object. It must be
                 stateless and cooperative.@Nonnull <S> BatchStage<T> filterStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiPredicateEx<? super S,? super T> filterFn)
GeneralStagecreateFn returns the object that holds the state. Jet passes this
 object along with each input item to filterFn, which can update
 the object's state. The state object will be included in the state
 snapshot, so it survives job restarts. For this reason it must be
 serializable.
 This sample decimates the input (throws out every 10th item):
 GeneralStage<String> decimated = input.filterStateful(
         LongAccumulator::new,
         (counter, item) -> {
             counter.add(1);
             return counter.get() % 10 != 0;
         }
 );
 filterStateful in interface GeneralStage<T>S - type of the state objectcreateFn - function that returns the state object. It must be
                 stateless and cooperative.filterFn - function that receives the state object and the input item and
                 produces the boolean result. It may modify the state object. It must be
                 stateless and cooperative.@Nonnull <S,R> BatchStage<R> flatMapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiFunctionEx<? super S,? super T,? extends Traverser<R>> flatMapFn)
GeneralStagecreateFn returns the object that holds the state. Jet passes this
 object along with each input item to flatMapFn, which can update
 the object's state. The state object will be included in the state
 snapshot, so it survives job restarts. For this reason it must be
 serializable.
 This sample inserts a punctuation mark (a special string) after every 10th input string:
 GeneralStage<String> punctuated = input.flatMapStateful(
         LongAccumulator::new,
         (counter, item) -> {
             counter.add(1);
             return counter.get() % 10 == 0
                     ? Traversers.traverseItems("punctuation", item)
                     : Traversers.singleton(item);
         }
 );
 flatMapStateful in interface GeneralStage<T>S - type of the state objectR - type of the resultcreateFn - function that returns the state object. It must be
                  stateless and cooperative.flatMapFn - function that receives the state object and the input item and
                  outputs the result items. It may modify the state
                  object. It must not return null traverser, but can
                  return an empty traverser. It must be
                  stateless and cooperative.@Nonnull default <A,R> BatchStage<R> rollingAggregate(@Nonnull AggregateOperation1<? super T,A,? extends R> aggrOp)
GeneralStageAggregateOperation. It passes each input item to
 the accumulator and outputs the current result of aggregation (as
 returned by the export primitive).
 Sample usage:
 stage.rollingAggregate(AggregateOperations.summing())
 {2, 7, 8, -5}, the output will be
 {2, 9, 17, 12}.
 This stage is fault-tolerant and saves its state to the snapshot.
 NOTE: since the output for each item depends on all
 the previous items, this operation cannot be parallelized. Jet will
 perform it on a single member, single-threaded. Jet also supports
 keyed rolling aggregation
 which it can parallelize by partitioning.
rollingAggregate in interface GeneralStage<T>R - result type of the aggregate operationaggrOp - the aggregate operation to do the aggregation@Nonnull <S,R> BatchStage<R> mapUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull BiFunctionEx<? super S,? super T,? extends R> mapFn)
GeneralStageserviceFactory.
 
 If the mapping result is null, it emits nothing. Therefore this
 stage can be used to implement filtering semantics as well.
 
 This sample takes a stream of stock items and sets the detail
 field on them by looking up from a registry:
 
 stage.mapUsingService(
     ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.hazelcastInstance())),
     (reg, item) -> item.setDetail(reg.fetchDetail(item))
 )
 mapUsingService in interface GeneralStage<T>S - type of service objectR - the result type of the mapping functionserviceFactory - the service factorymapFn - a mapping function. It must be stateless. It must be
     cooperative, if the service
     is cooperative.@Nonnull default <S,R> BatchStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull BiFunctionEx<? super S,? super T,? extends CompletableFuture<R>> mapAsyncFn)
GeneralStageGeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>): the mapAsyncFn
 returns a CompletableFuture<R> instead of just R.
 Uses default values for some extra parameters, so the maximum number of concurrent async operations per processor will be limited to 4 and whether or not the order of input items should be preserved will be true.
The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
The latency of the async call will add to the total latency of the output.
 This sample takes a stream of stock items and sets the detail
 field on them by looking up from a registry:
 
 stage.mapUsingServiceAsync(
     ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.hazelcastInstance())),
     (reg, item) -> reg.fetchDetailAsync(item)
                       .thenApply(detail -> item.setDetail(detail))
 )
 mapUsingServiceAsync in interface GeneralStage<T>S - type of service objectR - the future result type of the mapping functionserviceFactory - the service factorymapAsyncFn - a mapping function. Can map to null (return a null
     future). It must be stateless and cooperative.@Nonnull <S,R> BatchStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?,S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, @Nonnull BiFunctionEx<? super S,? super T,? extends CompletableFuture<R>> mapAsyncFn)
GeneralStageGeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>): the mapAsyncFn
 returns a CompletableFuture<R> instead of just R.
 The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
The latency of the async call will add to the total latency of the output.
 This sample takes a stream of stock items and sets the detail
 field on them by looking up from a registry:
 
 stage.mapUsingServiceAsync(
     ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.hazelcastInstance())),
     (reg, item) -> reg.fetchDetailAsync(item)
                       .thenApply(detail -> item.setDetail(detail))
 )
 mapUsingServiceAsync in interface GeneralStage<T>S - type of service objectR - the future result type of the mapping functionserviceFactory - the service factorymaxConcurrentOps - maximum number of concurrent async operations per processorpreserveOrder - whether the ordering of the input items should be preservedmapAsyncFn - a mapping function. Can map to null (return a null
     future). It must be stateless and cooperative.@Nonnull <S> BatchStage<T> filterUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull BiPredicateEx<? super S,? super T> filterFn)
GeneralStageserviceFactory.
 This sample takes a stream of photos, uses an image classifier to reason about their contents, and keeps only photos of cats:
 photos.filterUsingService(
     ServiceFactories.sharedService(ctx -> new ImageClassifier(ctx.hazelcastInstance())),
     (classifier, photo) -> classifier.classify(photo).equals("cat")
 )
 filterUsingService in interface GeneralStage<T>S - type of service objectserviceFactory - the service factoryfilterFn - a filter predicate function. It must be stateless and
     cooperative.@Nonnull <S,R> BatchStage<R> flatMapUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull BiFunctionEx<? super S,? super T,? extends Traverser<R>> flatMapFn)
GeneralStageTraverser it returns as the output items. The traverser must be
 null-terminated. The mapping function receives another
 parameter, the service object, which Jet will create using the supplied
 serviceFactory.
 This sample takes a stream of products and outputs an "exploded" stream of all the parts that go into making them:
 StreamStage<Part> parts = products.flatMapUsingService(
     ServiceFactories.sharedService(ctx -> new PartRegistryCtx()),
     (registry, product) -> Traversers.traverseIterable(
                                registry.fetchParts(product))
 );
 flatMapUsingService in interface GeneralStage<T>S - type of service objectR - the type of items in the result's traversersserviceFactory - the service factoryflatMapFn - a flatmapping function, whose result type is Jet's Traverser. It must not return null traverser, but can return an
                  empty traverser. It must be stateless
                  and cooperative.@Nonnull <S,R> BatchStage<R> mapUsingServiceAsyncBatched(@Nonnull ServiceFactory<?,S> serviceFactory, int maxBatchSize, @Nonnull BiFunctionEx<? super S,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
GeneralStageGeneralStage.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends java.util.concurrent.CompletableFuture<R>>): mapAsyncFn takes
 a list of input items and returns a CompletableFuture<List<R>>.
 The size of the input list is limited by the given maxBatchSize.
 The number of in-flight batches being completed asynchronously is limited to and this mapping operation always preserves the order of input elements.
 This transform can perform filtering by putting null elements into
 the output list.
 
The latency of the async call will add to the total latency of the output.
 This sample takes a stream of stock items and sets the detail
 field on them by performing batched lookups from a registry. The max
 size of the items to lookup is specified as 100:
 
 stage.mapUsingServiceAsyncBatched(
     ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.hazelcastInstance())),
     100,
     (reg, itemList) -> reg
             .fetchDetailsAsync(itemList)
             .thenApply(detailList -> {
                 for (int i = 0; i < itemList.size(); i++) {
                     itemList.get(i).setDetail(detailList.get(i))
                 }
             })
 )
 mapUsingServiceAsyncBatched in interface GeneralStage<T>S - type of service objectR - the future result type of the mapping functionserviceFactory - the service factorymaxBatchSize - max size of the input listmapAsyncFn - a mapping function. It must be stateless and
     cooperative.@Nonnull default <K,V,R> BatchStage<R> mapUsingReplicatedMap(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
GeneralStageReplicatedMap with the supplied name is performed and the result of the
 lookup is merged with the item and emitted.
 
 If the result of the mapping is null, it emits nothing.
 Therefore this stage can be used to implement filtering semantics as
 well.
 
The mapping logic is equivalent to:
 K key = lookupKeyFn.apply(item);
 V value = replicatedMap.get(key);
 return mapFn.apply(item, value);
 detail
 field on them by looking up from a registry:
 
 items.mapUsingReplicatedMap(
     "enriching-map",
     item -> item.getDetailId(),
     (Item item, ItemDetail detail) -> item.setDetail(detail)
 )
 mapUsingReplicatedMap in interface GeneralStage<T>K - type of the key in the ReplicatedMapV - type of the value in the ReplicatedMapR - type of the output itemmapName - name of the ReplicatedMaplookupKeyFn - a function which returns the key to look up in the map. Must not return
                    null. It must be stateless and cooperative.mapFn - the mapping function. It must be stateless and cooperative@Nonnull default <K,V,R> BatchStage<R> mapUsingReplicatedMap(@Nonnull ReplicatedMap<K,V> replicatedMap, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
GeneralStageReplicatedMap is performed and the result of the lookup is
 merged with the item and emitted.
 
 If the result of the mapping is null, it emits nothing.
 Therefore this stage can be used to implement filtering semantics as
 well.
 
The mapping logic is equivalent to:
 K key = lookupKeyFn.apply(item);
 V value = replicatedMap.get(key);
 return mapFn.apply(item, value);
 detail
 field on them by looking up from a registry:
 
 items.mapUsingReplicatedMap(
     enrichingMap,
     item -> item.getDetailId(),
     (item, detail) -> item.setDetail(detail)
 )
 mapUsingReplicatedMap in interface GeneralStage<T>K - type of the key in the ReplicatedMapV - type of the value in the ReplicatedMapR - type of the output itemreplicatedMap - the ReplicatedMap to lookup fromlookupKeyFn - a function which returns the key to look up in the map. Must not return
                    null. It must be stateless and cooperative.mapFn - the mapping function. It must be stateless and cooperative@Nonnull default <K,V,R> BatchStage<R> mapUsingIMap(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
GeneralStageIMap
 with the supplied name is performed and the result of the lookup is
 merged with the item and emitted.
 
 If the result of the mapping is null, it emits nothing.
 Therefore this stage can be used to implement filtering semantics as
 well.
 
The mapping logic is equivalent to:
 K key = lookupKeyFn.apply(item);
 V value = map.get(key);
 return mapFn.apply(item, value);
 detail
 field on them by looking up from a registry:
 
 items.mapUsingIMap(
     "enriching-map",
     item -> item.getDetailId(),
     (Item item, ItemDetail detail) -> item.setDetail(detail)
 )
 GeneralStageWithKey.mapUsingIMap(java.lang.String, com.hazelcast.function.BiFunctionEx<? super T, ? super V, ? extends R>) for a partitioned
 version of this operation.mapUsingIMap in interface GeneralStage<T>K - type of the key in the IMapV - type of the value in the IMapR - type of the output itemmapName - name of the IMaplookupKeyFn - a function which returns the key to look up in the map. Must not return
     null. It must be stateless and cooperative.mapFn - the mapping function. It must be stateless and cooperative.@Nonnull default <K,V,R> BatchStage<R> mapUsingIMap(@Nonnull IMap<K,V> iMap, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
GeneralStageIMap is performed and the result of the lookup is merged with
 the item and emitted.
 
 If the result of the mapping is null, it emits nothing.
 Therefore this stage can be used to implement filtering semantics as
 well.
 
The mapping logic is equivalent to:
 K key = lookupKeyFn.apply(item);
 V value = map.get(key);
 return mapFn.apply(item, value);
 detail
 field on them by looking up from a registry:
 
 items.mapUsingIMap(
     enrichingMap,
     item -> item.getDetailId(),
     (item, detail) -> item.setDetail(detail)
 )
 GeneralStageWithKey.mapUsingIMap(java.lang.String, com.hazelcast.function.BiFunctionEx<? super T, ? super V, ? extends R>) for a partitioned
 version of this operation.mapUsingIMap in interface GeneralStage<T>K - type of the key in the IMapV - type of the value in the IMapR - type of the output itemiMap - the IMap to lookup fromlookupKeyFn - a function which returns the key to look up in the map. Must not return
     null. It must be stateless and cooperative.mapFn - the mapping function. It must be stateless and cooperative.@Nonnull default BatchStage<T> distinct()
equals and hashCode).
 There is no guarantee which one of equal items it will emit.
 
 This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more
 information.
@Nonnull BatchStage<T> merge(@Nonnull BatchStage<? extends T> other)
other - the other stage whose data to merge into this one@Nonnull <K,T1_IN,T1,R> BatchStage<R> hashJoin(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull BiFunctionEx<T,T1,R> mapToOutputFn)
GeneralStagepackage javadoc for a detailed description of the hash-join transform.
 
 This sample joins a stream of users to a stream of countries and outputs
 a stream of users with the country field set:
 
 // Types of the input stages:
 BatchStage<User> users;
 BatchStage<Map.Entry<Long, Country>> idAndCountry;
 users.hashJoin(
     idAndCountry,
     JoinClause.joinMapEntries(User::getCountryId),
     (user, country) -> user.setCountry(country)
 )
 
 This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more
 information.
hashJoin in interface GeneralStage<T>K - the type of the join keyT1_IN - the type of stage1 itemsT1 - the result type of projection on stage1 itemsR - the resulting output typestage1 - the stage to hash-join with this onejoinClause1 - specifies how to join the two streamsmapToOutputFn - function to map the joined items to the output
                      value. It must be stateless and cooperative.@Nonnull <K,T1_IN,T1,R> BatchStage<R> innerHashJoin(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull BiFunctionEx<T,T1,R> mapToOutputFn)
GeneralStagepackage javadoc for a detailed description of the hash-join transform.
 
 This sample joins a stream of users to a stream of countries and outputs
 a stream of users with the country field set:
 
 // Types of the input stages:
 BatchStage<User> users;
 BatchStage<Map.Entry<Long, Country>> idAndCountry;
 users.innerHashJoin(
     idAndCountry,
     JoinClause.joinMapEntries(User::getCountryId),
     (user, country) -> user.setCountry(country)
 )
 
 This method is similar to GeneralStage.hashJoin(com.hazelcast.jet.pipeline.BatchStage<T1_IN>, com.hazelcast.jet.pipeline.JoinClause<K, ? super T, ? super T1_IN, ? extends T1>, com.hazelcast.function.BiFunctionEx<T, T1, R>) method, but it guarantees
 that both input items will be not-null. Nulls will be filtered out
 before reaching #mapToOutputFn.
 
 This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more
 information.
innerHashJoin in interface GeneralStage<T>K - the type of the join keyT1_IN - the type of stage1 itemsT1 - the result type of projection on stage1 itemsR - the resulting output typestage1 - the stage to hash-join with this onejoinClause1 - specifies how to join the two streamsmapToOutputFn - function to map the joined items to the output
                      value. It must be stateless and cooperative.@Nonnull <K1,K2,T1_IN,T2_IN,T1,T2,R> BatchStage<R> hashJoin2(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2, @Nonnull TriFunction<T,T1,T2,R> mapToOutputFn)
GeneralStagepackage javadoc for a detailed description of the hash-join transform.
 
 This sample joins a stream of users to streams of countries and
 companies, and outputs a stream of users with the country and
 company fields set:
 
 // Types of the input stages:
 BatchStage<User> users;
 BatchStage<Map.Entry<Long, Country>> idAndCountry;
 BatchStage<Map.Entry<Long, Company>> idAndCompany;
 users.hashJoin2(
     idAndCountry, JoinClause.joinMapEntries(User::getCountryId),
     idAndCompany, JoinClause.joinMapEntries(User::getCompanyId),
     (user, country, company) -> user.setCountry(country).setCompany(company)
 )
 
 This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more
 information.
hashJoin2 in interface GeneralStage<T>K1 - the type of key for stage1K2 - the type of key for stage2T1_IN - the type of stage1 itemsT2_IN - the type of stage2 itemsT1 - the result type of projection of stage1 itemsT2 - the result type of projection of stage2 itemsR - the resulting output typestage1 - the first stage to joinjoinClause1 - specifies how to join with stage1stage2 - the second stage to joinjoinClause2 - specifies how to join with stage2mapToOutputFn - function to map the joined items to the output
                      value. It must be stateless and cooperative.@Nonnull <K1,K2,T1_IN,T2_IN,T1,T2,R> BatchStage<R> innerHashJoin2(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2, @Nonnull TriFunction<T,T1,T2,R> mapToOutputFn)
GeneralStagepackage javadoc for a detailed description of the hash-join transform.
 
 This sample joins a stream of users to streams of countries and
 companies, and outputs a stream of users with the country and
 company fields set:
 
 // Types of the input stages:
 BatchStage<User> users;
 BatchStage<Map.Entry<Long, Country>> idAndCountry;
 BatchStage<Map.Entry<Long, Company>> idAndCompany;
 users.innerHashJoin2(
     idAndCountry, JoinClause.joinMapEntries(User::getCountryId),
     idAndCompany, JoinClause.joinMapEntries(User::getCompanyId),
     (user, country, company) -> user.setCountry(country).setCompany(company)
 )
 
 This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more
 information.
 
 This method is similar to GeneralStage.hashJoin2(com.hazelcast.jet.pipeline.BatchStage<T1_IN>, com.hazelcast.jet.pipeline.JoinClause<K1, ? super T, ? super T1_IN, ? extends T1>, com.hazelcast.jet.pipeline.BatchStage<T2_IN>, com.hazelcast.jet.pipeline.JoinClause<K2, ? super T, ? super T2_IN, ? extends T2>, com.hazelcast.jet.function.TriFunction<T, T1, T2, R>) method, but it guarantees
 that both input items will be not-null. Nulls will be filtered out
 before reaching #mapToOutputFn.
innerHashJoin2 in interface GeneralStage<T>K1 - the type of key for stage1K2 - the type of key for stage2T1_IN - the type of stage1 itemsT2_IN - the type of stage2 itemsT1 - the result type of projection of stage1 itemsT2 - the result type of projection of stage2 itemsR - the resulting output typestage1 - the first stage to joinjoinClause1 - specifies how to join with stage1stage2 - the second stage to joinjoinClause2 - specifies how to join with stage2mapToOutputFn - function to map the joined items to the output
                      value. It must be stateless and cooperative.@Nonnull default HashJoinBuilder<T> hashJoinBuilder()
GeneralStagestage.hashJoinN(...) calls because they offer
 more static type safety.
 
 This sample joins a stream of users to streams of countries and
 companies, and outputs a stream of users with the country and
 company fields set:
 
 // Types of the input stages:
 StreamStage<User> users;
 BatchStage<Map.Entry<Long, Country>> idAndCountry;
 BatchStage<Map.Entry<Long, Company>> idAndCompany;
 StreamHashJoinBuilder<User> builder = users.hashJoinBuilder();
 Tag<Country> tCountry = builder.add(idAndCountry,
         JoinClause.joinMapEntries(User::getCountryId));
 Tag<Company> tCompany = builder.add(idAndCompany,
         JoinClause.joinMapEntries(User::getCompanyId));
 StreamStage<User> joined = builder.build((user, itemsByTag) ->
         user.setCountry(itemsByTag.get(tCountry)).setCompany(itemsByTag.get(tCompany)));
 
 This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more
 information.
hashJoinBuilder in interface GeneralStage<T>@Nonnull <R> BatchStage<R> aggregate(@Nonnull AggregateOperation1<? super T,?,? extends R> aggrOp)
finish
 primitive. The result may be null (e.g., AggregateOperations.maxBy(com.hazelcast.function.ComparatorEx<? super T>) with no input), in that case the stage
 does not produce any output.
 Sample usage:
 BatchStage<Integer> stage = pipeline.readFrom(TestSources.items(1, 2));
 // Emits a single item, the number 2:
 BatchStage<Long> count = stage.aggregate(AggregateOperations.counting());
 R - the type of the resultaggrOp - the aggregate operation to performAggregateOperations@Nonnull <T1,R> BatchStage<R> aggregate2(@Nonnull BatchStage<T1> stage1, @Nonnull AggregateOperation2<? super T,? super T1,?,? extends R> aggrOp)
stage1 you supply.
 This variant requires you to provide a two-input aggregate operation
 (refer to its Javadoc for a simple
 example). If you can express your logic in terms of two single-input
 aggregate operations, one for each input stream, then you should use
 stage0.aggregate2(aggrOp0, stage1, aggrOp1) because it offers a simpler
 API and you can use the already defined single-input operations. Use
 this variant only when you have the need to implement an aggregate
 operation that combines the input streams into the same accumulator.
 
 The stage emits a single item, the result of the aggregate operation's
 finish primitive. The result may
 be null, in that case the stage does not produce any output.
 
Sample usage:
 BatchStage<Tuple2<Long, Long>> counts = pageVisits.aggregate2(addToCarts,
         AggregateOperations.aggregateOperation2(
                 AggregateOperations.counting(),
                 AggregateOperations.counting())
         );
 T1 - type of items in stage1R - type of the resultaggrOp - the aggregate operation to performAggregateOperations@Nonnull default <T1,R0,R1> BatchStage<Tuple2<R0,R1>> aggregate2(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull BatchStage<T1> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1)
Tuple2 with their results.
 Sample usage:
 BatchStage<Tuple2<Long, Long>> counts = pageVisits.aggregate2(
         AggregateOperations.counting(),
         addToCarts, AggregateOperations.counting()
 );
 T1 - type of the items in the other stageR0 - type of the aggregated result for this stageR1 - type of the aggregated result for the other stageaggrOp0 - aggregate operation to perform on this stagestage1 - the other stageaggrOp1 - aggregate operation to perform on the other stage@Nonnull <T1,T2,R> BatchStage<R> aggregate3(@Nonnull BatchStage<T1> stage1, @Nonnull BatchStage<T2> stage2, @Nonnull AggregateOperation3<? super T,? super T1,? super T2,?,? extends R> aggrOp)
stage1 and
 stage2 you supply. This variant requires you to provide a
 three-input aggregate operation (refer to its Javadoc for a simple example). If you can express
 your logic in terms of two single-input aggregate operations, one for
 each input stream, then you should use stage0.aggregate3(aggrOp0, stage1, aggrOp1, stage2, aggrOp2) because
 it offers a simpler API and you can use the already defined single-input
 operations. Use this variant only when you have the need to implement an
 aggregate operation that combines the input streams into the same
 accumulator.
 
 The stage emits a single item, the result of the aggregate operation's
 finish primitive. The result may
 be null, in that case the stage does not produce any output.
 
Sample usage:
 BatchStage<Tuple3<Long, Long, Long>> counts = pageVisits.aggregate3(
         addToCarts,
         payments,
         AggregateOperations.aggregateOperation3(
                 AggregateOperations.counting(),
                 AggregateOperations.counting(),
                 AggregateOperations.counting()));
 T1 - type of items in stage1T2 - type of items in stage2R - type of the resultaggrOp - the aggregate operation to performAggregateOperations@Nonnull default <T1,T2,R0,R1,R2> BatchStage<Tuple3<R0,R1,R2>> aggregate3(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull BatchStage<T1> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1, @Nonnull BatchStage<T2> stage2, @Nonnull AggregateOperation1<? super T2,?,? extends R2> aggrOp2)
Tuple3 with their results.
 Sample usage:
 BatchStage<Tuple3<Long, Long, Long>> counts = pageVisits.aggregate3(
         AggregateOperations.counting(),
         addToCarts, AggregateOperations.counting(),
         payments, AggregateOperations.counting()
 );
 T1 - type of the items in stage1T2 - type of the items in stage2R0 - type of the aggregated result for this stageR1 - type of the aggregated result for stage1R2 - type of the aggregated result for stage2aggrOp0 - aggregate operation to perform on this stagestage1 - the first additional stageaggrOp1 - aggregate operation to perform on stage1stage2 - the second additional stageaggrOp2 - aggregate operation to perform on stage2@Nonnull default <R0> AggregateBuilder<R0> aggregateBuilder(AggregateOperation1<? super T,?,? extends R0> aggrOp0)
Map.Entry(key, itemsByTag). Use the tag
 you get from builder.add(stageN, aggrOpN)
 to retrieve the aggregated result for that stage. Use builder.tag0() as the tag of this stage. You
 will also be able to supply a function to the builder that immediately
 transforms the ItemsByTag to the desired output type. Your
 function may return null and in that case the stage will not
 emit anything.
 This example counts the items in stage-0, sums those in stage-1 and takes the average of those in stage-2:
 BatchStage<Long> stage0 = p.readFrom(source0);
 BatchStage<Long> stage1 = p.readFrom(source1);
 BatchStage<Long> stage2 = p.readFrom(source2);
 AggregateBuilder<Long> b = stage0.aggregateBuilder(
         AggregateOperations.counting());
 Tag<Long> tag0 = b.tag0();
 Tag<Long> tag1 = b.add(stage1,
         AggregateOperations.summingLong(Number::longValue));
 Tag<Double> tag2 = b.add(stage2,
         AggregateOperations.averagingLong(Number::longValue));
 BatchStage<ItemsByTag> aggregated = b.build();
 aggregated.map(ibt -> String.format(
         "Count of stage0: %d, sum of stage1: %d, average of stage2: %f",
         ibt.get(tag0), ibt.get(tag1), ibt.get(tag2))
 );
@Nonnull default AggregateBuilder1<T> aggregateBuilder()
finish primitive returns. If it returns
 null, the stage will not emit any output.
 
 This builder requires you to provide a multi-input aggregate operation.
 If you can express your logic in terms of single-input aggregate
 operations, one for each input stream, then you should use stage0.aggregateBuilder(aggrOp0)
 because it offers a simpler API. Use this builder only when you have the
 need to implement an aggregate operation that combines all the input
 streams into the same accumulator.
 
 This builder is mainly intended to build a co-aggregation of four or
 more contributing stages. For up to three stages, prefer the direct
 stage.aggregateN(...) calls because they offer more static type
 safety.
 
 To add the other stages, call add(stage).
 Collect all the tags returned from add() and use them when
 building the aggregate operation. Retrieve the tag of the first stage
 (from which you obtained this builder) by calling AggregateBuilder1.tag0().
 
This example takes three streams of strings and counts the distinct strings across all of them:
 Pipeline p = Pipeline.create();
 BatchStage<String> stage0 = p.readFrom(source0);
 BatchStage<String> stage1 = p.readFrom(source1);
 BatchStage<String> stage2 = p.readFrom(source2);
 AggregateBuilder1<String> b = stage0.aggregateBuilder();
 Tag<String> tag0 = b.tag0();
 Tag<String> tag1 = b.add(stage1);
 Tag<String> tag2 = b.add(stage2);
 BatchStage<Integer> aggregated = b.build(AggregateOperation
         .withCreate(HashSet<String>::new)
         .andAccumulate(tag0, (acc, item) -> acc.add(item))
         .andAccumulate(tag1, (acc, item) -> acc.add(item))
         .andAccumulate(tag2, (acc, item) -> acc.add(item))
         .andCombine(HashSet::addAll)
         .andFinish(HashSet::size));
 @Nonnull default BatchStage<T> peek()
GeneralStagetoString()
 method at the INFO level to the log category com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>.
 The stage logs each item on whichever cluster member it happens to
 receive it. Its primary purpose is for development use, when running Jet
 on a local machine.
 
 Note that peek after GeneralStage.rebalance(FunctionEx) is not supported.
peek in interface GeneralStage<T>GeneralStage.peek(PredicateEx, FunctionEx), 
GeneralStage.peek(FunctionEx)@Nonnull BatchStage<T> peek(@Nonnull PredicateEx<? super T> shouldLogFn, @Nonnull FunctionEx<? super T,? extends CharSequence> toStringFn)
GeneralStageshouldLogFn predicate to see whether to log the item
 toStringFn to get the item's string
     representation
 com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
 
 Note that peek after GeneralStage.rebalance(FunctionEx) operation is not supported.
 
Sample usage:
 users.peek(
     user -> user.getName().size() > 100,
     User::getName
 )
 peek in interface GeneralStage<T>shouldLogFn - a function to filter the logged items. You can use alwaysTrue() as a pass-through filter when you
                    don't need any filtering. It must be stateless and cooperative.toStringFn - a function that returns a string representation of
                    the item. It must be stateless and cooperative.GeneralStage.peek(FunctionEx), 
GeneralStage.peek()@Nonnull default BatchStage<T> peek(@Nonnull FunctionEx<? super T,? extends CharSequence> toStringFn)
GeneralStagetoStringFn to get a string representation of the item
 com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
 
 Note that peek after GeneralStage.rebalance(FunctionEx) operation is not supported.
 
Sample usage:
 users.peek(User::getName)
 peek in interface GeneralStage<T>toStringFn - a function that returns a string representation of
     the item. It must be stateless and cooperative.GeneralStage.peek(PredicateEx, FunctionEx), 
GeneralStage.peek()@Nonnull default <R> BatchStage<R> customTransform(@Nonnull String stageName, @Nonnull SupplierEx<Processor> procSupplier)
GeneralStageProcessors.
 Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform in interface GeneralStage<T>R - the type of the output itemsstageName - a human-readable name for the custom stageprocSupplier - the supplier of processors@Nonnull default <R> BatchStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorSupplier procSupplier)
GeneralStageProcessors.
 Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform in interface GeneralStage<T>R - the type of the output itemsstageName - a human-readable name for the custom stageprocSupplier - the supplier of processors@Nonnull <R> BatchStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier)
GeneralStageProcessors.
 Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform in interface GeneralStage<T>R - the type of the output itemsstageName - a human-readable name for the custom stageprocSupplier - the supplier of processors@Nonnull default <R> BatchStage<R> apply(@Nonnull FunctionEx<? super BatchStage<T>,? extends BatchStage<R>> transformFn)
this stage using the provided transformFn and
 returns the transformed stage. It allows you to extract common pipeline
 transformations into a method and then call that method without
 interrupting the chained pipeline expression.
 For example, say you have this code:
 BatchStage<String> input = pipeline.readFrom(textSource);
 BatchStage<String> cleanedUp = input
         .map(String::toLowerCase)
         .filter(s -> s.startsWith("success"));
 map and filter steps into a common
 "cleanup" transformation:
 
 BatchStage<String> cleanUp(BatchStage<String> input) {
      return input.map(String::toLowerCase)
                  .filter(s -> s.startsWith("success"));
 }
 
 BatchStage<String> tokens = pipeline
     .readFrom(textSource)
     .apply(this::cleanUp)
     .flatMap(line -> traverseArray(line.split("\\W+")));
 R - type of the returned stagetransformFn - function to transform this stage into another stage@Nonnull BatchStage<T> setLocalParallelism(int localParallelism)
StageWhile most stages are backed by 1 vertex, there are exceptions. If a stage uses two vertices, each of them will have the given local parallelism, so in total there will be twice as many processors per member.
The default value is and it signals to Jet to figure out a default value. Jet will determine the vertex's local parallelism during job initialization from the global default and the processor meta-supplier's preferred value.
setLocalParallelism in interface GeneralStage<T>setLocalParallelism in interface Stage@Nonnull BatchStage<T> setName(@Nonnull String name)
StagesetName in interface GeneralStage<T>setName in interface Stagename - the stage nameCopyright © 2022 Hazelcast, Inc.. All rights reserved.