public final class Sinks extends Object
SinkStage
and accepts no downstream stages.
The default local parallelism for the sinks in this class is typically 1, check the documentation of individual methods.
Modifier and Type | Method and Description |
---|---|
static <T extends Map.Entry> |
cache(String cacheName)
Returns a sink that puts
Map.Entry s it receives into a Hazelcast
ICache with the specified name. |
static <T> Sink<T> |
files(String directoryName)
Convenience for
filesBuilder(java.lang.String) with the UTF-8 charset and with
overwriting of existing files. |
static <T> FileSinkBuilder<T> |
filesBuilder(String directoryName)
Returns a builder object that offers a step-by-step fluent API to build
a custom file sink for the Pipeline API.
|
static <T> Sink<T> |
fromProcessor(String sinkName,
ProcessorMetaSupplier metaSupplier)
Returns a sink constructed directly from the given Core API processor
meta-supplier.
|
static <T> Sink<T> |
fromProcessor(String sinkName,
ProcessorMetaSupplier metaSupplier,
FunctionEx<? super T,?> partitionKeyFn)
Returns a sink constructed directly from the given Core API processor
meta-supplier.
|
static <T> Sink<T> |
jdbc(String updateQuery,
DataConnectionRef dataConnectionRef,
BiConsumerEx<PreparedStatement,T> bindFn)
A shortcut for:
|
static <T> Sink<T> |
jdbc(String updateQuery,
String jdbcUrl,
BiConsumerEx<PreparedStatement,T> bindFn)
A shortcut for:
|
static <T> Sink<T> |
jdbc(String updateQuery,
SupplierEx<? extends CommonDataSource> dataSourceSupplier,
BiConsumerEx<PreparedStatement,T> bindFn)
A shortcut for:
|
static <T> JdbcSinkBuilder<T> |
jdbcBuilder()
Returns a builder to build a sink that connects to a JDBC database,
prepares an SQL statement and executes it for each item.
|
static <T> Sink<T> |
jmsQueue(String queueName,
SupplierEx<javax.jms.ConnectionFactory> factorySupplier)
Convenience for
jmsQueueBuilder(SupplierEx) . |
static <T> JmsSinkBuilder<T> |
jmsQueueBuilder(SupplierEx<javax.jms.ConnectionFactory> factorySupplier)
Returns a builder object that offers a step-by-step fluent API to build
a custom JMS queue sink for the Pipeline API.
|
static <T> Sink<T> |
jmsTopic(String topicName,
SupplierEx<javax.jms.ConnectionFactory> factorySupplier)
Shortcut for:
|
static <T> JmsSinkBuilder<T> |
jmsTopicBuilder(SupplierEx<javax.jms.ConnectionFactory> factorySupplier)
Returns a builder object that offers a step-by-step fluent API to build
a custom JMS topic sink for the Pipeline API.
|
static <T> Sink<T> |
json(String directoryName)
Convenience for
filesBuilder(java.lang.String) with the UTF-8 charset and with
overwriting of existing files. |
static <T> Sink<T> |
list(IList<? super T> list)
Returns a sink that adds the items it receives to the specified
Hazelcast
IList . |
static <T> Sink<T> |
list(String listName)
Returns a sink that adds the items it receives to a Hazelcast
IList with the specified name. |
static <T> Sink<T> |
logger()
|
static <T> Sink<T> |
logger(FunctionEx<? super T,String> toStringFn)
Returns a sink that logs all the data items it receives, at the INFO
level to the log category
WriteLoggerP . |
static <K,V> Sink<Map.Entry<K,V>> |
map(IMap<? super K,? super V> map)
Returns a sink that puts
Map.Entry s it receives into the given
Hazelcast IMap . |
static <T,K,V> Sink<T> |
map(IMap<? super K,? super V> map,
FunctionEx<? super T,? extends K> toKeyFn,
FunctionEx<? super T,? extends V> toValueFn)
Returns a sink that uses the supplied functions to extract the key
and value with which to put to given Hazelcast
IMap . |
static <K,V> Sink<Map.Entry<K,V>> |
map(String mapName)
Returns a sink that puts
Map.Entry s it receives into a Hazelcast
IMap with the specified name. |
static <T,K,V> Sink<T> |
map(String mapName,
FunctionEx<? super T,? extends K> toKeyFn,
FunctionEx<? super T,? extends V> toValueFn)
Returns a sink that uses the supplied functions to extract the key
and value with which to put to a Hazelcast
IMap with the
specified name. |
static <T,K,V,R> Sink<T> |
mapWithEntryProcessor(IMap<? super K,? super V> map,
FunctionEx<? super T,? extends K> toKeyFn,
FunctionEx<? super T,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
Returns a sink that uses the items it receives to create
EntryProcessor s it submits to a Hazelcast IMap with the
specified name. |
static <E,K,V,R> Sink<E> |
mapWithEntryProcessor(int maxParallelAsyncOps,
String mapName,
FunctionEx<? super E,? extends K> toKeyFn,
FunctionEx<? super E,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
Returns a sink that uses the items it receives to create
EntryProcessor s it submits to a Hazelcast IMap with the
specified name. |
static <E,K,V,R> Sink<E> |
mapWithEntryProcessor(String mapName,
FunctionEx<? super E,? extends K> toKeyFn,
FunctionEx<? super E,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
Convenience for
mapWithEntryProcessor(int, String, FunctionEx, FunctionEx)
when the maximum number of async operations is not specified. |
static <T,K,V> Sink<T> |
mapWithMerging(IMap<? super K,? super V> map,
FunctionEx<? super T,? extends K> toKeyFn,
FunctionEx<? super T,? extends V> toValueFn,
BinaryOperatorEx<V> mergeFn)
Returns a sink that uses the supplied functions to extract the key
and value with which to update a Hazelcast
IMap . |
static <K,V> Sink<Map.Entry<K,V>> |
mapWithMerging(IMap<? super K,V> map,
BinaryOperatorEx<V> mergeFn)
Convenience for
mapWithMerging(IMap, FunctionEx, FunctionEx,
BinaryOperatorEx) with Map.Entry as input item. |
static <K,V> Sink<Map.Entry<K,V>> |
mapWithMerging(String mapName,
BinaryOperatorEx<V> mergeFn)
Convenience for
mapWithMerging(String, FunctionEx, FunctionEx,
BinaryOperatorEx) with Map.Entry as input item. |
static <T,K,V> Sink<T> |
mapWithMerging(String mapName,
FunctionEx<? super T,? extends K> toKeyFn,
FunctionEx<? super T,? extends V> toValueFn,
BinaryOperatorEx<V> mergeFn)
Returns a sink that uses the supplied functions to extract the key
and value with which to update a Hazelcast
IMap . |
static <K,V,E extends Map.Entry<K,V>> |
mapWithUpdating(IMap<? super K,? super V> map,
BiFunctionEx<? super V,? super E,? extends V> updateFn)
Convenience for
mapWithUpdating(IMap, FunctionEx,
BiFunctionEx) with Map.Entry as the input item. |
static <T,K,V> Sink<T> |
mapWithUpdating(IMap<? super K,? super V> map,
FunctionEx<? super T,? extends K> toKeyFn,
BiFunctionEx<? super V,? super T,? extends V> updateFn)
Returns a sink that uses the supplied key-extracting and value-updating
functions to update a Hazelcast
IMap . |
static <K,V,E extends Map.Entry<K,V>> |
mapWithUpdating(String mapName,
BiFunctionEx<? super V,? super E,? extends V> updateFn)
Convenience for
mapWithUpdating(String, FunctionEx,
BiFunctionEx) with Map.Entry as the input item. |
static <T,K,V> Sink<T> |
mapWithUpdating(String mapName,
FunctionEx<? super T,? extends K> toKeyFn,
BiFunctionEx<? super V,? super T,? extends V> updateFn)
Returns a sink that uses the supplied key-extracting and value-updating
functions to update a Hazelcast
IMap . |
static <T> Sink<T> |
noop()
Returns a sink which discards all received items.
|
static <T> Sink<T> |
observable(Observable<? super T> observable)
Returns a sink that publishes to the provided
Observable . |
static <T> Sink<T> |
observable(String name)
Returns a sink that publishes to the
Observable with the
provided name. |
static <T> Sink<T> |
reliableTopic(ITopic<Object> reliableTopic)
Returns a sink which publishes the items it receives to the provided
distributed reliable topic.
|
static <T> Sink<T> |
reliableTopic(String reliableTopicName)
Returns a sink which publishes the items it receives to a distributed
reliable topic with the specified name.
|
static <T extends Map.Entry> |
remoteCache(String cacheName,
ClientConfig clientConfig)
Returns a sink that puts
Map.Entry s it receives into a Hazelcast
ICache with the specified name in a remote cluster identified by
the supplied ClientConfig . |
static <T> Sink<T> |
remoteList(String listName,
ClientConfig clientConfig)
Returns a sink that adds the items it receives to a Hazelcast
IList with the specified name in a remote cluster identified by the
supplied ClientConfig . |
static <K,V> Sink<Map.Entry<K,V>> |
remoteMap(String mapName,
ClientConfig clientConfig)
Returns a sink that puts
Map.Entry s it receives into a Hazelcast
IMap with the specified name in a remote cluster identified by
the supplied ClientConfig . |
static <T,K,V> Sink<T> |
remoteMap(String mapName,
ClientConfig clientConfig,
FunctionEx<? super T,? extends K> toKeyFn,
FunctionEx<? super T,? extends V> toValueFn)
Returns a sink that uses the supplied functions to extract the key
and value with which to put to a Hazelcast
IMap in a remote
cluster identified by the supplied ClientConfig . |
static <E,K,V,R> Sink<E> |
remoteMapWithEntryProcessor(String mapName,
ClientConfig clientConfig,
FunctionEx<? super E,? extends K> toKeyFn,
FunctionEx<? super E,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
Returns a sink equivalent to
mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>) , but for a map
in a remote Hazelcast cluster identified by the supplied ClientConfig . |
static <K,V> Sink<Map.Entry<K,V>> |
remoteMapWithMerging(String mapName,
ClientConfig clientConfig,
BinaryOperatorEx<V> mergeFn)
|
static <T,K,V> Sink<T> |
remoteMapWithMerging(String mapName,
ClientConfig clientConfig,
FunctionEx<? super T,? extends K> toKeyFn,
FunctionEx<? super T,? extends V> toValueFn,
BinaryOperatorEx<V> mergeFn)
Returns a sink equivalent to
mapWithMerging(String, BinaryOperatorEx) ,
but for a map in a remote Hazelcast cluster identified by the supplied
ClientConfig . |
static <K,V,E extends Map.Entry<K,V>> |
remoteMapWithUpdating(String mapName,
ClientConfig clientConfig,
BiFunctionEx<? super V,? super E,? extends V> updateFn)
|
static <T,K,V> Sink<T> |
remoteMapWithUpdating(String mapName,
ClientConfig clientConfig,
FunctionEx<? super T,? extends K> toKeyFn,
BiFunctionEx<? super V,? super T,? extends V> updateFn)
Returns a sink equivalent to
mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>) , but for a map
in a remote Hazelcast cluster identified by the supplied ClientConfig . |
static <T> Sink<T> |
remoteReliableTopic(String reliableTopicName,
ClientConfig clientConfig)
Returns a sink which publishes the items it receives to a distributed
reliable topic with the provided name in a remote cluster identified by
the supplied
ClientConfig . |
static <T> Sink<T> |
socket(String host,
int port)
Convenience for
socket(String, int, FunctionEx,
Charset) with Object.toString as the conversion function and
UTF-8 as the charset. |
static <T> Sink<T> |
socket(String host,
int port,
FunctionEx<? super T,? extends String> toStringFn)
Convenience for
socket(String, int, FunctionEx,
Charset) with UTF-8 as the charset. |
static <T> Sink<T> |
socket(String host,
int port,
FunctionEx<? super T,? extends String> toStringFn,
Charset charset)
Returns a sink that connects to the specified TCP socket and writes to
it a string representation of the items it receives.
|
@Nonnull public static <T> Sink<T> fromProcessor(@Nonnull String sinkName, @Nonnull ProcessorMetaSupplier metaSupplier)
The default local parallelism for this source is specified inside the
metaSupplier
.
sinkName
- user-friendly sink namemetaSupplier
- the processor meta-supplier@Nonnull public static <T> Sink<T> fromProcessor(@Nonnull String sinkName, @Nonnull ProcessorMetaSupplier metaSupplier, @Nullable FunctionEx<? super T,?> partitionKeyFn)
The default local parallelism for this source is specified inside the
metaSupplier
.
sinkName
- user-friendly sink namemetaSupplier
- the processor meta-supplierpartitionKeyFn
- key extractor function for partitioning edges to
sink. It must be stateless and cooperative.@Nonnull public static <K,V> Sink<Map.Entry<K,V>> map(@Nonnull String mapName)
Map.Entry
s it receives into a Hazelcast
IMap
with the specified name.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
@Nonnull public static <K,V> Sink<Map.Entry<K,V>> map(@Nonnull IMap<? super K,? super V> map)
Map.Entry
s it receives into the given
Hazelcast IMap
.
NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
@Nonnull public static <T,K,V> Sink<T> map(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn)
IMap
with the
specified name.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
@Nonnull public static <T,K,V> Sink<T> map(@Nonnull IMap<? super K,? super V> map, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn)
IMap
.
NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
@Nonnull public static <K,V> Sink<Map.Entry<K,V>> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig)
Map.Entry
s it receives into a Hazelcast
IMap
with the specified name in a remote cluster identified by
the supplied ClientConfig
.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
@Nonnull public static <T,K,V> Sink<T> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn)
IMap
in a remote
cluster identified by the supplied ClientConfig
.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
@Nonnull public static <T,K,V> Sink<T> mapWithMerging(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn, @Nonnull BinaryOperatorEx<V> mergeFn)
IMap
. If the map
already contains the key, it applies the given mergeFn
to
resolve the existing and the proposed value into the value to use. If
the value comes out as null
, it removes the key from the map.
Expressed as code, the sink performs the equivalent of the following for
each item:
K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = toValueFn.apply(item); V resolved = (oldValue == null) ? newValue : mergeFn.apply(oldValue, newValue); if (value == null) map.remove(key); else map.put(key, value);This sink supports exactly-once processing only if the supplied merge function performs idempotent updates, i.e., it satisfies the rule
mergeFn.apply(oldValue, toValueFn.apply(e)).equals(oldValue)
for any e
that was already observed.
Note: This operation is NOT lock-aware, it will process the
entries no matter if they are locked or not. Use mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
if you need locking.
The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
T
- input item typeK
- key typeV
- value typemapName
- name of the maptoKeyFn
- function that extracts the key from the input itemtoValueFn
- function that extracts the value from the input itemmergeFn
- function that merges the existing value with the value acquired from the
received item@Nonnull public static <T,K,V> Sink<T> mapWithMerging(@Nonnull IMap<? super K,? super V> map, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn, @Nonnull BinaryOperatorEx<V> mergeFn)
IMap
. If the map
already contains the key, it applies the given mergeFn
to
resolve the existing and the proposed value into the value to use. If
the value comes out as null
, it removes the key from the map.
Expressed as code, the sink performs the equivalent of the following for
each item:
K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = toValueFn.apply(item); V resolved = (oldValue == null) ? newValue : mergeFn.apply(oldValue, newValue); if (value == null) map.remove(key); else map.put(key, value);NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
This sink supports exactly-once processing only if the
supplied merge function performs idempotent updates, i.e.,
it satisfies the rule
mergeFn.apply(oldValue, toValueFn.apply(e)).equals(oldValue)
for any e
that was already observed.
Note: This operation is NOT lock-aware, it will process the
entries no matter if they are locked or not. Use mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
if you need locking.
The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
T
- input item typeK
- key typeV
- value typemap
- the map to drain totoKeyFn
- function that extracts the key from the input itemtoValueFn
- function that extracts the value from the input itemmergeFn
- function that merges the existing value with the value acquired from the
received item@Nonnull public static <T,K,V> Sink<T> remoteMapWithMerging(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn, @Nonnull BinaryOperatorEx<V> mergeFn)
mapWithMerging(String, BinaryOperatorEx)
,
but for a map in a remote Hazelcast cluster identified by the supplied
ClientConfig
.
Due to the used API, the remote cluster must be at least version 4.0.
@Nonnull public static <K,V> Sink<Map.Entry<K,V>> mapWithMerging(@Nonnull String mapName, @Nonnull BinaryOperatorEx<V> mergeFn)
mapWithMerging(String, FunctionEx, FunctionEx,
BinaryOperatorEx)
with Map.Entry
as input item.@Nonnull public static <K,V> Sink<Map.Entry<K,V>> mapWithMerging(@Nonnull IMap<? super K,V> map, @Nonnull BinaryOperatorEx<V> mergeFn)
mapWithMerging(IMap, FunctionEx, FunctionEx,
BinaryOperatorEx)
with Map.Entry
as input item.@Nonnull public static <K,V> Sink<Map.Entry<K,V>> remoteMapWithMerging(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull BinaryOperatorEx<V> mergeFn)
@Nonnull public static <T,K,V> Sink<T> mapWithUpdating(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V,? super T,? extends V> updateFn)
IMap
. For each item it receives, it
applies toKeyFn
to get the key and then applies updateFn
to
the existing value in the map and the received item to acquire the new
value to associate with the key. If the new value is null
, it
removes the key from the map. Expressed as code, the sink performs the
equivalent of the following for each item:
K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = updateFn.apply(oldValue, item); if (newValue == null) map.remove(key); else map.put(key, newValue);This sink supports exactly-once processing only if the supplied update function performs idempotent updates, i.e., it satisfies the rule
updateFn.apply(v, e).equals(v)
for any
e
that was already observed.
Note: This operation is NOT lock-aware, it will process the entries
no matter if they are locked or not.
Use mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
if you need locking.
The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
T
- input item typeK
- key typeV
- value typemapName
- name of the maptoKeyFn
- function that extracts the key from the input itemupdateFn
- function that receives the existing map value and the item
and returns the new map value@Nonnull public static <T,K,V> Sink<T> mapWithUpdating(@Nonnull IMap<? super K,? super V> map, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V,? super T,? extends V> updateFn)
IMap
. For each item it receives, it
applies toKeyFn
to get the key and then applies updateFn
to
the existing value in the map and the received item to acquire the new
value to associate with the key. If the new value is null
, it
removes the key from the map. Expressed as code, the sink performs the
equivalent of the following for each item:
K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = updateFn.apply(oldValue, item); if (newValue == null) map.remove(key); else map.put(key, newValue);NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
This sink supports exactly-once processing only if the supplied update
function performs idempotent updates, i.e., it satisfies the rule
updateFn.apply(v, e).equals(v)
for any e
that was
already observed.
Note: This operation is not lock-aware, it will process the entries
even if they are locked. Use mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
if you need
locking.
The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
T
- input item typeK
- key typeV
- value typemap
- map to drain totoKeyFn
- function that extracts the key from the input itemupdateFn
- function that receives the existing map value and the item
and returns the new map value@Nonnull public static <T,K,V> Sink<T> remoteMapWithUpdating(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V,? super T,? extends V> updateFn)
mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
, but for a map
in a remote Hazelcast cluster identified by the supplied ClientConfig
.
Due to the used API, the remote cluster must be at least version 4.0.
@Nonnull public static <K,V,E extends Map.Entry<K,V>> Sink<E> mapWithUpdating(@Nonnull String mapName, @Nonnull BiFunctionEx<? super V,? super E,? extends V> updateFn)
mapWithUpdating(String, FunctionEx,
BiFunctionEx)
with Map.Entry
as the input item.@Nonnull public static <K,V,E extends Map.Entry<K,V>> Sink<E> mapWithUpdating(@Nonnull IMap<? super K,? super V> map, @Nonnull BiFunctionEx<? super V,? super E,? extends V> updateFn)
mapWithUpdating(IMap, FunctionEx,
BiFunctionEx)
with Map.Entry
as the input item.@Nonnull public static <K,V,E extends Map.Entry<K,V>> Sink<E> remoteMapWithUpdating(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull BiFunctionEx<? super V,? super E,? extends V> updateFn)
@Nonnull public static <E,K,V,R> Sink<E> mapWithEntryProcessor(@Nonnull String mapName, @Nonnull FunctionEx<? super E,? extends K> toKeyFn, @Nonnull FunctionEx<? super E,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
mapWithEntryProcessor(int, String, FunctionEx, FunctionEx)
when the maximum number of async operations is not specified.@Nonnull public static <E,K,V,R> Sink<E> mapWithEntryProcessor(int maxParallelAsyncOps, @Nonnull String mapName, @Nonnull FunctionEx<? super E,? extends K> toKeyFn, @Nonnull FunctionEx<? super E,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
EntryProcessor
s it submits to a Hazelcast IMap
with the
specified name. For each received item it applies toKeyFn
to
get the key and toEntryProcessorFn
to get the entry processor,
and then submits the key and the entry processor to the Hazelcast
cluster, which will internally apply the entry processor to the key.
As opposed to mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
and mapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>)
,
this sink does not use batching and submits a separate entry processor
for each received item. For use cases that are efficiently solvable
using those sinks, this one will perform worse. It should be used only
when they are not applicable.
If your entry processors take a long time to update a value, consider
using entry processors that implement Offloadable
. This will
avoid blocking the Hazelcast partition thread during large update
operations.
This sink supports exactly-once processing only if the supplied entry processor performs idempotent updates, i.e., the resulting value would be the same if an entry processor was run on the same entry more than once.
Note: Unlike mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
and mapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>)
,
this operation is lock-aware. If the key is locked,
the EntryProcessor will wait until it acquires the lock.
The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
E
- input item typeK
- key typeV
- value typemaxParallelAsyncOps
- maximum number of simultaneous entry
processors affecting the mapmapName
- name of the maptoKeyFn
- function that extracts the key from the input itemtoEntryProcessorFn
- function that returns the EntryProcessor
to apply to the key@Nonnull public static <T,K,V,R> Sink<T> mapWithEntryProcessor(@Nonnull IMap<? super K,? super V> map, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
EntryProcessor
s it submits to a Hazelcast IMap
with the
specified name. For each received item it applies toKeyFn
to
get the key and toEntryProcessorFn
to get the entry processor,
and then submits the key and the entry processor to the Hazelcast
cluster, which will internally apply the entry processor to the key.
NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
As opposed to mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
and mapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>)
,
this sink does not use batching and submits a separate entry processor
for each received item. For use cases that are efficiently solvable
using those sinks, this one will perform worse. It should be used only
when they are not applicable.
If your entry processors take a long time to update a value, consider
using entry processors that implement Offloadable
. This will
avoid blocking the Hazelcast partition thread during large update
operations.
This sink supports exactly-once processing only if the supplied entry processor performs idempotent updates, i.e., the resulting value would be the same if an entry processor was run on the same entry more than once.
Note: Unlike mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
and mapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>)
,
this operation is lock-aware. If the key is locked,
the EntryProcessor will wait until it acquires the lock.
The default local parallelism for this sink is 1.
The given functions must be stateless and cooperative.
T
- input item typeK
- key typeV
- value typemap
- map to drain totoKeyFn
- function that extracts the key from the input itemtoEntryProcessorFn
- function that returns the EntryProcessor
to apply to the key@Nonnull public static <E,K,V,R> Sink<E> remoteMapWithEntryProcessor(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super E,? extends K> toKeyFn, @Nonnull FunctionEx<? super E,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
, but for a map
in a remote Hazelcast cluster identified by the supplied ClientConfig
.@Nonnull public static <T extends Map.Entry> Sink<T> cache(@Nonnull String cacheName)
Map.Entry
s it receives into a Hazelcast
ICache
with the specified name.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 2.
@Nonnull public static <T extends Map.Entry> Sink<T> remoteCache(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig)
Map.Entry
s it receives into a Hazelcast
ICache
with the specified name in a remote cluster identified by
the supplied ClientConfig
.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 2.
@Nonnull public static <T> Sink<T> list(@Nonnull String listName)
IList
with the specified name.
No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
@Nonnull public static <T> Sink<T> list(@Nonnull IList<? super T> list)
IList
.
NOTE: Jet only remembers the name of the list you supply and acquires a list with that name on the local cluster. If you supply a list instance from another cluster, no error will be thrown to indicate this.
No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
@Nonnull public static <T> Sink<T> remoteList(@Nonnull String listName, @Nonnull ClientConfig clientConfig)
IList
with the specified name in a remote cluster identified by the
supplied ClientConfig
.
No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
@Nonnull public static <T> Sink<T> reliableTopic(@Nonnull String reliableTopicName)
No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
Local parallelism for this sink is 1.
@Nonnull public static <T> Sink<T> reliableTopic(@Nonnull ITopic<Object> reliableTopic)
ITopic
and then independently retrieves the ITopic
with the same name from the cluster where the job is running. To
prevent surprising behavior, make sure you have obtained the ITopic
from the same cluster to which you will submit the pipeline.
No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
Local parallelism for this sink is 1.
@Nonnull public static <T> Sink<T> remoteReliableTopic(@Nonnull String reliableTopicName, @Nonnull ClientConfig clientConfig)
ClientConfig
.
No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
Local parallelism for this sink is 1.
@Nonnull public static <T> Sink<T> socket(@Nonnull String host, int port, @Nonnull FunctionEx<? super T,? extends String> toStringFn, @Nonnull Charset charset)
toStringFn
function and encodes the string using the supplied Charset
. It
follows each item with a newline character.
No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
host
- the host to connect toport
- the target porttoStringFn
- a function to convert received items to string. It
must be stateless and cooperative.charset
- charset used to convert the string to bytes@Nonnull public static <T> Sink<T> socket(@Nonnull String host, int port, @Nonnull FunctionEx<? super T,? extends String> toStringFn)
socket(String, int, FunctionEx,
Charset)
with UTF-8 as the charset.@Nonnull public static <T> Sink<T> socket(@Nonnull String host, int port)
socket(String, int, FunctionEx,
Charset)
with Object.toString
as the conversion function and
UTF-8 as the charset.@Nonnull public static <T> FileSinkBuilder<T> filesBuilder(@Nonnull String directoryName)
FileSinkBuilder
for more details.
The sink writes the items it receives to files. Each processor will write to its own files whose names contain the processor's global index (an integer unique to each processor of the vertex), but the same directory is used for all files, on all cluster members. That directory can be a shared in a network - each processor creates globally unique file names.
Because Jet starts a new file each time it snapshots the state, the sink
will produce many more small files, depending on the snapshot interval.
If you want to avoid the temporary files or the high number of files but
need to have exactly-once for other processors in the job, call exactlyOnce(false)
on the returned
builder. This will give you at-least-once guarantee for the
source and unchanged guarantee for other processors.
For the fault-tolerance to work, the target file system must be a network file system. If you lose a member with its files, you'll obviously lose data. Even if that member rejoins later with the lost files, the job might have processed more transactions on the remaining members and will not commit the temporary files on the resurrected member.
[<date>-]<global processor index>[-<sequence>][".tmp"]
Description (parts in []
are optional):
<date>
: the current date and time, see FileSinkBuilder.rollByDate(String)
. Not present if rolling by
date is not used
<global processor index>
: a processor index ensuring
that each parallel processor writes to its own file
<sequence>
: a sequence number starting from 0. Used if
either:<date>
changes.
".tmp"
: the FileSinkBuilder.TEMP_FILE_SUFFIX
,
used if the file is not yet committed
For performance, the processor doesn't delete old files from the directory. If you have frequent snapshots, you should delete the old files from time to time to avoid having huge number of files in the directory. Jet lists the files in the directory after a restart to find out the sequence number to use.
The default local parallelism for this sink is 1.
T
- type of the items the sink accepts@Nonnull public static <T> Sink<T> files(@Nonnull String directoryName)
filesBuilder(java.lang.String)
with the UTF-8 charset and with
overwriting of existing files.@Nonnull public static <T> Sink<T> json(@Nonnull String directoryName)
filesBuilder(java.lang.String)
with the UTF-8 charset and with
overwriting of existing files. The sink converts each item to a JSON
string adds it to the file as a line.@Nonnull public static <T> Sink<T> logger(@Nonnull FunctionEx<? super T,String> toStringFn)
WriteLoggerP
. It also logs watermark
items, but at FINE level.
The sink logs each item on whichever cluster member it happens to receive it. Its primary purpose is for development, when running Jet on a local machine.
The default local parallelism for this sink is 1.
T
- stream item typetoStringFn
- a function that returns a string representation of a
stream item. It must be stateless and cooperative.@Nonnull public static <T> Sink<T> jmsQueue(@Nonnull String queueName, @Nonnull SupplierEx<javax.jms.ConnectionFactory> factorySupplier)
jmsQueueBuilder(SupplierEx)
. Creates a
connection without any authentication parameters. If a received item is
not an instance of javax.jms.Message
, the sink wraps item.toString()
into a TextMessage
.queueName
- the name of the queuefactorySupplier
- supplier to obtain JMS connection factory. It
must be stateless.@Nonnull public static <T> JmsSinkBuilder<T> jmsQueueBuilder(@Nonnull SupplierEx<javax.jms.ConnectionFactory> factorySupplier)
JmsSinkBuilder
methods for more details.
In exactly-once mode the processor uses two-phase XA transactions
to guarantee exactly-once delivery. The supplier is expected to return
an XAConnectionFactory
. The transaction is committed
after all processors finished processing the items and stored all data
to the snapshot. Processor is also able to finish the commit after a
restart, should the job fail mid-way of the commit process. This mode
significantly increases latency because produced messages are visible
only after they are committed; if you want to avoid it, you can reduce
the guarantee just for this sink. To do so call exactlyOnce(false)
on the returned
builder. If your messages have a unique identifier, some JMS brokers
have deduplication functionality - you can use this to avoid the latency
penalty.
In at-least-once mode or when the guarantee is off, the produced records are acknowledged immediately. We use transactions to produce messages in batches, but those transactions have very short duration.
IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.
Test the XA support of your broker
The JMS is an API, some brokers don't implement the XA transactions correctly. We run our stress tests with RabbitMQ and ActiveMQ. The most common flaw is that a prepared transaction is rolled back if the client disconnects. To check your broker, you can run the code in https://github.com/hazelcast/hazelcast-jet-contrib/xa-test
Notes
The default local parallelism for this processor is 1.
T
- type of the items the sink acceptsfactorySupplier
- supplier to obtain JMS connection factory. For
exactly-once the factory must implement XAConnectionFactory
. It must be stateless.@Nonnull public static <T> Sink<T> jmsTopic(@Nonnull String topicName, @Nonnull SupplierEx<javax.jms.ConnectionFactory> factorySupplier)
jmsTopicBuilder(factorySupplier)
.destinationName(topicName)
.build();
See jmsTopicBuilder(SupplierEx)
for more details.topicName
- the name of the queuefactorySupplier
- supplier to obtain JMS connection factory. For
exactly-once the factory must implement XAConnectionFactory
. It must be stateless.@Nonnull public static <T> JmsSinkBuilder<T> jmsTopicBuilder(@Nonnull SupplierEx<javax.jms.ConnectionFactory> factorySupplier)
JmsSinkBuilder
methods for more details.
In exactly-once mode the processor uses two-phase XA transactions
to guarantee exactly-once delivery. The supplier is expected to return
an XAConnectionFactory
. The transaction is committed
after all processors finished processing the items and stored all data
to the snapshot. Processor is also able to finish the commit after a
restart, should the job fail mid-way of the commit process. This mode
significantly increases latency because produced messages are visible
only after they are committed; if you want to avoid it, you can reduce
the guarantee just for this sink. To do so call exactlyOnce(false)
on the returned
builder.
In at-least-once mode or when the guarantee is off, the produced records are acknowledged immediately. We use transactions to produce messages in batches, but those transactions have very short duration.
IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.
Test the XA support of your broker
The JMS is an API, some brokers don't implement the XA transactions correctly. We run our stress tests with RabbitMQ and ActiveMQ. The most common flaw is that a prepared transaction is rolled back if the client disconnects. To check your broker, you can run the code in https://github.com/hazelcast/hazelcast-jet-contrib/xa-test
Notes
The default local parallelism for this processor is 1.
T
- type of the items the sink acceptsfactorySupplier
- supplier to obtain JMS connection factory. It
must be stateless.@Nonnull public static <T> Sink<T> jdbc(@Nonnull String updateQuery, @Nonnull SupplierEx<? extends CommonDataSource> dataSourceSupplier, @Nonnull BiConsumerEx<PreparedStatement,T> bindFn)
Sinks.<T>jdbcBuilder()
.updateQuery(updateQuery)
.dataSourceSupplier(dataSourceSupplier)
.bindFn(bindFn)
.build();
See jdbcBuilder()
for more information.@Nonnull @Beta public static <T> Sink<T> jdbc(@Nonnull String updateQuery, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull BiConsumerEx<PreparedStatement,T> bindFn)
Sinks.<T>jdbcBuilder()
.updateQuery(updateQuery)
.dataConnectionRef(dataConnectionRef)
.bindFn(bindFn)
.build();
See jdbcBuilder()
for more information.
@Nonnull public static <T> Sink<T> jdbc(@Nonnull String updateQuery, @Nonnull String jdbcUrl, @Nonnull BiConsumerEx<PreparedStatement,T> bindFn)
jdbcBuilder(updateQuery, bindFn)
.jdbcUrl(jdbcUrl)
.build()
See jdbcBuilder()
for more information.@Nonnull public static <T> JdbcSinkBuilder<T> jdbcBuilder()
Example
stage.writeTo(Sinks.<Entry<Integer, String>>jdbcBuilder()
.updateQuery("INSERT INTO table (key, value) VALUES(?, ?)")
.bindFn((stmt, item) -> {
stmt.setInt(1, item.getKey());
stmt.setString(2, item.getValue());
})
.jdbcUrl("jdbc:...")
.build());
Commit behavior
The commit behavior depends on the job guarantee:
If the job is in exactly-once mode, the overhead in the database and the output latency are higher. This is caused by the fact that Jet will not commit the transaction until the next snapshot occurs and the number of uncommitted records in the transactions can be very high. Latency is high because the changes are visible only after the transactions are committed. Configure the snapshot interval accordingly.
If your driver doesn't support XA transactions or if you want to avoid
the performance or latency penalty, you can decrease the guarantee just
for this sink by calling exactlyOnce(false)
on the returned builder.
Notes
In non-XA mode, in case of an SQLException
the processor will
transparently reconnect and the job won't fail, except for an SQLNonTransientException
subclass. In XA mode the job will fail
immediately.
Test the XA support of your database
The JDBC is an API, some brokers don't implement the XA transactions correctly. We run our stress tests with PostgreSQL. The most common flaw is that a prepared transaction is rolled back if the client disconnects. To check your database, you can run the code in https://github.com/hazelcast/hazelcast-jet-contrib/xa-test
Notes
The default local parallelism for this sink is 1.
T
- type of the items the sink accepts@Nonnull public static <T> Sink<T> observable(String name)
Observable
with the
provided name. The records that are sent to the observable can be
read through first getting a handle to it through
JetService.getObservable(String)
and then subscribing to
the events using the methods on Observable
.
The Observable
should be destroyed after using it. For the full
description see the javadoc for Observable
.
Example:
Observable<Integer> observable = jet.newObservable();
CompletableFuture<List<Integer>> list = observable.toFuture(o -> o.collect(toList()));
pipeline.readFrom(TestSources.items(1, 2, 3, 4))
.writeTo(Sinks.observable(observable));
Job job = jet.newJob(pipeline);
System.out.println(list.get());
observable.destroy();
This sink is cooperative and uses a local parallelism of 1.@Nonnull public static <T> Sink<T> observable(Observable<? super T> observable)
Observable
. More
precisely, it takes the name of the given Observable
and then independently retrieves an Observable
with the same
name from the cluster where the job is running. To prevent surprising
behavior, make sure you have obtained the Observable
from the
same cluster to which you will submit the pipeline.
For more details refer to observable(name)
.
Copyright © 2024 Hazelcast, Inc.. All rights reserved.