Interface GeneralStageWithKey<T,​K>

  • Type Parameters:
    T - type of the stream item
    K - type of the grouping key
    All Known Subinterfaces:
    BatchStageWithKey<T,​K>, StreamStageWithKey<T,​K>

    public interface GeneralStageWithKey<T,​K>
    An intermediate step when constructing a group-and-aggregate pipeline stage. This is the base type for the batch and stream variants.
    Since:
    Jet 3.0
    • Method Detail

      • keyFn

        @Nonnull
        FunctionEx<? super T,​? extends K> keyFn()
        Returns the function that extracts the key from stream items. The purpose of the key varies with the operation you apply.
      • mapStateful

        @Nonnull
        <S,​R> GeneralStage<R> mapStateful​(@Nonnull
                                                SupplierEx<? extends S> createFn,
                                                @Nonnull
                                                TriFunction<? super S,​? super K,​? super T,​? extends R> mapFn)
        Attaches a stage that performs a stateful mapping operation. createFn returns the object that holds the state. Jet passes this object along with each input item and its key to mapFn, which can update the object's state. For each grouping key there's a separate state object. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable. If you want to return the state variable from mapFn, then the return value must be a copy of state variable to avoid situations in which the result of mapFn is modified after being emitted or where the state is modified by downstream processors.

        If you want to return the state variable from mapFn, then the return value must be a copy of state variable to avoid situations in which the result of mapFn is modified after being emitted or where the state is modified by downstream processors.

        This sample takes a stream of pairs (serverId, latency) representing the latencies of serving individual requests and outputs the cumulative latency of all handled requests so far, for each server separately:

        
         GeneralStage<Entry<String, Long>> latencies;
         GeneralStage<Entry<String, Long>> cumulativeLatencies = latencies
                 .groupingKey(Entry::getKey)
                 .mapStateful(
                         LongAccumulator::new,
                         (sum, key, entry) -> {
                             sum.add(entry.getValue());
                             return entry(key, sum.get());
                         }
                 );
         
        This code has the same result as latencies.groupingKey(Entry::getKey).rollingAggregate(summing()).

        This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more information.

        Type Parameters:
        S - type of the state object
        R - type of the result
        Parameters:
        createFn - function that returns the state object
        mapFn - function that receives the state object and the input item and outputs the result item. It may modify the state object. It must be stateless and cooperative.
      • filterStateful

        @Nonnull
        <S> GeneralStage<T> filterStateful​(@Nonnull
                                           SupplierEx<? extends S> createFn,
                                           @Nonnull
                                           BiPredicateEx<? super S,​? super T> filterFn)
        Attaches a stage that performs a stateful filtering operation. createFn returns the object that holds the state. Jet passes this object along with each input item and its key to filterFn, which can update the object's state. For each grouping key there's a separate state object. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable.

        This sample groups a stream of strings by length and decimates each group (throws out every 10th string of each length):

        
         GeneralStage<String> decimated = input
                 .groupingKey(String::length)
                 .filterStateful(
                         LongAccumulator::new,
                         (counter, item) -> {
                             counter.add(1);
                             return counter.get() % 10 != 0;
                         }
                 );
         

        The given functions must be stateless and cooperative.

        This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more information.

        Type Parameters:
        S - type of the state object
        Parameters:
        createFn - function that returns the state object
        filterFn - predicate that receives the state object and the input item and outputs a boolean value. It may modify the state object.
      • flatMapStateful

        @Nonnull
        <S,​R> GeneralStage<R> flatMapStateful​(@Nonnull
                                                    SupplierEx<? extends S> createFn,
                                                    @Nonnull
                                                    TriFunction<? super S,​? super K,​? super T,​? extends Traverser<R>> flatMapFn)
        Attaches a stage that performs a stateful flat-mapping operation. createFn returns the object that holds the state. Jet passes this object along with each input item and its key to flatMapFn, which can update the object's state. For each grouping key there's a separate state object. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable. If you want to return the state variable from flatMapStateful, then the return value must be a copy of state variable to avoid situations in which the result of mapFn is modified after being emitted or where the state is modified by downstream processors.

        If you want to return the state variable from flatMapFn, then the return value must be a copy of state variable to avoid situations in which the result of mapFn is modified after being emitted or where the state is modified by downstream processors.

        This sample groups a stream of strings by length and inserts punctuation (a special string) after every 10th string in each group:

        
         GeneralStage<String> punctuated = input
                 .groupingKey(String::length)
                 .flatMapStateful(
                         LongAccumulator::new,
                         (counter, key, item) -> {
                             counter.add(1);
                             return counter.get() % 10 == 0
                                     ? Traversers.traverseItems("punctuation" + key, item)
                                     : Traversers.singleton(item);
                         }
                 );
         

        The given functions must be stateless and cooperative.

        This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more information.

        Type Parameters:
        S - type of the state object
        R - type of the result
        Parameters:
        createFn - function that returns the state object
        flatMapFn - function that receives the state object and the input item and outputs the result items. It may modify the state object. It must not return null traverser, but can return an empty traverser.
      • rollingAggregate

        @Nonnull
        default <A,​R> GeneralStage<java.util.Map.Entry<K,​R>> rollingAggregate​(@Nonnull
                                                                                          AggregateOperation1<? super T,​A,​? extends R> aggrOp)
        Attaches a rolling aggregation stage. This is a special case of stateful mapping that uses an AggregateOperation. It passes each input item to the accumulator and outputs the current result of aggregation (as returned by the export primitive).

        Sample usage:

        
         StreamStage<Entry<Color, Long>> aggregated = items
                 .groupingKey(Item::getColor)
                 .rollingAggregate(AggregateOperations.counting());
         
        For example, if your input is {2, 7, 8, -5}, the output will be {2, 9, 17, 12}.

        This stage is fault-tolerant and saves its state to the snapshot.

        This operation is subject to memory limits. See InstanceConfig.setMaxProcessorAccumulatedRecords(long) for more information.

        Type Parameters:
        R - type of the aggregate operation result
        Parameters:
        aggrOp - the aggregate operation to perform
        Returns:
        the newly attached stage
      • mapUsingService

        @Deprecated
        @Nonnull
        <S,​R> GeneralStage<R> mapUsingService​(@Nonnull
                                                    ServiceFactory<?,​S> serviceFactory,
                                                    @Nonnull
                                                    TriFunction<? super S,​? super K,​? super T,​? extends R> mapFn)
        Deprecated.
        Jet now has first-class support for data rebalancing, see GeneralStage.rebalance() and GeneralStage.rebalance(FunctionEx).
        Attaches a mapping stage which applies the given function to each input item independently and emits the function's result as the output item. The mapping function receives another parameter, the service object, which Jet will create using the supplied serviceFactory. If the mapping result is null, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.

        Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.

        Sample usage:

        
         items.groupingKey(Item::getDetailId)
              .mapUsingService(
                  ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
                  (reg, key, item) -> item.setDetail(reg.fetchDetail(key))
              );
         

        Interaction with fault-tolerant unbounded jobs

        If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.
        Type Parameters:
        S - type of service object
        R - the result type of the mapping function
        Parameters:
        serviceFactory - the service factory
        mapFn - a mapping function. It must be stateless. It must be cooperative, if the service is cooperative.
        Returns:
        the newly attached stage
      • mapUsingServiceAsync

        @Deprecated
        @Nonnull
        default <S,​R> GeneralStage<R> mapUsingServiceAsync​(@Nonnull
                                                                 ServiceFactory<?,​S> serviceFactory,
                                                                 @Nonnull
                                                                 TriFunction<? super S,​? super K,​? super T,​java.util.concurrent.CompletableFuture<R>> mapAsyncFn)
        Deprecated.
        Jet now has first-class support for data rebalancing, see GeneralStage.rebalance() and GeneralStage.rebalance(FunctionEx).
        Asynchronous version of mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>): the mapAsyncFn returns a CompletableFuture<R> instead of just R.

        Uses default values for some extra parameters, so the maximum number of concurrent async operations per processor will be limited to 4 and whether or not the order of input items should be preserved will be true.

        The function can return a null future or the future can return a null result: in both cases it will act just like a filter.

        Sample usage:

        
         items.groupingKey(Item::getDetailId)
              .mapUsingServiceAsync(
                  ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
                  (reg, key, item) -> reg.fetchDetailAsync(key).thenApply(item::setDetail)
              );
         
        The latency of the async call will add to the total latency of the output.
        Type Parameters:
        S - type of service object
        R - the future's result type of the mapping function
        Parameters:
        serviceFactory - the service factory
        mapAsyncFn - a mapping function. Can map to null (return a null future). It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • mapUsingServiceAsync

        @Deprecated
        @Nonnull
        <S,​R> GeneralStage<R> mapUsingServiceAsync​(@Nonnull
                                                         ServiceFactory<?,​S> serviceFactory,
                                                         int maxConcurrentOps,
                                                         boolean preserveOrder,
                                                         @Nonnull
                                                         TriFunction<? super S,​? super K,​? super T,​java.util.concurrent.CompletableFuture<R>> mapAsyncFn)
        Deprecated.
        Jet now has first-class support for data rebalancing, see GeneralStage.rebalance() and GeneralStage.rebalance(FunctionEx).
        Asynchronous version of mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>): the mapAsyncFn returns a CompletableFuture<R> instead of just R.

        The function can return a null future or the future can return a null result: in both cases it will act just like a filter.

        Sample usage:

        
         items.groupingKey(Item::getDetailId)
              .mapUsingServiceAsync(
                  ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
                  16,
                  true,
                  (reg, key, item) -> reg.fetchDetailAsync(key).thenApply(item::setDetail)
              );
         
        The latency of the async call will add to the total latency of the output.
        Type Parameters:
        S - type of service object
        R - the future's result type of the mapping function
        Parameters:
        serviceFactory - the service factory
        maxConcurrentOps - maximum number of concurrent async operations per processor
        preserveOrder - whether the async responses are ordered or not
        mapAsyncFn - a mapping function. Can map to null (return a null future). It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • mapUsingServiceAsyncBatched

        @Deprecated
        @Nonnull
        <S,​R> GeneralStage<R> mapUsingServiceAsyncBatched​(@Nonnull
                                                                ServiceFactory<?,​S> serviceFactory,
                                                                int maxBatchSize,
                                                                @Nonnull
                                                                BiFunctionEx<? super S,​? super java.util.List<T>,​? extends java.util.concurrent.CompletableFuture<java.util.List<R>>> mapAsyncFn)
        Deprecated.
        Jet now has first-class support for data rebalancing, see GeneralStage.rebalance() and GeneralStage.rebalance(FunctionEx).
        Batched version of mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>): mapAsyncFn takes a list of input items and returns a CompletableFuture<List<R>>. The size of list is limited by the given maxBatchSize.

        This transform can perform filtering by putting null elements into the output list.

        The latency of the async call will add to the total latency of the output.

        This sample takes a stream of stock items and sets the detail field on them by performing batched lookups from a registry. The max size of the items to lookup is specified as 100:

        
         items.groupingKey(Item::getDetailId)
              .mapUsingServiceAsyncBatched(
                  ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
                  100,
                  (reg, itemList) -> reg
                      .fetchDetailsAsync(itemList.stream().map(Item::getDetailId).collect(Collectors.toList()))
                      .thenApply(details -> {
                          for (int i = 0; i < itemList.size(); i++) {
                              itemList.get(i).setDetail(details.get(i));
                          }
                          return itemList;
                      })
              );
         
        Type Parameters:
        S - type of service object
        R - the future result type of the mapping function
        Parameters:
        serviceFactory - the service factory
        maxBatchSize - max size of the input list
        mapAsyncFn - a mapping function. It must be stateless. It must be cooperative, if the service is cooperative.
        Returns:
        the newly attached stage
        Since:
        Jet 4.0
      • mapUsingServiceAsyncBatched

        @Deprecated
        @Nonnull
        <S,​R> GeneralStage<R> mapUsingServiceAsyncBatched​(@Nonnull
                                                                ServiceFactory<?,​S> serviceFactory,
                                                                int maxBatchSize,
                                                                @Nonnull
                                                                TriFunction<? super S,​? super java.util.List<K>,​? super java.util.List<T>,​? extends java.util.concurrent.CompletableFuture<java.util.List<R>>> mapAsyncFn)
        Deprecated.
        Jet now has first-class support for data rebalancing, see GeneralStage.rebalance() and GeneralStage.rebalance(FunctionEx).
        Batched version of mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>): mapAsyncFn takes a list of input items (and a list of their corresponding keys) and returns a CompletableFuture<List<R>>. The sizes of the input lists are identical and are limited by the given maxBatchSize. The key at index N corresponds to the input item at index N.

        The number of in-flight batches being completed asynchronously is limited to 2 and this mapping operation always preserves the order of input elements.

        This transform can perform filtering by putting null elements into the output list.

        The latency of the async call will add to the total latency of the output.

        This sample takes a stream of stock items and sets the detail field on them by performing batched lookups from a registry. The max size of the items to lookup is specified as 100:

        
         items.groupingKey(Item::getDetailId)
              .mapUsingServiceAsyncBatched(
                  ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
                  100,
                  (reg, keyList, itemList) -> reg.fetchDetailsAsync(keyList).thenApply(details -> {
                      for (int i = 0; i < itemList.size(); i++) {
                          itemList.get(i).setDetail(details.get(i));
                      }
                      return itemList;
                  })
              );
         
        Type Parameters:
        S - type of service object
        R - the future result type of the mapping function
        Parameters:
        serviceFactory - the service factory
        maxBatchSize - max size of the input list
        mapAsyncFn - a mapping function. It must be stateless and cooperative.
        Returns:
        the newly attached stage
        Since:
        Jet 4.0
      • filterUsingService

        @Deprecated
        @Nonnull
        <S> GeneralStage<T> filterUsingService​(@Nonnull
                                               ServiceFactory<?,​S> serviceFactory,
                                               @Nonnull
                                               TriPredicate<? super S,​? super K,​? super T> filterFn)
        Deprecated.
        Jet now has first-class support for data rebalancing, see GeneralStage.rebalance() and GeneralStage.rebalance(FunctionEx).
        Attaches a filtering stage which applies the provided predicate function to each input item to decide whether to pass the item to the output or to discard it. The predicate function receives another parameter, the service object, which Jet will create using the supplied serviceFactory.

        The number of in-flight batches being completed asynchronously is limited to 2 and this mapping operation always preserves the order of input elements.

        Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.

        Sample usage:

        
         items.groupingKey(Item::getDetailId)
              .filterUsingService(
                  ServiceFactories.sharedService(ctx -> new ItemDetailRegistry()),
                  (reg, key, item) -> reg.fetchDetail(key).contains("blade")
              );
         

        Interaction with fault-tolerant unbounded jobs

        If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.
        Type Parameters:
        S - type of service object
        Parameters:
        serviceFactory - the service factory
        filterFn - a filter predicate function. It must be stateless. It must be cooperative, if the service is cooperative.
        Returns:
        the newly attached stage
      • flatMapUsingService

        @Deprecated
        @Nonnull
        <S,​R> GeneralStage<R> flatMapUsingService​(@Nonnull
                                                        ServiceFactory<?,​S> serviceFactory,
                                                        @Nonnull
                                                        TriFunction<? super S,​? super K,​? super T,​? extends Traverser<R>> flatMapFn)
        Deprecated.
        Jet now has first-class support for data rebalancing, see GeneralStage.rebalance() and GeneralStage.rebalance(FunctionEx).
        Attaches a flat-mapping stage which applies the supplied function to each input item independently and emits all the items from the Traverser it returns as the output items. The traverser must be null-terminated. The mapping function receives another parameter, the service object, which Jet will create using the supplied serviceFactory.

        Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same service instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.

        Sample usage:

        
         StreamStage<Part> parts = products
             .groupingKey(Product::getId)
             .flatMapUsingService(
                 ServiceFactories.sharedService(ctx -> new PartRegistry()),
                 (registry, productId, product) -> Traversers.traverseIterable(
                         registry.fetchParts(productId))
             );
         

        Interaction with fault-tolerant unbounded jobs

        If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.
        Type Parameters:
        S - type of service object
        R - type of the output items
        Parameters:
        serviceFactory - the service factory
        flatMapFn - a flatmapping function. It must not return null traverser, but can return an empty traverser. It must be stateless. It must be cooperative, if the service is cooperative.
        Returns:
        the newly attached stage
      • mapUsingIMap

        @Nonnull
        default <V,​R> GeneralStage<R> mapUsingIMap​(@Nonnull
                                                         java.lang.String mapName,
                                                         @Nonnull
                                                         BiFunctionEx<? super T,​? super V,​? extends R> mapFn)
        Attaches a mapping stage where for each item a lookup in the IMap with the supplied name using the grouping key is performed and the result of the lookup is merged with the item and emitted.

        If the result of the mapping is null, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.

        The mapping logic is equivalent to:

        
         V value = map.get(groupingKey);
         return mapFn.apply(item, value);
         
        Sample usage:
        
         items.groupingKey(Item::getDetailId)
              .mapUsingIMap(
                      "enriching-map",
                      (Item item, ItemDetail detail) -> item.setDetail(detail)
              );
         
        This stage is similar to stageWithoutKey.mapUsingIMap(), but here Jet knows the key and uses it to partition and distribute the input in order to achieve data locality. The value it fetches from the IMap is stored on the cluster member where the processing takes place. However, if the map doesn't use the default partitioning strategy, the data locality will be broken.
        Type Parameters:
        V - type of the value in the IMap
        R - type of the output item
        Parameters:
        mapName - name of the IMap
        mapFn - the mapping function. It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • mapUsingIMap

        @Nonnull
        default <V,​R> GeneralStage<R> mapUsingIMap​(@Nonnull
                                                         IMap<K,​V> iMap,
                                                         @Nonnull
                                                         BiFunctionEx<? super T,​? super V,​? extends R> mapFn)
        Attaches a mapping stage where for each item a lookup in the supplied IMap using the grouping key is performed and the result of the lookup is merged with the item and emitted.

        If the result of the mapping is null, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.

        The mapping logic is equivalent to:

        
         V value = map.get(groupingKey);
         return mapFn.apply(item, value);
         
        Sample usage:
        
         items.groupingKey(Item::getDetailId)
              .mapUsingIMap(enrichingMap, (item, detail) -> item.setDetail(detail));
         
        This stage is similar to stageWithoutKey.mapUsingIMap(), but here Jet knows the key and uses it to partition and distribute the input in order to achieve data locality. The value it fetches from the IMap is stored on the cluster member where the processing takes place. However, if the map doesn't use the default partitioning strategy, data locality will be broken.
        Type Parameters:
        V - type of the value in the IMap
        R - type of the output item
        Parameters:
        iMap - the IMap to use as the service
        mapFn - the mapping function. It must be stateless and cooperative.
        Returns:
        the newly attached stage
      • customTransform

        @Nonnull
        <R> GeneralStage<R> customTransform​(@Nonnull
                                            java.lang.String stageName,
                                            @Nonnull
                                            SupplierEx<Processor> procSupplier)
        Attaches a stage with a custom transform based on the provided supplier of Core API Processors. The inbound edge will be distributed and partitioned using the key function assigned to this stage.

        Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.

        Type Parameters:
        R - the type of the output items
        Parameters:
        stageName - a human-readable name for the custom stage
        procSupplier - the supplier of processors
      • customTransform

        @Nonnull
        <R> GeneralStage<R> customTransform​(@Nonnull
                                            java.lang.String stageName,
                                            @Nonnull
                                            ProcessorSupplier procSupplier)
        Attaches a stage with a custom transform based on the provided supplier of Core API Processors. The inbound edge will be distributed and partitioned using the key function assigned to this stage.

        Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.

        Type Parameters:
        R - the type of the output items
        Parameters:
        stageName - a human-readable name for the custom stage
        procSupplier - the supplier of processors
      • customTransform

        @Nonnull
        <R> GeneralStage<R> customTransform​(@Nonnull
                                            java.lang.String stageName,
                                            @Nonnull
                                            ProcessorMetaSupplier procSupplier)
        Attaches a stage with a custom transform based on the provided supplier of Core API Processors. The inbound edge will be distributed and partitioned using the key function assigned to this stage.

        Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.

        Type Parameters:
        R - the type of the output items
        Parameters:
        stageName - a human-readable name for the custom stage
        procSupplier - the supplier of processors