public final class Sinks extends Object
SinkStage
and accepts no downstream stages.
The default local parallelism for the sinks in this class is typically 1, check the documentation of individual methods.
Modifier and Type | Method and Description |
---|---|
static <T extends Map.Entry> |
cache(String cacheName)
Returns a sink that puts
Map.Entry s it receives into a Hazelcast
ICache with the specified name. |
static <T> Sink<T> |
files(String directoryName)
Convenience for
filesBuilder(java.lang.String) with the UTF-8 charset and with
overwriting of existing files. |
static <T> FileSinkBuilder<T> |
filesBuilder(String directoryName)
Returns a builder object that offers a step-by-step fluent API to build
a custom file sink for the Pipeline API.
|
static <T> Sink<T> |
fromProcessor(String sinkName,
ProcessorMetaSupplier metaSupplier)
Returns a sink constructed directly from the given Core API processor
meta-supplier.
|
static <T> Sink<T> |
jdbc(String updateQuery,
DistributedSupplier<Connection> connectionSupplier,
DistributedBiConsumer<PreparedStatement,T> bindFn)
Returns a sink that connects to the specified database using the given
connectionSupplier , prepares a statement using the given updateQuery and inserts/updates the items. |
static <T> Sink<T> |
jdbc(String updateQuery,
String connectionUrl,
DistributedBiConsumer<PreparedStatement,T> bindFn)
Convenience for
jdbc(String, DistributedSupplier,
DistributedBiConsumer) . |
static <T> Sink<T> |
jmsQueue(DistributedSupplier<javax.jms.ConnectionFactory> factorySupplier,
String name)
Convenience for
jmsQueueBuilder(DistributedSupplier) . |
static <T> JmsSinkBuilder<T> |
jmsQueueBuilder(DistributedSupplier<javax.jms.ConnectionFactory> factorySupplier)
Returns a builder object that offers a step-by-step fluent API to build
a custom JMS queue sink for the Pipeline API.
|
static <T> Sink<T> |
jmsTopic(DistributedSupplier<javax.jms.ConnectionFactory> factorySupplier,
String name)
Convenience for
jmsTopicBuilder(DistributedSupplier) . |
static <T> JmsSinkBuilder<T> |
jmsTopicBuilder(DistributedSupplier<javax.jms.ConnectionFactory> factorySupplier)
Returns a builder object that offers a step-by-step fluent API to build
a custom JMS topic sink for the Pipeline API.
|
static <T> Sink<T> |
list(IList<? super T> list)
Returns a sink that adds the items it receives to a Hazelcast
IList with the specified name. |
static <T> Sink<T> |
list(String listName)
Returns a sink that adds the items it receives to a Hazelcast
IList with the specified name. |
static <T> Sink<T> |
logger()
|
static <T> Sink<T> |
logger(DistributedFunction<? super T,String> toStringFn)
Returns a sink that logs all the data items it receives, at the INFO
level to the log category
WriteLoggerP . |
static <K,V> Sink<Map.Entry<K,V>> |
map(IMap<? super K,? super V> map)
Returns a sink that puts
Map.Entry s it receives into the given
Hazelcast IMap . |
static <K,V> Sink<Map.Entry<K,V>> |
map(String mapName)
Returns a sink that puts
Map.Entry s it receives into a Hazelcast
IMap with the specified name. |
static <T,K,V> Sink<T> |
mapWithEntryProcessor(IMap<? super K,? super V> map,
DistributedFunction<? super T,? extends K> toKeyFn,
DistributedFunction<? super T,? extends EntryProcessor<K,V>> toEntryProcessorFn)
Returns a sink that uses the items it receives to create
EntryProcessor s it submits to a Hazelcast IMap with the
specified name. |
static <E,K,V> Sink<E> |
mapWithEntryProcessor(String mapName,
DistributedFunction<? super E,? extends K> toKeyFn,
DistributedFunction<? super E,? extends EntryProcessor<K,V>> toEntryProcessorFn)
Returns a sink that uses the items it receives to create
EntryProcessor s it submits to a Hazelcast IMap with the
specified name. |
static <T,K,V> Sink<T> |
mapWithMerging(IMap<? super K,? super V> map,
DistributedFunction<? super T,? extends K> toKeyFn,
DistributedFunction<? super T,? extends V> toValueFn,
DistributedBinaryOperator<V> mergeFn)
Returns a sink that uses the supplied functions to extract the key
and value with which to update a Hazelcast
IMap . |
static <K,V,V_IN extends V> |
mapWithMerging(IMap<? super K,V> map,
DistributedBinaryOperator<V> mergeFn)
Convenience for
mapWithMerging(IMap, DistributedFunction, DistributedFunction,
DistributedBinaryOperator) with Map.Entry as input item. |
static <K,V> Sink<Map.Entry<K,V>> |
mapWithMerging(String mapName,
DistributedBinaryOperator<? super V> mergeFn)
Convenience for
mapWithMerging(String, DistributedFunction, DistributedFunction,
DistributedBinaryOperator) with Map.Entry as input item. |
static <T,K,V> Sink<T> |
mapWithMerging(String mapName,
DistributedFunction<? super T,? extends K> toKeyFn,
DistributedFunction<? super T,? extends V> toValueFn,
DistributedBinaryOperator<V> mergeFn)
Returns a sink that uses the supplied functions to extract the key
and value with which to update a Hazelcast
IMap . |
static <K,V,E extends Map.Entry<K,V>> |
mapWithUpdating(IMap<? super K,? super V> map,
DistributedBiFunction<? super V,? super E,? extends V> updateFn)
Convenience for
mapWithUpdating(IMap, DistributedFunction,
DistributedBiFunction) with Map.Entry as the input item. |
static <T,K,V> Sink<T> |
mapWithUpdating(IMap<? super K,? super V> map,
DistributedFunction<? super T,? extends K> toKeyFn,
DistributedBiFunction<? super V,? super T,? extends V> updateFn)
Returns a sink that uses the supplied key-extracting and value-updating
functions to update a Hazelcast
IMap . |
static <K,V,E extends Map.Entry<K,V>> |
mapWithUpdating(String mapName,
DistributedBiFunction<? super V,? super E,? extends V> updateFn)
Convenience for
mapWithUpdating(String, DistributedFunction,
DistributedBiFunction) with Map.Entry as the input item. |
static <T,K,V> Sink<T> |
mapWithUpdating(String mapName,
DistributedFunction<? super T,? extends K> toKeyFn,
DistributedBiFunction<? super V,? super T,? extends V> updateFn)
Returns a sink that uses the supplied key-extracting and value-updating
functions to update a Hazelcast
IMap . |
static <T> Sink<T> |
noop()
Returns a sink which discards all received items.
|
static <T extends Map.Entry> |
remoteCache(String cacheName,
com.hazelcast.client.config.ClientConfig clientConfig)
Returns a sink that puts
Map.Entry s it receives into a Hazelcast
ICache with the specified name in a remote cluster identified by
the supplied ClientConfig . |
static <T> Sink<T> |
remoteList(String listName,
com.hazelcast.client.config.ClientConfig clientConfig)
Returns a sink that adds the items it receives to a Hazelcast
IList with the specified name in a remote cluster identified by the
supplied ClientConfig . |
static <K,V> Sink<Map.Entry<K,V>> |
remoteMap(String mapName,
com.hazelcast.client.config.ClientConfig clientConfig)
Returns a sink that puts
Map.Entry s it receives into a Hazelcast
IMap with the specified name in a remote cluster identified by
the supplied ClientConfig . |
static <E,K,V> Sink<E> |
remoteMapWithEntryProcessor(String mapName,
com.hazelcast.client.config.ClientConfig clientConfig,
DistributedFunction<? super E,? extends K> toKeyFn,
DistributedFunction<? super E,? extends EntryProcessor<K,V>> toEntryProcessorFn)
Returns a sink equivalent to
mapWithEntryProcessor(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super E, ? extends K>, com.hazelcast.jet.function.DistributedFunction<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V>>) , but for a map
in a remote Hazelcast cluster identified by the supplied ClientConfig . |
static <K,V> Sink<Map.Entry<K,V>> |
remoteMapWithMerging(String mapName,
com.hazelcast.client.config.ClientConfig clientConfig,
DistributedBinaryOperator<V> mergeFn)
Convenience for
remoteMapWithMerging(java.lang.String, com.hazelcast.client.config.ClientConfig, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends K>, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends V>, com.hazelcast.jet.function.DistributedBinaryOperator<V>) with Map.Entry as
input item. |
static <T,K,V> Sink<T> |
remoteMapWithMerging(String mapName,
com.hazelcast.client.config.ClientConfig clientConfig,
DistributedFunction<? super T,? extends K> toKeyFn,
DistributedFunction<? super T,? extends V> toValueFn,
DistributedBinaryOperator<V> mergeFn)
Returns a sink equivalent to
mapWithMerging(String, DistributedBinaryOperator) ,
but for a map in a remote Hazelcast cluster identified by the supplied
ClientConfig . |
static <K,V,E extends Map.Entry<K,V>> |
remoteMapWithUpdating(String mapName,
com.hazelcast.client.config.ClientConfig clientConfig,
DistributedBiFunction<? super V,? super E,? extends V> updateFn)
|
static <T,K,V> Sink<T> |
remoteMapWithUpdating(String mapName,
com.hazelcast.client.config.ClientConfig clientConfig,
DistributedFunction<? super T,? extends K> toKeyFn,
DistributedBiFunction<? super V,? super T,? extends V> updateFn)
Returns a sink equivalent to
mapWithUpdating(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends K>, com.hazelcast.jet.function.DistributedBiFunction<? super V, ? super T, ? extends V>) , but for a map
in a remote Hazelcast cluster identified by the supplied ClientConfig . |
static <T> Sink<T> |
socket(String host,
int port)
Convenience for
socket(String, int, DistributedFunction,
Charset) with Object.toString as the conversion function and
UTF-8 as the charset. |
static <T> Sink<T> |
socket(String host,
int port,
DistributedFunction<? super T,? extends String> toStringFn)
Convenience for
socket(String, int, DistributedFunction,
Charset) with UTF-8 as the charset. |
static <T> Sink<T> |
socket(String host,
int port,
DistributedFunction<? super T,? extends String> toStringFn,
Charset charset)
Returns a sink that connects to the specified TCP socket and writes to
it a string representation of the items it receives.
|
@Nonnull public static <T> Sink<T> fromProcessor(@Nonnull String sinkName, @Nonnull ProcessorMetaSupplier metaSupplier)
The default local parallelism for this source is specified inside the
metaSupplier
.
sinkName
- user-friendly sink namemetaSupplier
- the processor meta-supplier@Nonnull public static <K,V> Sink<Map.Entry<K,V>> map(@Nonnull String mapName)
Map.Entry
s it receives into a Hazelcast
IMap
with the specified name.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
@Nonnull public static <K,V> Sink<Map.Entry<K,V>> map(@Nonnull IMap<? super K,? super V> map)
Map.Entry
s it receives into the given
Hazelcast IMap
.
NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
@Nonnull public static <K,V> Sink<Map.Entry<K,V>> remoteMap(@Nonnull String mapName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig)
Map.Entry
s it receives into a Hazelcast
IMap
with the specified name in a remote cluster identified by
the supplied ClientConfig
.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
@Nonnull public static <T,K,V> Sink<T> mapWithMerging(@Nonnull String mapName, @Nonnull DistributedFunction<? super T,? extends K> toKeyFn, @Nonnull DistributedFunction<? super T,? extends V> toValueFn, @Nonnull DistributedBinaryOperator<V> mergeFn)
IMap
. If the map
already contains the key, it applies the given mergeFn
to
resolve the existing and the proposed value into the value to use. If
the value comes out as null
, it removes the key from the map.
Expressed as code, the sink performs the equivalent of the following for
each item:
K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = toValueFn.apply(item); V resolved = (oldValue == null) ? newValue : mergeFn.apply(oldValue, newValue); if (value == null) map.remove(key); else map.put(key, value);This sink supports exactly-once processing only if the supplied merge function performs idempotent updates, i.e., it satisfies the rule
mergeFn.apply(oldValue, toValueFn.apply(e)).equals(oldValue)
for any e
that was already observed.
Note: This operation is NOT lock-aware, it will process the
entries no matter if they are locked or not. Use mapWithEntryProcessor(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super E, ? extends K>, com.hazelcast.jet.function.DistributedFunction<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V>>)
if you need locking.
The default local parallelism for this sink is 1.
T
- input item typeK
- key typeV
- value typemapName
- name of the maptoKeyFn
- function that extracts the key from the input itemtoValueFn
- function that extracts the value from the input itemmergeFn
- function that merges the existing value with the value acquired from the
received item@Nonnull public static <T,K,V> Sink<T> mapWithMerging(@Nonnull IMap<? super K,? super V> map, @Nonnull DistributedFunction<? super T,? extends K> toKeyFn, @Nonnull DistributedFunction<? super T,? extends V> toValueFn, @Nonnull DistributedBinaryOperator<V> mergeFn)
IMap
. If the map
already contains the key, it applies the given mergeFn
to
resolve the existing and the proposed value into the value to use. If
the value comes out as null
, it removes the key from the map.
Expressed as code, the sink performs the equivalent of the following for
each item:
K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = toValueFn.apply(item); V resolved = (oldValue == null) ? newValue : mergeFn.apply(oldValue, newValue); if (value == null) map.remove(key); else map.put(key, value);NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
This sink supports exactly-once processing only if the
supplied merge function performs idempotent updates, i.e.,
it satisfies the rule
mergeFn.apply(oldValue, toValueFn.apply(e)).equals(oldValue)
for any e
that was already observed.
Note: This operation is NOT lock-aware, it will process the
entries no matter if they are locked or not. Use mapWithEntryProcessor(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super E, ? extends K>, com.hazelcast.jet.function.DistributedFunction<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V>>)
if you need locking.
The default local parallelism for this sink is 1.
T
- input item typeK
- key typeV
- value typemap
- the map to drain totoKeyFn
- function that extracts the key from the input itemtoValueFn
- function that extracts the value from the input itemmergeFn
- function that merges the existing value with the value acquired from the
received item@Nonnull public static <T,K,V> Sink<T> remoteMapWithMerging(@Nonnull String mapName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig, @Nonnull DistributedFunction<? super T,? extends K> toKeyFn, @Nonnull DistributedFunction<? super T,? extends V> toValueFn, @Nonnull DistributedBinaryOperator<V> mergeFn)
mapWithMerging(String, DistributedBinaryOperator)
,
but for a map in a remote Hazelcast cluster identified by the supplied
ClientConfig
.@Nonnull public static <K,V> Sink<Map.Entry<K,V>> mapWithMerging(@Nonnull String mapName, @Nonnull DistributedBinaryOperator<? super V> mergeFn)
mapWithMerging(String, DistributedFunction, DistributedFunction,
DistributedBinaryOperator)
with Map.Entry
as input item.@Nonnull public static <K,V,V_IN extends V> Sink<Map.Entry<K,V_IN>> mapWithMerging(@Nonnull IMap<? super K,V> map, @Nonnull DistributedBinaryOperator<V> mergeFn)
mapWithMerging(IMap, DistributedFunction, DistributedFunction,
DistributedBinaryOperator)
with Map.Entry
as input item.@Nonnull public static <K,V> Sink<Map.Entry<K,V>> remoteMapWithMerging(@Nonnull String mapName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig, @Nonnull DistributedBinaryOperator<V> mergeFn)
remoteMapWithMerging(java.lang.String, com.hazelcast.client.config.ClientConfig, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends K>, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends V>, com.hazelcast.jet.function.DistributedBinaryOperator<V>)
with Map.Entry
as
input item.@Nonnull public static <T,K,V> Sink<T> mapWithUpdating(@Nonnull String mapName, @Nonnull DistributedFunction<? super T,? extends K> toKeyFn, @Nonnull DistributedBiFunction<? super V,? super T,? extends V> updateFn)
IMap
. For each item it receives, it
applies toKeyFn
to get the key and then applies updateFn
to
the existing value in the map and the received item to acquire the new
value to associate with the key. If the new value is null
, it
removes the key from the map. Expressed as code, the sink performs the
equivalent of the following for each item:
K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = updateFn.apply(oldValue, item); if (newValue == null) map.remove(key); else map.put(key, newValue);This sink supports exactly-once processing only if the supplied update function performs idempotent updates, i.e., it satisfies the rule
updateFn.apply(v, e).equals(v)
for any
e
that was already observed.
Note: This operation is NOT lock-aware, it will process the entries
no matter if they are locked or not.
Use mapWithEntryProcessor(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super E, ? extends K>, com.hazelcast.jet.function.DistributedFunction<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V>>)
if you need locking.
The default local parallelism for this sink is 1.
T
- input item typeK
- key typeV
- value typemapName
- name of the maptoKeyFn
- function that extracts the key from the input itemupdateFn
- function that receives the existing map value and the item
and returns the new map value@Nonnull public static <T,K,V> Sink<T> mapWithUpdating(@Nonnull IMap<? super K,? super V> map, @Nonnull DistributedFunction<? super T,? extends K> toKeyFn, @Nonnull DistributedBiFunction<? super V,? super T,? extends V> updateFn)
IMap
. For each item it receives, it
applies toKeyFn
to get the key and then applies updateFn
to
the existing value in the map and the received item to acquire the new
value to associate with the key. If the new value is null
, it
removes the key from the map. Expressed as code, the sink performs the
equivalent of the following for each item:
K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = updateFn.apply(oldValue, item); if (newValue == null) map.remove(key); else map.put(key, newValue);NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
This sink supports exactly-once processing only if the supplied update
function performs idempotent updates, i.e., it satisfies the rule
updateFn.apply(v, e).equals(v)
for any e
that was
already observed.
Note: This operation is not lock-aware, it will process the entries
even if they are locked. Use mapWithEntryProcessor(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super E, ? extends K>, com.hazelcast.jet.function.DistributedFunction<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V>>)
if you need
locking.
The default local parallelism for this sink is 1.
T
- input item typeK
- key typeV
- value typemap
- map to drain totoKeyFn
- function that extracts the key from the input itemupdateFn
- function that receives the existing map value and the item
and returns the new map value@Nonnull public static <T,K,V> Sink<T> remoteMapWithUpdating(@Nonnull String mapName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig, @Nonnull DistributedFunction<? super T,? extends K> toKeyFn, @Nonnull DistributedBiFunction<? super V,? super T,? extends V> updateFn)
mapWithUpdating(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends K>, com.hazelcast.jet.function.DistributedBiFunction<? super V, ? super T, ? extends V>)
, but for a map
in a remote Hazelcast cluster identified by the supplied ClientConfig
.@Nonnull public static <K,V,E extends Map.Entry<K,V>> Sink<E> mapWithUpdating(@Nonnull String mapName, @Nonnull DistributedBiFunction<? super V,? super E,? extends V> updateFn)
mapWithUpdating(String, DistributedFunction,
DistributedBiFunction)
with Map.Entry
as the input item.@Nonnull public static <K,V,E extends Map.Entry<K,V>> Sink<E> mapWithUpdating(@Nonnull IMap<? super K,? super V> map, @Nonnull DistributedBiFunction<? super V,? super E,? extends V> updateFn)
mapWithUpdating(IMap, DistributedFunction,
DistributedBiFunction)
with Map.Entry
as the input item.@Nonnull public static <K,V,E extends Map.Entry<K,V>> Sink<E> remoteMapWithUpdating(@Nonnull String mapName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig, @Nonnull DistributedBiFunction<? super V,? super E,? extends V> updateFn)
@Nonnull public static <E,K,V> Sink<E> mapWithEntryProcessor(@Nonnull String mapName, @Nonnull DistributedFunction<? super E,? extends K> toKeyFn, @Nonnull DistributedFunction<? super E,? extends EntryProcessor<K,V>> toEntryProcessorFn)
EntryProcessor
s it submits to a Hazelcast IMap
with the
specified name. For each received item it applies toKeyFn
to
get the key and toEntryProcessorFn
to get the entry processor,
and then submits the key and the entry processor to the Hazelcast
cluster, which will internally apply the entry processor to the key.
As opposed to mapWithUpdating(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends K>, com.hazelcast.jet.function.DistributedBiFunction<? super V, ? super T, ? extends V>)
and mapWithMerging(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends K>, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends V>, com.hazelcast.jet.function.DistributedBinaryOperator<V>)
,
this sink does not use batching and submits a separate entry processor
for each received item. For use cases that are efficiently solvable
using those sinks, this one will perform worse. It should be used only
when they are not applicable.
If your entry processors take a long time to update a value, consider
using entry processors that implement Offloadable
. This will
avoid blocking the Hazelcast partition thread during large update
operations.
This sink supports exactly-once processing only if the supplied entry processor performs idempotent updates, i.e., the resulting value would be the same if an entry processor was run on the same entry more than once.
Note: Unlike mapWithUpdating(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends K>, com.hazelcast.jet.function.DistributedBiFunction<? super V, ? super T, ? extends V>)
and mapWithMerging(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends K>, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends V>, com.hazelcast.jet.function.DistributedBinaryOperator<V>)
,
this operation is lock-aware. If the key is locked,
the EntryProcessor will wait until it acquires the lock.
The default local parallelism for this sink is 1.
E
- input item typeK
- key typeV
- value typemapName
- name of the maptoKeyFn
- function that extracts the key from the input itemtoEntryProcessorFn
- function that returns the EntryProcessor
to apply to the key@Nonnull public static <T,K,V> Sink<T> mapWithEntryProcessor(@Nonnull IMap<? super K,? super V> map, @Nonnull DistributedFunction<? super T,? extends K> toKeyFn, @Nonnull DistributedFunction<? super T,? extends EntryProcessor<K,V>> toEntryProcessorFn)
EntryProcessor
s it submits to a Hazelcast IMap
with the
specified name. For each received item it applies toKeyFn
to
get the key and toEntryProcessorFn
to get the entry processor,
and then submits the key and the entry processor to the Hazelcast
cluster, which will internally apply the entry processor to the key.
NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
As opposed to mapWithUpdating(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends K>, com.hazelcast.jet.function.DistributedBiFunction<? super V, ? super T, ? extends V>)
and mapWithMerging(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends K>, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends V>, com.hazelcast.jet.function.DistributedBinaryOperator<V>)
,
this sink does not use batching and submits a separate entry processor
for each received item. For use cases that are efficiently solvable
using those sinks, this one will perform worse. It should be used only
when they are not applicable.
If your entry processors take a long time to update a value, consider
using entry processors that implement Offloadable
. This will
avoid blocking the Hazelcast partition thread during large update
operations.
This sink supports exactly-once processing only if the supplied entry processor performs idempotent updates, i.e., the resulting value would be the same if an entry processor was run on the same entry more than once.
Note: Unlike mapWithUpdating(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends K>, com.hazelcast.jet.function.DistributedBiFunction<? super V, ? super T, ? extends V>)
and mapWithMerging(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends K>, com.hazelcast.jet.function.DistributedFunction<? super T, ? extends V>, com.hazelcast.jet.function.DistributedBinaryOperator<V>)
,
this operation is lock-aware. If the key is locked,
the EntryProcessor will wait until it acquires the lock.
The default local parallelism for this sink is 1.
T
- input item typeK
- key typeV
- value typemap
- map to drain totoKeyFn
- function that extracts the key from the input itemtoEntryProcessorFn
- function that returns the EntryProcessor
to apply to the key@Nonnull public static <E,K,V> Sink<E> remoteMapWithEntryProcessor(@Nonnull String mapName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig, @Nonnull DistributedFunction<? super E,? extends K> toKeyFn, @Nonnull DistributedFunction<? super E,? extends EntryProcessor<K,V>> toEntryProcessorFn)
mapWithEntryProcessor(java.lang.String, com.hazelcast.jet.function.DistributedFunction<? super E, ? extends K>, com.hazelcast.jet.function.DistributedFunction<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V>>)
, but for a map
in a remote Hazelcast cluster identified by the supplied ClientConfig
.@Nonnull public static <T extends Map.Entry> Sink<T> cache(@Nonnull String cacheName)
Map.Entry
s it receives into a Hazelcast
ICache
with the specified name.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
@Nonnull public static <T extends Map.Entry> Sink<T> remoteCache(@Nonnull String cacheName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig)
Map.Entry
s it receives into a Hazelcast
ICache
with the specified name in a remote cluster identified by
the supplied ClientConfig
.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
@Nonnull public static <T> Sink<T> list(@Nonnull String listName)
IList
with the specified name.
No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
@Nonnull public static <T> Sink<T> list(@Nonnull IList<? super T> list)
IList
with the specified name.
NOTE: Jet only remembers the name of the list you supply and acquires a list with that name on the local cluster. If you supply a list instance from another cluster, no error will be thrown to indicate this.
No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
@Nonnull public static <T> Sink<T> remoteList(@Nonnull String listName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig)
IList
with the specified name in a remote cluster identified by the
supplied ClientConfig
.
No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
@Nonnull public static <T> Sink<T> socket(@Nonnull String host, int port, @Nonnull DistributedFunction<? super T,? extends String> toStringFn, @Nonnull Charset charset)
toStringFn
function and encodes the string using the supplied Charset
. It
follows each item with a newline character.
No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
@Nonnull public static <T> Sink<T> socket(@Nonnull String host, int port, @Nonnull DistributedFunction<? super T,? extends String> toStringFn)
socket(String, int, DistributedFunction,
Charset)
with UTF-8 as the charset.@Nonnull public static <T> Sink<T> socket(@Nonnull String host, int port)
socket(String, int, DistributedFunction,
Charset)
with Object.toString
as the conversion function and
UTF-8 as the charset.@Nonnull public static <T> FileSinkBuilder<T> filesBuilder(@Nonnull String directoryName)
FileSinkBuilder
for more details.
The sink writes the items it receives to files. Each processor will write to its own file whose name is equal to the processor's global index (an integer unique to each processor of the vertex), but a single pathname is used to resolve the containing directory of all files, on all cluster members.
No state is saved to snapshot for this sink. If the job is restarted and appending is enabled, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
T
- type of the items the sink accepts@Nonnull public static <T> Sink<T> files(@Nonnull String directoryName)
filesBuilder(java.lang.String)
with the UTF-8 charset and with
overwriting of existing files.@Nonnull public static <T> Sink<T> logger(@Nonnull DistributedFunction<? super T,String> toStringFn)
WriteLoggerP
. It also logs watermark
items, but at FINE level.
The sink logs each item on whichever cluster member it happens to receive it. Its primary purpose is for development, when running Jet on a local machine.
The default local parallelism for this sink is 1.
T
- stream item typetoStringFn
- a function that returns a string representation of a stream item@Nonnull public static <T> Sink<T> jmsQueue(@Nonnull DistributedSupplier<javax.jms.ConnectionFactory> factorySupplier, @Nonnull String name)
jmsQueueBuilder(DistributedSupplier)
. Creates a
connection without any authentication parameters and uses non-transacted
sessions with Session.AUTO_ACKNOWLEDGE
mode. If a received item
is not an instance of javax.jms.Message
, the sink wraps item.toString()
into a TextMessage
.factorySupplier
- supplier to obtain JMS connection factoryname
- the name of the queue@Nonnull public static <T> JmsSinkBuilder<T> jmsQueueBuilder(@Nonnull DistributedSupplier<javax.jms.ConnectionFactory> factorySupplier)
JmsSinkBuilder
methods for more details.
Behavior on job restart: the processor is stateless. If the job is restarted, duplicate events can occur. If you need exactly-once behavior, you must ensure idempotence on the application level.
IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.
Default local parallelism for this processor is 4 (or less if less CPUs are available).
T
- type of the items the sink accepts@Nonnull public static <T> Sink<T> jmsTopic(@Nonnull DistributedSupplier<javax.jms.ConnectionFactory> factorySupplier, @Nonnull String name)
jmsTopicBuilder(DistributedSupplier)
. Creates a
connection without any authentication parameters and uses non-transacted
sessions with Session.AUTO_ACKNOWLEDGE
mode. If a received item
is not an instance of javax.jms.Message
, the sink wraps item.toString()
into a TextMessage
.factorySupplier
- supplier to obtain JMS connection factoryname
- the name of the queue@Nonnull public static <T> JmsSinkBuilder<T> jmsTopicBuilder(@Nonnull DistributedSupplier<javax.jms.ConnectionFactory> factorySupplier)
JmsSinkBuilder
methods for more details.
Behavior on job restart: the processor is stateless. If the job is restarted, duplicate events can occur. If you need exactly-once behavior, you must ensure idempotence on the application level.
IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.
Default local parallelism for this processor is 4 (or less if less CPUs are available).
T
- type of the items the sink accepts@Nonnull public static <T> Sink<T> jdbc(@Nonnull String updateQuery, @Nonnull DistributedSupplier<Connection> connectionSupplier, @Nonnull DistributedBiConsumer<PreparedStatement,T> bindFn)
connectionSupplier
, prepares a statement using the given updateQuery
and inserts/updates the items.
The updateQuery
should contain a parametrized query. The bindFn
will receive a PreparedStatement
created for this query
and should bind parameters to it. It should not execute the query,
call commit or any other method.
The records will be committed after each batch of records and a batch mode will be used (if the driver supports it). Auto-commit will be disabled on the connection.
Example:
p.drainTo(Sinks.jdbc(
"REPLACE into table (id, name) values(?, ?)",
() -> return DriverManager.getConnection("jdbc:..."),
(stmt, item) -> {
stmt.setInt(1, item.id);
stmt.setInt(2, item.name);
}
));
In case of an SQLException
the processor will automatically try
to reconnect and the job won't fail, except for the SQLNonTransientException
subclass. The default local parallelism for
this sink is 1.
No state is saved to snapshot for this sink. After the job is restarted,
the items will likely be duplicated, providing an at-least-once
guarantee. For this reason you should not use INSERT
statement
which can fail on duplicate primary key. Rather use an
insert-or-update statement that can tolerate duplicate writes.
T
- type of the items the sink acceptsupdateQuery
- the SQL query which will do the insert/updateconnectionSupplier
- the supplier of database connectionbindFn
- the function to set the parameters of the statement for
each item received@Nonnull public static <T> Sink<T> jdbc(@Nonnull String updateQuery, @Nonnull String connectionUrl, @Nonnull DistributedBiConsumer<PreparedStatement,T> bindFn)
jdbc(String, DistributedSupplier,
DistributedBiConsumer)
. The connection will be created from connectionUrl
.Copyright © 2018 Hazelcast, Inc.. All rights reserved.