Class Sources
Pipeline.readFrom(BatchSource)
and you will obtain the initial BatchStage
. You can then
attach further stages to it.
The same pipeline may contain more than one source, each starting its own branch. The branches may be merged with multiple-input transforms such as co-group and hash-join.
The default local parallelism for sources in this class is 1 or 2, check the documentation of individual methods.
- Since:
- Jet 3.0
-
Method Summary
Modifier and TypeMethodDescriptionstatic <T> BatchSource<T>
batchFromProcessor
(String sourceName, ProcessorMetaSupplier metaSupplier) Returns a bounded (batch) source constructed directly from the given Core API processor meta-supplier.static <K,
V> BatchSource<Map.Entry<K, V>> Returns a source that fetches entries from a HazelcastICache
with the given name and emits them asMap.Entry
.static <K,
V> StreamSource<Map.Entry<K, V>> cacheJournal
(String cacheName, JournalInitialPosition initialPos) Convenience forcacheJournal(String, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass onlyCREATED
andUPDATED
events and will project the event's key and new value into aMap.Entry
.static <T,
K, V> StreamSource<T> cacheJournal
(String cacheName, JournalInitialPosition initialPos, FunctionEx<? super EventJournalCacheEvent<K, V>, ? extends T> projectionFn, PredicateEx<? super EventJournalCacheEvent<K, V>> predicateFn) Returns a source that will stream theEventJournalCacheEvent
events of a HazelcastICache
with the specified name.static BatchSource<String>
A source to read all files in a directory in a batch way.static FileSourceBuilder
filesBuilder
(String directory) Returns a builder object that offers a step-by-step fluent API to build a custom source to read files for the Pipeline API.static StreamSource<String>
fileWatcher
(String watchedDirectory) A source to stream lines added to files in a directory.static <T> BatchSource<T>
jdbc
(SupplierEx<? extends Connection> newConnectionFn, ToResultSetFunction resultSetFn, FunctionEx<? super ResultSet, ? extends T> createOutputFn) Returns a source which connects to the specified database using the givennewConnectionFn
, queries the database and creates a result set using the the givenresultSetFn
.static <T> BatchSource<T>
jdbc
(DataConnectionRef dataConnectionRef, ToResultSetFunction resultSetFn, FunctionEx<? super ResultSet, ? extends T> createOutputFn) Returns a source which connects to the specified database using the givendataConnectionRef
, queries the database and creates a result set using the givenresultSetFn
.static <T> BatchSource<T>
jdbc
(String connectionURL, String query, FunctionEx<? super ResultSet, ? extends T> createOutputFn) Convenience forjdbc(SupplierEx, ToResultSetFunction, FunctionEx)
.static <T> BatchSource<T>
jdbc
(String connectionURL, String query, Properties properties, FunctionEx<? super ResultSet, ? extends T> createOutputFn) Same as @{linkjdbc(String, String, FunctionEx)
}static StreamSource<jakarta.jms.Message>
jmsQueue
(SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier, String name) Deprecated.see jmsQueue(String, SupplierEx).static StreamSource<jakarta.jms.Message>
jmsQueue
(String name, SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier) Shortcut equivalent to:static JmsSourceBuilder
jmsQueueBuilder
(SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier) Returns a builder object that offers a step-by-step fluent API to build a custom JMSStreamSource
for the Pipeline API.static StreamSource<jakarta.jms.Message>
jmsTopic
(SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier, String name) Deprecated.static StreamSource<jakarta.jms.Message>
jmsTopic
(String name, SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier) Shortcut equivalent to:static JmsSourceBuilder
jmsTopicBuilder
(SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier) Returns a builder object that offers a step-by-step fluent API to build a custom JMSStreamSource
for the Pipeline API.static BatchSource<Map<String,
Object>> Convenience forjson(String, Class)
which converts each JSON string to aMap
.static <T> BatchSource<T>
A source to read all files in a directory in a batch way.static StreamSource<Map<String,
Object>> jsonWatcher
(String watchedDirectory) Convenience forjsonWatcher(String, Class)
which converts each line appended to theMap
representation of the JSON string.static <T> StreamSource<T>
jsonWatcher
(String watchedDirectory, Class<T> type) A source to stream lines added to files in a directory.static <T> BatchSource<T>
Returns a source that emits items retrieved from a HazelcastIList
.static <T> BatchSource<T>
Returns a source that emits items retrieved from a HazelcastIList
.static <K,
V> BatchSource<Map.Entry<K, V>> Returns a source that fetches entries from the given HazelcastIMap
and emits them asMap.Entry
.static <T,
K, V> BatchSource<T> map
(IMap<? extends K, ? extends V> map, Predicate<K, V> predicate, Projection<? super Map.Entry<K, V>, ? extends T> projection) Returns a source that fetches entries from the given HazelcastIMap
.static <K,
V> BatchSource<Map.Entry<K, V>> Returns a source that fetches entries from a local HazelcastIMap
with the specified name and emits them asMap.Entry
.static <T,
K, V> BatchSource<T> map
(String mapName, Predicate<K, V> predicate, Projection<? super Map.Entry<K, V>, ? extends T> projection) Returns a source that fetches entries from a local HazelcastIMap
with the specified name.static <K,
V> StreamSource<Map.Entry<K, V>> mapJournal
(IMap<? extends K, ? extends V> map, JournalInitialPosition initialPos) Convenience formapJournal(IMap, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass onlyADDED
andUPDATED
events and will project the event's key and new value into aMap.Entry
.static <T,
K, V> StreamSource<T> mapJournal
(IMap<? extends K, ? extends V> map, JournalInitialPosition initialPos, FunctionEx<? super EventJournalMapEvent<K, V>, ? extends T> projectionFn, PredicateEx<? super EventJournalMapEvent<K, V>> predicateFn) Returns a source that will streamEventJournalMapEvent
s of the given HazelcastIMap
.static <K,
V> StreamSource<Map.Entry<K, V>> mapJournal
(String mapName, JournalInitialPosition initialPos) Convenience formapJournal(String, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass onlyADDED
andUPDATED
events and will project the event's key and new value into aMap.Entry
.static <T,
K, V> StreamSource<T> mapJournal
(String mapName, JournalInitialPosition initialPos, FunctionEx<? super EventJournalMapEvent<K, V>, ? extends T> projectionFn, PredicateEx<? super EventJournalMapEvent<K, V>> predicateFn) Returns a source that will streamEventJournalMapEvent
s of the HazelcastIMap
with the specified name.static <K,
V> BatchSource<Map.Entry<K, V>> remoteCache
(String cacheName, ClientConfig clientConfig) Returns a source that fetches entries from the HazelcastICache
with the specified name in a remote cluster identified by the suppliedClientConfig
and emits them asMap.Entry
.static <K,
V> StreamSource<Map.Entry<K, V>> remoteCacheJournal
(String cacheName, ClientConfig clientConfig, JournalInitialPosition initialPos) Convenience forremoteCacheJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass onlyCREATED
andUPDATED
events and will project the event's key and new value into aMap.Entry
.static <T,
K, V> StreamSource<T> remoteCacheJournal
(String cacheName, ClientConfig clientConfig, JournalInitialPosition initialPos, FunctionEx<? super EventJournalCacheEvent<K, V>, ? extends T> projectionFn, PredicateEx<? super EventJournalCacheEvent<K, V>> predicateFn) Returns a source that will stream theEventJournalCacheEvent
events of the HazelcastICache
with the specified name from a remote cluster.static <T> BatchSource<T>
remoteList
(String listName, ClientConfig clientConfig) Returns a source that emits items retrieved from a HazelcastIList
in a remote cluster identified by the suppliedClientConfig
.static <K,
V> BatchSource<Map.Entry<K, V>> remoteMap
(String mapName, ClientConfig clientConfig) Returns a source that fetches entries from the HazelcastIMap
with the specified name in a remote cluster identified by the suppliedClientConfig
and emits them asMap.Entry
.static <T,
K, V> BatchSource<T> remoteMap
(String mapName, ClientConfig clientConfig, Predicate<K, V> predicate, Projection<? super Map.Entry<K, V>, ? extends T> projection) Returns a source that fetches entries from a remote HazelcastIMap
with the specified name in a remote cluster identified by the suppliedClientConfig
.static <K,
V> BatchSource<Map.Entry<K, V>> remoteMap
(String mapName, DataConnectionRef dataConnectionRef) The same as theremoteMap(String, ClientConfig, Predicate, Projection)
method.static <T,
K, V> BatchSource<T> remoteMap
(String mapName, DataConnectionRef dataConnectionRef, Predicate<K, V> predicate, Projection<? super Map.Entry<K, V>, ? extends T> projection) The same as theremoteMap(String, ClientConfig, Predicate, Projection)
method.static <K,
V> RemoteMapSourceBuilder<K, V, Map.Entry<K, V>> remoteMapBuilder
(String mapName) Returns a builder to build a source that fetches entries from a remote HazelcastIMap
with the specified name.static <K,
V> StreamSource<Map.Entry<K, V>> remoteMapJournal
(String mapName, ClientConfig clientConfig, JournalInitialPosition initialPos) Convenience forremoteMapJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass onlyADDED
andUPDATED
events and will project the event's key and new value into aMap.Entry
.static <T,
K, V> StreamSource<T> remoteMapJournal
(String mapName, ClientConfig clientConfig, JournalInitialPosition initialPos, FunctionEx<? super EventJournalMapEvent<K, V>, ? extends T> projectionFn, PredicateEx<? super EventJournalMapEvent<K, V>> predicateFn) Returns a source that will stream theEventJournalMapEvent
events of the HazelcastIMap
with the specified name from a remote cluster.static <K,
V> StreamSource<Map.Entry<K, V>> remoteMapJournal
(String mapName, DataConnectionRef dataConnectionRef, JournalInitialPosition initialPos) Convenience forremoteMapJournal(String, DataConnectionRef, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass onlyADDED
andUPDATED
events and will project the event's key and new value into aMap.Entry
.static <T,
K, V> StreamSource<T> remoteMapJournal
(String mapName, DataConnectionRef dataConnectionRef, JournalInitialPosition initialPos, FunctionEx<? super EventJournalMapEvent<K, V>, ? extends T> projectionFn, PredicateEx<? super EventJournalMapEvent<K, V>> predicateFn) The same as theremoteMapJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx)
method.static StreamSource<String>
Convenience forsocket(host, port, charset)
with UTF-8 as the charset.static StreamSource<String>
Returns a source which connects to the specified socket and emits lines of text received from it.static <T> StreamSource<T>
streamFromProcessor
(String sourceName, ProcessorMetaSupplier metaSupplier) Returns an unbounded (event stream) source constructed directly from the given Core API processor meta-supplier.static <T> StreamSource<T>
streamFromProcessorWithWatermarks
(String sourceName, boolean supportsNativeTimestamps, FunctionEx<EventTimePolicy<? super T>, ProcessorMetaSupplier> metaSupplierFn) Returns an unbounded (event stream) source that will use the supplied function to create processor meta-suppliers as required by the Core API.
-
Method Details
-
batchFromProcessor
@Nonnull public static <T> BatchSource<T> batchFromProcessor(@Nonnull String sourceName, @Nonnull ProcessorMetaSupplier metaSupplier) Returns a bounded (batch) source constructed directly from the given Core API processor meta-supplier.- Parameters:
sourceName
- user-friendly source namemetaSupplier
- the processor meta-supplier
-
streamFromProcessorWithWatermarks
@Nonnull public static <T> StreamSource<T> streamFromProcessorWithWatermarks(@Nonnull String sourceName, boolean supportsNativeTimestamps, @Nonnull FunctionEx<EventTimePolicy<? super T>, ProcessorMetaSupplier> metaSupplierFn) Returns an unbounded (event stream) source that will use the supplied function to create processor meta-suppliers as required by the Core API. Jet will call the function you supply with anEventTimePolicy
and it must return a meta-supplier of processors that will act according to the parameters in the policy and must emit the watermark items as the policy specifies.If you are implementing a custom source processor, be sure to check out the
EventTimeMapper
class that will help you correctly implement watermark emission.- Parameters:
sourceName
- user-friendly source namesupportsNativeTimestamps
- true, if the processor is able to workmetaSupplierFn
- factory of processor meta-suppliers. Since Jet 4.3 this argument changed from Function to FunctionEx to support serializability.
-
streamFromProcessor
@Nonnull public static <T> StreamSource<T> streamFromProcessor(@Nonnull String sourceName, @Nonnull ProcessorMetaSupplier metaSupplier) Returns an unbounded (event stream) source constructed directly from the given Core API processor meta-supplier.- Parameters:
sourceName
- user-friendly source namemetaSupplier
- the processor meta-supplier
-
map
Returns a source that fetches entries from a local HazelcastIMap
with the specified name and emits them asMap.Entry
. It leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.
The default local parallelism for this processor is 1.
-
map
@Nonnull public static <K,V> BatchSource<Map.Entry<K,V>> map(@Nonnull IMap<? extends K, ? extends V> map) Returns a source that fetches entries from the given HazelcastIMap
and emits them asMap.Entry
. It leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.
The default local parallelism for this processor is 1.
-
map
@Nonnull public static <T,K, BatchSource<T> mapV> (@Nonnull String mapName, @Nonnull Predicate<K, V> predicate, @Nonnull Projection<? super Map.Entry<K, V>, ? extends T> projection) Returns a source that fetches entries from a local HazelcastIMap
with the specified name. By supplying apredicate
andprojection
here instead of in separatemap/filter
transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic. If your data is stored in the IMDG using the portable serialization format, there are additional optimizations available when usingProjections.singleAttribute(java.lang.String)
andProjections.multiAttribute(java.lang.String...)
) to create your projection instance and using thePredicates
factory orPredicateBuilder
to create the predicate. In this case Jet can test the predicate and apply the projection without deserializing the whole object.Due to the current limitations in the way Jet reads the map it can't use any indexes on the map. It will always scan the map in full.
The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.
The default local parallelism for this processor is 1.
Predicate/projection class requirements
The classes implementingpredicate
andprojection
need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath inJobConfig
. The same is true for the class of the objects stored in the map itself. If you cannot meet these requirements, usemap(String)
and add a subsequentmap
orfilter
stage.- Type Parameters:
T
- type of emitted item- Parameters:
mapName
- the name of the mappredicate
- the predicate to filter the events. If you want to specify just the projection, usePredicates.alwaysTrue()
as a pass-through predicate. It must be stateless and cooperative.projection
- the projection to map the events. If the projection returns anull
for an item, that item will be filtered out. If you want to specify just the predicate, useProjections.identity()
. It must be stateless and cooperative.
-
map
@Nonnull public static <T,K, BatchSource<T> mapV> (@Nonnull IMap<? extends K, ? extends V> map, @Nonnull Predicate<K, V> predicate, @Nonnull Projection<? super Map.Entry<K, V>, ? extends T> projection) Returns a source that fetches entries from the given HazelcastIMap
. By supplying apredicate
andprojection
here instead of in separatemap/filter
transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
If your data is stored in the IMDG using the portable serialization format, there are additional optimizations available when using
Projections.singleAttribute(java.lang.String)
andProjections.multiAttribute(java.lang.String...)
) to create your projection instance and using thePredicates
factory orPredicateBuilder
to create the predicate. In this case Jet can test the predicate and apply the projection without deserializing the whole object.Due to the current limitations in the way Jet reads the map it can't use any indexes on the map. It will always scan the map in full.
The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.
The default local parallelism for this processor 1.
Predicate/projection class requirements
The classes implementing
predicate
andprojection
need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath inJobConfig
. The same is true for the class of the objects stored in the map itself. If you cannot meet these requirements, usemap(String)
and add a subsequentmap
orfilter
stage.- Type Parameters:
T
- type of emitted item- Parameters:
map
- the Hazelcast map to read data frompredicate
- the predicate to filter the events. If you want to specify just the projection, usePredicates.alwaysTrue()
as a pass-through predicate. It must be stateless and cooperative.projection
- the projection to map the events. If the projection returns anull
for an item, that item will be filtered out. If you want to specify just the predicate, useProjections.identity()
. It must be stateless and cooperative.
-
mapJournal
@Nonnull public static <T,K, StreamSource<T> mapJournalV> (@Nonnull String mapName, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalMapEvent<K, V>, ? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalMapEvent<K, V>> predicateFn) Returns a source that will streamEventJournalMapEvent
s of the HazelcastIMap
with the specified name. By supplying apredicate
andprojection
here instead of in separatemap/filter
transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
To use an
IMap
as a streaming source, you mustconfigure the event journal
for it. The journal has fixed capacity and will drop events if it overflows.The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).
If you start a new job from an exported state, you can change the source parameters as needed.
The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
Predicate/projection class requirements
The classes implementingpredicateFn
andprojectionFn
need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath inJobConfig
. The same is true for the class of the objects stored in the map itself. If you cannot meet these requirements, usemapJournal(String, JournalInitialPosition)
and add a subsequentmap
orfilter
stage.- Type Parameters:
T
- type of emitted item- Parameters:
mapName
- the name of the mapinitialPos
- describes which event to start receiving fromprojectionFn
- the projection to map the events. If the projection returns anull
for an item, that item will be filtered out. You may useUtil.mapEventToEntry()
to extract just the key and the new value. It must be stateless and cooperative.predicateFn
- the predicate to filter the events. If you want to specify just the projection, useUtil.mapPutEvents()
to pass onlyADDED
andUPDATED
events. It must be stateless and cooperative.
-
mapJournal
@Nonnull public static <T,K, StreamSource<T> mapJournalV> (@Nonnull IMap<? extends K, ? extends V> map, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalMapEvent<K, V>, ? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalMapEvent<K, V>> predicateFn) Returns a source that will streamEventJournalMapEvent
s of the given HazelcastIMap
. By supplying apredicate
andprojection
here instead of in separatemap/filter
transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
To use an
IMap
as a streaming source, you mustconfigure the event journal
for it. The journal has fixed capacity and will drop events if it overflows.The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).
If you start a new job from an exported state, you can change the source parameters as needed.
The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
Predicate/projection class requirements
The classes implementingpredicateFn
andprojectionFn
need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath inJobConfig
. The same is true for the class of the objects stored in the map itself. If you cannot meet these requirements, usemapJournal(String, JournalInitialPosition)
and add a subsequentmap
orfilter
stage.Issue when "catching up"
This processor does not coalesce watermarks from partitions. It reads partitions one by one: it emits events from one partition and then from another one in batches. This adds time disorder to events: it might emit very recent event from partition1 while not yet emitting an old event from partition2; and it generates watermarks based on this order. Even if items in your partitions are ordered by timestamp, you can't use allowed lag of 0. Most notably, the "catching up" happens after the job is restarted, when events since the last snapshot are reprocessed in a burst. In order to not lose any events, the lag should be configured to at leastsnapshotInterval + timeToRestart + normalEventLag
. The reason for this behavior that the default partition count in the cluster is pretty high and cannot be changed per object and for low-traffic maps it takes long until all partitions see an event to allow emitting of a coalesced watermark.- Type Parameters:
T
- type of emitted item- Parameters:
map
- the map to read data frominitialPos
- describes which event to start receiving fromprojectionFn
- the projection to map the events. If the projection returns anull
for an item, that item will be filtered out. You may useUtil.mapEventToEntry()
to extract just the key and the new value. It must be stateless and cooperative.predicateFn
- the predicate to filter the events. If you want to specify just the projection, useUtil.mapPutEvents()
to pass onlyADDED
andUPDATED
events. It must be stateless and cooperative.
-
mapJournal
@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> mapJournal(@Nonnull String mapName, @Nonnull JournalInitialPosition initialPos) Convenience formapJournal(String, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass onlyADDED
andUPDATED
events and will project the event's key and new value into aMap.Entry
. -
mapJournal
@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> mapJournal(@Nonnull IMap<? extends K, ? extends V> map, @Nonnull JournalInitialPosition initialPos) Convenience formapJournal(IMap, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass onlyADDED
andUPDATED
events and will project the event's key and new value into aMap.Entry
.NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
-
remoteMap
@Nonnull public static <K,V> BatchSource<Map.Entry<K,V>> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig) Returns a source that fetches entries from the HazelcastIMap
with the specified name in a remote cluster identified by the suppliedClientConfig
and emits them asMap.Entry
.The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If the
IMap
is modified while being read, or if there is a cluster topology change (triggering data migration), the source may miss and/or duplicate some entries. If we detect a topology change, the job will fail, but the detection is only on a best-effort basis - we might still give incorrect results without reporting a failure. Concurrent mutation is not detected at all.The default local parallelism for this processor is 1.
-
remoteMap
@Nonnull public static <T,K, BatchSource<T> remoteMapV> (@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull Predicate<K, V> predicate, @Nonnull Projection<? super Map.Entry<K, V>, ? extends T> projection) Returns a source that fetches entries from a remote HazelcastIMap
with the specified name in a remote cluster identified by the suppliedClientConfig
.See
RemoteMapSourceBuilder
for details on the remote map source.- Type Parameters:
T
- type of emitted item- Parameters:
mapName
- the name of the mappredicate
- the predicate to filter the events. If you want to specify just the projection, usePredicates.alwaysTrue()
as a pass-through predicate. It must be stateless and cooperative.projection
- the projection to map the events. If the projection returns anull
for an item, that item will be filtered out. If you want to specify just the predicate, useProjections.identity()
. It must be stateless and cooperative.
-
remoteMap
@Nonnull public static <K,V> BatchSource<Map.Entry<K,V>> remoteMap(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef) The same as theremoteMap(String, ClientConfig, Predicate, Projection)
method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used
- Parameters:
mapName
- the name of the mapdataConnectionRef
- the reference to DataConnectionConfig- Since:
- 5.4
-
remoteMap
@Nonnull public static <T,K, BatchSource<T> remoteMapV> (@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull Predicate<K, V> predicate, @Nonnull Projection<? super Map.Entry<K, V>, ? extends T> projection) The same as theremoteMap(String, ClientConfig, Predicate, Projection)
method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used
- Parameters:
mapName
- the name of the mapdataConnectionRef
- the reference to DataConnectionConfig- Since:
- 5.4
-
remoteMapBuilder
@Nonnull public static <K,V> RemoteMapSourceBuilder<K,V, remoteMapBuilderMap.Entry<K, V>> (String mapName) Returns a builder to build a source that fetches entries from a remote HazelcastIMap
with the specified name. It provides a fluent API to build the source using the optional parameters.See
RemoteMapSourceBuilder
for details on the remote map source.- Type Parameters:
K
- the type of the key in the mapV
- the type of the value in the map- Parameters:
mapName
- the name of the map- Returns:
- builder for remote map source
- Since:
- 5.4
-
remoteMapJournal
@Nonnull public static <T,K, StreamSource<T> remoteMapJournalV> (@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalMapEvent<K, V>, ? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalMapEvent<K, V>> predicateFn) Returns a source that will stream theEventJournalMapEvent
events of the HazelcastIMap
with the specified name from a remote cluster. By supplying apredicate
andprojection
here instead of in separatemap/filter
transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.To use an
IMap
as a streaming source, you mustconfigure the event journal
for it. The journal has fixed capacity and will drop events if it overflows.The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).
If you start a new job from an exported state, you can change the source parameters as needed. If you connect to another cluster, keep in mind that the same offsets will be used. To avoid this, give different name to this source.
The default local parallelism for this processor is 1.
Predicate/projection class requirements
The classes implementingpredicateFn
andprojectionFn
need to be available on the remote cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath inJobConfig
. The same is true for the class of the objects stored in the map itself. If you cannot meet these requirements, useremoteMapJournal(String, ClientConfig, JournalInitialPosition)
and add a subsequentmap
orfilter
stage.- Type Parameters:
K
- type of keyV
- type of valueT
- type of emitted item- Parameters:
mapName
- the name of the mapclientConfig
- configuration for the client to connect to the remote clusterinitialPos
- describes which event to start receiving fromprojectionFn
- the projection to map the events. If the projection returns anull
for an item, that item will be filtered out. You may useUtil.mapEventToEntry()
to extract just the key and the new value. It must be stateless and cooperative.predicateFn
- the predicate to filter the events. You may useUtil.mapPutEvents()
to pass onlyADDED
andUPDATED
events. It must be stateless and cooperative.
-
remoteMapJournal
@Nonnull public static <T,K, StreamSource<T> remoteMapJournalV> (@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalMapEvent<K, V>, ? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalMapEvent<K, V>> predicateFn) The same as theremoteMapJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx)
method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used
(Prerequisite) External dataConnection configuration: Use
HazelcastDataConnection.CLIENT_XML
for XML or useHazelcastDataConnection.CLIENT_YML
for YAML string.Config config = ...; String xmlString = ...; DataConnectionConfig dataConnectionConfig = new DataConnectionConfig() .setName("my-hzclient-data-connection") .setType("Hz") .setProperty(HzClientDataConnectionFactory.CLIENT_XML, xmlString); config.addDataConnectionConfig(dataConnectionConfig);
Pipeline configuration
PredicateEx<EventJournalMapEvent<String, Integer>> predicate = ...; p.readFrom(Sources.remoteMapJournal( mapName, DataConnectionRef.dataConnectionRef("my-hzclient-data-connection"), JournalInitialPosition.START_FROM_OLDEST, EventJournalMapEvent::getNewValue, predicate ));
- Type Parameters:
T
- is the return type of the streamK
- is the key type of EventJournalMapEventV
- is the vale type of EventJournalMapEvent- Parameters:
mapName
- the name of the mapdataConnectionRef
- the reference to DataConnectionConfiginitialPos
- describes which event to start receiving fromprojectionFn
- the projection to map the events. If the projection returns anull
for an item, that item will be filtered out. You may useUtil.mapEventToEntry()
to extract just the key and the new value. It must be stateless and cooperative.predicateFn
- the predicate to filter the events. If you want to specify just the projection, useUtil.mapPutEvents()
to pass onlyADDED
andUPDATED
events. It must be stateless and cooperative.- Returns:
- a stream that can be used as a source
- Since:
- 5.3
-
remoteMapJournal
@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> remoteMapJournal(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos) Convenience forremoteMapJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass onlyADDED
andUPDATED
events and will project the event's key and new value into aMap.Entry
. -
remoteMapJournal
@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> remoteMapJournal(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull JournalInitialPosition initialPos) Convenience forremoteMapJournal(String, DataConnectionRef, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass onlyADDED
andUPDATED
events and will project the event's key and new value into aMap.Entry
.- Since:
- 5.3
-
cache
Returns a source that fetches entries from a HazelcastICache
with the given name and emits them asMap.Entry
. It leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If entries are added or removed to the cache during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.
The default local parallelism for this processor 1.
-
cacheJournal
@Nonnull public static <T,K, StreamSource<T> cacheJournalV> (@Nonnull String cacheName, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalCacheEvent<K, V>, ? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalCacheEvent<K, V>> predicateFn) Returns a source that will stream theEventJournalCacheEvent
events of a HazelcastICache
with the specified name. By supplying apredicate
andprojection
here instead of in separatemap/filter
transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
To use an
ICache
as a streaming source, you mustconfigure the event journal
for it. The journal has fixed capacity and will drop events if it overflows.The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).
If you start a new job from an exported state, you can change the source parameters as needed.
The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
Predicate/projection class requirements
The classes implementingpredicateFn
andprojectionFn
need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath inJobConfig
. The same is true for the class of the objects stored in the cache itself. If you cannot meet these conditions, usecacheJournal(String, JournalInitialPosition)
and add a subsequentmap
orfilter
stage.- Type Parameters:
T
- type of emitted item- Parameters:
cacheName
- the name of the cacheinitialPos
- describes which event to start receiving fromprojectionFn
- the projection to map the events. If the projection returns anull
for an item, that item will be filtered out. You may useUtil.cacheEventToEntry()
to extract just the key and the new value. It must be stateless and cooperative.predicateFn
- the predicate to filter the events. You may useUtil.cachePutEvents()
to pass onlyCREATED
andUPDATED
events. It must be stateless and cooperative.
-
cacheJournal
@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> cacheJournal(@Nonnull String cacheName, @Nonnull JournalInitialPosition initialPos) Convenience forcacheJournal(String, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass onlyCREATED
andUPDATED
events and will project the event's key and new value into aMap.Entry
. -
remoteCache
@Nonnull public static <K,V> BatchSource<Map.Entry<K,V>> remoteCache(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig) Returns a source that fetches entries from the HazelcastICache
with the specified name in a remote cluster identified by the suppliedClientConfig
and emits them asMap.Entry
.The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If entries are added or removed to the cache during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.
The default local parallelism for this processor is 1.
-
remoteCacheJournal
@Nonnull public static <T,K, StreamSource<T> remoteCacheJournalV> (@Nonnull String cacheName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalCacheEvent<K, V>, ? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalCacheEvent<K, V>> predicateFn) Returns a source that will stream theEventJournalCacheEvent
events of the HazelcastICache
with the specified name from a remote cluster. By supplying apredicate
andprojection
here instead of in separatemap/filter
transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.To use an
ICache
as a streaming source, you mustconfigure the event journal
for it. The journal has fixed capacity and will drop events if it overflows.The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).
If you start a new job from an exported state, you can change the source parameters as needed. If you connect to another cluster, keep in mind that the same offsets will be used. To avoid this, give different name to this source.
The default local parallelism for this processor is 1.
Predicate/projection class requirements
The classes implementingpredicateFn
andprojectionFn
need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath inJobConfig
. The same is true for the class of the objects stored in the cache itself. If you cannot meet these conditions, useremoteCacheJournal(String, ClientConfig, JournalInitialPosition)
and add a subsequentmap
orfilter
stage.- Type Parameters:
T
- type of emitted item- Parameters:
cacheName
- the name of the cacheclientConfig
- configuration for the client to connect to the remote clusterinitialPos
- describes which event to start receiving fromprojectionFn
- the projection to map the events. If the projection returns anull
for an item, that item will be filtered out. You may useUtil.cacheEventToEntry()
to extract just the key and the new value. It must be stateless and cooperative.predicateFn
- the predicate to filter the events. You may useUtil.cachePutEvents()
to pass onlyCREATED
andUPDATED
events. It must be stateless and cooperative.
-
remoteCacheJournal
@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> remoteCacheJournal(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos) Convenience forremoteCacheJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass onlyCREATED
andUPDATED
events and will project the event's key and new value into aMap.Entry
. -
list
Returns a source that emits items retrieved from a HazelcastIList
. All elements are emitted on a single member — the one where the entire list is stored by the IMDG.If the
IList
is modified while being read, the source may miss and/or duplicate some entries.The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
One instance of this processor runs only on the member that owns the list.
-
list
Returns a source that emits items retrieved from a HazelcastIList
. All elements are emitted on a single member — the one where the entire list is stored by the IMDG.If the
IList
is modified while being read, the source may miss and/or duplicate some entries.NOTE: Jet only remembers the name of the list you supply and acquires a list with that name on the local cluster. If you supply a list instance from another cluster, no error will be thrown to indicate this.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
One instance of this processor runs only on the member that owns the list.
-
remoteList
@Nonnull public static <T> BatchSource<T> remoteList(@Nonnull String listName, @Nonnull ClientConfig clientConfig) Returns a source that emits items retrieved from a HazelcastIList
in a remote cluster identified by the suppliedClientConfig
. All elements are emitted on a single member.If the
IList
is modified while being read, the source may miss and/or duplicate some entries.The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
Only 1 instance of this processor runs in the cluster.
-
socket
@Nonnull public static StreamSource<String> socket(@Nonnull String host, int port, @Nonnull Charset charset) Returns a source which connects to the specified socket and emits lines of text received from it. It decodes the text using the suppliedcharset
.Each underlying processor opens its own TCP connection, so there will be
clusterSize * localParallelism
open connections to the server.The source completes when the server closes the socket. It never attempts to reconnect. Any
IOException
will cause the job to fail.The source does not save any state to snapshot. On job restart, it will emit whichever items the server sends. The implementation uses non-blocking API, the processor is cooperative.
The default local parallelism for this processor is 1.
-
socket
Convenience forsocket(host, port, charset)
with UTF-8 as the charset.- Parameters:
host
- the hostname to connect toport
- the port to connect to
-
filesBuilder
-
files
A source to read all files in a directory in a batch way.This method is a shortcut for:
filesBuilder(directory) .charset(UTF_8) .glob(GLOB_WILDCARD) .sharedFileSystem(false) .build((fileName, line) -> line)
If files are appended to while being read, the addition might or might not be emitted or part of a line can be emitted. If files are modified in more complex ways, the behavior is undefined.
See
filesBuilder(String)
. -
json
A source to read all files in a directory in a batch way. The source expects the content of the files as streaming JSON content, where each JSON string is separated by a new-line. The JSON string itself can span on multiple lines. The source converts each JSON string to an object of given type.This method is a shortcut for:
filesBuilder(directory) .charset(UTF_8) .glob(GLOB_WILDCARD) .sharedFileSystem(false) .build(path -> JsonUtil.beanSequenceFrom(path, type))
If files are appended to while being read, the addition might or might not be emitted or part of a line can be emitted. If files are modified in more complex ways, the behavior is undefined.
- Since:
- Jet 4.2
-
json
Convenience forjson(String, Class)
which converts each JSON string to aMap
. It will throwClassCastException
if JSON string is just primitive (String
,Number
,Boolean
) or JSON array (List
).- Since:
- Jet 4.2
-
fileWatcher
A source to stream lines added to files in a directory. This is a streaming source, it will watch directory and emit lines as they are appended to files in that directory.This method is a shortcut for:
filesBuilder(directory) .charset(UTF_8) .glob(GLOB_WILDCARD) .sharedFileSystem(false) .buildWatcher((fileName, line) -> line)
Appending lines using an text editor
If you're testing this source, you might think of using a text editor to append the lines. However, it might not work as expected because some editors write to a temp file and then rename it or append extra newline character at the end which gets overwritten if more text is added in the editor. The best way to append is to useecho text >> yourFile
.See
filesBuilder(String)
. -
jsonWatcher
@Nonnull public static <T> StreamSource<T> jsonWatcher(@Nonnull String watchedDirectory, @Nonnull Class<T> type) A source to stream lines added to files in a directory. This is a streaming source, it will watch directory and emit objects of giventype
by converting each line as they are appended to files in that directory.This method is a shortcut for:
filesBuilder(directory) .charset(UTF_8) .glob(GLOB_WILDCARD) .sharedFileSystem(false) .buildWatcher((fileName, line) -> JsonUtil.beanFrom(line, type))
Appending lines using an text editor
If you're testing this source, you might think of using a text editor to append the lines. However, it might not work as expected because some editors write to a temp file and then rename it or append extra newline character at the end which gets overwritten if more text is added in the editor. The best way to append is to useecho text >> yourFile
.- Since:
- Jet 4.2
-
jsonWatcher
@Nonnull public static StreamSource<Map<String,Object>> jsonWatcher(@Nonnull String watchedDirectory) Convenience forjsonWatcher(String, Class)
which converts each line appended to theMap
representation of the JSON string.- Since:
- Jet 4.2
-
jmsQueue
@Nonnull @Deprecated public static StreamSource<jakarta.jms.Message> jmsQueue(@Nonnull SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier, @Nonnull String name) Deprecated. -
jmsQueue
@Nonnull public static StreamSource<jakarta.jms.Message> jmsQueue(@Nonnull String name, @Nonnull SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier) Shortcut equivalent to:return jmsQueueBuilder(factorySupplier) .destinationName(name) .build();
This version creates a connection without any authentication parameters. JMSMessage
objects are emitted to downstream.Note:
Message
might not be serializable. In that case you can use the builder and add a projection.- Parameters:
name
- the name of the queuefactorySupplier
- supplier to obtain JMS connection factory. It must be stateless.- Since:
- Jet 4.1
-
jmsQueueBuilder
@Nonnull public static JmsSourceBuilder jmsQueueBuilder(SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier) Returns a builder object that offers a step-by-step fluent API to build a custom JMSStreamSource
for the Pipeline API. See javadoc onJmsSourceBuilder
methods for more details.This source uses the JMS' message timestamp as the native timestamp, if enabled.
This source supports exactly-once and at-least-once mode, see
JmsSourceBuilder.maxGuarantee(ProcessingGuarantee)
for more information.IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.
The default local parallelism for this processor is 1.
- Parameters:
factorySupplier
- supplier to obtain JMS connection factory. It must be stateless.
-
jmsTopic
@Nonnull @Deprecated public static StreamSource<jakarta.jms.Message> jmsTopic(@Nonnull SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier, @Nonnull String name) Deprecated. -
jmsTopic
@Nonnull public static StreamSource<jakarta.jms.Message> jmsTopic(@Nonnull String name, @Nonnull SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier) Shortcut equivalent to:return jmsTopicBuilder(factorySupplier) .destinationName(name) .build();
This version creates a connection without any authentication parameters. A non-durable, non-shared consumer is used, only one member will connect to the broker. JMSMessage
objects are emitted to downstream.Note:
Message
might not be serializable. In that case you can use the builder and add a projection.- Parameters:
name
- the name of the queuefactorySupplier
- supplier to obtain JMS connection factory. It must be stateless.- Since:
- Jet 4.1
-
jmsTopicBuilder
@Nonnull public static JmsSourceBuilder jmsTopicBuilder(SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier) Returns a builder object that offers a step-by-step fluent API to build a custom JMSStreamSource
for the Pipeline API. See javadoc onJmsSourceBuilder
methods for more details.By default, a non-shared consumer is used. This forces the source to run on only one member of the cluster. You can use
JmsSourceBuilder.consumerFn(FunctionEx)
to create a shared consumer.This source uses the JMS' message timestamp as the native timestamp, if enabled.
This source supports exactly-once and at-least-once mode for durable topic subscriptions, see
JmsSourceBuilder.maxGuarantee(ProcessingGuarantee)
for more information.IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.
The default local parallelism for this processor is 1.
- Parameters:
factorySupplier
- supplier to obtain JMS connection factory. It must be stateless.
-
jdbc
@Nonnull public static <T> BatchSource<T> jdbc(@Nonnull SupplierEx<? extends Connection> newConnectionFn, @Nonnull ToResultSetFunction resultSetFn, @Nonnull FunctionEx<? super ResultSet, ? extends T> createOutputFn) Returns a source which connects to the specified database using the givennewConnectionFn
, queries the database and creates a result set using the the givenresultSetFn
. It creates output objects from theResultSet
using givenmapOutputFn
and emits them to downstream.resultSetFn
gets the created connection, total parallelism (local parallelism * member count) and global processor index as arguments and produces a result set. The parallelism and processor index arguments should be used to fetch a part of the whole result set specific to the processor. If the table itself isn't partitioned by the same key, then running multiple queries might not really be faster than using the simpler version of this method, do your own testing.createOutputFn
gets theResultSet
and creates desired output object. The function is called for each row of the result set, user should not callResultSet.next()
or any other cursor-navigating functions.Example:
p.readFrom(Sources.jdbc( () -> DriverManager.getConnection(DB_CONNECTION_URL), (con, parallelism, index) -> { PreparedStatement stmt = con.prepareStatement("SELECT * FROM TABLE WHERE MOD(id, ?) = ?)"); stmt.setInt(1, parallelism); stmt.setInt(2, index); return stmt.executeQuery(); }, resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
If the underlying table is modified while being read, the source may miss and/or duplicate some entries, because multiple queries for parts of the data on multiple members will be executed.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
Any
SQLException
will cause the job to fail.The default local parallelism for this processor is 1.
The given functions must be stateless.
-
jdbc
@Nonnull public static <T> BatchSource<T> jdbc(@Nonnull DataConnectionRef dataConnectionRef, @Nonnull ToResultSetFunction resultSetFn, @Nonnull FunctionEx<? super ResultSet, ? extends T> createOutputFn) Returns a source which connects to the specified database using the givendataConnectionRef
, queries the database and creates a result set using the givenresultSetFn
. It creates output objects from theResultSet
using givenmapOutputFn
and emits them to downstream.Example:
(Prerequisite) Data connection configuration:
Config config = smallInstanceConfig(); Properties properties = new Properties(); properties.setProperty("jdbcUrl", jdbcUrl); properties.setProperty("username", username); properties.setProperty("password", password); DataConnectionConfig dataConnectionConfig = new DataConnectionConfig() .setName("my-jdbc-data-connection") .setType("Jdbc") .setProperties(properties); config.getDataConnectionConfigs().put(name, dataConnectionConfig);
Pipeline configuration
p.readFrom(Sources.jdbc( DataConnectionRef.dataConnectionRef("my-jdbc-data-connection"), (con, parallelism, index) -> { PreparedStatement stmt = con.prepareStatement("SELECT * FROM TABLE WHERE MOD(id, ?) = ?)"); stmt.setInt(1, parallelism); stmt.setInt(2, index); return stmt.executeQuery(); }, resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
- Since:
- 5.3
-
jdbc
@Nonnull public static <T> BatchSource<T> jdbc(@Nonnull String connectionURL, @Nonnull String query, @Nonnull FunctionEx<? super ResultSet, ? extends T> createOutputFn) Convenience forjdbc(SupplierEx, ToResultSetFunction, FunctionEx)
. A non-distributed, single-worker source which fetches the whole resultSet with a single query on single member.This method executes exactly one query in the target database. If the underlying table is modified while being read, the behavior depends on the configured transaction isolation level in the target database. Refer to the documentation for the target database system.
Example:
The given function must be stateless.p.readFrom(Sources.jdbc( DB_CONNECTION_URL, "select ID, NAME from PERSON", resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
-
jdbc
@Nonnull public static <T> BatchSource<T> jdbc(@Nonnull String connectionURL, @Nonnull String query, @Nonnull Properties properties, @Nonnull FunctionEx<? super ResultSet, ? extends T> createOutputFn) Same as @{linkjdbc(String, String, FunctionEx)
}It is not always possible to use the default properties. This overload allows passing some properties to the JDBC driver
Example for PostgreSQL to specify fetchSize: PostgreSQL requires that the autocommit should be disabled. Because the backend closes cursors at the end of transactions, so in autocommit enabled mode the backend will have closed the cursor before anything can be fetched from it.
Properties properties = new Properties(); properties.setProperty(JdbcPropertyKeys.FETCH_SIZE, "5"); properties.setProperty(JdbcPropertyKeys.AUTO_COMMIT, "false"); p.readFrom(Sources.jdbc( "jdbc:postgresql://localhost:5432/mydatabase", "select ID, NAME from PERSON", properties resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
Example for MySQL to specify fetchSize: The database connection URL should have "&useCursorFetch=true" parameter to enable cursor-based fetching. This means that the JDBC driver will fetch a set of rows from the database at a time, rather than fetching all the rows in the result set at once
Properties properties = new Properties(); properties.setProperty(JdbcPropertyKeys.FETCH_SIZE, "5"); p.readFrom(Sources.jdbc( "jdbc:mysql://localhost:3306/mydatabase?useCursorFetch=true," "select ID, NAME from PERSON", properties resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
-