public final class Sources extends Object
Pipeline.readFrom(BatchSource)
and you will obtain the initial BatchStage
. You can then
attach further stages to it.
The same pipeline may contain more than one source, each starting its own branch. The branches may be merged with multiple-input transforms such as co-group and hash-join.
The default local parallelism for sources in this class is 1 or 2, check the documentation of individual methods.
Modifier and Type | Method and Description |
---|---|
static <T> BatchSource<T> |
batchFromProcessor(String sourceName,
ProcessorMetaSupplier metaSupplier)
Returns a bounded (batch) source constructed directly from the given
Core API processor meta-supplier.
|
static <K,V> BatchSource<Map.Entry<K,V>> |
cache(String cacheName)
Returns a source that fetches entries from a Hazelcast
ICache
with the given name and emits them as Map.Entry . |
static <K,V> StreamSource<Map.Entry<K,V>> |
cacheJournal(String cacheName,
JournalInitialPosition initialPos)
Convenience for
cacheJournal(String, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass only CREATED and UPDATED
events and will project the event's key and new value into a Map.Entry . |
static <T,K,V> StreamSource<T> |
cacheJournal(String cacheName,
JournalInitialPosition initialPos,
FunctionEx<? super EventJournalCacheEvent<K,V>,? extends T> projectionFn,
PredicateEx<? super EventJournalCacheEvent<K,V>> predicateFn)
Returns a source that will stream the
EventJournalCacheEvent
events of a Hazelcast ICache with the specified name. |
static BatchSource<String> |
files(String directory)
A source to read all files in a directory in a batch way.
|
static FileSourceBuilder |
filesBuilder(String directory)
Returns a builder object that offers a step-by-step fluent API to build
a custom source to read files for the Pipeline API.
|
static StreamSource<String> |
fileWatcher(String watchedDirectory)
A source to stream lines added to files in a directory.
|
static <T> BatchSource<T> |
jdbc(DataConnectionRef dataConnectionRef,
ToResultSetFunction resultSetFn,
FunctionEx<? super ResultSet,? extends T> createOutputFn)
Returns a source which connects to the specified database using the given
dataConnectionRef , queries the database and creates a result set
using the given resultSetFn . |
static <T> BatchSource<T> |
jdbc(String connectionURL,
String query,
FunctionEx<? super ResultSet,? extends T> createOutputFn)
Convenience for
jdbc(SupplierEx,
ToResultSetFunction, FunctionEx) . |
static <T> BatchSource<T> |
jdbc(SupplierEx<? extends Connection> newConnectionFn,
ToResultSetFunction resultSetFn,
FunctionEx<? super ResultSet,? extends T> createOutputFn)
Returns a source which connects to the specified database using the given
newConnectionFn , queries the database and creates a result set
using the the given resultSetFn . |
static StreamSource<javax.jms.Message> |
jmsQueue(String name,
SupplierEx<? extends javax.jms.ConnectionFactory> factorySupplier)
Shortcut equivalent to:
|
static StreamSource<javax.jms.Message> |
jmsQueue(SupplierEx<? extends javax.jms.ConnectionFactory> factorySupplier,
String name)
Deprecated.
|
static JmsSourceBuilder |
jmsQueueBuilder(SupplierEx<? extends javax.jms.ConnectionFactory> factorySupplier)
Returns a builder object that offers a step-by-step fluent API to build
a custom JMS
StreamSource for the Pipeline API. |
static StreamSource<javax.jms.Message> |
jmsTopic(String name,
SupplierEx<? extends javax.jms.ConnectionFactory> factorySupplier)
Shortcut equivalent to:
|
static StreamSource<javax.jms.Message> |
jmsTopic(SupplierEx<? extends javax.jms.ConnectionFactory> factorySupplier,
String name)
Deprecated.
|
static JmsSourceBuilder |
jmsTopicBuilder(SupplierEx<? extends javax.jms.ConnectionFactory> factorySupplier)
Returns a builder object that offers a step-by-step fluent API to build
a custom JMS
StreamSource for the Pipeline API. |
static BatchSource<Map<String,Object>> |
json(String directory)
Convenience for
json(String, Class) which converts each
JSON string to a Map . |
static <T> BatchSource<T> |
json(String directory,
Class<T> type)
A source to read all files in a directory in a batch way.
|
static StreamSource<Map<String,Object>> |
jsonWatcher(String watchedDirectory)
Convenience for
jsonWatcher(String, Class) which converts each
line appended to the Map representation of the JSON string. |
static <T> StreamSource<T> |
jsonWatcher(String watchedDirectory,
Class<T> type)
A source to stream lines added to files in a directory.
|
static <T> BatchSource<T> |
list(IList<? extends T> list)
Returns a source that emits items retrieved from a Hazelcast
IList . |
static <T> BatchSource<T> |
list(String listName)
Returns a source that emits items retrieved from a Hazelcast
IList . |
static <K,V> BatchSource<Map.Entry<K,V>> |
map(IMap<? extends K,? extends V> map)
Returns a source that fetches entries from the given Hazelcast
IMap and emits them as Map.Entry . |
static <T,K,V> BatchSource<T> |
map(IMap<? extends K,? extends V> map,
Predicate<K,V> predicate,
Projection<? super Map.Entry<K,V>,? extends T> projection)
Returns a source that fetches entries from the given Hazelcast
IMap . |
static <K,V> BatchSource<Map.Entry<K,V>> |
map(String mapName)
Returns a source that fetches entries from a local Hazelcast
IMap
with the specified name and emits them as Map.Entry . |
static <T,K,V> BatchSource<T> |
map(String mapName,
Predicate<K,V> predicate,
Projection<? super Map.Entry<K,V>,? extends T> projection)
Returns a source that fetches entries from a local Hazelcast
IMap with the specified name. |
static <K,V> StreamSource<Map.Entry<K,V>> |
mapJournal(IMap<? extends K,? extends V> map,
JournalInitialPosition initialPos)
Convenience for
mapJournal(IMap, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass only ADDED and UPDATED
events and will project the event's key and new value into a Map.Entry . |
static <T,K,V> StreamSource<T> |
mapJournal(IMap<? extends K,? extends V> map,
JournalInitialPosition initialPos,
FunctionEx<? super EventJournalMapEvent<K,V>,? extends T> projectionFn,
PredicateEx<? super EventJournalMapEvent<K,V>> predicateFn)
Returns a source that will stream
EventJournalMapEvent s of the
given Hazelcast IMap . |
static <K,V> StreamSource<Map.Entry<K,V>> |
mapJournal(String mapName,
JournalInitialPosition initialPos)
Convenience for
mapJournal(String, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass only ADDED and
UPDATED events and will project the
event's key and new value into a Map.Entry . |
static <T,K,V> StreamSource<T> |
mapJournal(String mapName,
JournalInitialPosition initialPos,
FunctionEx<? super EventJournalMapEvent<K,V>,? extends T> projectionFn,
PredicateEx<? super EventJournalMapEvent<K,V>> predicateFn)
Returns a source that will stream
EventJournalMapEvent s of the
Hazelcast IMap with the specified name. |
static <K,V> BatchSource<Map.Entry<K,V>> |
remoteCache(String cacheName,
ClientConfig clientConfig)
Returns a source that fetches entries from the Hazelcast
ICache
with the specified name in a remote cluster identified by the supplied
ClientConfig and emits them as Map.Entry . |
static <K,V> StreamSource<Map.Entry<K,V>> |
remoteCacheJournal(String cacheName,
ClientConfig clientConfig,
JournalInitialPosition initialPos)
Convenience for
remoteCacheJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass only
CREATED
and UPDATED
events and will project the event's key and new value
into a Map.Entry . |
static <T,K,V> StreamSource<T> |
remoteCacheJournal(String cacheName,
ClientConfig clientConfig,
JournalInitialPosition initialPos,
FunctionEx<? super EventJournalCacheEvent<K,V>,? extends T> projectionFn,
PredicateEx<? super EventJournalCacheEvent<K,V>> predicateFn)
Returns a source that will stream the
EventJournalCacheEvent
events of the Hazelcast ICache with the specified name from a
remote cluster. |
static <T> BatchSource<T> |
remoteList(String listName,
ClientConfig clientConfig)
Returns a source that emits items retrieved from a Hazelcast
IList in a remote cluster identified by the supplied ClientConfig . |
static <K,V> BatchSource<Map.Entry<K,V>> |
remoteMap(String mapName,
ClientConfig clientConfig)
Returns a source that fetches entries from the Hazelcast
IMap
with the specified name in a remote cluster identified by the supplied
ClientConfig and emits them as Map.Entry . |
static <T,K,V> BatchSource<T> |
remoteMap(String mapName,
ClientConfig clientConfig,
Predicate<K,V> predicate,
Projection<? super Map.Entry<K,V>,? extends T> projection)
Returns a source that fetches entries from a remote Hazelcast
IMap with the specified name in a remote cluster identified by the
supplied ClientConfig . |
static <K,V> StreamSource<Map.Entry<K,V>> |
remoteMapJournal(String mapName,
ClientConfig clientConfig,
JournalInitialPosition initialPos)
Convenience for
remoteMapJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass only ADDED
and UPDATED events and will
project the event's key and new value into a Map.Entry . |
static <T,K,V> StreamSource<T> |
remoteMapJournal(String mapName,
ClientConfig clientConfig,
JournalInitialPosition initialPos,
FunctionEx<? super EventJournalMapEvent<K,V>,? extends T> projectionFn,
PredicateEx<? super EventJournalMapEvent<K,V>> predicateFn)
Returns a source that will stream the
EventJournalMapEvent
events of the Hazelcast IMap with the specified name from a
remote cluster. |
static <K,V> StreamSource<Map.Entry<K,V>> |
remoteMapJournal(String mapName,
DataConnectionRef dataConnectionRef,
JournalInitialPosition initialPos)
Convenience for
remoteMapJournal(String, DataConnectionRef, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass only ADDED
and UPDATED events and will
project the event's key and new value into a Map.Entry . |
static <T,K,V> StreamSource<T> |
remoteMapJournal(String mapName,
DataConnectionRef dataConnectionRef,
JournalInitialPosition initialPos,
FunctionEx<? super EventJournalMapEvent<K,V>,? extends T> projectionFn,
PredicateEx<? super EventJournalMapEvent<K,V>> predicateFn)
The same as the
remoteMapJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx)
method. |
static StreamSource<String> |
socket(String host,
int port)
Convenience for
socket(host, port, charset) with
UTF-8 as the charset. |
static StreamSource<String> |
socket(String host,
int port,
Charset charset)
Returns a source which connects to the specified socket and emits lines
of text received from it.
|
static <T> StreamSource<T> |
streamFromProcessor(String sourceName,
ProcessorMetaSupplier metaSupplier)
Returns an unbounded (event stream) source constructed directly from the given
Core API processor meta-supplier.
|
static <T> StreamSource<T> |
streamFromProcessorWithWatermarks(String sourceName,
boolean supportsNativeTimestamps,
FunctionEx<EventTimePolicy<? super T>,ProcessorMetaSupplier> metaSupplierFn)
Returns an unbounded (event stream) source that will use the supplied
function to create processor meta-suppliers as required by the Core API.
|
@Nonnull public static <T> BatchSource<T> batchFromProcessor(@Nonnull String sourceName, @Nonnull ProcessorMetaSupplier metaSupplier)
sourceName
- user-friendly source namemetaSupplier
- the processor meta-supplier@Nonnull public static <T> StreamSource<T> streamFromProcessorWithWatermarks(@Nonnull String sourceName, boolean supportsNativeTimestamps, @Nonnull FunctionEx<EventTimePolicy<? super T>,ProcessorMetaSupplier> metaSupplierFn)
EventTimePolicy
and it must return a meta-supplier of processors that will act according
to the parameters in the policy and must emit the watermark items as the
policy specifies.
If you are implementing a custom source processor, be sure to check out
the EventTimeMapper
class that will help you correctly implement
watermark emission.
sourceName
- user-friendly source namesupportsNativeTimestamps
- true, if the processor is able to workmetaSupplierFn
- factory of processor meta-suppliers. Since Jet
4.3 this argument changed from Function to FunctionEx
to support serializability.@Nonnull public static <T> StreamSource<T> streamFromProcessor(@Nonnull String sourceName, @Nonnull ProcessorMetaSupplier metaSupplier)
sourceName
- user-friendly source namemetaSupplier
- the processor meta-supplier@Nonnull public static <K,V> BatchSource<Map.Entry<K,V>> map(@Nonnull String mapName)
IMap
with the specified name and emits them as Map.Entry
. It leverages
data locality by making each of the underlying processors fetch only those
entries that are stored on the member where it is running.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.
The default local parallelism for this processor is 1.
@Nonnull public static <K,V> BatchSource<Map.Entry<K,V>> map(@Nonnull IMap<? extends K,? extends V> map)
IMap
and emits them as Map.Entry
. It leverages data locality
by making each of the underlying processors fetch only those entries
that are stored on the member where it is running.
NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.
The default local parallelism for this processor is 1.
@Nonnull public static <T,K,V> BatchSource<T> map(@Nonnull String mapName, @Nonnull Predicate<K,V> predicate, @Nonnull Projection<? super Map.Entry<K,V>,? extends T> projection)
IMap
with the specified name. By supplying a predicate
and
projection
here instead of in separate map/filter
transforms you allow the source to apply these functions early, before
generating any output, with the potential of significantly reducing
data traffic. If your data is stored in the IMDG using the
portable serialization format, there are additional optimizations
available when using Projections.singleAttribute(java.lang.String)
and
Projections.multiAttribute(java.lang.String...)
) to create your projection instance and
using the Predicates
factory or PredicateBuilder
to create the predicate. In this case Jet can test the predicate and
apply the projection without deserializing the whole object.
Due to the current limitations in the way Jet reads the map it can't use any indexes on the map. It will always scan the map in full.
The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.
The default local parallelism for this processor is 1.
predicate
and projection
need
to be available on the cluster's classpath or loaded using
Hazelcast User Code Deployment. It's not enough to add them to
the job classpath in JobConfig
. The same is true for the class of
the objects stored in the map itself. If you cannot meet these
requirements, use map(String)
and add a subsequent
map
or filter
stage.T
- type of emitted itemmapName
- the name of the mappredicate
- the predicate to filter the events. If you want to specify just the
projection, use Predicates.alwaysTrue()
as a pass-through
predicate. It must be stateless and cooperative.projection
- the projection to map the events. If the projection returns a null
for an item, that item will be filtered out. If you want to
specify just the predicate, use Projections.identity()
. It must
be stateless and cooperative.@Nonnull public static <T,K,V> BatchSource<T> map(@Nonnull IMap<? extends K,? extends V> map, @Nonnull Predicate<K,V> predicate, @Nonnull Projection<? super Map.Entry<K,V>,? extends T> projection)
IMap
. By supplying a predicate
and projection
here
instead of in separate map/filter
transforms you allow the
source to apply these functions early, before generating any output,
with the potential of significantly reducing data traffic.
NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
If your data is stored in the IMDG using the
portable serialization format, there are additional optimizations
available when using Projections.singleAttribute(java.lang.String)
and
Projections.multiAttribute(java.lang.String...)
) to create your projection instance
and using the Predicates
factory or PredicateBuilder
to create the predicate. In this case Jet can test the predicate and
apply the projection without deserializing the whole object.
Due to the current limitations in the way Jet reads the map it can't use any indexes on the map. It will always scan the map in full.
The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.
The default local parallelism for this processor 1.
The classes implementing predicate
and projection
need
to be available on the cluster's classpath or loaded using
Hazelcast User Code Deployment. It's not enough to add them to
the job classpath in JobConfig
. The same is true for the class
of the objects stored in the map itself. If you cannot meet these
requirements, use map(String)
and add a subsequent
map
or filter
stage.
T
- type of emitted itemmap
- the Hazelcast map to read data frompredicate
- the predicate to filter the events. If you want to specify just the
projection, use Predicates.alwaysTrue()
as a pass-through
predicate. It must be stateless and cooperative.projection
- the projection to map the events. If the projection returns a null
for an item, that item will be filtered out. If you want to
specify just the predicate, use Projections.identity()
. It must
be stateless and cooperative.@Nonnull public static <T,K,V> StreamSource<T> mapJournal(@Nonnull String mapName, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalMapEvent<K,V>,? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalMapEvent<K,V>> predicateFn)
EventJournalMapEvent
s of the
Hazelcast IMap
with the specified name. By supplying a predicate
and projection
here instead of in separate map/filter
transforms you allow the source to apply these functions
early, before generating any output, with the potential of significantly
reducing data traffic.
The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
To use an IMap
as a streaming source, you must configure the event journal
for it. The journal has fixed capacity and
will drop events if it overflows.
The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).
If you start a new job from an exported state, you can change the source parameters as needed.
The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
predicateFn
and projectionFn
need to be available on the cluster's classpath or loaded using
Hazelcast User Code Deployment. It's not enough to add them to
the job classpath in JobConfig
. The same is true for the class
of the objects stored in the map itself. If you cannot meet these
requirements, use mapJournal(String, JournalInitialPosition)
and
add a subsequent map
or filter
stage.T
- type of emitted itemmapName
- the name of the mapinitialPos
- describes which event to start receiving fromprojectionFn
- the projection to map the events. If the projection returns a null
for an item, that item will be filtered out. You may use Util.mapEventToEntry()
to extract just the key and
the new value. It must be stateless and cooperative.predicateFn
- the predicate to filter the events. If you want to specify just the
projection, use Util.mapPutEvents()
to pass
only ADDED
and
UPDATED
events. It must be stateless and
cooperative.@Nonnull public static <T,K,V> StreamSource<T> mapJournal(@Nonnull IMap<? extends K,? extends V> map, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalMapEvent<K,V>,? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalMapEvent<K,V>> predicateFn)
EventJournalMapEvent
s of the
given Hazelcast IMap
. By supplying a predicate
and projection
here instead of in separate map/filter
transforms you
allow the source to apply these functions early, before generating any
output, with the potential of significantly reducing data traffic.
NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
To use an IMap
as a streaming source, you must configure the event journal
for it. The journal has fixed capacity and
will drop events if it
overflows.
The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).
If you start a new job from an exported state, you can change the source parameters as needed.
The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
predicateFn
and projectionFn
need to be available on the cluster's classpath or loaded using
Hazelcast User Code Deployment. It's not enough to add them to
the job classpath in JobConfig
. The same is true for the class
of the objects stored in the map itself. If you cannot meet these
requirements, use mapJournal(String, JournalInitialPosition)
and add a subsequent map
or
filter
stage.
snapshotInterval + timeToRestart + normalEventLag
. The
reason for this behavior that the default partition count in the cluster
is pretty high and cannot by changed per object and for low-traffic maps
it takes long until all partitions see an event to allow emitting of a
coalesced watermark.T
- type of emitted itemmap
- the map to read data frominitialPos
- describes which event to start receiving fromprojectionFn
- the projection to map the events. If the projection returns a null
for an item, that item will be filtered out. You may use Util.mapEventToEntry()
to extract just the key and the new value. It
must be stateless and cooperative.predicateFn
- the predicate to filter the events. If you want to specify just the
projection, use Util.mapPutEvents()
to pass only ADDED
and UPDATED
events. It must be stateless and cooperative.@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> mapJournal(@Nonnull String mapName, @Nonnull JournalInitialPosition initialPos)
mapJournal(String, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass only ADDED
and
UPDATED
events and will project the
event's key and new value into a Map.Entry
.@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> mapJournal(@Nonnull IMap<? extends K,? extends V> map, @Nonnull JournalInitialPosition initialPos)
mapJournal(IMap, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass only ADDED
and UPDATED
events and will project the event's key and new value into a Map.Entry
.
NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
@Nonnull public static <K,V> BatchSource<Map.Entry<K,V>> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig)
IMap
with the specified name in a remote cluster identified by the supplied
ClientConfig
and emits them as Map.Entry
.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If the IMap
is modified while being read, or if there is a
cluster topology change (triggering data migration), the source may miss
and/or duplicate some entries. If we detect a topology change, the job
will fail, but the detection is only on a best-effort basis - we might
still give incorrect results without reporting a failure. Concurrent
mutation is not detected at all.
The default local parallelism for this processor is 1.
@Nonnull public static <T,K,V> BatchSource<T> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull Predicate<K,V> predicate, @Nonnull Projection<? super Map.Entry<K,V>,? extends T> projection)
IMap
with the specified name in a remote cluster identified by the
supplied ClientConfig
. By supplying a predicate
and
projection
here instead of in separate map/filter
transforms you allow the source to apply these functions early, before
generating any output, with the potential of significantly reducing
data traffic. If your data is stored in the IMDG using the
portable serialization format, there are additional optimizations
available when using Projections.singleAttribute(java.lang.String)
and Projections.multiAttribute(java.lang.String...)
) to create your projection instance and
using the Predicates
factory or
PredicateBuilder
to create
the predicate. In this case Jet can test the predicate and apply the
projection without deserializing the whole object.
Due to the current limitations in the way Jet reads the map it can't use any indexes on the map. It will always scan the map in full.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If the IMap
is modified while being read, or if there is a
cluster topology change (triggering data migration), the source may miss
and/or duplicate some entries. If we detect a topology change, the job
will fail, but the detection is only on a best-effort basis - we might
still give incorrect results without reporting a failure. Concurrent
mutation is not detected at all.
The default local parallelism for this processor is 1.
predicate
and projection
need
to be available on the remote cluster's classpath or loaded using
Hazelcast User Code Deployment. It's not enough to add them to
the job classpath in JobConfig
. The same is true for the class
of the objects stored in the map itself. If you cannot meet these
conditions, use remoteMap(String, ClientConfig)
and add a
subsequent map
or filter
stage.T
- type of emitted itemmapName
- the name of the mappredicate
- the predicate to filter the events. If you want to specify just the
projection, use Predicates.alwaysTrue()
as a pass-through
predicate. It must be stateless and cooperative.projection
- the projection to map the events. If the projection returns a null
for an item, that item will be filtered out. If you want to
specify just the predicate, use Projections.identity()
. It must
be stateless and cooperative.@Nonnull public static <T,K,V> StreamSource<T> remoteMapJournal(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalMapEvent<K,V>,? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalMapEvent<K,V>> predicateFn)
EventJournalMapEvent
events of the Hazelcast IMap
with the specified name from a
remote cluster. By supplying a predicate
and projection
here instead of in separate map/filter
transforms you allow the
source to apply these functions early, before generating any output,
with the potential of significantly reducing data traffic.
To use an IMap
as a streaming source, you must configure the event journal
for it. The journal has fixed capacity and
will drop events if it overflows.
The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).
If you start a new job from an exported state, you can change the source parameters as needed. If you connect to another cluster, keep in mind that the same offsets will be used. To avoid this, give different name to this source.
The default local parallelism for this processor is 1.
predicateFn
and projectionFn
need to be available on the remote cluster's classpath or loaded using
Hazelcast User Code Deployment. It's not enough to add them to
the job classpath in JobConfig
. The same is true for the class
of the objects stored in the map itself. If you cannot meet these
requirements, use remoteMapJournal(String, ClientConfig, JournalInitialPosition)
and add a subsequent map
or
filter
stage.K
- type of keyV
- type of valueT
- type of emitted itemmapName
- the name of the mapclientConfig
- configuration for the client to connect to the remote clusterinitialPos
- describes which event to start receiving fromprojectionFn
- the projection to map the events. If the projection returns a null
for an item, that item will be filtered out. You may use Util.mapEventToEntry()
to extract just the key and the new value. It
must be stateless and cooperative.predicateFn
- the predicate to filter the events. You may use Util.mapPutEvents()
to pass only ADDED
and
UPDATED
events. It must be stateless and
cooperative.@Nonnull @Beta public static <T,K,V> StreamSource<T> remoteMapJournal(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalMapEvent<K,V>,? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalMapEvent<K,V>> predicateFn)
remoteMapJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx)
method. The only difference is instead of a ClientConfig parameter that
is used to connect to remote cluster, this method receives a
DataConnectionConfig.
The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used
(Prerequisite) External dataConnection configuration:
Use HazelcastDataConnection.CLIENT_XML
for XML or
use HazelcastDataConnection.CLIENT_YML
for YAML string.
Config config = ...;
String xmlString = ...;
DataConnectionConfig dataConnectionConfig = new DataConnectionConfig()
.setName("my-hzclient-data-connection")
.setType("Hz")
.setProperty(HzClientDataConnectionFactory.CLIENT_XML, xmlString);
config.addDataConnectionConfig(dataConnectionConfig);
Pipeline configuration
PredicateEx<EventJournalMapEvent<String, Integer>> predicate = ...;
p.readFrom(Sources.remoteMapJournal(
mapName,
DataConnectionRef.dataConnectionRef("my-hzclient-data-connection"),
JournalInitialPosition.START_FROM_OLDEST,
EventJournalMapEvent::getNewValue,
predicate
));
T
- is the return type of the streamK
- is the key type of EventJournalMapEventV
- is the vale type of EventJournalMapEventmapName
- the name of the mapdataConnectionRef
- the reference to DataConnectionConfiginitialPos
- describes which event to start receiving fromprojectionFn
- the projection to map the events. If the projection returns a null
for an item, that item will be filtered out. You may use Util.mapEventToEntry()
to extract just the key and
the new value. It must be stateless and cooperative.predicateFn
- the predicate to filter the events. If you want to specify just the
projection, use Util.mapPutEvents()
to pass
only ADDED
and
UPDATED
events. It must be stateless and
cooperative.@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> remoteMapJournal(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos)
remoteMapJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass only ADDED
and UPDATED
events and will
project the event's key and new value into a Map.Entry
.@Nonnull @Beta public static <K,V> StreamSource<Map.Entry<K,V>> remoteMapJournal(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull JournalInitialPosition initialPos)
remoteMapJournal(String, DataConnectionRef, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass only ADDED
and UPDATED
events and will
project the event's key and new value into a Map.Entry
.@Nonnull public static <K,V> BatchSource<Map.Entry<K,V>> cache(@Nonnull String cacheName)
ICache
with the given name and emits them as Map.Entry
. It leverages
data locality by making each of the underlying processors fetch only
those entries that are stored on the member where it is running.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If entries are added or removed to the cache during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.
The default local parallelism for this processor 1.
@Nonnull public static <T,K,V> StreamSource<T> cacheJournal(@Nonnull String cacheName, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalCacheEvent<K,V>,? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalCacheEvent<K,V>> predicateFn)
EventJournalCacheEvent
events of a Hazelcast ICache
with the specified name. By
supplying a predicate
and projection
here instead of
in separate map/filter
transforms you allow the source to apply
these functions early, before generating any output, with the potential
of significantly reducing data traffic.
The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
To use an ICache
as a streaming source, you must configure the event journal
for it. The journal has fixed capacity and
will drop events if it overflows.
The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).
If you start a new job from an exported state, you can change the source parameters as needed.
The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
predicateFn
and projectionFn
need to be available on the cluster's classpath or loaded using
Hazelcast User Code Deployment. It's not enough to add them to
the job classpath in JobConfig
. The same is true for the class
of the objects stored in the cache itself. If you cannot meet these
conditions, use cacheJournal(String, JournalInitialPosition)
and add a subsequent map
or
filter
stage.T
- type of emitted itemcacheName
- the name of the cacheinitialPos
- describes which event to start receiving fromprojectionFn
- the projection to map the events. If the projection returns a null
for an item, that item will be filtered out. You may use Util.cacheEventToEntry()
to extract just the key and the new value. It
must be stateless and cooperative.predicateFn
- the predicate to filter the events. You may use Util.cachePutEvents()
to pass only CREATED
and UPDATED
events. It must be
stateless and cooperative.@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> cacheJournal(@Nonnull String cacheName, @Nonnull JournalInitialPosition initialPos)
cacheJournal(String, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass only CREATED
and UPDATED
events and will project the event's key and new value into a Map.Entry
.@Nonnull public static <K,V> BatchSource<Map.Entry<K,V>> remoteCache(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig)
ICache
with the specified name in a remote cluster identified by the supplied
ClientConfig
and emits them as Map.Entry
.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If entries are added or removed to the cache during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.
The default local parallelism for this processor is 1.
@Nonnull public static <T,K,V> StreamSource<T> remoteCacheJournal(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalCacheEvent<K,V>,? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalCacheEvent<K,V>> predicateFn)
EventJournalCacheEvent
events of the Hazelcast ICache
with the specified name from a
remote cluster. By supplying a predicate
and projection
here instead of in separate map/filter
transforms you allow the
source to apply these functions early, before generating any output,
with the potential of significantly reducing data traffic.
To use an ICache
as a streaming source, you must configure the event journal
for it. The journal has fixed capacity and
will drop events if it overflows.
The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).
If you start a new job from an exported state, you can change the source parameters as needed. If you connect to another cluster, keep in mind that the same offsets will be used. To avoid this, give different name to this source.
The default local parallelism for this processor is 1.
predicateFn
and projectionFn
need to be available on the cluster's classpath or loaded using
Hazelcast User Code Deployment. It's not enough to add them to
the job classpath in JobConfig
. The same is true for the class
of the objects stored in the cache itself. If you cannot meet these
conditions, use remoteCacheJournal(String, ClientConfig, JournalInitialPosition)
and add a subsequent map
or
filter
stage.T
- type of emitted itemcacheName
- the name of the cacheclientConfig
- configuration for the client to connect to the remote clusterinitialPos
- describes which event to start receiving fromprojectionFn
- the projection to map the events. If the projection returns a null
for an item, that item will be filtered out. You may use Util.cacheEventToEntry()
to extract just the key and the new value. It
must be stateless and cooperative.predicateFn
- the predicate to filter the events. You may use Util.cachePutEvents()
to pass only CREATED
and UPDATED
events. It must be
stateless and cooperative.@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> remoteCacheJournal(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos)
remoteCacheJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx)
which will pass only
CREATED
and UPDATED
events and will project the event's key and new value
into a Map.Entry
.@Nonnull public static <T> BatchSource<T> list(@Nonnull String listName)
IList
. All elements are emitted on a single member — the one
where the entire list is stored by the IMDG.
If the IList
is modified while being read, the source may miss
and/or duplicate some entries.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
One instance of this processor runs only on the member that owns the list.
@Nonnull public static <T> BatchSource<T> list(@Nonnull IList<? extends T> list)
IList
. All elements are emitted on a single member — the one
where the entire list is stored by the IMDG.
If the IList
is modified while being read, the source may miss
and/or duplicate some entries.
NOTE: Jet only remembers the name of the list you supply and acquires a list with that name on the local cluster. If you supply a list instance from another cluster, no error will be thrown to indicate this.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
One instance of this processor runs only on the member that owns the list.
@Nonnull public static <T> BatchSource<T> remoteList(@Nonnull String listName, @Nonnull ClientConfig clientConfig)
IList
in a remote cluster identified by the supplied ClientConfig
. All elements are emitted on a single member.
If the IList
is modified while being read, the source may miss
and/or duplicate some entries.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
Only 1 instance of this processor runs in the cluster.
@Nonnull public static StreamSource<String> socket(@Nonnull String host, int port, @Nonnull Charset charset)
charset
.
Each underlying processor opens its own TCP connection, so there will be
clusterSize * localParallelism
open connections to the server.
The source completes when the server closes the socket. It never attempts
to reconnect. Any IOException
will cause the job to fail.
The source does not save any state to snapshot. On job restart, it will emit whichever items the server sends. The implementation uses non-blocking API, the processor is cooperative.
The default local parallelism for this processor is 1.
@Nonnull public static StreamSource<String> socket(@Nonnull String host, int port)
socket(host, port, charset)
with
UTF-8 as the charset.host
- the hostname to connect toport
- the port to connect to@Nonnull public static FileSourceBuilder filesBuilder(@Nonnull String directory)
@Nonnull public static BatchSource<String> files(@Nonnull String directory)
This method is a shortcut for:
filesBuilder(directory)
.charset(UTF_8)
.glob(GLOB_WILDCARD)
.sharedFileSystem(false)
.build((fileName, line) -> line)
If files are appended to while being read, the addition might or might
not be emitted or part of a line can be emitted. If files are modified
in more complex ways, the behavior is undefined.
See filesBuilder(String)
.
@Nonnull public static <T> BatchSource<T> json(@Nonnull String directory, @Nonnull Class<T> type)
This method is a shortcut for:
filesBuilder(directory)
.charset(UTF_8)
.glob(GLOB_WILDCARD)
.sharedFileSystem(false)
.build(path -> JsonUtil.beanSequenceFrom(path, type))
If files are appended to while being read, the addition might or might not be emitted or part of a line can be emitted. If files are modified in more complex ways, the behavior is undefined.
@Nonnull public static BatchSource<Map<String,Object>> json(@Nonnull String directory)
json(String, Class)
which converts each
JSON string to a Map
. It will throw ClassCastException
if JSON string is just primitive (String
, Number
,
Boolean
) or JSON array (List
).@Nonnull public static StreamSource<String> fileWatcher(@Nonnull String watchedDirectory)
This method is a shortcut for:
filesBuilder(directory)
.charset(UTF_8)
.glob(GLOB_WILDCARD)
.sharedFileSystem(false)
.buildWatcher((fileName, line) -> line)
echo text >> yourFile
.
See filesBuilder(String)
.@Nonnull public static <T> StreamSource<T> jsonWatcher(@Nonnull String watchedDirectory, @Nonnull Class<T> type)
type
by converting each line as they are appended to files in
that directory.
This method is a shortcut for:
filesBuilder(directory)
.charset(UTF_8)
.glob(GLOB_WILDCARD)
.sharedFileSystem(false)
.buildWatcher((fileName, line) -> JsonUtil.beanFrom(line, type))
echo text >> yourFile
.
See filesBuilder(String)
, fileWatcher(String)
.@Nonnull public static StreamSource<Map<String,Object>> jsonWatcher(@Nonnull String watchedDirectory)
jsonWatcher(String, Class)
which converts each
line appended to the Map
representation of the JSON string.@Nonnull @Deprecated public static StreamSource<javax.jms.Message> jmsQueue(@Nonnull SupplierEx<? extends javax.jms.ConnectionFactory> factorySupplier, @Nonnull String name)
@Nonnull public static StreamSource<javax.jms.Message> jmsQueue(@Nonnull String name, @Nonnull SupplierEx<? extends javax.jms.ConnectionFactory> factorySupplier)
return jmsQueueBuilder(factorySupplier) .destinationName(name) .build();This version creates a connection without any authentication parameters. JMS
Message
objects are emitted to downstream.
Note: Message
might not be serializable. In
that case you can use the
builder and add a projection.
name
- the name of the queuefactorySupplier
- supplier to obtain JMS connection factory. It
must be stateless.@Nonnull public static JmsSourceBuilder jmsQueueBuilder(SupplierEx<? extends javax.jms.ConnectionFactory> factorySupplier)
StreamSource
for the Pipeline API. See javadoc on
JmsSourceBuilder
methods for more details.
This source uses the JMS' message timestamp as the native timestamp, if enabled.
This source supports exactly-once and at-least-once mode, see JmsSourceBuilder.maxGuarantee(ProcessingGuarantee)
for more
information.
IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.
The default local parallelism for this processor is 1.
factorySupplier
- supplier to obtain JMS connection factory. It
must be stateless.@Nonnull @Deprecated public static StreamSource<javax.jms.Message> jmsTopic(@Nonnull SupplierEx<? extends javax.jms.ConnectionFactory> factorySupplier, @Nonnull String name)
@Nonnull public static StreamSource<javax.jms.Message> jmsTopic(@Nonnull String name, @Nonnull SupplierEx<? extends javax.jms.ConnectionFactory> factorySupplier)
return jmsTopicBuilder(factorySupplier) .destinationName(name) .build();This version creates a connection without any authentication parameters. A non-durable, non-shared consumer is used, only one member will connect to the broker. JMS
Message
objects are emitted to
downstream.
Note: Message
might not be serializable. In
that case you can use the
builder and add a projection.
name
- the name of the queuefactorySupplier
- supplier to obtain JMS connection factory. It
must be stateless.@Nonnull public static JmsSourceBuilder jmsTopicBuilder(SupplierEx<? extends javax.jms.ConnectionFactory> factorySupplier)
StreamSource
for the Pipeline API. See javadoc on
JmsSourceBuilder
methods for more details.
By default, a non-shared consumer is used. This forces the source to
run on only one member of the cluster. You can use JmsSourceBuilder.consumerFn(FunctionEx)
to create a shared consumer.
This source uses the JMS' message timestamp as the native timestamp, if enabled.
This source supports exactly-once and at-least-once mode for durable
topic subscriptions, see JmsSourceBuilder.maxGuarantee(ProcessingGuarantee)
for more
information.
IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.
The default local parallelism for this processor is 1.
factorySupplier
- supplier to obtain JMS connection factory. It
must be stateless.public static <T> BatchSource<T> jdbc(@Nonnull SupplierEx<? extends Connection> newConnectionFn, @Nonnull ToResultSetFunction resultSetFn, @Nonnull FunctionEx<? super ResultSet,? extends T> createOutputFn)
newConnectionFn
, queries the database and creates a result set
using the the given resultSetFn
. It creates output objects from the
ResultSet
using given mapOutputFn
and emits them to
downstream.
resultSetFn
gets the created connection, total parallelism (local
parallelism * member count) and global processor index as arguments and
produces a result set. The parallelism and processor index arguments
should be used to fetch a part of the whole result set specific to the
processor. If the table itself isn't partitioned by the same key, then
running multiple queries might not really be faster than using the
simpler
version of this method, do your own testing.
createOutputFn
gets the ResultSet
and creates desired
output object. The function is called for each row of the result set,
user should not call ResultSet.next()
or any other
cursor-navigating functions.
Example:
p.readFrom(Sources.jdbc(
() -> DriverManager.getConnection(DB_CONNECTION_URL),
(con, parallelism, index) -> {
PreparedStatement stmt = con.prepareStatement("SELECT * FROM TABLE WHERE MOD(id, ?) = ?)");
stmt.setInt(1, parallelism);
stmt.setInt(2, index);
return stmt.executeQuery();
},
resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
If the underlying table is modified while being read, the source may miss and/or duplicate some entries, because multiple queries for parts of the data on multiple members will be executed.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
Any SQLException
will cause the job to fail.
The default local parallelism for this processor is 1.
The given functions must be stateless.
@Beta public static <T> BatchSource<T> jdbc(@Nonnull DataConnectionRef dataConnectionRef, @Nonnull ToResultSetFunction resultSetFn, @Nonnull FunctionEx<? super ResultSet,? extends T> createOutputFn)
dataConnectionRef
, queries the database and creates a result set
using the given resultSetFn
. It creates output objects from the
ResultSet
using given mapOutputFn
and emits them to
downstream.
Example:
(Prerequisite) Data connection configuration:
Config config = smallInstanceConfig();
Properties properties = new Properties();
properties.put("jdbcUrl", jdbcUrl);
properties.put("username", username);
properties.put("password", password);
DataConnectionConfig dataConnectionConfig = new DataConnectionConfig()
.setName("my-jdbc-data-connection")
.setType("Jdbc")
.setProperties(properties);
config.getDataConnectionConfigs().put(name, dataConnectionConfig);
Pipeline configuration
p.readFrom(Sources.jdbc(
DataConnectionRef.dataConnectionRef("my-jdbc-data-connection"),
(con, parallelism, index) -> {
PreparedStatement stmt = con.prepareStatement("SELECT * FROM TABLE WHERE MOD(id, ?) = ?)");
stmt.setInt(1, parallelism);
stmt.setInt(2, index);
return stmt.executeQuery();
},
resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
public static <T> BatchSource<T> jdbc(@Nonnull String connectionURL, @Nonnull String query, @Nonnull FunctionEx<? super ResultSet,? extends T> createOutputFn)
jdbc(SupplierEx,
ToResultSetFunction, FunctionEx)
.
A non-distributed, single-worker source which fetches the whole resultSet
with a single query on single member.
This method executes exactly one query in the target database. If the underlying table is modified while being read, the behavior depends on the configured transaction isolation level in the target database. Refer to the documentation for the target database system.
Example:
p.readFrom(Sources.jdbc(
DB_CONNECTION_URL,
"select ID, NAME from PERSON",
resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
The given function must be stateless.Copyright © 2024 Hazelcast, Inc.. All rights reserved.