Class Sinks

java.lang.Object
com.hazelcast.jet.pipeline.Sinks

public final class Sinks extends Object
Contains factory methods for various types of pipeline sinks. Formally, a sink transform is one which has no output. A pipeline stage with a sink transform has the type SinkStage and accepts no downstream stages.

The default local parallelism for the sinks in this class is typically 1, check the documentation of individual methods.

Since:
Jet 3.0
  • Method Details

    • fromProcessor

      @Nonnull public static <T> Sink<T> fromProcessor(@Nonnull String sinkName, @Nonnull ProcessorMetaSupplier metaSupplier)
      Returns a sink constructed directly from the given Core API processor meta-supplier.

      The default local parallelism for this source is specified inside the metaSupplier.

      Parameters:
      sinkName - user-friendly sink name
      metaSupplier - the processor meta-supplier
    • fromProcessor

      @Nonnull public static <T> Sink<T> fromProcessor(@Nonnull String sinkName, @Nonnull ProcessorMetaSupplier metaSupplier, @Nullable FunctionEx<? super T,?> partitionKeyFn)
      Returns a sink constructed directly from the given Core API processor meta-supplier.

      The default local parallelism for this source is specified inside the metaSupplier.

      Parameters:
      sinkName - user-friendly sink name
      metaSupplier - the processor meta-supplier
      partitionKeyFn - key extractor function for partitioning edges to sink. It must be stateless and cooperative.
    • map

      @Nonnull public static <K, V> Sink<Map.Entry<K,V>> map(@Nonnull String mapName)
      Returns a sink that puts Map.Entrys it receives into a Hazelcast IMap with the specified name.

      This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.

      The default local parallelism for this sink is 1.

    • map

      @Nonnull public static <K, V> Sink<Map.Entry<K,V>> map(@Nonnull IMap<? super K,? super V> map)
      Returns a sink that puts Map.Entrys it receives into the given Hazelcast IMap.

      NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.

      This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.

      The default local parallelism for this sink is 1.

    • map

      @Nonnull public static <T, K, V> Sink<T> map(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn)
      Returns a sink that uses the supplied functions to extract the key and value with which to put to a Hazelcast IMap with the specified name.

      This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.

      The default local parallelism for this sink is 1.

      The given functions must be stateless and cooperative.

      Since:
      Jet 4.2
    • map

      @Nonnull public static <T, K, V> Sink<T> map(@Nonnull IMap<? super K,? super V> map, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn)
      Returns a sink that uses the supplied functions to extract the key and value with which to put to given Hazelcast IMap.

      NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.

      This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.

      The default local parallelism for this sink is 1.

      The given functions must be stateless and cooperative.

      Since:
      Jet 4.2
    • remoteMap

      @Nonnull public static <K, V> Sink<Map.Entry<K,V>> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig)
      Returns a sink that puts Map.Entrys it receives into a Hazelcast IMap with the specified name in a remote cluster identified by the supplied ClientConfig.

      This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.

      The default local parallelism for this sink is 1.

    • remoteMap

      @Nonnull public static <K, V> Sink<Map.Entry<K,V>> remoteMap(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef)
      The same as the remoteMap(String, ClientConfig) method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.

      The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used

      Parameters:
      mapName - the name of the map
      dataConnectionRef - the reference to DataConnectionConfig
      Since:
      5.4
    • remoteMap

      @Nonnull public static <T, K, V> Sink<T> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn)
      Returns a sink that uses the supplied functions to extract the key and value with which to put to a Hazelcast IMap in a remote cluster identified by the supplied ClientConfig.

      This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.

      The default local parallelism for this sink is 1.

      The given functions must be stateless and cooperative.

      Since:
      Jet 4.2
    • remoteMap

      @Nonnull public static <T, K, V> Sink<T> remoteMap(@Nonnull String mapName, DataConnectionRef dataConnectionRef, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn)
      The same as the remoteMap(String, ClientConfig, FunctionEx, FunctionEx) method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.

      The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used

      Parameters:
      mapName - the name of the map
      dataConnectionRef - the reference to DataConnectionConfig
      Since:
      5.4
    • mapBuilder

      @Nonnull public static <T, K, V> MapSinkBuilder<T,K,V> mapBuilder(String mapName)
      Returns a builder object that offers a step-by-step fluent API to build a custom file sink for the Pipeline API. See javadoc of methods in MapSinkBuilder for more details.
      Type Parameters:
      T - type of the incoming items
      K - type of the key extracted from each item
      V - type fo the value extracted from each item
      Parameters:
      mapName - name of the map to sink into, must not be null
      Since:
      5.4
    • mapEntryProcessorBuilder

      @Nonnull public static <E, K, V, R> MapSinkEntryProcessorBuilder<E,K,V,R> mapEntryProcessorBuilder(String mapName)
      Returns a builder object that offers a step-by-step fluent API to build a custom file sink for the Pipeline API. See javadoc of methods in MapSinkBuilder for more details.
      Type Parameters:
      E - type of the incoming items
      K - type of the key extracted from each item
      V - type fo the value extracted from each item
      Parameters:
      mapName - name of the map to sink into, must not be null
      Since:
      5.4
    • mapWithMerging

      @Nonnull public static <T, K, V> Sink<T> mapWithMerging(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn, @Nonnull BinaryOperatorEx<V> mergeFn)
      Returns a sink that uses the supplied functions to extract the key and value with which to update a Hazelcast IMap. If the map already contains the key, it applies the given mergeFn to resolve the existing and the proposed value into the value to use. If the value comes out as null, it removes the key from the map. Expressed as code, the sink performs the equivalent of the following for each item:
       K key = toKeyFn.apply(item);
       V oldValue = map.get(key);
       V newValue = toValueFn.apply(item);
       V resolved = (oldValue == null)
                  ? newValue
       : mergeFn.apply(oldValue, newValue);
       if (value == null)
           map.remove(key);
       else
           map.put(key, value);
       

      This sink supports exactly-once processing only if the supplied merge function performs idempotent updates, i.e., it satisfies the rule mergeFn.apply(oldValue, toValueFn.apply(e)).equals(oldValue) for any e that was already observed.

      Note: This operation is NOT lock-aware, it will process the entries no matter if they are locked or not. Use mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>) if you need locking.

      The default local parallelism for this sink is 1.

      The given functions must be stateless and cooperative.

      Type Parameters:
      T - input item type
      K - key type
      V - value type
      Parameters:
      mapName - name of the map
      toKeyFn - function that extracts the key from the input item
      toValueFn - function that extracts the value from the input item
      mergeFn - function that merges the existing value with the value acquired from the received item
    • mapWithMerging

      @Nonnull public static <T, K, V> Sink<T> mapWithMerging(@Nonnull IMap<? super K,? super V> map, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn, @Nonnull BinaryOperatorEx<V> mergeFn)
      Returns a sink that uses the supplied functions to extract the key and value with which to update a Hazelcast IMap. If the map already contains the key, it applies the given mergeFn to resolve the existing and the proposed value into the value to use. If the value comes out as null, it removes the key from the map. Expressed as code, the sink performs the equivalent of the following for each item:
       K key = toKeyFn.apply(item);
       V oldValue = map.get(key);
       V newValue = toValueFn.apply(item);
       V resolved = (oldValue == null)
                  ? newValue
       : mergeFn.apply(oldValue, newValue);
       if (value == null)
           map.remove(key);
       else
           map.put(key, value);
       
      NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.

      This sink supports exactly-once processing only if the supplied merge function performs idempotent updates, i.e., it satisfies the rule mergeFn.apply(oldValue, toValueFn.apply(e)).equals(oldValue) for any e that was already observed.

      Note: This operation is NOT lock-aware, it will process the entries no matter if they are locked or not. Use mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>) if you need locking.

      The default local parallelism for this sink is 1.

      The given functions must be stateless and cooperative.

      Type Parameters:
      T - input item type
      K - key type
      V - value type
      Parameters:
      map - the map to drain to
      toKeyFn - function that extracts the key from the input item
      toValueFn - function that extracts the value from the input item
      mergeFn - function that merges the existing value with the value acquired from the received item
    • remoteMapWithMerging

      @Nonnull public static <K, V> Sink<Map.Entry<K,V>> remoteMapWithMerging(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull BinaryOperatorEx<V> mergeFn)
    • remoteMapWithMerging

      @Nonnull public static <K, V> Sink<Map.Entry<K,V>> remoteMapWithMerging(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull BinaryOperatorEx<V> mergeFn)
      The same as the remoteMapWithMerging(String, ClientConfig, BinaryOperatorEx) method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.

      The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used

      Since:
      5.4
    • remoteMapWithMerging

      @Nonnull public static <T, K, V> Sink<T> remoteMapWithMerging(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn, @Nonnull BinaryOperatorEx<V> mergeFn)
      Returns a sink equivalent to mapWithMerging(String, BinaryOperatorEx), but for a map in a remote Hazelcast cluster identified by the supplied ClientConfig.

      Due to the used API, the remote cluster must be at least version 4.0.

    • remoteMapWithMerging

      @Nonnull public static <T, K, V> Sink<T> remoteMapWithMerging(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn, @Nonnull BinaryOperatorEx<V> mergeFn)
      The same as the remoteMapWithMerging(String, ClientConfig, FunctionEx, FunctionEx, BinaryOperatorEx) method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.

      The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used

      Since:
      5.4
    • mapWithMerging

      @Nonnull public static <K, V> Sink<Map.Entry<K,V>> mapWithMerging(@Nonnull String mapName, @Nonnull BinaryOperatorEx<V> mergeFn)
    • mapWithMerging

      @Nonnull public static <K, V> Sink<Map.Entry<K,V>> mapWithMerging(@Nonnull IMap<? super K,V> map, @Nonnull BinaryOperatorEx<V> mergeFn)
    • mapWithUpdating

      @Nonnull public static <T, K, V> Sink<T> mapWithUpdating(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V,? super T,? extends V> updateFn)
      Returns a sink that uses the supplied key-extracting and value-updating functions to update a Hazelcast IMap. For each item it receives, it applies toKeyFn to get the key and then applies updateFn to the existing value in the map and the received item to acquire the new value to associate with the key. If the new value is null, it removes the key from the map. Expressed as code, the sink performs the equivalent of the following for each item:
       K key = toKeyFn.apply(item);
       V oldValue = map.get(key);
       V newValue = updateFn.apply(oldValue, item);
       if (newValue == null)
           map.remove(key);
       else
           map.put(key, newValue);
       

      This sink supports exactly-once processing only if the supplied update function performs idempotent updates, i.e., it satisfies the rule updateFn.apply(v, e).equals(v) for any e that was already observed.

      Note: This operation is NOT lock-aware, it will process the entries no matter if they are locked or not. Use mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>) if you need locking.

      The default local parallelism for this sink is 1.

      The given functions must be stateless and cooperative.

      Type Parameters:
      T - input item type
      K - key type
      V - value type
      Parameters:
      mapName - name of the map
      toKeyFn - function that extracts the key from the input item
      updateFn - function that receives the existing map value and the item and returns the new map value
    • mapWithUpdating

      @Nonnull public static <T, K, V> Sink<T> mapWithUpdating(@Nonnull IMap<? super K,? super V> map, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V,? super T,? extends V> updateFn)
      Returns a sink that uses the supplied key-extracting and value-updating functions to update a Hazelcast IMap. For each item it receives, it applies toKeyFn to get the key and then applies updateFn to the existing value in the map and the received item to acquire the new value to associate with the key. If the new value is null, it removes the key from the map. Expressed as code, the sink performs the equivalent of the following for each item:
       K key = toKeyFn.apply(item);
       V oldValue = map.get(key);
       V newValue = updateFn.apply(oldValue, item);
       if (newValue == null)
           map.remove(key);
       else
           map.put(key, newValue);
       
      NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.

      This sink supports exactly-once processing only if the supplied update function performs idempotent updates, i.e., it satisfies the rule updateFn.apply(v, e).equals(v) for any e that was already observed.

      Note: This operation is not lock-aware, it will process the entries even if they are locked. Use mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>) if you need locking.

      The default local parallelism for this sink is 1.

      The given functions must be stateless and cooperative.

      Type Parameters:
      T - input item type
      K - key type
      V - value type
      Parameters:
      map - map to drain to
      toKeyFn - function that extracts the key from the input item
      updateFn - function that receives the existing map value and the item and returns the new map value
    • remoteMapWithUpdating

      @Nonnull public static <T, K, V> Sink<T> remoteMapWithUpdating(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V,? super T,? extends V> updateFn)
      Returns a sink equivalent to mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>), but for a map in a remote Hazelcast cluster identified by the supplied ClientConfig.

      Due to the used API, the remote cluster must be at least version 4.0.

    • remoteMapWithUpdating

      @Nonnull public static <T, K, V> Sink<T> remoteMapWithUpdating(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V,? super T,? extends V> updateFn)
      The same as the remoteMapWithUpdating(String, ClientConfig, BiFunctionEx) method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.

      The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used

      Since:
      5.4
    • mapWithUpdating

      @Nonnull public static <K, V, E extends Map.Entry<K, V>> Sink<E> mapWithUpdating(@Nonnull String mapName, @Nonnull BiFunctionEx<? super V,? super E,? extends V> updateFn)
      Convenience for mapWithUpdating(String, FunctionEx, BiFunctionEx) with Map.Entry as the input item.
    • mapWithUpdating

      @Nonnull public static <K, V, E extends Map.Entry<K, V>> Sink<E> mapWithUpdating(@Nonnull IMap<? super K,? super V> map, @Nonnull BiFunctionEx<? super V,? super E,? extends V> updateFn)
      Convenience for mapWithUpdating(IMap, FunctionEx, BiFunctionEx) with Map.Entry as the input item.
    • remoteMapWithUpdating

      @Nonnull public static <K, V, E extends Map.Entry<K, V>> Sink<E> remoteMapWithUpdating(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull BiFunctionEx<? super V,? super E,? extends V> updateFn)
    • remoteMapWithUpdating

      @Nonnull public static <K, V, E extends Map.Entry<K, V>> Sink<E> remoteMapWithUpdating(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull BiFunctionEx<? super V,? super E,? extends V> updateFn)
      The same as the remoteMapWithUpdating(String, ClientConfig, BiFunctionEx) method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.

      The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used

      Since:
      5.4
    • mapWithEntryProcessor

      @Nonnull public static <E, K, V, R> Sink<E> mapWithEntryProcessor(@Nonnull String mapName, @Nonnull FunctionEx<? super E,? extends K> toKeyFn, @Nonnull FunctionEx<? super E,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
      Convenience for mapWithEntryProcessor(int, String, FunctionEx, FunctionEx) when the maximum number of async operations is not specified.
    • mapWithEntryProcessor

      @Nonnull public static <E, K, V, R> Sink<E> mapWithEntryProcessor(int maxParallelAsyncOps, @Nonnull String mapName, @Nonnull FunctionEx<? super E,? extends K> toKeyFn, @Nonnull FunctionEx<? super E,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
      Returns a sink that uses the items it receives to create EntryProcessors it submits to a Hazelcast IMap with the specified name. For each received item it applies toKeyFn to get the key and toEntryProcessorFn to get the entry processor, and then submits the key and the entry processor to the Hazelcast cluster, which will internally apply the entry processor to the key.

      As opposed to mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>) and mapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>), this sink does not use batching and submits a separate entry processor for each received item. For use cases that are efficiently solvable using those sinks, this one will perform worse. It should be used only when they are not applicable.

      If your entry processors take a long time to update a value, consider using entry processors that implement Offloadable. This will avoid blocking the Hazelcast partition thread during large update operations.

      This sink supports exactly-once processing only if the supplied entry processor performs idempotent updates, i.e., the resulting value would be the same if an entry processor was run on the same entry more than once.

      Note: Unlike mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>) and mapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>), this operation is lock-aware. If the key is locked, the EntryProcessor will wait until it acquires the lock.

      The default local parallelism for this sink is 1.

      The given functions must be stateless and cooperative.

      Type Parameters:
      E - input item type
      K - key type
      V - value type
      Parameters:
      maxParallelAsyncOps - maximum number of simultaneous entry processors affecting the map
      mapName - name of the map
      toKeyFn - function that extracts the key from the input item
      toEntryProcessorFn - function that returns the EntryProcessor to apply to the key
    • mapWithEntryProcessor

      @Nonnull public static <T, K, V, R> Sink<T> mapWithEntryProcessor(@Nonnull IMap<? super K,? super V> map, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
      Returns a sink that uses the items it receives to create EntryProcessors it submits to a Hazelcast IMap with the specified name. For each received item it applies toKeyFn to get the key and toEntryProcessorFn to get the entry processor, and then submits the key and the entry processor to the Hazelcast cluster, which will internally apply the entry processor to the key.

      NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.

      As opposed to mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>) and mapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>), this sink does not use batching and submits a separate entry processor for each received item. For use cases that are efficiently solvable using those sinks, this one will perform worse. It should be used only when they are not applicable.

      If your entry processors take a long time to update a value, consider using entry processors that implement Offloadable. This will avoid blocking the Hazelcast partition thread during large update operations.

      This sink supports exactly-once processing only if the supplied entry processor performs idempotent updates, i.e., the resulting value would be the same if an entry processor was run on the same entry more than once.

      Note: Unlike mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>) and mapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>), this operation is lock-aware. If the key is locked, the EntryProcessor will wait until it acquires the lock.

      The default local parallelism for this sink is 1.

      The given functions must be stateless and cooperative.

      Type Parameters:
      T - input item type
      K - key type
      V - value type
      Parameters:
      map - map to drain to
      toKeyFn - function that extracts the key from the input item
      toEntryProcessorFn - function that returns the EntryProcessor to apply to the key
    • remoteMapWithEntryProcessor

      @Nonnull public static <E, K, V, R> Sink<E> remoteMapWithEntryProcessor(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super E,? extends K> toKeyFn, @Nonnull FunctionEx<? super E,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
    • remoteMapWithEntryProcessor

      public static <E, K, V, R> Sink<E> remoteMapWithEntryProcessor(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull FunctionEx<? super E,? extends K> toKeyFn, @Nonnull FunctionEx<? super E,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
      The same as the remoteMapWithEntryProcessor(String, ClientConfig, FunctionEx, FunctionEx) method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.

      The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used

      Since:
      5.4
    • cache

      @Nonnull public static <T extends Map.Entry> Sink<T> cache(@Nonnull String cacheName)
      Returns a sink that puts Map.Entrys it receives into a Hazelcast ICache with the specified name.

      This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.

      The default local parallelism for this sink is 2.

    • remoteCache

      @Nonnull public static <T extends Map.Entry> Sink<T> remoteCache(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig)
      Returns a sink that puts Map.Entrys it receives into a Hazelcast ICache with the specified name in a remote cluster identified by the supplied ClientConfig.

      This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.

      The default local parallelism for this sink is 2.

    • list

      @Nonnull public static <T> Sink<T> list(@Nonnull String listName)
      Returns a sink that adds the items it receives to a Hazelcast IList with the specified name.

      No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.

      The default local parallelism for this sink is 1.

    • list

      @Nonnull public static <T> Sink<T> list(@Nonnull IList<? super T> list)
      Returns a sink that adds the items it receives to the specified Hazelcast IList.

      NOTE: Jet only remembers the name of the list you supply and acquires a list with that name on the local cluster. If you supply a list instance from another cluster, no error will be thrown to indicate this.

      No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.

      The default local parallelism for this sink is 1.

    • remoteList

      @Nonnull public static <T> Sink<T> remoteList(@Nonnull String listName, @Nonnull ClientConfig clientConfig)
      Returns a sink that adds the items it receives to a Hazelcast IList with the specified name in a remote cluster identified by the supplied ClientConfig.

      No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.

      The default local parallelism for this sink is 1.

    • reliableTopic

      @Nonnull public static <T> Sink<T> reliableTopic(@Nonnull String reliableTopicName)
      Returns a sink which publishes the items it receives to a distributed reliable topic with the specified name.

      No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.

      Local parallelism for this sink is 1.

      Since:
      Jet 4.0
    • reliableTopic

      @Nonnull public static <T> Sink<T> reliableTopic(@Nonnull ITopic<Object> reliableTopic)
      Returns a sink which publishes the items it receives to the provided distributed reliable topic. More precisely, it takes the name of the given ITopic and then independently retrieves the ITopic with the same name from the cluster where the job is running. To prevent surprising behavior, make sure you have obtained the ITopic from the same cluster to which you will submit the pipeline.

      No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.

      Local parallelism for this sink is 1.

      Since:
      Jet 4.0
    • remoteReliableTopic

      @Nonnull public static <T> Sink<T> remoteReliableTopic(@Nonnull String reliableTopicName, @Nonnull ClientConfig clientConfig)
      Returns a sink which publishes the items it receives to a distributed reliable topic with the provided name in a remote cluster identified by the supplied ClientConfig.

      No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.

      Local parallelism for this sink is 1.

      Since:
      Jet 4.0
    • socket

      @Nonnull public static <T> Sink<T> socket(@Nonnull String host, int port, @Nonnull FunctionEx<? super T,? extends String> toStringFn, @Nonnull Charset charset)
      Returns a sink that connects to the specified TCP socket and writes to it a string representation of the items it receives. It converts an item to its string representation using the supplied toStringFn function and encodes the string using the supplied Charset. It follows each item with a newline character.

      No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.

      The default local parallelism for this sink is 1.

      Parameters:
      host - the host to connect to
      port - the target port
      toStringFn - a function to convert received items to string. It must be stateless and cooperative.
      charset - charset used to convert the string to bytes
    • socket

      @Nonnull public static <T> Sink<T> socket(@Nonnull String host, int port, @Nonnull FunctionEx<? super T,? extends String> toStringFn)
      Convenience for socket(String, int, FunctionEx, Charset) with UTF-8 as the charset.
    • socket

      @Nonnull public static <T> Sink<T> socket(@Nonnull String host, int port)
      Convenience for socket(String, int, FunctionEx, Charset) with Object.toString as the conversion function and UTF-8 as the charset.
    • filesBuilder

      @Nonnull public static <T> FileSinkBuilder<T> filesBuilder(@Nonnull String directoryName)
      Returns a builder object that offers a step-by-step fluent API to build a custom file sink for the Pipeline API. See javadoc of methods in FileSinkBuilder for more details.

      The sink writes the items it receives to files. Each processor will write to its own files whose names contain the processor's global index (an integer unique to each processor of the vertex), but the same directory is used for all files, on all cluster members. That directory can be a shared in a network - each processor creates globally unique file names.

      Fault tolerance

      If the job is running in exactly-once mode, Jet writes the items to temporary files (ending with a ".tmp" suffix). When Jet commits a snapshot, it atomically renames the file to remove this suffix. Thanks to the two-phase commit of the snapshot the sink provides exactly-once guarantee.

      Because Jet starts a new file each time it snapshots the state, the sink will produce many more small files, depending on the snapshot interval. If you want to avoid the temporary files or the high number of files but need to have exactly-once for other processors in the job, call exactlyOnce(false) on the returned builder. This will give you at-least-once guarantee for the source and unchanged guarantee for other processors.

      For the fault-tolerance to work, the target file system must be a network file system. If you lose a member with its files, you'll obviously lose data. Even if that member rejoins later with the lost files, the job might have processed more transactions on the remaining members and will not commit the temporary files on the resurrected member.

      File name structure

      
       [<date>-]<global processor index>[-<sequence>][".tmp"]
       

      Description (parts in [] are optional):

      • <date>: the current date and time, see FileSinkBuilder.rollByDate(String). Not present if rolling by date is not used
      • <global processor index>: a processor index ensuring that each parallel processor writes to its own file
      • <sequence>: a sequence number starting from 0. Used if either: The sequence is reset to 0 when the <date> changes.
      • ".tmp": the FileSinkBuilder.TEMP_FILE_SUFFIX, used if the file is not yet committed

      Notes

      The target directory is not deleted before the job start. If file names clash, they are appended to. This is needed to ensure at-least-once behavior. In exactly-once mode the file names never clash thanks to the sequence number in file name: a number higher than the highest sequence number found in the directory is always chosen.

      For performance, the processor doesn't delete old files from the directory. If you have frequent snapshots, you should delete the old files from time to time to avoid having huge number of files in the directory. Jet lists the files in the directory after a restart to find out the sequence number to use.

      The default local parallelism for this sink is 1.

      Type Parameters:
      T - type of the items the sink accepts
    • files

      @Nonnull public static <T> Sink<T> files(@Nonnull String directoryName)
      Convenience for filesBuilder(java.lang.String) with the UTF-8 charset and with overwriting of existing files.
    • json

      @Nonnull public static <T> Sink<T> json(@Nonnull String directoryName)
      Convenience for filesBuilder(java.lang.String) with the UTF-8 charset and with overwriting of existing files. The sink converts each item to a JSON string adds it to the file as a line.
    • logger

      @Nonnull public static <T> Sink<T> logger(@Nonnull FunctionEx<? super T,String> toStringFn)
      Returns a sink that logs all the data items it receives, at the INFO level to the log category WriteLoggerP. It also logs watermark items, but at FINE level.

      The sink logs each item on whichever cluster member it happens to receive it. Its primary purpose is for development, when running Jet on a local machine.

      The default local parallelism for this sink is 1.

      Type Parameters:
      T - stream item type
      Parameters:
      toStringFn - a function that returns a string representation of a stream item. It must be stateless and cooperative.
    • logger

      @Nonnull public static <T> Sink<T> logger()
      Convenience for logger(FunctionEx) with Object.toString() as the toStringFn.
    • noop

      @Nonnull public static <T> Sink<T> noop()
      Returns a sink which discards all received items.
    • jmsQueue

      @Nonnull public static <T> Sink<T> jmsQueue(@Nonnull String queueName, @Nonnull SupplierEx<jakarta.jms.ConnectionFactory> factorySupplier)
      Convenience for jmsQueueBuilder(SupplierEx). Creates a connection without any authentication parameters. If a received item is not an instance of jakarta.jms.Message, the sink wraps item.toString() into a TextMessage.
      Parameters:
      queueName - the name of the queue
      factorySupplier - supplier to obtain JMS connection factory. It must be stateless.
    • jmsQueueBuilder

      @Nonnull public static <T> JmsSinkBuilder<T> jmsQueueBuilder(@Nonnull SupplierEx<jakarta.jms.ConnectionFactory> factorySupplier)
      Returns a builder object that offers a step-by-step fluent API to build a custom JMS queue sink for the Pipeline API. See javadoc for JmsSinkBuilder methods for more details.

      In exactly-once mode the processor uses two-phase XA transactions to guarantee exactly-once delivery. The supplier is expected to return an XAConnectionFactory. The transaction is committed after all processors finished processing the items and stored all data to the snapshot. Processor is also able to finish the commit after a restart, should the job fail mid-way of the commit process. This mode significantly increases latency because produced messages are visible only after they are committed; if you want to avoid it, you can reduce the guarantee just for this sink. To do so call exactlyOnce(false) on the returned builder. If your messages have a unique identifier, some JMS brokers have deduplication functionality - you can use this to avoid the latency penalty.

      In at-least-once mode or when the guarantee is off, the produced records are acknowledged immediately. We use transactions to produce messages in batches, but those transactions have very short duration.

      IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.

      Test the XA support of your broker

      The JMS is an API, some brokers don't implement the XA transactions correctly. We run our stress tests with RabbitMQ and ActiveMQ. The most common flaw is that a prepared transaction is rolled back if the client disconnects. To check your broker, you can run the code in https://github.com/hazelcast/hazelcast-jet-contrib/xa-test

      Notes

      The default local parallelism for this processor is 1.

      Type Parameters:
      T - type of the items the sink accepts
      Parameters:
      factorySupplier - supplier to obtain JMS connection factory. For exactly-once the factory must implement XAConnectionFactory. It must be stateless.
    • jmsTopic

      @Nonnull public static <T> Sink<T> jmsTopic(@Nonnull String topicName, @Nonnull SupplierEx<jakarta.jms.ConnectionFactory> factorySupplier)
      Shortcut for:
      
            jmsTopicBuilder(factorySupplier)
                       .destinationName(topicName)
                       .build();
       

      See jmsTopicBuilder(SupplierEx) for more details.

      Parameters:
      topicName - the name of the queue
      factorySupplier - supplier to obtain JMS connection factory. For exactly-once the factory must implement XAConnectionFactory. It must be stateless.
    • jmsTopicBuilder

      @Nonnull public static <T> JmsSinkBuilder<T> jmsTopicBuilder(@Nonnull SupplierEx<jakarta.jms.ConnectionFactory> factorySupplier)
      Returns a builder object that offers a step-by-step fluent API to build a custom JMS topic sink for the Pipeline API. See javadoc on JmsSinkBuilder methods for more details.

      In exactly-once mode the processor uses two-phase XA transactions to guarantee exactly-once delivery. The supplier is expected to return an XAConnectionFactory. The transaction is committed after all processors finished processing the items and stored all data to the snapshot. Processor is also able to finish the commit after a restart, should the job fail mid-way of the commit process. This mode significantly increases latency because produced messages are visible only after they are committed; if you want to avoid it, you can reduce the guarantee just for this sink. To do so call exactlyOnce(false) on the returned builder.

      In at-least-once mode or when the guarantee is off, the produced records are acknowledged immediately. We use transactions to produce messages in batches, but those transactions have very short duration.

      IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.

      Test the XA support of your broker

      The JMS is an API, some brokers don't implement the XA transactions correctly. We run our stress tests with RabbitMQ and ActiveMQ. The most common flaw is that a prepared transaction is rolled back if the client disconnects. To check your broker, you can run the code in https://github.com/hazelcast/hazelcast-jet-contrib/xa-test

      Notes

      The default local parallelism for this processor is 1.

      Type Parameters:
      T - type of the items the sink accepts
      Parameters:
      factorySupplier - supplier to obtain JMS connection factory. It must be stateless.
    • jdbc

      @Nonnull public static <T> Sink<T> jdbc(@Nonnull String updateQuery, @Nonnull SupplierEx<? extends CommonDataSource> dataSourceSupplier, @Nonnull BiConsumerEx<PreparedStatement,T> bindFn)
      A shortcut for:
      
           Sinks.<T>jdbcBuilder()
                   .updateQuery(updateQuery)
                   .dataSourceSupplier(dataSourceSupplier)
                   .bindFn(bindFn)
                   .build();
       

      See jdbcBuilder() for more information.

    • jdbc

      @Nonnull @Beta public static <T> Sink<T> jdbc(@Nonnull String updateQuery, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull BiConsumerEx<PreparedStatement,T> bindFn)
      A shortcut for:
      
           Sinks.<T>jdbcBuilder()
                   .updateQuery(updateQuery)
                   .dataConnectionRef(dataConnectionRef)
                   .bindFn(bindFn)
                   .build();
       

      See jdbcBuilder() for more information.

      Since:
      5.2
    • jdbc

      @Nonnull public static <T> Sink<T> jdbc(@Nonnull String updateQuery, @Nonnull String jdbcUrl, @Nonnull BiConsumerEx<PreparedStatement,T> bindFn)
      A shortcut for:
      
           jdbcBuilder(updateQuery, bindFn)
                    .jdbcUrl(jdbcUrl)
                    .build()
       

      See jdbcBuilder() for more information.

    • jdbcBuilder

      @Nonnull public static <T> JdbcSinkBuilder<T> jdbcBuilder()
      Returns a builder to build a sink that connects to a JDBC database, prepares an SQL statement and executes it for each item. On the returned builder you must specify a connection (either using a JDBC URL or using a datasource or using a dataConnection), the SQL statement and a bind function.

      Example

      
       stage.writeTo(Sinks.<Entry<Integer, String>>jdbcBuilder()
           .updateQuery("INSERT INTO table (key, value) VALUES(?, ?)")
           .bindFn((stmt, item) -> {
               stmt.setInt(1, item.getKey());
               stmt.setString(2, item.getValue());
           })
           .jdbcUrl("jdbc:...")
           .build());
       

      Commit behavior

      The commit behavior depends on the job guarantee:

      • Exactly-once: XA transactions will be used to commit the work in phase two of the snapshot, that is after all other vertices in the job have performed the snapshot. Very small state will be saved to snapshot.
      • At-least-once or no guarantee: Records will be committed in batches. A batch is created from records that are readily available at the sink.

      If the job is in exactly-once mode, the overhead in the database and the output latency are higher. This is caused by the fact that Jet will not commit the transaction until the next snapshot occurs and the number of uncommitted records in the transactions can be very high. Latency is high because the changes are visible only after the transactions are committed. Configure the snapshot interval accordingly.

      If your driver doesn't support XA transactions or if you want to avoid the performance or latency penalty, you can decrease the guarantee just for this sink by calling exactlyOnce(false) on the returned builder.

      Notes

      In non-XA mode, in case of an SQLException the processor will transparently reconnect and the job won't fail, except for an SQLNonTransientException subclass. In XA mode the job will fail immediately.

      Test the XA support of your database

      The JDBC is an API, some brokers don't implement the XA transactions correctly. We run our stress tests with PostgreSQL. The most common flaw is that a prepared transaction is rolled back if the client disconnects. To check your database, you can run the code in https://github.com/hazelcast/hazelcast-jet-contrib/xa-test

      Notes

      The default local parallelism for this sink is 1.

      Type Parameters:
      T - type of the items the sink accepts
      Since:
      Jet 4.1
    • observable

      @Nonnull public static <T> Sink<T> observable(String name)
      Returns a sink that publishes to the Observable with the provided name. The records that are sent to the observable can be read through first getting a handle to it through JetService.getObservable(String) and then subscribing to the events using the methods on Observable.

      The Observable should be destroyed after using it. For the full description see the javadoc for Observable. Example:

      
         Observable<Integer> observable = jet.newObservable();
         CompletableFuture<List<Integer>> list = observable.toFuture(o -> o.collect(toList()));
      
         pipeline.readFrom(TestSources.items(1, 2, 3, 4))
                 .writeTo(Sinks.observable(observable));
      
         Job job = jet.newJob(pipeline);
      
         System.out.println(list.get());
         observable.destroy();
       
      This sink is cooperative and uses a local parallelism of 1.
      Since:
      Jet 4.0
    • observable

      @Nonnull public static <T> Sink<T> observable(Observable<? super T> observable)
      Returns a sink that publishes to the provided Observable. More precisely, it takes the name of the given Observable and then independently retrieves an Observable with the same name from the cluster where the job is running. To prevent surprising behavior, make sure you have obtained the Observable from the same cluster to which you will submit the pipeline.

      For more details refer to observable(name).

      Since:
      Jet 4.0