public final class Sources extends Object
Pipeline.drawFrom(BatchSource)
and you will obtain the initial BatchStage. You can then
attach further stages to it.
The same pipeline may contain more than one source, each starting its own branch. The branches may be merged with multiple-input transforms such as co-group and hash-join.
The default local parallelism for sources in this class is 1 or 2, check the documentation of individual methods.
| Modifier and Type | Method and Description |
|---|---|
static <T> BatchSource<T> |
batchFromProcessor(String sourceName,
ProcessorMetaSupplier metaSupplier)
Returns a bounded (batch) source constructed directly from the given
Core API processor meta-supplier.
|
static <K,V> BatchSource<Map.Entry<K,V>> |
cache(String cacheName)
Returns a source that fetches entries from the Hazelcast
ICache
with the specified name and emits them as Map.Entry. |
static <T,K,V> StreamSource<T> |
cacheJournal(String cacheName,
DistributedPredicate<EventJournalCacheEvent<K,V>> predicateFn,
DistributedFunction<EventJournalCacheEvent<K,V>,T> projectionFn,
JournalInitialPosition initialPos)
Returns a source that will stream the
EventJournalCacheEvent
events of the Hazelcast ICache with the specified name. |
static <K,V> StreamSource<Map.Entry<K,V>> |
cacheJournal(String cacheName,
JournalInitialPosition initialPos)
Convenience for
cacheJournal(String, DistributedPredicate,
DistributedFunction, JournalInitialPosition)
which will pass only CREATED and UPDATED
events and will project the event's key and new value into a Map.Entry. |
static BatchSource<String> |
files(String directory)
Convenience for
the full version of readFiles which uses UTF-8 encoding, matches all
the files in the directory and emits lines of text in the files. |
static <R> BatchSource<R> |
files(String directory,
Charset charset,
String glob,
DistributedBiFunction<String,String,? extends R> mapOutputFn)
A source that emits lines from files in a directory (but not its
subdirectories.
|
static StreamSource<String> |
fileWatcher(String watchedDirectory)
Convenience for
streamFiles(watchedDirectory, UTF_8, "*"). |
static StreamSource<String> |
fileWatcher(String watchedDirectory,
Charset charset,
String glob)
A source that emits a stream of lines of text coming from files in
the watched directory (but not its subdirectories).
|
static <T> BatchSource<T> |
list(String listName)
Returns a source that emits items retrieved from a Hazelcast
IList. |
static <K,V> BatchSource<Map.Entry<K,V>> |
map(String mapName)
Returns a source that fetches entries from a local Hazelcast
IMap
with the specified name and emits them as Map.Entry. |
static <T,K,V> BatchSource<T> |
map(String mapName,
Predicate<K,V> predicate,
DistributedFunction<Map.Entry<K,V>,T> projectionFn)
Convenience for
map(String, Predicate, Projection)
which uses a DistributedFunction as the projection function. |
static <T,K,V> BatchSource<T> |
map(String mapName,
Predicate<K,V> predicate,
Projection<Map.Entry<K,V>,T> projection)
Returns a source that fetches entries from a local Hazelcast
IMap with the specified name. |
static <T,K,V> StreamSource<T> |
mapJournal(String mapName,
DistributedPredicate<EventJournalMapEvent<K,V>> predicateFn,
DistributedFunction<EventJournalMapEvent<K,V>,T> projectionFn,
JournalInitialPosition initialPos)
Returns a source that will stream
EventJournalMapEvents of the
Hazelcast IMap with the specified name. |
static <K,V> StreamSource<Map.Entry<K,V>> |
mapJournal(String mapName,
JournalInitialPosition initialPos)
Convenience for
mapJournal(String, DistributedPredicate,
DistributedFunction, JournalInitialPosition)
which will pass only ADDED and UPDATED
events and will project the event's key and new value into a Map.Entry. |
static <K,V> BatchSource<Map.Entry<K,V>> |
remoteCache(String cacheName,
ClientConfig clientConfig)
Returns a source that fetches entries from the Hazelcast
ICache
with the specified name in a remote cluster identified by the supplied
ClientConfig and emits them as Map.Entry. |
static <T,K,V> StreamSource<T> |
remoteCacheJournal(String cacheName,
ClientConfig clientConfig,
DistributedPredicate<EventJournalCacheEvent<K,V>> predicateFn,
DistributedFunction<EventJournalCacheEvent<K,V>,T> projectionFn,
JournalInitialPosition initialPos)
Returns a source that will stream the
EventJournalCacheEvent
events of the Hazelcast ICache with the specified name from a
remote cluster. |
static <K,V> StreamSource<Map.Entry<K,V>> |
remoteCacheJournal(String cacheName,
ClientConfig clientConfig,
JournalInitialPosition initialPos)
Convenience for
remoteCacheJournal(String, ClientConfig,
DistributedPredicate, DistributedFunction, JournalInitialPosition)
which will pass only
CREATED
and UPDATED
events and will project the event's key and new value
into a Map.Entry. |
static <T> BatchSource<T> |
remoteList(String listName,
ClientConfig clientConfig)
Returns a source that emits items retrieved from a Hazelcast
IList in a remote cluster identified by the supplied ClientConfig. |
static <K,V> BatchSource<Map.Entry<K,V>> |
remoteMap(String mapName,
ClientConfig clientConfig)
Returns a source that fetches entries from the Hazelcast
IMap
with the specified name in a remote cluster identified by the supplied
ClientConfig and emits them as Map.Entry. |
static <T,K,V> BatchSource<T> |
remoteMap(String mapName,
ClientConfig clientConfig,
Predicate<K,V> predicate,
DistributedFunction<Map.Entry<K,V>,T> projectionFn)
Convenience for
remoteMap(String, ClientConfig, Predicate, Projection)
which use a DistributedFunction as the projection function. |
static <T,K,V> BatchSource<T> |
remoteMap(String mapName,
ClientConfig clientConfig,
Predicate<K,V> predicate,
Projection<Map.Entry<K,V>,T> projection)
Returns a source that fetches entries from a remote Hazelcast
IMap with the specified name in a remote cluster identified by the
supplied ClientConfig. |
static <T,K,V> StreamSource<T> |
remoteMapJournal(String mapName,
ClientConfig clientConfig,
DistributedPredicate<EventJournalMapEvent<K,V>> predicateFn,
DistributedFunction<EventJournalMapEvent<K,V>,T> projectionFn,
JournalInitialPosition initialPos)
Returns a source that will stream the
EventJournalMapEvent
events of the Hazelcast IMap with the specified name from a
remote cluster. |
static <K,V> StreamSource<Map.Entry<K,V>> |
remoteMapJournal(String mapName,
ClientConfig clientConfig,
JournalInitialPosition initialPos)
Convenience for
remoteMapJournal(String, ClientConfig,
DistributedPredicate, DistributedFunction, JournalInitialPosition)
which will pass only ADDED
and UPDATED events and will
project the event's key and new value into a Map.Entry. |
static StreamSource<String> |
socket(String host,
int port)
Convenience for
socket(host, port, charset) with
UTF-8 as the charset. |
static StreamSource<String> |
socket(String host,
int port,
Charset charset)
Returns a source which connects to the specified socket and emits lines
of text received from it.
|
static <T> StreamSource<T> |
streamFromProcessor(String sourceName,
ProcessorMetaSupplier metaSupplier)
Returns an unbounded (event stream) source constructed directly from the given
Core API processor meta-supplier.
|
static <T> StreamSource<T> |
streamFromProcessorWithWatermarks(String sourceName,
Function<WatermarkGenerationParams<T>,ProcessorMetaSupplier> metaSupplierFn)
Returns an unbounded (event stream) source that will use the supplied
function to create processor meta-suppliers as required by the Core API.
|
@Nonnull public static <T> BatchSource<T> batchFromProcessor(@Nonnull String sourceName, @Nonnull ProcessorMetaSupplier metaSupplier)
sourceName - user-friendly source namemetaSupplier - the processor meta-supplier@Nonnull public static <T> StreamSource<T> streamFromProcessorWithWatermarks(@Nonnull String sourceName, @Nonnull Function<WatermarkGenerationParams<T>,ProcessorMetaSupplier> metaSupplierFn)
If you are implementing a custom source processor, be sure to check out
the WatermarkSourceUtil class that will
help you correctly implement watermark item emission.
sourceName - user-friendly source namemetaSupplierFn - factory of processor meta-suppliers@Nonnull public static <T> StreamSource<T> streamFromProcessor(@Nonnull String sourceName, @Nonnull ProcessorMetaSupplier metaSupplier)
sourceName - user-friendly source namemetaSupplier - the processor meta-supplier@Nonnull public static <K,V> BatchSource<Map.Entry<K,V>> map(@Nonnull String mapName)
IMap
with the specified name and emits them as Map.Entry. It leverages
data locality by making each of the underlying processors fetch only those
entries that are stored on the member where it is running.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If the IMap is modified while being read, or if there is a
cluster topology change (triggering data migration), the source may
miss and/or duplicate some entries.
The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
@Nonnull public static <T,K,V> BatchSource<T> map(@Nonnull String mapName, @Nonnull Predicate<K,V> predicate, @Nonnull Projection<Map.Entry<K,V>,T> projection)
IMap with the specified name. By supplying a predicate and
projection here instead of in separate map/filter
transforms you allow the source to apply these functions early, before
generating any output, with the potential of significantly reducing
data traffic. If your data is stored in the IMDG using the
portable serialization format, there are additional optimizations
available when using Projections.singleAttribute() and Projections.multiAttribute()) to create your projection instance and
using the GenericPredicates factory or
PredicateBuilder to create
the predicate. In this case Jet can test the predicate and apply the
projection without deserializing the whole object.
The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If the IMap is modified while being read, or if there is a
cluster topology change (triggering data migration), the source may
miss and/or duplicate some entries.
The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
predicate and projection need
to be available on the cluster's classpath, or loaded using
Hazelcast User Code Deployment. It's not enough to add them to
job classpath in JobConfig. Same is
true for the class of the objects stored in the map itself. If you
cannot fulfill these conditions, use map(String) and add a
subsequent map or filter stage.T - type of emitted itemmapName - the name of the mappredicate - the predicate to filter the events. If you want to specify just the
projection, use GenericPredicates.alwaysTrue() as a pass-through
predicateprojection - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. If you want to
specify just the predicate, use Projections.identity().@Nonnull public static <T,K,V> BatchSource<T> map(@Nonnull String mapName, @Nonnull Predicate<K,V> predicate, @Nonnull DistributedFunction<Map.Entry<K,V>,T> projectionFn)
map(String, Predicate, Projection)
which uses a DistributedFunction as the projection function.@Nonnull public static <T,K,V> StreamSource<T> mapJournal(@Nonnull String mapName, @Nonnull DistributedPredicate<EventJournalMapEvent<K,V>> predicateFn, @Nonnull DistributedFunction<EventJournalMapEvent<K,V>,T> projectionFn, @Nonnull JournalInitialPosition initialPos)
EventJournalMapEvents of the
Hazelcast IMap with the specified name. By supplying a predicate and projection here instead of in separate map/filter transforms you allow the source to apply these functions
early, before generating any output, with the potential of significantly
reducing data traffic.
The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
To use an IMap as a streaming source, you must configure the event journal
for it. The journal has fixed capacity and will drop events if it
overflows.
The source saves the journal offset to the snapshot. If the job restarts, it starts emitting from the saved offset with an exactly-once guarantee (unless the journal has overflowed).
The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
predicateFn and projectionFn
need to be available on the cluster's classpath, or loaded using
Hazelcast User Code Deployment. It's not enough to add them to
job classpath in JobConfig. Same is
true for the class of the objects stored in the map itself. If you
cannot fulfill these conditions, use mapJournal(String,
JournalInitialPosition) and add a subsequent map or filter stage.T - type of emitted itemmapName - the name of the mappredicateFn - the predicate to filter the events. If you want to specify just the
projection, use Util.mapPutEvents() to pass only ADDED and UPDATED events.projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.mapEventToEntry() to extract just the key and the new value.initialPos - describes which event to start receiving from@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> mapJournal(@Nonnull String mapName, @Nonnull JournalInitialPosition initialPos)
mapJournal(String, DistributedPredicate,
DistributedFunction, JournalInitialPosition)
which will pass only ADDED and UPDATED
events and will project the event's key and new value into a Map.Entry.@Nonnull public static <K,V> BatchSource<Map.Entry<K,V>> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig)
IMap
with the specified name in a remote cluster identified by the supplied
ClientConfig and emits them as Map.Entry.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If the IMap is modified while being read, or if there is a
cluster topology change (triggering data migration), the source may
miss and/or duplicate some entries.
The default local parallelism for this processor is 1.
@Nonnull public static <T,K,V> BatchSource<T> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull Predicate<K,V> predicate, @Nonnull Projection<Map.Entry<K,V>,T> projection)
IMap with the specified name in a remote cluster identified by the
supplied ClientConfig. By supplying a predicate and
projection here instead of in separate map/filter
transforms you allow the source to apply these functions early, before
generating any output, with the potential of significantly reducing
data traffic. If your data is stored in the IMDG using the
portable serialization format, there are additional optimizations
available when using Projections.singleAttribute() and Projections.multiAttribute()) to create your projection instance and
using the GenericPredicates factory or
PredicateBuilder to create
the predicate. In this case Jet can test the predicate and apply the
projection without deserializing the whole object.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If the IMap is modified while being read, or if there is a
cluster topology change (triggering data migration), the source may
miss and/or duplicate some entries.
The default local parallelism for this processor is 1.
predicate and projection need
to be available on the remote cluster's classpath, or loaded using
Hazelcast User Code Deployment. It's not enough to add them to
job classpath in JobConfig. Same is
true for the class of the objects stored in the map itself. If you
cannot fulfill these conditions, use remoteMap(String,
ClientConfig) and add a subsequent map or
filter stage.T - type of emitted itemmapName - the name of the mappredicate - the predicate to filter the events. If you want to specify just the
projection, use GenericPredicates.alwaysTrue() as a pass-through
predicateprojection - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. If you want to
specify just the predicate, use Projections.identity().@Nonnull public static <T,K,V> BatchSource<T> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull Predicate<K,V> predicate, @Nonnull DistributedFunction<Map.Entry<K,V>,T> projectionFn)
remoteMap(String, ClientConfig, Predicate, Projection)
which use a DistributedFunction as the projection function.@Nonnull public static <T,K,V> StreamSource<T> remoteMapJournal(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull DistributedPredicate<EventJournalMapEvent<K,V>> predicateFn, @Nonnull DistributedFunction<EventJournalMapEvent<K,V>,T> projectionFn, @Nonnull JournalInitialPosition initialPos)
EventJournalMapEvent
events of the Hazelcast IMap with the specified name from a
remote cluster. By supplying a predicate and projection
here instead of in separate map/filter transforms you allow the
source to apply these functions early, before generating any output,
with the potential of significantly reducing data traffic.
To use an IMap as a streaming source, you must configure the event journal
for it. The journal has fixed capacity and will drop events if it
overflows.
The source saves the journal offset to the snapshot. If the job restarts, it starts emitting from the saved offset with an exactly-once guarantee (unless the journal has overflowed).
The default local parallelism for this processor is 1.
predicateFn and projectionFn
need to be available on the remote cluster's classpath, or loaded using
Hazelcast User Code Deployment. It's not enough to add them to
job classpath in JobConfig. Same is
true for the class of the objects stored in the map itself. If you
cannot fulfill these conditions, use remoteMapJournal(String,
ClientConfig, JournalInitialPosition) and add a subsequent map or filter stage.K - type of keyV - type of valueT - type of emitted itemmapName - the name of the mapclientConfig - configuration for the client to connect to the remote clusterpredicateFn - the predicate to filter the events. You may use Util.mapPutEvents()
to pass only ADDED and UPDATED
events.projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.mapEventToEntry() to extract just the key and the new value.initialPos - describes which event to start receiving from@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> remoteMapJournal(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos)
remoteMapJournal(String, ClientConfig,
DistributedPredicate, DistributedFunction, JournalInitialPosition)
which will pass only ADDED
and UPDATED events and will
project the event's key and new value into a Map.Entry.@Nonnull public static <K,V> BatchSource<Map.Entry<K,V>> cache(@Nonnull String cacheName)
ICache
with the specified name and emits them as Map.Entry. It
leverages data locality by making each of the underlying processors
fetch only those entries that are stored on the member where it is
running.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If the ICache is modified while being read, or if there is a
cluster topology change (triggering data migration), the source may
miss and/or duplicate some entries.
The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
@Nonnull public static <T,K,V> StreamSource<T> cacheJournal(@Nonnull String cacheName, @Nonnull DistributedPredicate<EventJournalCacheEvent<K,V>> predicateFn, @Nonnull DistributedFunction<EventJournalCacheEvent<K,V>,T> projectionFn, @Nonnull JournalInitialPosition initialPos)
EventJournalCacheEvent
events of the Hazelcast ICache with the specified name. By
supplying a predicate and projection here instead of
in separate map/filter transforms you allow the source to apply
these functions early, before generating any output, with the potential
of significantly reducing data traffic.
The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.
To use an ICache as a streaming source, you must configure the event journal
for it. The journal has fixed capacity and will drop events if it
overflows.
The source saves the journal offset to the snapshot. If the job restarts, it starts emitting from the saved offset with an exactly-once guarantee (unless the journal has overflowed).
The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
predicateFn and projectionFn
need to be available on the cluster's classpath, or loaded using
Hazelcast User Code Deployment. It's not enough to add them to
job classpath in JobConfig. Same is
true for the class of the objects stored in the cache itself. If you
cannot fulfill these conditions, use cacheJournal(String,
JournalInitialPosition) and add a subsequent map or filter stage.T - type of emitted itemcacheName - the name of the cachepredicateFn - the predicate to filter the events. You may use Util.cachePutEvents() to pass only CREATED and UPDATED events.projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.cacheEventToEntry() to extract just the key and the new value.initialPos - describes which event to start receiving from@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> cacheJournal(@Nonnull String cacheName, @Nonnull JournalInitialPosition initialPos)
cacheJournal(String, DistributedPredicate,
DistributedFunction, JournalInitialPosition)
which will pass only CREATED and UPDATED
events and will project the event's key and new value into a Map.Entry.@Nonnull public static <K,V> BatchSource<Map.Entry<K,V>> remoteCache(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig)
ICache
with the specified name in a remote cluster identified by the supplied
ClientConfig and emits them as Map.Entry.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
If the ICache is modified while being read, or if there is a
cluster topology change (triggering data migration), the source may
miss and/or duplicate some entries.
The default local parallelism for this processor is 1.
@Nonnull public static <T,K,V> StreamSource<T> remoteCacheJournal(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig, @Nonnull DistributedPredicate<EventJournalCacheEvent<K,V>> predicateFn, @Nonnull DistributedFunction<EventJournalCacheEvent<K,V>,T> projectionFn, @Nonnull JournalInitialPosition initialPos)
EventJournalCacheEvent
events of the Hazelcast ICache with the specified name from a
remote cluster. By supplying a predicate and projection
here instead of in separate map/filter transforms you allow the
source to apply these functions early, before generating any output,
with the potential of significantly reducing data traffic.
To use an ICache as a streaming source, you must configure the event journal
for it. The journal has fixed capacity and will drop events if it
overflows.
The source saves the journal offset to the snapshot. If the job restarts, it starts emitting from the saved offset with an exactly-once guarantee (unless the journal has overflowed).
The default local parallelism for this processor is 1.
predicateFn and projectionFn
need to be available on the cluster's classpath, or loaded using
Hazelcast User Code Deployment. It's not enough to add them to
job classpath in JobConfig. Same is
true for the class of the objects stored in the cache itself. If you
cannot fulfill these conditions, use remoteCacheJournal(String,
ClientConfig, JournalInitialPosition) and add a subsequent map or filter stage.T - type of emitted itemcacheName - the name of the cacheclientConfig - configuration for the client to connect to the remote clusterpredicateFn - the predicate to filter the events. You may use Util.cachePutEvents() to pass only CREATED and UPDATED events.projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.cacheEventToEntry() to extract just the key and the new value.initialPos - describes which event to start receiving from@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> remoteCacheJournal(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos)
remoteCacheJournal(String, ClientConfig,
DistributedPredicate, DistributedFunction, JournalInitialPosition)
which will pass only
CREATED
and UPDATED
events and will project the event's key and new value
into a Map.Entry.@Nonnull public static <T> BatchSource<T> list(@Nonnull String listName)
IList. All elements are emitted on a single member — the one
where the entire list is stored by the IMDG.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
The default local parallelism for this processor is 1.
@Nonnull public static <T> BatchSource<T> remoteList(@Nonnull String listName, @Nonnull ClientConfig clientConfig)
IList in a remote cluster identified by the supplied ClientConfig. All elements are emitted on a single member.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
The default local parallelism for this processor is 1.
@Nonnull public static StreamSource<String> socket(@Nonnull String host, int port, @Nonnull Charset charset)
charset.
Each underlying processor opens its own TCP connection, so there will be
clusterSize * localParallelism open connections to the server.
The source completes when the server closes the socket. It never attempts
to reconnect. Any IOException will cause the job to fail.
The source does not save any state to snapshot. On job restart, it will emit whichever items the server sends. The implementation uses non-blocking API, the processor is cooperative.
The default local parallelism for this processor is 1.
@Nonnull public static StreamSource<String> socket(@Nonnull String host, int port)
socket(host, port, charset) with
UTF-8 as the charset.host - the hostname to connect toport - the port to connect to@Nonnull public static <R> BatchSource<R> files(@Nonnull String directory, @Nonnull Charset charset, @Nonnull String glob, @Nonnull DistributedBiFunction<String,String,? extends R> mapOutputFn)
To be useful, the source should be configured to read data local to each member. For example, if the pathname resolves to a shared network filesystem visible by multiple members, they will emit duplicate data.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
Any IOException will cause the job to fail.
The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
directory - parent directory of the filescharset - charset to use to decode the filesglob - the globbing mask, see getPathMatcher().
Use "*" for all files.mapOutputFn - function to create output items. Parameters are
fileName and line.@Nonnull public static BatchSource<String> files(@Nonnull String directory)
the full version of readFiles which uses UTF-8 encoding, matches all
the files in the directory and emits lines of text in the files.@Nonnull public static StreamSource<String> fileWatcher(@Nonnull String watchedDirectory, @Nonnull Charset charset, @Nonnull String glob)
To be useful, the source should be configured to read data local to each member. For example, if the pathname resolves to a shared network filesystem visible by multiple members, they will emit duplicate data.
If, during the scanning phase, the source observes a file that doesn't end with a newline, it will assume that there is a line just being written. This line won't appear in its output.
The source completes when the directory is deleted. However, in order
to delete the directory, all files in it must be deleted and if you
delete a file that is currently being read from, the job may encounter
an IOException. The directory must be deleted on all nodes.
Any IOException will cause the job to fail.
The source does not save any state to snapshot. If the job is restarted, lines added after the restart will be emitted, which gives at-most-once behavior.
The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
WatchService is not notified of appended lines
until the file is closed. If the file-writing process keeps the file
open while appending, the processor may fail to observe the changes.
It will be notified if any process tries to open that file, such as
looking at the file in Explorer. This holds for Windows 10 with the NTFS
file system and might change in future. You are advised to do your own
testing on your target Windows platform.
WatchService) has a
history of unreliability and this source may experience infinite
blocking, missed, or duplicate events as a result. Such problems may be
resolved by upgrading the JRE to the latest version.watchedDirectory - pathname to the source directorycharset - charset to use to decode the filesglob - the globbing mask, see getPathMatcher().
Use "*" for all files.@Nonnull public static StreamSource<String> fileWatcher(@Nonnull String watchedDirectory)
streamFiles(watchedDirectory, UTF_8, "*").Copyright © 2018 Hazelcast, Inc.. All rights reserved.