public final class Sources extends Object
Pipeline.drawFrom(Source)
and you will obtain the initial ComputeStage
. You can then
attach further stages to it.
The same pipeline may contain more than one source, each starting its own branch. The branches may be merged with multiple-input transforms such as co-group and hash-join.
Modifier and Type | Method and Description |
---|---|
static <K,V> Source<Map.Entry<K,V>> |
cache(String cacheName)
Returns a source that fetches entries from the Hazelcast
ICache
with the specified name and emits them as Map.Entry . |
static <K,V> Source<com.hazelcast.cache.journal.EventJournalCacheEvent<K,V>> |
cacheJournal(String cacheName,
boolean startFromLatestSequence)
Convenience for
cacheJournal(String, DistributedPredicate,
DistributedFunction, boolean) with no projection or filtering. |
static <K,V,T> Source<T> |
cacheJournal(String cacheName,
DistributedPredicate<com.hazelcast.cache.journal.EventJournalCacheEvent<K,V>> predicateFn,
DistributedFunction<com.hazelcast.cache.journal.EventJournalCacheEvent<K,V>,T> projectionFn,
boolean startFromLatestSequence)
Returns a source that will stream the
EventJournalCacheEvent
events of the Hazelcast ICache with the specified name. |
static Source<String> |
files(String directory)
Convenience for
readFiles(directory, UTF_8, "*") . |
static Source<String> |
files(String directory,
Charset charset,
String glob)
A source that emits lines from files in a directory (but not its
subdirectories.
|
static Source<String> |
fileWatcher(String watchedDirectory)
Convenience for
streamFiles(watchedDirectory, UTF_8, "*") . |
static Source<String> |
fileWatcher(String watchedDirectory,
Charset charset,
String glob)
A source that emits a stream of lines of text coming from files in
the watched directory (but not its subdirectories).
|
static <T> Source<T> |
fromProcessor(String sourceName,
ProcessorMetaSupplier metaSupplier)
Returns a source constructed directly from the given Core API processor
meta-supplier.
|
static <E> Source<E> |
list(String listName)
Returns a source that emits items retrieved from a Hazelcast
IList . |
static <K,V> Source<Map.Entry<K,V>> |
map(String mapName)
Returns a source that fetches entries from a local Hazelcast
IMap
with the specified name and emits them as Map.Entry . |
static <K,V,T> Source<T> |
map(String mapName,
com.hazelcast.query.Predicate<K,V> predicate,
DistributedFunction<Map.Entry<K,V>,T> projectionFn)
Convenience for
map(String, Predicate, Projection)
which uses a DistributedFunction as the projection function. |
static <K,V,T> Source<T> |
map(String mapName,
com.hazelcast.query.Predicate<K,V> predicate,
com.hazelcast.projection.Projection<Map.Entry<K,V>,T> projection)
Returns a source that fetches entries from a local Hazelcast
IMap with the specified name. |
static <K,V> Source<com.hazelcast.map.journal.EventJournalMapEvent<K,V>> |
mapJournal(String mapName,
boolean startFromLatestSequence)
Convenience for
mapJournal(String, DistributedPredicate,
DistributedFunction, boolean) with no projection or filtering. |
static <K,V,T> Source<T> |
mapJournal(String mapName,
DistributedPredicate<com.hazelcast.map.journal.EventJournalMapEvent<K,V>> predicateFn,
DistributedFunction<com.hazelcast.map.journal.EventJournalMapEvent<K,V>,T> projectionFn,
boolean startFromLatestSequence)
Returns a source that will stream the
EventJournalMapEvent
events of the Hazelcast IMap with the specified name. |
static <K,V> Source<Map.Entry<K,V>> |
remoteCache(String cacheName,
com.hazelcast.client.config.ClientConfig clientConfig)
Returns a source that fetches entries from the Hazelcast
ICache
with the specified name in a remote cluster identified by the supplied
ClientConfig and emits them as Map.Entry . |
static <K,V> Source<com.hazelcast.cache.journal.EventJournalCacheEvent<K,V>> |
remoteCacheJournal(String cacheName,
com.hazelcast.client.config.ClientConfig clientConfig,
boolean startFromLatestSequence)
Convenience for
remoteCacheJournal(String, ClientConfig,
DistributedPredicate, DistributedFunction, boolean) with no projection
or filtering. |
static <K,V,T> Source<T> |
remoteCacheJournal(String cacheName,
com.hazelcast.client.config.ClientConfig clientConfig,
DistributedPredicate<com.hazelcast.cache.journal.EventJournalCacheEvent<K,V>> predicateFn,
DistributedFunction<com.hazelcast.cache.journal.EventJournalCacheEvent<K,V>,T> projectionFn,
boolean startFromLatestSequence)
Returns a source that will stream the
EventJournalCacheEvent
events of the Hazelcast ICache with the specified name from a
remote cluster. |
static <E> Source<E> |
remoteList(String listName,
com.hazelcast.client.config.ClientConfig clientConfig)
Returns a source that emits items retrieved from a Hazelcast
IList in a remote cluster identified by the supplied ClientConfig . |
static <K,V> Source<Map.Entry<K,V>> |
remoteMap(String mapName,
com.hazelcast.client.config.ClientConfig clientConfig)
Returns a source that fetches entries from the Hazelcast
IMap
with the specified name in a remote cluster identified by the supplied
ClientConfig and emits them as Map.Entry . |
static <K,V,T> Source<T> |
remoteMap(String mapName,
com.hazelcast.client.config.ClientConfig clientConfig,
com.hazelcast.query.Predicate<K,V> predicate,
DistributedFunction<Map.Entry<K,V>,T> projectionFn)
Convenience for
remoteMap(String, ClientConfig, Predicate, Projection)
which use a DistributedFunction as the projection function. |
static <K,V,T> Source<T> |
remoteMap(String mapName,
com.hazelcast.client.config.ClientConfig clientConfig,
com.hazelcast.query.Predicate<K,V> predicate,
com.hazelcast.projection.Projection<Map.Entry<K,V>,T> projection)
Returns a source that fetches entries from a remote Hazelcast
IMap with the specified name in a remote cluster identified by the
supplied ClientConfig . |
static <K,V> Source<com.hazelcast.map.journal.EventJournalMapEvent<K,V>> |
remoteMapJournal(String mapName,
com.hazelcast.client.config.ClientConfig clientConfig,
boolean startFromLatestSequence)
Convenience for
remoteMapJournal(String, ClientConfig,
DistributedPredicate, DistributedFunction, boolean) with no projection
or filtering. |
static <K,V,T> Source<T> |
remoteMapJournal(String mapName,
com.hazelcast.client.config.ClientConfig clientConfig,
DistributedPredicate<com.hazelcast.map.journal.EventJournalMapEvent<K,V>> predicateFn,
DistributedFunction<com.hazelcast.map.journal.EventJournalMapEvent<K,V>,T> projectionFn,
boolean startFromLatestSequence)
Returns a source that will stream the
EventJournalMapEvent
events of the Hazelcast IMap with the specified name from a
remote cluster. |
static Source<String> |
socket(String host,
int port,
Charset charset)
Returns a source which connects to the specified socket and emits lines
of text received from it.
|
public static <T> Source<T> fromProcessor(@Nonnull String sourceName, @Nonnull ProcessorMetaSupplier metaSupplier)
sourceName
- user-friendly source namemetaSupplier
- the processor meta-supplierpublic static <K,V> Source<Map.Entry<K,V>> map(@Nonnull String mapName)
IMap
with the specified name and emits them as Map.Entry
. It leverages
data locality by making each of the underlying processors fetch only those
entries that are stored on the member where it is running.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If the IMap
is modified while being read, or if there is a
cluster topology change (triggering data migration), the source may
miss and/or duplicate some entries.
public static <K,V,T> Source<T> map(@Nonnull String mapName, @Nonnull com.hazelcast.query.Predicate<K,V> predicate, @Nonnull com.hazelcast.projection.Projection<Map.Entry<K,V>,T> projection)
IMap
with the specified name. By supplying a predicate
and
projection
here instead of in separate map/filter
transforms you allow the source to apply these functions early, before
generating any output, with the potential of significantly reducing
data traffic. If your data is stored in the IMDG using the
portable serialization format, there are additional optimizations
available when using Projections.singleAttribute()
and Projections.multiAttribute()
) to create your projection instance and
using the Predicates
factory or
PredicateBuilder
to create
the predicate. In this case Jet can test the predicate and apply the
projection without deserializing the whole object.
The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If the IMap
is modified while being read, or if there is a
cluster topology change (triggering data migration), the source may
miss and/or duplicate some entries.
public static <K,V,T> Source<T> map(@Nonnull String mapName, @Nonnull com.hazelcast.query.Predicate<K,V> predicate, @Nonnull DistributedFunction<Map.Entry<K,V>,T> projectionFn)
map(String, Predicate, Projection)
which uses a DistributedFunction
as the projection function.@Nonnull public static <K,V,T> Source<T> mapJournal(@Nonnull String mapName, @Nullable DistributedPredicate<com.hazelcast.map.journal.EventJournalMapEvent<K,V>> predicateFn, @Nullable DistributedFunction<com.hazelcast.map.journal.EventJournalMapEvent<K,V>,T> projectionFn, boolean startFromLatestSequence)
EventJournalMapEvent
events of the Hazelcast IMap
with the specified name. By
supplying a predicate
and projection
here instead of
in separate map/filter
transforms you allow the source to apply
these functions early, before generating any output, with the potential
of significantly reducing data traffic.
The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
To use an IMap
as a streaming source, you must configure the event journal
for it. The journal has fixed capacity and will drop events if it
overflows.
The source saves the journal offset to the snapshot. If the job restarts, it starts emitting from the saved offset with an exactly-once guarantee (unless the journal has overflowed).
snapshotInterval + timeToRestart + normalEventLag
.
We plan to address this issue in a future release.T
- type of emitted itemmapName
- the name of the mappredicateFn
- the predicate to filter the events, can be null
projectionFn
- the projection to map the events, can be null
startFromLatestSequence
- starting point of the events in event journal.
true
to start from the latest, false
to start from the earliest@Nonnull public static <K,V> Source<com.hazelcast.map.journal.EventJournalMapEvent<K,V>> mapJournal(@Nonnull String mapName, boolean startFromLatestSequence)
mapJournal(String, DistributedPredicate,
DistributedFunction, boolean)
with no projection or filtering. It
emits EventJournalMapEvent
s.@Nonnull public static <K,V> Source<Map.Entry<K,V>> remoteMap(@Nonnull String mapName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig)
IMap
with the specified name in a remote cluster identified by the supplied
ClientConfig
and emits them as Map.Entry
.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If the IMap
is modified while being read, or if there is a
cluster topology change (triggering data migration), the source may
miss and/or duplicate some entries.
public static <K,V,T> Source<T> remoteMap(@Nonnull String mapName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig, @Nonnull com.hazelcast.query.Predicate<K,V> predicate, @Nonnull com.hazelcast.projection.Projection<Map.Entry<K,V>,T> projection)
IMap
with the specified name in a remote cluster identified by the
supplied ClientConfig
. By supplying a predicate
and
projection
here instead of in separate map/filter
transforms you allow the source to apply these functions early, before
generating any output, with the potential of significantly reducing
data traffic. If your data is stored in the IMDG using the
portable serialization format, there are additional optimizations
available when using Projections.singleAttribute()
and Projections.multiAttribute()
) to create your projection instance and
using the Predicates
factory or
PredicateBuilder
to create
the predicate. In this case Jet can test the predicate and apply the
projection without deserializing the whole object.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If the IMap
is modified while being read, or if there is a
cluster topology change (triggering data migration), the source may
miss and/or duplicate some entries.
public static <K,V,T> Source<T> remoteMap(@Nonnull String mapName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig, @Nonnull com.hazelcast.query.Predicate<K,V> predicate, @Nonnull DistributedFunction<Map.Entry<K,V>,T> projectionFn)
remoteMap(String, ClientConfig, Predicate, Projection)
which use a DistributedFunction
as the projection function.@Nonnull public static <K,V,T> Source<T> remoteMapJournal(@Nonnull String mapName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig, @Nullable DistributedPredicate<com.hazelcast.map.journal.EventJournalMapEvent<K,V>> predicateFn, @Nullable DistributedFunction<com.hazelcast.map.journal.EventJournalMapEvent<K,V>,T> projectionFn, boolean startFromLatestSequence)
EventJournalMapEvent
events of the Hazelcast IMap
with the specified name from a
remote cluster. By supplying a predicate
and projection
here instead of in separate map/filter
transforms you allow the
source to apply these functions early, before generating any output,
with the potential of significantly reducing data traffic.
To use an IMap
as a streaming source, you must configure the event journal
for it. The journal has fixed capacity and will drop events if it
overflows.
The source saves the journal offset to the snapshot. If the job restarts, it starts emitting from the saved offset with an exactly-once guarantee (unless the journal has overflowed).
K
- type of keyV
- type of valueT
- type of emitted itemmapName
- the name of the mapclientConfig
- configuration for the client to connect to the remote clusterpredicateFn
- the predicate to filter the events, can be null
projectionFn
- the projection to map the events, can be null
startFromLatestSequence
- starting point of the events in event journal.
true
to start from latest, false
to start from oldest.@Nonnull public static <K,V> Source<com.hazelcast.map.journal.EventJournalMapEvent<K,V>> remoteMapJournal(@Nonnull String mapName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig, boolean startFromLatestSequence)
remoteMapJournal(String, ClientConfig,
DistributedPredicate, DistributedFunction, boolean)
with no projection
or filtering. It emits EventJournalMapEvent
s.@Nonnull public static <K,V> Source<Map.Entry<K,V>> cache(@Nonnull String cacheName)
ICache
with the specified name and emits them as Map.Entry
. It
leverages data locality by making each of the underlying processors
fetch only those entries that are stored on the member where it is
running.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If the ICache
is modified while being read, or if there is a
cluster topology change (triggering data migration), the source may
miss and/or duplicate some entries.
@Nonnull public static <K,V,T> Source<T> cacheJournal(@Nonnull String cacheName, @Nullable DistributedPredicate<com.hazelcast.cache.journal.EventJournalCacheEvent<K,V>> predicateFn, @Nullable DistributedFunction<com.hazelcast.cache.journal.EventJournalCacheEvent<K,V>,T> projectionFn, boolean startFromLatestSequence)
EventJournalCacheEvent
events of the Hazelcast ICache
with the specified name. By
supplying a predicate
and projection
here instead of
in separate map/filter
transforms you allow the source to apply
these functions early, before generating any output, with the potential
of significantly reducing data traffic.
The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
To use an ICache
as a streaming source, you must configure the event journal
for it. The journal has fixed capacity and will drop events if it
overflows.
The source saves the journal offset to the snapshot. If the job restarts, it starts emitting from the saved offset with an exactly-once guarantee (unless the journal has overflowed).
T
- type of emitted itemcacheName
- The name of the cachepredicateFn
- The predicate to filter the events, can be nullprojectionFn
- The projection to map the events, can be nullstartFromLatestSequence
- starting point of the events in event journal
true
to start from latest, false
to start from oldest@Nonnull public static <K,V> Source<com.hazelcast.cache.journal.EventJournalCacheEvent<K,V>> cacheJournal(@Nonnull String cacheName, boolean startFromLatestSequence)
cacheJournal(String, DistributedPredicate,
DistributedFunction, boolean)
with no projection or filtering. It
emits EventJournalCacheEvent
s.@Nonnull public static <K,V> Source<Map.Entry<K,V>> remoteCache(@Nonnull String cacheName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig)
ICache
with the specified name in a remote cluster identified by the supplied
ClientConfig
and emits them as Map.Entry
.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If the ICache
is modified while being read, or if there is a
cluster topology change (triggering data migration), the source may
miss and/or duplicate some entries.
@Nonnull public static <K,V,T> Source<T> remoteCacheJournal(@Nonnull String cacheName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig, @Nullable DistributedPredicate<com.hazelcast.cache.journal.EventJournalCacheEvent<K,V>> predicateFn, @Nullable DistributedFunction<com.hazelcast.cache.journal.EventJournalCacheEvent<K,V>,T> projectionFn, boolean startFromLatestSequence)
EventJournalCacheEvent
events of the Hazelcast ICache
with the specified name from a
remote cluster. By supplying a predicate
and projection
here instead of in separate map/filter
transforms you allow the
source to apply these functions early, before generating any output,
with the potential of significantly reducing data traffic.
To use an ICache
as a streaming source, you must configure the event journal
for it. The journal has fixed capacity and will drop events if it
overflows.
The source saves the journal offset to the snapshot. If the job restarts, it starts emitting from the saved offset with an exactly-once guarantee (unless the journal has overflowed).
T
- type of emitted itemcacheName
- The name of the cacheclientConfig
- configuration for the client to connect to the remote clusterpredicateFn
- The predicate to filter the events, can be nullprojectionFn
- The projection to map the events, can be nullstartFromLatestSequence
- starting point of the events in event journal
true
to start from latest, false
to start from oldest@Nonnull public static <K,V> Source<com.hazelcast.cache.journal.EventJournalCacheEvent<K,V>> remoteCacheJournal(@Nonnull String cacheName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig, boolean startFromLatestSequence)
remoteCacheJournal(String, ClientConfig,
DistributedPredicate, DistributedFunction, boolean)
with no projection
or filtering. It emits EventJournalCacheEvent
s.@Nonnull public static <E> Source<E> list(@Nonnull String listName)
IList
. All elements are emitted on a single member — the one
where the entire list is stored by the IMDG.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
@Nonnull public static <E> Source<E> remoteList(@Nonnull String listName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig)
IList
in a remote cluster identified by the supplied ClientConfig
. All elements are emitted on a single member.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
@Nonnull public static Source<String> socket(@Nonnull String host, int port, @Nonnull Charset charset)
charset
.
Each underlying processor opens its own TCP connection, so there will be
clusterSize * localParallelism
open connections to the server.
The source completes when the server closes the socket. It never attempts
to reconnect. Any IOException
will cause the job to fail.
The source does not save any state to snapshot. On job restart, it will emit whichever items the server sends. The implementation uses non-blocking API, the processor is cooperative.
@Nonnull public static Source<String> files(@Nonnull String directory, @Nonnull Charset charset, @Nonnull String glob)
To be useful, the source should be configured to read data local to each member. For example, if the pathname resolves to a shared network filesystem visible by multiple members, they will emit duplicate data.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
Any IOException
will cause the job to fail.
directory
- parent directory of the filescharset
- charset to use to decode the filesglob
- the globbing mask, see getPathMatcher()
.
Use "*"
for all files.public static Source<String> files(@Nonnull String directory)
readFiles(directory, UTF_8, "*")
.public static Source<String> fileWatcher(@Nonnull String watchedDirectory, @Nonnull Charset charset, @Nonnull String glob)
To be useful, the source should be configured to read data local to each member. For example, if the pathname resolves to a shared network filesystem visible by multiple members, they will emit duplicate data.
If, during the scanning phase, the source observes a file that doesn't end with a newline, it will assume that there is a line just being written. This line won't appear in its output.
The source completes when the directory is deleted. However, in order
to delete the directory, all files in it must be deleted and if you
delete a file that is currently being read from, the job may encounter
an IOException
. The directory must be deleted on all nodes.
Any IOException
will cause the job to fail.
The source does not save any state to snapshot. If the job is restarted, lines added after the restart will be emitted, which gives at-most-once behavior.
WatchService
is not notified of appended lines
until the file is closed. If the file-writing process keeps the file
open while appending, the processor may fail to observe the changes.
It will be notified if any process tries to open that file, such as
looking at the file in Explorer. This holds for Windows 10 with the NTFS
file system and might change in future. You are advised to do your own
testing on your target Windows platform.
WatchService
) has a
history of unreliability and this source may experience infinite
blocking, missed, or duplicate events as a result. Such problems may be
resolved by upgrading the JRE to the latest version.watchedDirectory
- pathname to the source directorycharset
- charset to use to decode the filesglob
- the globbing mask, see getPathMatcher()
.
Use "*"
for all files.public static Source<String> fileWatcher(@Nonnull String watchedDirectory)
streamFiles(watchedDirectory, UTF_8, "*")
.Copyright © 2017 Hazelcast, Inc.. All Rights Reserved.