Class Sinks
SinkStage
and accepts no downstream stages.
The default local parallelism for the sinks in this class is typically 1, check the documentation of individual methods.
- Since:
- Jet 3.0
-
Method Summary
Modifier and TypeMethodDescriptionReturns a sink that putsMap.Entry
s it receives into a HazelcastICache
with the specified name.static <T> Sink<T>
Convenience forfilesBuilder(java.lang.String)
with the UTF-8 charset and with overwriting of existing files.static <T> FileSinkBuilder<T>
filesBuilder
(String directoryName) Returns a builder object that offers a step-by-step fluent API to build a custom file sink for the Pipeline API.static <T> Sink<T>
fromProcessor
(String sinkName, ProcessorMetaSupplier metaSupplier) Returns a sink constructed directly from the given Core API processor meta-supplier.static <T> Sink<T>
fromProcessor
(String sinkName, ProcessorMetaSupplier metaSupplier, FunctionEx<? super T, ?> partitionKeyFn) Returns a sink constructed directly from the given Core API processor meta-supplier.static <T> Sink<T>
jdbc
(String updateQuery, SupplierEx<? extends CommonDataSource> dataSourceSupplier, BiConsumerEx<PreparedStatement, T> bindFn) A shortcut for:static <T> Sink<T>
jdbc
(String updateQuery, DataConnectionRef dataConnectionRef, BiConsumerEx<PreparedStatement, T> bindFn) A shortcut for:static <T> Sink<T>
jdbc
(String updateQuery, String jdbcUrl, BiConsumerEx<PreparedStatement, T> bindFn) A shortcut for:static <T> JdbcSinkBuilder<T>
Returns a builder to build a sink that connects to a JDBC database, prepares an SQL statement and executes it for each item.static <T> Sink<T>
jmsQueue
(String queueName, SupplierEx<jakarta.jms.ConnectionFactory> factorySupplier) Convenience forjmsQueueBuilder(SupplierEx)
.static <T> JmsSinkBuilder<T>
jmsQueueBuilder
(SupplierEx<jakarta.jms.ConnectionFactory> factorySupplier) Returns a builder object that offers a step-by-step fluent API to build a custom JMS queue sink for the Pipeline API.static <T> Sink<T>
jmsTopic
(String topicName, SupplierEx<jakarta.jms.ConnectionFactory> factorySupplier) Shortcut for:static <T> JmsSinkBuilder<T>
jmsTopicBuilder
(SupplierEx<jakarta.jms.ConnectionFactory> factorySupplier) Returns a builder object that offers a step-by-step fluent API to build a custom JMS topic sink for the Pipeline API.static <T> Sink<T>
Convenience forfilesBuilder(java.lang.String)
with the UTF-8 charset and with overwriting of existing files.static <T> Sink<T>
Returns a sink that adds the items it receives to the specified HazelcastIList
.static <T> Sink<T>
Returns a sink that adds the items it receives to a HazelcastIList
with the specified name.static <T> Sink<T>
logger()
static <T> Sink<T>
logger
(FunctionEx<? super T, String> toStringFn) Returns a sink that logs all the data items it receives, at the INFO level to the log categoryWriteLoggerP
.Returns a sink that putsMap.Entry
s it receives into the given HazelcastIMap
.static <T,
K, V> Sink<T> map
(IMap<? super K, ? super V> map, FunctionEx<? super T, ? extends K> toKeyFn, FunctionEx<? super T, ? extends V> toValueFn) Returns a sink that uses the supplied functions to extract the key and value with which to put to given HazelcastIMap
.Returns a sink that putsMap.Entry
s it receives into a HazelcastIMap
with the specified name.static <T,
K, V> Sink<T> map
(String mapName, FunctionEx<? super T, ? extends K> toKeyFn, FunctionEx<? super T, ? extends V> toValueFn) Returns a sink that uses the supplied functions to extract the key and value with which to put to a HazelcastIMap
with the specified name.static <T,
K, V> MapSinkBuilder<T, K, V> mapBuilder
(String mapName) Returns a builder object that offers a step-by-step fluent API to build a custom file sink for the Pipeline API.static <E,
K, V, R> MapSinkEntryProcessorBuilder<E, K, V, R> mapEntryProcessorBuilder
(String mapName) Returns a builder object that offers a step-by-step fluent API to build a custom file sink for the Pipeline API.static <E,
K, V, R> Sink<E> mapWithEntryProcessor
(int maxParallelAsyncOps, String mapName, FunctionEx<? super E, ? extends K> toKeyFn, FunctionEx<? super E, ? extends EntryProcessor<K, V, R>> toEntryProcessorFn) Returns a sink that uses the items it receives to createEntryProcessor
s it submits to a HazelcastIMap
with the specified name.static <T,
K, V, R> Sink<T> mapWithEntryProcessor
(IMap<? super K, ? super V> map, FunctionEx<? super T, ? extends K> toKeyFn, FunctionEx<? super T, ? extends EntryProcessor<K, V, R>> toEntryProcessorFn) Returns a sink that uses the items it receives to createEntryProcessor
s it submits to a HazelcastIMap
with the specified name.static <E,
K, V, R> Sink<E> mapWithEntryProcessor
(String mapName, FunctionEx<? super E, ? extends K> toKeyFn, FunctionEx<? super E, ? extends EntryProcessor<K, V, R>> toEntryProcessorFn) Convenience formapWithEntryProcessor(int, String, FunctionEx, FunctionEx)
when the maximum number of async operations is not specified.static <T,
K, V> Sink<T> mapWithMerging
(IMap<? super K, ? super V> map, FunctionEx<? super T, ? extends K> toKeyFn, FunctionEx<? super T, ? extends V> toValueFn, BinaryOperatorEx<V> mergeFn) Returns a sink that uses the supplied functions to extract the key and value with which to update a HazelcastIMap
.mapWithMerging
(IMap<? super K, V> map, BinaryOperatorEx<V> mergeFn) Convenience formapWithMerging(IMap, FunctionEx, FunctionEx, BinaryOperatorEx)
withMap.Entry
as input item.mapWithMerging
(String mapName, BinaryOperatorEx<V> mergeFn) Convenience formapWithMerging(String, FunctionEx, FunctionEx, BinaryOperatorEx)
withMap.Entry
as input item.static <T,
K, V> Sink<T> mapWithMerging
(String mapName, FunctionEx<? super T, ? extends K> toKeyFn, FunctionEx<? super T, ? extends V> toValueFn, BinaryOperatorEx<V> mergeFn) Returns a sink that uses the supplied functions to extract the key and value with which to update a HazelcastIMap
.mapWithUpdating
(IMap<? super K, ? super V> map, BiFunctionEx<? super V, ? super E, ? extends V> updateFn) Convenience formapWithUpdating(IMap, FunctionEx, BiFunctionEx)
withMap.Entry
as the input item.static <T,
K, V> Sink<T> mapWithUpdating
(IMap<? super K, ? super V> map, FunctionEx<? super T, ? extends K> toKeyFn, BiFunctionEx<? super V, ? super T, ? extends V> updateFn) Returns a sink that uses the supplied key-extracting and value-updating functions to update a HazelcastIMap
.mapWithUpdating
(String mapName, BiFunctionEx<? super V, ? super E, ? extends V> updateFn) Convenience formapWithUpdating(String, FunctionEx, BiFunctionEx)
withMap.Entry
as the input item.static <T,
K, V> Sink<T> mapWithUpdating
(String mapName, FunctionEx<? super T, ? extends K> toKeyFn, BiFunctionEx<? super V, ? super T, ? extends V> updateFn) Returns a sink that uses the supplied key-extracting and value-updating functions to update a HazelcastIMap
.static <T> Sink<T>
noop()
Returns a sink which discards all received items.static <T> Sink<T>
observable
(Observable<? super T> observable) Returns a sink that publishes to the providedObservable
.static <T> Sink<T>
observable
(String name) Returns a sink that publishes to theObservable
with the provided name.static <T> Sink<T>
reliableTopic
(ITopic<Object> reliableTopic) Returns a sink which publishes the items it receives to the provided distributed reliable topic.static <T> Sink<T>
reliableTopic
(String reliableTopicName) Returns a sink which publishes the items it receives to a distributed reliable topic with the specified name.remoteCache
(String cacheName, ClientConfig clientConfig) Returns a sink that putsMap.Entry
s it receives into a HazelcastICache
with the specified name in a remote cluster identified by the suppliedClientConfig
.static <T> Sink<T>
remoteList
(String listName, ClientConfig clientConfig) Returns a sink that adds the items it receives to a HazelcastIList
with the specified name in a remote cluster identified by the suppliedClientConfig
.remoteMap
(String mapName, ClientConfig clientConfig) Returns a sink that putsMap.Entry
s it receives into a HazelcastIMap
with the specified name in a remote cluster identified by the suppliedClientConfig
.static <T,
K, V> Sink<T> remoteMap
(String mapName, ClientConfig clientConfig, FunctionEx<? super T, ? extends K> toKeyFn, FunctionEx<? super T, ? extends V> toValueFn) Returns a sink that uses the supplied functions to extract the key and value with which to put to a HazelcastIMap
in a remote cluster identified by the suppliedClientConfig
.remoteMap
(String mapName, DataConnectionRef dataConnectionRef) The same as theremoteMap(String, ClientConfig)
method.static <T,
K, V> Sink<T> remoteMap
(String mapName, DataConnectionRef dataConnectionRef, FunctionEx<? super T, ? extends K> toKeyFn, FunctionEx<? super T, ? extends V> toValueFn) The same as theremoteMap(String, ClientConfig, FunctionEx, FunctionEx)
method.static <E,
K, V, R> Sink<E> remoteMapWithEntryProcessor
(String mapName, ClientConfig clientConfig, FunctionEx<? super E, ? extends K> toKeyFn, FunctionEx<? super E, ? extends EntryProcessor<K, V, R>> toEntryProcessorFn) Returns a sink equivalent tomapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
.static <E,
K, V, R> Sink<E> remoteMapWithEntryProcessor
(String mapName, DataConnectionRef dataConnectionRef, FunctionEx<? super E, ? extends K> toKeyFn, FunctionEx<? super E, ? extends EntryProcessor<K, V, R>> toEntryProcessorFn) The same as theremoteMapWithEntryProcessor(String, ClientConfig, FunctionEx, FunctionEx)
method.remoteMapWithMerging
(String mapName, ClientConfig clientConfig, BinaryOperatorEx<V> mergeFn) Convenience forremoteMapWithMerging(java.lang.String, com.hazelcast.client.config.ClientConfig, com.hazelcast.function.BinaryOperatorEx<V>)
withMap.Entry
as input item.static <T,
K, V> Sink<T> remoteMapWithMerging
(String mapName, ClientConfig clientConfig, FunctionEx<? super T, ? extends K> toKeyFn, FunctionEx<? super T, ? extends V> toValueFn, BinaryOperatorEx<V> mergeFn) Returns a sink equivalent tomapWithMerging(String, BinaryOperatorEx)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
.remoteMapWithMerging
(String mapName, DataConnectionRef dataConnectionRef, BinaryOperatorEx<V> mergeFn) The same as theremoteMapWithMerging(String, ClientConfig, BinaryOperatorEx)
method.static <T,
K, V> Sink<T> remoteMapWithMerging
(String mapName, DataConnectionRef dataConnectionRef, FunctionEx<? super T, ? extends K> toKeyFn, FunctionEx<? super T, ? extends V> toValueFn, BinaryOperatorEx<V> mergeFn) The same as theremoteMapWithMerging(String, ClientConfig, FunctionEx, FunctionEx, BinaryOperatorEx)
method.remoteMapWithUpdating
(String mapName, ClientConfig clientConfig, BiFunctionEx<? super V, ? super E, ? extends V> updateFn) static <T,
K, V> Sink<T> remoteMapWithUpdating
(String mapName, ClientConfig clientConfig, FunctionEx<? super T, ? extends K> toKeyFn, BiFunctionEx<? super V, ? super T, ? extends V> updateFn) Returns a sink equivalent tomapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
.remoteMapWithUpdating
(String mapName, DataConnectionRef dataConnectionRef, BiFunctionEx<? super V, ? super E, ? extends V> updateFn) The same as theremoteMapWithUpdating(String, ClientConfig, BiFunctionEx)
method.static <T,
K, V> Sink<T> remoteMapWithUpdating
(String mapName, DataConnectionRef dataConnectionRef, FunctionEx<? super T, ? extends K> toKeyFn, BiFunctionEx<? super V, ? super T, ? extends V> updateFn) The same as theremoteMapWithUpdating(String, ClientConfig, BiFunctionEx)
method.static <T> Sink<T>
remoteReliableTopic
(String reliableTopicName, ClientConfig clientConfig) Returns a sink which publishes the items it receives to a distributed reliable topic with the provided name in a remote cluster identified by the suppliedClientConfig
.static <T> Sink<T>
Convenience forsocket(String, int, FunctionEx, Charset)
withObject.toString
as the conversion function and UTF-8 as the charset.static <T> Sink<T>
socket
(String host, int port, FunctionEx<? super T, ? extends String> toStringFn) Convenience forsocket(String, int, FunctionEx, Charset)
with UTF-8 as the charset.static <T> Sink<T>
socket
(String host, int port, FunctionEx<? super T, ? extends String> toStringFn, Charset charset) Returns a sink that connects to the specified TCP socket and writes to it a string representation of the items it receives.
-
Method Details
-
fromProcessor
@Nonnull public static <T> Sink<T> fromProcessor(@Nonnull String sinkName, @Nonnull ProcessorMetaSupplier metaSupplier) Returns a sink constructed directly from the given Core API processor meta-supplier.The default local parallelism for this source is specified inside the
metaSupplier
.- Parameters:
sinkName
- user-friendly sink namemetaSupplier
- the processor meta-supplier
-
fromProcessor
@Nonnull public static <T> Sink<T> fromProcessor(@Nonnull String sinkName, @Nonnull ProcessorMetaSupplier metaSupplier, @Nullable FunctionEx<? super T, ?> partitionKeyFn) Returns a sink constructed directly from the given Core API processor meta-supplier.The default local parallelism for this source is specified inside the
metaSupplier
.- Parameters:
sinkName
- user-friendly sink namemetaSupplier
- the processor meta-supplierpartitionKeyFn
- key extractor function for partitioning edges to sink. It must be stateless and cooperative.
-
map
Returns a sink that putsMap.Entry
s it receives into a HazelcastIMap
with the specified name.This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
-
map
Returns a sink that putsMap.Entry
s it receives into the given HazelcastIMap
.NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
-
map
@Nonnull public static <T,K, Sink<T> mapV> (@Nonnull String mapName, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends V> toValueFn) Returns a sink that uses the supplied functions to extract the key and value with which to put to a HazelcastIMap
with the specified name.This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
- Since:
- Jet 4.2
-
map
@Nonnull public static <T,K, Sink<T> mapV> (@Nonnull IMap<? super K, ? super V> map, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends V> toValueFn) Returns a sink that uses the supplied functions to extract the key and value with which to put to given HazelcastIMap
.NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
- Since:
- Jet 4.2
-
remoteMap
@Nonnull public static <K,V> Sink<Map.Entry<K,V>> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig) Returns a sink that putsMap.Entry
s it receives into a HazelcastIMap
with the specified name in a remote cluster identified by the suppliedClientConfig
.This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
-
remoteMap
@Nonnull public static <K,V> Sink<Map.Entry<K,V>> remoteMap(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef) The same as theremoteMap(String, ClientConfig)
method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used
- Parameters:
mapName
- the name of the mapdataConnectionRef
- the reference to DataConnectionConfig- Since:
- 5.4
-
remoteMap
@Nonnull public static <T,K, Sink<T> remoteMapV> (@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends V> toValueFn) Returns a sink that uses the supplied functions to extract the key and value with which to put to a HazelcastIMap
in a remote cluster identified by the suppliedClientConfig
.This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
- Since:
- Jet 4.2
-
remoteMap
@Nonnull public static <T,K, Sink<T> remoteMapV> (@Nonnull String mapName, DataConnectionRef dataConnectionRef, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends V> toValueFn) The same as theremoteMap(String, ClientConfig, FunctionEx, FunctionEx)
method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used
- Parameters:
mapName
- the name of the mapdataConnectionRef
- the reference to DataConnectionConfig- Since:
- 5.4
-
mapBuilder
Returns a builder object that offers a step-by-step fluent API to build a custom file sink for the Pipeline API. See javadoc of methods inMapSinkBuilder
for more details.- Type Parameters:
T
- type of the incoming itemsK
- type of the key extracted from each itemV
- type fo the value extracted from each item- Parameters:
mapName
- name of the map to sink into, must not be null- Since:
- 5.4
-
mapEntryProcessorBuilder
@Nonnull public static <E,K, MapSinkEntryProcessorBuilder<E,V, R> K, mapEntryProcessorBuilderV, R> (String mapName) Returns a builder object that offers a step-by-step fluent API to build a custom file sink for the Pipeline API. See javadoc of methods inMapSinkBuilder
for more details.- Type Parameters:
E
- type of the incoming itemsK
- type of the key extracted from each itemV
- type fo the value extracted from each item- Parameters:
mapName
- name of the map to sink into, must not be null- Since:
- 5.4
-
mapWithMerging
@Nonnull public static <T,K, Sink<T> mapWithMergingV> (@Nonnull String mapName, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends V> toValueFn, @Nonnull BinaryOperatorEx<V> mergeFn) Returns a sink that uses the supplied functions to extract the key and value with which to update a HazelcastIMap
. If the map already contains the key, it applies the givenmergeFn
to resolve the existing and the proposed value into the value to use. If the value comes out asnull
, it removes the key from the map. Expressed as code, the sink performs the equivalent of the following for each item:K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = toValueFn.apply(item); V resolved = (oldValue == null) ? newValue : mergeFn.apply(oldValue, newValue); if (value == null) map.remove(key); else map.put(key, value);
This sink supports exactly-once processing only if the supplied merge function performs idempotent updates, i.e., it satisfies the rule
mergeFn.apply(oldValue, toValueFn.apply(e)).equals(oldValue)
for anye
that was already observed.Note: This operation is NOT lock-aware, it will process the entries no matter if they are locked or not. Use
mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
if you need locking.The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
- Type Parameters:
T
- input item typeK
- key typeV
- value type- Parameters:
mapName
- name of the maptoKeyFn
- function that extracts the key from the input itemtoValueFn
- function that extracts the value from the input itemmergeFn
- function that merges the existing value with the value acquired from the received item
-
mapWithMerging
@Nonnull public static <T,K, Sink<T> mapWithMergingV> (@Nonnull IMap<? super K, ? super V> map, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends V> toValueFn, @Nonnull BinaryOperatorEx<V> mergeFn) Returns a sink that uses the supplied functions to extract the key and value with which to update a HazelcastIMap
. If the map already contains the key, it applies the givenmergeFn
to resolve the existing and the proposed value into the value to use. If the value comes out asnull
, it removes the key from the map. Expressed as code, the sink performs the equivalent of the following for each item:K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = toValueFn.apply(item); V resolved = (oldValue == null) ? newValue : mergeFn.apply(oldValue, newValue); if (value == null) map.remove(key); else map.put(key, value);
NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.This sink supports exactly-once processing only if the supplied merge function performs idempotent updates, i.e., it satisfies the rule
mergeFn.apply(oldValue, toValueFn.apply(e)).equals(oldValue)
for anye
that was already observed.Note: This operation is NOT lock-aware, it will process the entries no matter if they are locked or not. Use
mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
if you need locking.The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
- Type Parameters:
T
- input item typeK
- key typeV
- value type- Parameters:
map
- the map to drain totoKeyFn
- function that extracts the key from the input itemtoValueFn
- function that extracts the value from the input itemmergeFn
- function that merges the existing value with the value acquired from the received item
-
remoteMapWithMerging
@Nonnull public static <K,V> Sink<Map.Entry<K,V>> remoteMapWithMerging(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull BinaryOperatorEx<V> mergeFn) Convenience forremoteMapWithMerging(java.lang.String, com.hazelcast.client.config.ClientConfig, com.hazelcast.function.BinaryOperatorEx<V>)
withMap.Entry
as input item. -
remoteMapWithMerging
@Nonnull public static <K,V> Sink<Map.Entry<K,V>> remoteMapWithMerging(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull BinaryOperatorEx<V> mergeFn) The same as theremoteMapWithMerging(String, ClientConfig, BinaryOperatorEx)
method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used
- Since:
- 5.4
-
remoteMapWithMerging
@Nonnull public static <T,K, Sink<T> remoteMapWithMergingV> (@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends V> toValueFn, @Nonnull BinaryOperatorEx<V> mergeFn) Returns a sink equivalent tomapWithMerging(String, BinaryOperatorEx)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
.Due to the used API, the remote cluster must be at least version 4.0.
-
remoteMapWithMerging
@Nonnull public static <T,K, Sink<T> remoteMapWithMergingV> (@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends V> toValueFn, @Nonnull BinaryOperatorEx<V> mergeFn) The same as theremoteMapWithMerging(String, ClientConfig, FunctionEx, FunctionEx, BinaryOperatorEx)
method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used
- Since:
- 5.4
-
mapWithMerging
@Nonnull public static <K,V> Sink<Map.Entry<K,V>> mapWithMerging(@Nonnull String mapName, @Nonnull BinaryOperatorEx<V> mergeFn) Convenience formapWithMerging(String, FunctionEx, FunctionEx, BinaryOperatorEx)
withMap.Entry
as input item. -
mapWithMerging
@Nonnull public static <K,V> Sink<Map.Entry<K,V>> mapWithMerging(@Nonnull IMap<? super K, V> map, @Nonnull BinaryOperatorEx<V> mergeFn) Convenience formapWithMerging(IMap, FunctionEx, FunctionEx, BinaryOperatorEx)
withMap.Entry
as input item. -
mapWithUpdating
@Nonnull public static <T,K, Sink<T> mapWithUpdatingV> (@Nonnull String mapName, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V, ? super T, ? extends V> updateFn) Returns a sink that uses the supplied key-extracting and value-updating functions to update a HazelcastIMap
. For each item it receives, it appliestoKeyFn
to get the key and then appliesupdateFn
to the existing value in the map and the received item to acquire the new value to associate with the key. If the new value isnull
, it removes the key from the map. Expressed as code, the sink performs the equivalent of the following for each item:K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = updateFn.apply(oldValue, item); if (newValue == null) map.remove(key); else map.put(key, newValue);
This sink supports exactly-once processing only if the supplied update function performs idempotent updates, i.e., it satisfies the rule
updateFn.apply(v, e).equals(v)
for anye
that was already observed.Note: This operation is NOT lock-aware, it will process the entries no matter if they are locked or not. Use
mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
if you need locking.The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
- Type Parameters:
T
- input item typeK
- key typeV
- value type- Parameters:
mapName
- name of the maptoKeyFn
- function that extracts the key from the input itemupdateFn
- function that receives the existing map value and the item and returns the new map value
-
mapWithUpdating
@Nonnull public static <T,K, Sink<T> mapWithUpdatingV> (@Nonnull IMap<? super K, ? super V> map, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V, ? super T, ? extends V> updateFn) Returns a sink that uses the supplied key-extracting and value-updating functions to update a HazelcastIMap
. For each item it receives, it appliestoKeyFn
to get the key and then appliesupdateFn
to the existing value in the map and the received item to acquire the new value to associate with the key. If the new value isnull
, it removes the key from the map. Expressed as code, the sink performs the equivalent of the following for each item:K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = updateFn.apply(oldValue, item); if (newValue == null) map.remove(key); else map.put(key, newValue);
NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.This sink supports exactly-once processing only if the supplied update function performs idempotent updates, i.e., it satisfies the rule
updateFn.apply(v, e).equals(v)
for anye
that was already observed.Note: This operation is not lock-aware, it will process the entries even if they are locked. Use
mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
if you need locking.The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
- Type Parameters:
T
- input item typeK
- key typeV
- value type- Parameters:
map
- map to drain totoKeyFn
- function that extracts the key from the input itemupdateFn
- function that receives the existing map value and the item and returns the new map value
-
remoteMapWithUpdating
@Nonnull public static <T,K, Sink<T> remoteMapWithUpdatingV> (@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V, ? super T, ? extends V> updateFn) Returns a sink equivalent tomapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
.Due to the used API, the remote cluster must be at least version 4.0.
-
remoteMapWithUpdating
@Nonnull public static <T,K, Sink<T> remoteMapWithUpdatingV> (@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V, ? super T, ? extends V> updateFn) The same as theremoteMapWithUpdating(String, ClientConfig, BiFunctionEx)
method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used
- Since:
- 5.4
-
mapWithUpdating
@Nonnull public static <K,V, Sink<E> mapWithUpdatingE extends Map.Entry<K, V>> (@Nonnull String mapName, @Nonnull BiFunctionEx<? super V, ? super E, ? extends V> updateFn) Convenience formapWithUpdating(String, FunctionEx, BiFunctionEx)
withMap.Entry
as the input item. -
mapWithUpdating
@Nonnull public static <K,V, Sink<E> mapWithUpdatingE extends Map.Entry<K, V>> (@Nonnull IMap<? super K, ? super V> map, @Nonnull BiFunctionEx<? super V, ? super E, ? extends V> updateFn) Convenience formapWithUpdating(IMap, FunctionEx, BiFunctionEx)
withMap.Entry
as the input item. -
remoteMapWithUpdating
@Nonnull public static <K,V, Sink<E> remoteMapWithUpdatingE extends Map.Entry<K, V>> (@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull BiFunctionEx<? super V, ? super E, ? extends V> updateFn) -
remoteMapWithUpdating
@Nonnull public static <K,V, Sink<E> remoteMapWithUpdatingE extends Map.Entry<K, V>> (@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull BiFunctionEx<? super V, ? super E, ? extends V> updateFn) The same as theremoteMapWithUpdating(String, ClientConfig, BiFunctionEx)
method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used
- Since:
- 5.4
-
mapWithEntryProcessor
@Nonnull public static <E,K, Sink<E> mapWithEntryProcessorV, R> (@Nonnull String mapName, @Nonnull FunctionEx<? super E, ? extends K> toKeyFn, @Nonnull FunctionEx<? super E, ? extends EntryProcessor<K, V, R>> toEntryProcessorFn) Convenience formapWithEntryProcessor(int, String, FunctionEx, FunctionEx)
when the maximum number of async operations is not specified. -
mapWithEntryProcessor
@Nonnull public static <E,K, Sink<E> mapWithEntryProcessorV, R> (int maxParallelAsyncOps, @Nonnull String mapName, @Nonnull FunctionEx<? super E, ? extends K> toKeyFn, @Nonnull FunctionEx<? super E, ? extends EntryProcessor<K, V, R>> toEntryProcessorFn) Returns a sink that uses the items it receives to createEntryProcessor
s it submits to a HazelcastIMap
with the specified name. For each received item it appliestoKeyFn
to get the key andtoEntryProcessorFn
to get the entry processor, and then submits the key and the entry processor to the Hazelcast cluster, which will internally apply the entry processor to the key.As opposed to
mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
andmapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>)
, this sink does not use batching and submits a separate entry processor for each received item. For use cases that are efficiently solvable using those sinks, this one will perform worse. It should be used only when they are not applicable.If your entry processors take a long time to update a value, consider using entry processors that implement
Offloadable
. This will avoid blocking the Hazelcast partition thread during large update operations.This sink supports exactly-once processing only if the supplied entry processor performs idempotent updates, i.e., the resulting value would be the same if an entry processor was run on the same entry more than once.
Note: Unlike
mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
andmapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>)
, this operation is lock-aware. If the key is locked, the EntryProcessor will wait until it acquires the lock.The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
- Type Parameters:
E
- input item typeK
- key typeV
- value type- Parameters:
maxParallelAsyncOps
- maximum number of simultaneous entry processors affecting the mapmapName
- name of the maptoKeyFn
- function that extracts the key from the input itemtoEntryProcessorFn
- function that returns theEntryProcessor
to apply to the key
-
mapWithEntryProcessor
@Nonnull public static <T,K, Sink<T> mapWithEntryProcessorV, R> (@Nonnull IMap<? super K, ? super V> map, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends EntryProcessor<K, V, R>> toEntryProcessorFn) Returns a sink that uses the items it receives to createEntryProcessor
s it submits to a HazelcastIMap
with the specified name. For each received item it appliestoKeyFn
to get the key andtoEntryProcessorFn
to get the entry processor, and then submits the key and the entry processor to the Hazelcast cluster, which will internally apply the entry processor to the key.NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
As opposed to
mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
andmapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>)
, this sink does not use batching and submits a separate entry processor for each received item. For use cases that are efficiently solvable using those sinks, this one will perform worse. It should be used only when they are not applicable.If your entry processors take a long time to update a value, consider using entry processors that implement
Offloadable
. This will avoid blocking the Hazelcast partition thread during large update operations.This sink supports exactly-once processing only if the supplied entry processor performs idempotent updates, i.e., the resulting value would be the same if an entry processor was run on the same entry more than once.
Note: Unlike
mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
andmapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>)
, this operation is lock-aware. If the key is locked, the EntryProcessor will wait until it acquires the lock.The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
- Type Parameters:
T
- input item typeK
- key typeV
- value type- Parameters:
map
- map to drain totoKeyFn
- function that extracts the key from the input itemtoEntryProcessorFn
- function that returns theEntryProcessor
to apply to the key
-
remoteMapWithEntryProcessor
@Nonnull public static <E,K, Sink<E> remoteMapWithEntryProcessorV, R> (@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super E, ? extends K> toKeyFn, @Nonnull FunctionEx<? super E, ? extends EntryProcessor<K, V, R>> toEntryProcessorFn) Returns a sink equivalent tomapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
. -
remoteMapWithEntryProcessor
public static <E,K, Sink<E> remoteMapWithEntryProcessorV, R> (@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull FunctionEx<? super E, ? extends K> toKeyFn, @Nonnull FunctionEx<? super E, ? extends EntryProcessor<K, V, R>> toEntryProcessorFn) The same as theremoteMapWithEntryProcessor(String, ClientConfig, FunctionEx, FunctionEx)
method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used
- Since:
- 5.4
-
cache
Returns a sink that putsMap.Entry
s it receives into a HazelcastICache
with the specified name.This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 2.
-
remoteCache
@Nonnull public static <T extends Map.Entry> Sink<T> remoteCache(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig) Returns a sink that putsMap.Entry
s it receives into a HazelcastICache
with the specified name in a remote cluster identified by the suppliedClientConfig
.This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 2.
-
list
Returns a sink that adds the items it receives to a HazelcastIList
with the specified name.No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
-
list
Returns a sink that adds the items it receives to the specified HazelcastIList
.NOTE: Jet only remembers the name of the list you supply and acquires a list with that name on the local cluster. If you supply a list instance from another cluster, no error will be thrown to indicate this.
No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
-
remoteList
@Nonnull public static <T> Sink<T> remoteList(@Nonnull String listName, @Nonnull ClientConfig clientConfig) Returns a sink that adds the items it receives to a HazelcastIList
with the specified name in a remote cluster identified by the suppliedClientConfig
.No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
-
reliableTopic
Returns a sink which publishes the items it receives to a distributed reliable topic with the specified name.No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
Local parallelism for this sink is 1.
- Since:
- Jet 4.0
-
reliableTopic
Returns a sink which publishes the items it receives to the provided distributed reliable topic. More precisely, it takes the name of the givenITopic
and then independently retrieves theITopic
with the same name from the cluster where the job is running. To prevent surprising behavior, make sure you have obtained theITopic
from the same cluster to which you will submit the pipeline.No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
Local parallelism for this sink is 1.
- Since:
- Jet 4.0
-
remoteReliableTopic
@Nonnull public static <T> Sink<T> remoteReliableTopic(@Nonnull String reliableTopicName, @Nonnull ClientConfig clientConfig) Returns a sink which publishes the items it receives to a distributed reliable topic with the provided name in a remote cluster identified by the suppliedClientConfig
.No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
Local parallelism for this sink is 1.
- Since:
- Jet 4.0
-
socket
@Nonnull public static <T> Sink<T> socket(@Nonnull String host, int port, @Nonnull FunctionEx<? super T, ? extends String> toStringFn, @Nonnull Charset charset) Returns a sink that connects to the specified TCP socket and writes to it a string representation of the items it receives. It converts an item to its string representation using the suppliedtoStringFn
function and encodes the string using the suppliedCharset
. It follows each item with a newline character.No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
- Parameters:
host
- the host to connect toport
- the target porttoStringFn
- a function to convert received items to string. It must be stateless and cooperative.charset
- charset used to convert the string to bytes
-
socket
@Nonnull public static <T> Sink<T> socket(@Nonnull String host, int port, @Nonnull FunctionEx<? super T, ? extends String> toStringFn) Convenience forsocket(String, int, FunctionEx, Charset)
with UTF-8 as the charset. -
socket
Convenience forsocket(String, int, FunctionEx, Charset)
withObject.toString
as the conversion function and UTF-8 as the charset. -
filesBuilder
Returns a builder object that offers a step-by-step fluent API to build a custom file sink for the Pipeline API. See javadoc of methods inFileSinkBuilder
for more details.The sink writes the items it receives to files. Each processor will write to its own files whose names contain the processor's global index (an integer unique to each processor of the vertex), but the same directory is used for all files, on all cluster members. That directory can be a shared in a network - each processor creates globally unique file names.
Fault tolerance
If the job is running in exactly-once mode, Jet writes the items to temporary files (ending with a ".tmp" suffix). When Jet commits a snapshot, it atomically renames the file to remove this suffix. Thanks to the two-phase commit of the snapshot the sink provides exactly-once guarantee.
Because Jet starts a new file each time it snapshots the state, the sink will produce many more small files, depending on the snapshot interval. If you want to avoid the temporary files or the high number of files but need to have exactly-once for other processors in the job, call
exactlyOnce(false)
on the returned builder. This will give you at-least-once guarantee for the source and unchanged guarantee for other processors.For the fault-tolerance to work, the target file system must be a network file system. If you lose a member with its files, you'll obviously lose data. Even if that member rejoins later with the lost files, the job might have processed more transactions on the remaining members and will not commit the temporary files on the resurrected member.
File name structure
[<date>-]<global processor index>[-<sequence>][".tmp"]
Description (parts in
[]
are optional):<date>
: the current date and time, seeFileSinkBuilder.rollByDate(String)
. Not present if rolling by date is not used<global processor index>
: a processor index ensuring that each parallel processor writes to its own file<sequence>
: a sequence number starting from 0. Used if either:- running in exactly-once mode
- maximum file size is set
<date>
changes.".tmp"
: theFileSinkBuilder.TEMP_FILE_SUFFIX
, used if the file is not yet committed
Notes
The target directory is not deleted before the job start. If file names clash, they are appended to. This is needed to ensure at-least-once behavior. In exactly-once mode the file names never clash thanks to the sequence number in file name: a number higher than the highest sequence number found in the directory is always chosen.
For performance, the processor doesn't delete old files from the directory. If you have frequent snapshots, you should delete the old files from time to time to avoid having huge number of files in the directory. Jet lists the files in the directory after a restart to find out the sequence number to use.
The default local parallelism for this sink is 1.
- Type Parameters:
T
- type of the items the sink accepts
-
files
Convenience forfilesBuilder(java.lang.String)
with the UTF-8 charset and with overwriting of existing files. -
json
Convenience forfilesBuilder(java.lang.String)
with the UTF-8 charset and with overwriting of existing files. The sink converts each item to a JSON string adds it to the file as a line. -
logger
Returns a sink that logs all the data items it receives, at the INFO level to the log categoryWriteLoggerP
. It also logswatermark
items, but at FINE level.The sink logs each item on whichever cluster member it happens to receive it. Its primary purpose is for development, when running Jet on a local machine.
The default local parallelism for this sink is 1.
- Type Parameters:
T
- stream item type- Parameters:
toStringFn
- a function that returns a string representation of a stream item. It must be stateless and cooperative.
-
logger
-
noop
Returns a sink which discards all received items. -
jmsQueue
@Nonnull public static <T> Sink<T> jmsQueue(@Nonnull String queueName, @Nonnull SupplierEx<jakarta.jms.ConnectionFactory> factorySupplier) Convenience forjmsQueueBuilder(SupplierEx)
. Creates a connection without any authentication parameters. If a received item is not an instance ofjakarta.jms.Message
, the sink wrapsitem.toString()
into aTextMessage
.- Parameters:
queueName
- the name of the queuefactorySupplier
- supplier to obtain JMS connection factory. It must be stateless.
-
jmsQueueBuilder
@Nonnull public static <T> JmsSinkBuilder<T> jmsQueueBuilder(@Nonnull SupplierEx<jakarta.jms.ConnectionFactory> factorySupplier) Returns a builder object that offers a step-by-step fluent API to build a custom JMS queue sink for the Pipeline API. See javadoc forJmsSinkBuilder
methods for more details.In exactly-once mode the processor uses two-phase XA transactions to guarantee exactly-once delivery. The supplier is expected to return an
XAConnectionFactory
. The transaction is committed after all processors finished processing the items and stored all data to the snapshot. Processor is also able to finish the commit after a restart, should the job fail mid-way of the commit process. This mode significantly increases latency because produced messages are visible only after they are committed; if you want to avoid it, you can reduce the guarantee just for this sink. To do so callexactlyOnce(false)
on the returned builder. If your messages have a unique identifier, some JMS brokers have deduplication functionality - you can use this to avoid the latency penalty.In at-least-once mode or when the guarantee is off, the produced records are acknowledged immediately. We use transactions to produce messages in batches, but those transactions have very short duration.
IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.
Test the XA support of your broker
The JMS is an API, some brokers don't implement the XA transactions correctly. We run our stress tests with RabbitMQ and ActiveMQ. The most common flaw is that a prepared transaction is rolled back if the client disconnects. To check your broker, you can run the code in https://github.com/hazelcast/hazelcast-jet-contrib/xa-test
Notes
The default local parallelism for this processor is 1.
- Type Parameters:
T
- type of the items the sink accepts- Parameters:
factorySupplier
- supplier to obtain JMS connection factory. For exactly-once the factory must implementXAConnectionFactory
. It must be stateless.
-
jmsTopic
@Nonnull public static <T> Sink<T> jmsTopic(@Nonnull String topicName, @Nonnull SupplierEx<jakarta.jms.ConnectionFactory> factorySupplier) Shortcut for:jmsTopicBuilder(factorySupplier) .destinationName(topicName) .build();
See
jmsTopicBuilder(SupplierEx)
for more details.- Parameters:
topicName
- the name of the queuefactorySupplier
- supplier to obtain JMS connection factory. For exactly-once the factory must implementXAConnectionFactory
. It must be stateless.
-
jmsTopicBuilder
@Nonnull public static <T> JmsSinkBuilder<T> jmsTopicBuilder(@Nonnull SupplierEx<jakarta.jms.ConnectionFactory> factorySupplier) Returns a builder object that offers a step-by-step fluent API to build a custom JMS topic sink for the Pipeline API. See javadoc onJmsSinkBuilder
methods for more details.In exactly-once mode the processor uses two-phase XA transactions to guarantee exactly-once delivery. The supplier is expected to return an
XAConnectionFactory
. The transaction is committed after all processors finished processing the items and stored all data to the snapshot. Processor is also able to finish the commit after a restart, should the job fail mid-way of the commit process. This mode significantly increases latency because produced messages are visible only after they are committed; if you want to avoid it, you can reduce the guarantee just for this sink. To do so callexactlyOnce(false)
on the returned builder.In at-least-once mode or when the guarantee is off, the produced records are acknowledged immediately. We use transactions to produce messages in batches, but those transactions have very short duration.
IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.
Test the XA support of your broker
The JMS is an API, some brokers don't implement the XA transactions correctly. We run our stress tests with RabbitMQ and ActiveMQ. The most common flaw is that a prepared transaction is rolled back if the client disconnects. To check your broker, you can run the code in https://github.com/hazelcast/hazelcast-jet-contrib/xa-test
Notes
The default local parallelism for this processor is 1.
- Type Parameters:
T
- type of the items the sink accepts- Parameters:
factorySupplier
- supplier to obtain JMS connection factory. It must be stateless.
-
jdbc
@Nonnull public static <T> Sink<T> jdbc(@Nonnull String updateQuery, @Nonnull SupplierEx<? extends CommonDataSource> dataSourceSupplier, @Nonnull BiConsumerEx<PreparedStatement, T> bindFn) A shortcut for:Sinks.<T>jdbcBuilder() .updateQuery(updateQuery) .dataSourceSupplier(dataSourceSupplier) .bindFn(bindFn) .build();
See
jdbcBuilder()
for more information. -
jdbc
@Nonnull @Beta public static <T> Sink<T> jdbc(@Nonnull String updateQuery, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull BiConsumerEx<PreparedStatement, T> bindFn) A shortcut for:Sinks.<T>jdbcBuilder() .updateQuery(updateQuery) .dataConnectionRef(dataConnectionRef) .bindFn(bindFn) .build();
See
jdbcBuilder()
for more information.- Since:
- 5.2
-
jdbc
@Nonnull public static <T> Sink<T> jdbc(@Nonnull String updateQuery, @Nonnull String jdbcUrl, @Nonnull BiConsumerEx<PreparedStatement, T> bindFn) A shortcut for:jdbcBuilder(updateQuery, bindFn) .jdbcUrl(jdbcUrl) .build()
See
jdbcBuilder()
for more information. -
jdbcBuilder
Returns a builder to build a sink that connects to a JDBC database, prepares an SQL statement and executes it for each item. On the returned builder you must specify a connection (either using a JDBC URL or using a datasource or using a dataConnection), the SQL statement and a bind function.Example
stage.writeTo(Sinks.<Entry<Integer, String>>jdbcBuilder() .updateQuery("INSERT INTO table (key, value) VALUES(?, ?)") .bindFn((stmt, item) -> { stmt.setInt(1, item.getKey()); stmt.setString(2, item.getValue()); }) .jdbcUrl("jdbc:...") .build());
Commit behavior
The commit behavior depends on the job guarantee:
- Exactly-once: XA transactions will be used to commit the work in phase two of the snapshot, that is after all other vertices in the job have performed the snapshot. Very small state will be saved to snapshot.
- At-least-once or no guarantee: Records will be committed in batches. A batch is created from records that are readily available at the sink.
If the job is in exactly-once mode, the overhead in the database and the output latency are higher. This is caused by the fact that Jet will not commit the transaction until the next snapshot occurs and the number of uncommitted records in the transactions can be very high. Latency is high because the changes are visible only after the transactions are committed. Configure the snapshot interval accordingly.
If your driver doesn't support XA transactions or if you want to avoid the performance or latency penalty, you can decrease the guarantee just for this sink by calling
exactlyOnce(false)
on the returned builder.Notes
In non-XA mode, in case of an
SQLException
the processor will transparently reconnect and the job won't fail, except for anSQLNonTransientException
subclass. In XA mode the job will fail immediately.Test the XA support of your database
The JDBC is an API, some brokers don't implement the XA transactions correctly. We run our stress tests with PostgreSQL. The most common flaw is that a prepared transaction is rolled back if the client disconnects. To check your database, you can run the code in https://github.com/hazelcast/hazelcast-jet-contrib/xa-test
Notes
The default local parallelism for this sink is 1.
- Type Parameters:
T
- type of the items the sink accepts- Since:
- Jet 4.1
-
observable
Returns a sink that publishes to theObservable
with the provided name. The records that are sent to the observable can be read through first getting a handle to it throughJetService.getObservable(String)
and then subscribing to the events using the methods onObservable
.The
Observable
should be destroyed after using it. For the full description seethe javadoc for Observable
. Example:
This sink is cooperative and uses a local parallelism of 1.Observable<Integer> observable = jet.newObservable(); CompletableFuture<List<Integer>> list = observable.toFuture(o -> o.collect(toList())); pipeline.readFrom(TestSources.items(1, 2, 3, 4)) .writeTo(Sinks.observable(observable)); Job job = jet.newJob(pipeline); System.out.println(list.get()); observable.destroy();
- Since:
- Jet 4.0
-
observable
Returns a sink that publishes to the providedObservable
. More precisely, it takes the name of the givenObservable
and then independently retrieves anObservable
with the same name from the cluster where the job is running. To prevent surprising behavior, make sure you have obtained theObservable
from the same cluster to which you will submit the pipeline.For more details refer to
observable(name)
.- Since:
- Jet 4.0
-