Class Sources

java.lang.Object
com.hazelcast.jet.pipeline.Sources

public final class Sources extends Object
Contains factory methods for various types of pipeline sources. To start building a pipeline, pass a source to Pipeline.readFrom(BatchSource) and you will obtain the initial BatchStage. You can then attach further stages to it.

The same pipeline may contain more than one source, each starting its own branch. The branches may be merged with multiple-input transforms such as co-group and hash-join.

The default local parallelism for sources in this class is 1 or 2, check the documentation of individual methods.

Since:
Jet 3.0
  • Method Details

    • batchFromProcessor

      @Nonnull public static <T> BatchSource<T> batchFromProcessor(@Nonnull String sourceName, @Nonnull ProcessorMetaSupplier metaSupplier)
      Returns a bounded (batch) source constructed directly from the given Core API processor meta-supplier.
      Parameters:
      sourceName - user-friendly source name
      metaSupplier - the processor meta-supplier
    • streamFromProcessorWithWatermarks

      @Nonnull public static <T> StreamSource<T> streamFromProcessorWithWatermarks(@Nonnull String sourceName, boolean supportsNativeTimestamps, @Nonnull FunctionEx<EventTimePolicy<? super T>,ProcessorMetaSupplier> metaSupplierFn)
      Returns an unbounded (event stream) source that will use the supplied function to create processor meta-suppliers as required by the Core API. Jet will call the function you supply with an EventTimePolicy and it must return a meta-supplier of processors that will act according to the parameters in the policy and must emit the watermark items as the policy specifies.

      If you are implementing a custom source processor, be sure to check out the EventTimeMapper class that will help you correctly implement watermark emission.

      Parameters:
      sourceName - user-friendly source name
      supportsNativeTimestamps - true, if the processor is able to work
      metaSupplierFn - factory of processor meta-suppliers. Since Jet 4.3 this argument changed from Function to FunctionEx to support serializability.
    • streamFromProcessor

      @Nonnull public static <T> StreamSource<T> streamFromProcessor(@Nonnull String sourceName, @Nonnull ProcessorMetaSupplier metaSupplier)
      Returns an unbounded (event stream) source constructed directly from the given Core API processor meta-supplier.
      Parameters:
      sourceName - user-friendly source name
      metaSupplier - the processor meta-supplier
    • map

      @Nonnull public static <K, V> BatchSource<Map.Entry<K,V>> map(@Nonnull String mapName)
      Returns a source that fetches entries from a local Hazelcast IMap with the specified name and emits them as Map.Entry. It leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

      The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

      If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.

      The default local parallelism for this processor is 1.

    • map

      @Nonnull public static <K, V> BatchSource<Map.Entry<K,V>> map(@Nonnull IMap<? extends K,? extends V> map)
      Returns a source that fetches entries from the given Hazelcast IMap and emits them as Map.Entry. It leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

      NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.

      The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

      If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.

      The default local parallelism for this processor is 1.

    • map

      @Nonnull public static <T, K, V> BatchSource<T> map(@Nonnull String mapName, @Nonnull Predicate<K,V> predicate, @Nonnull Projection<? super Map.Entry<K,V>,? extends T> projection)
      Returns a source that fetches entries from a local Hazelcast IMap with the specified name. By supplying a predicate and projection here instead of in separate map/filter transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic. If your data is stored in the IMDG using the portable serialization format, there are additional optimizations available when using Projections.singleAttribute(java.lang.String) and Projections.multiAttribute(java.lang.String...)) to create your projection instance and using the Predicates factory or PredicateBuilder to create the predicate. In this case Jet can test the predicate and apply the projection without deserializing the whole object.

      Due to the current limitations in the way Jet reads the map it can't use any indexes on the map. It will always scan the map in full.

      The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

      The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

      If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.

      The default local parallelism for this processor is 1.

      Predicate/projection class requirements

      The classes implementing predicate and projection need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath in JobConfig. The same is true for the class of the objects stored in the map itself. If you cannot meet these requirements, use map(String) and add a subsequent map or filter stage.
      Type Parameters:
      T - type of emitted item
      Parameters:
      mapName - the name of the map
      predicate - the predicate to filter the events. If you want to specify just the projection, use Predicates.alwaysTrue() as a pass-through predicate. It must be stateless and cooperative.
      projection - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. If you want to specify just the predicate, use Projections.identity(). It must be stateless and cooperative.
    • map

      @Nonnull public static <T, K, V> BatchSource<T> map(@Nonnull IMap<? extends K,? extends V> map, @Nonnull Predicate<K,V> predicate, @Nonnull Projection<? super Map.Entry<K,V>,? extends T> projection)
      Returns a source that fetches entries from the given Hazelcast IMap. By supplying a predicate and projection here instead of in separate map/filter transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.

      NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.

      If your data is stored in the IMDG using the portable serialization format, there are additional optimizations available when using Projections.singleAttribute(java.lang.String) and Projections.multiAttribute(java.lang.String...)) to create your projection instance and using the Predicates factory or PredicateBuilder to create the predicate. In this case Jet can test the predicate and apply the projection without deserializing the whole object.

      Due to the current limitations in the way Jet reads the map it can't use any indexes on the map. It will always scan the map in full.

      The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

      The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

      If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.

      The default local parallelism for this processor 1.

      Predicate/projection class requirements

      The classes implementing predicate and projection need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath in JobConfig. The same is true for the class of the objects stored in the map itself. If you cannot meet these requirements, use map(String) and add a subsequent map or filter stage.

      Type Parameters:
      T - type of emitted item
      Parameters:
      map - the Hazelcast map to read data from
      predicate - the predicate to filter the events. If you want to specify just the projection, use Predicates.alwaysTrue() as a pass-through predicate. It must be stateless and cooperative.
      projection - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. If you want to specify just the predicate, use Projections.identity(). It must be stateless and cooperative.
    • mapJournal

      @Nonnull public static <T, K, V> StreamSource<T> mapJournal(@Nonnull String mapName, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalMapEvent<K,V>,? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalMapEvent<K,V>> predicateFn)
      Returns a source that will stream EventJournalMapEvents of the Hazelcast IMap with the specified name. By supplying a predicate and projection here instead of in separate map/filter transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.

      The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

      To use an IMap as a streaming source, you must configure the event journal for it. The journal has fixed capacity and will drop events if it overflows.

      The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).

      If you start a new job from an exported state, you can change the source parameters as needed.

      The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).

      Predicate/projection class requirements

      The classes implementing predicateFn and projectionFn need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath in JobConfig. The same is true for the class of the objects stored in the map itself. If you cannot meet these requirements, use mapJournal(String, JournalInitialPosition) and add a subsequent map or filter stage.
      Type Parameters:
      T - type of emitted item
      Parameters:
      mapName - the name of the map
      initialPos - describes which event to start receiving from
      projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.mapEventToEntry() to extract just the key and the new value. It must be stateless and cooperative.
      predicateFn - the predicate to filter the events. If you want to specify just the projection, use Util.mapPutEvents() to pass only ADDED and UPDATED events. It must be stateless and cooperative.
    • mapJournal

      @Nonnull public static <T, K, V> StreamSource<T> mapJournal(@Nonnull IMap<? extends K,? extends V> map, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalMapEvent<K,V>,? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalMapEvent<K,V>> predicateFn)
      Returns a source that will stream EventJournalMapEvents of the given Hazelcast IMap. By supplying a predicate and projection here instead of in separate map/filter transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.

      NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.

      The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

      To use an IMap as a streaming source, you must configure the event journal for it. The journal has fixed capacity and will drop events if it overflows.

      The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).

      If you start a new job from an exported state, you can change the source parameters as needed.

      The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).

      Predicate/projection class requirements

      The classes implementing predicateFn and projectionFn need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath in JobConfig. The same is true for the class of the objects stored in the map itself. If you cannot meet these requirements, use mapJournal(String, JournalInitialPosition) and add a subsequent map or filter stage.

      Issue when "catching up"

      This processor does not coalesce watermarks from partitions. It reads partitions one by one: it emits events from one partition and then from another one in batches. This adds time disorder to events: it might emit very recent event from partition1 while not yet emitting an old event from partition2; and it generates watermarks based on this order. Even if items in your partitions are ordered by timestamp, you can't use allowed lag of 0. Most notably, the "catching up" happens after the job is restarted, when events since the last snapshot are reprocessed in a burst. In order to not lose any events, the lag should be configured to at least snapshotInterval + timeToRestart + normalEventLag. The reason for this behavior that the default partition count in the cluster is pretty high and cannot be changed per object and for low-traffic maps it takes long until all partitions see an event to allow emitting of a coalesced watermark.
      Type Parameters:
      T - type of emitted item
      Parameters:
      map - the map to read data from
      initialPos - describes which event to start receiving from
      projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.mapEventToEntry() to extract just the key and the new value. It must be stateless and cooperative.
      predicateFn - the predicate to filter the events. If you want to specify just the projection, use Util.mapPutEvents() to pass only ADDED and UPDATED events. It must be stateless and cooperative.
    • mapJournal

      @Nonnull public static <K, V> StreamSource<Map.Entry<K,V>> mapJournal(@Nonnull String mapName, @Nonnull JournalInitialPosition initialPos)
      Convenience for mapJournal(String, JournalInitialPosition, FunctionEx, PredicateEx) which will pass only ADDED and UPDATED events and will project the event's key and new value into a Map.Entry.
    • mapJournal

      @Nonnull public static <K, V> StreamSource<Map.Entry<K,V>> mapJournal(@Nonnull IMap<? extends K,? extends V> map, @Nonnull JournalInitialPosition initialPos)
      Convenience for mapJournal(IMap, JournalInitialPosition, FunctionEx, PredicateEx) which will pass only ADDED and UPDATED events and will project the event's key and new value into a Map.Entry.

      NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.

    • remoteMap

      @Nonnull public static <K, V> BatchSource<Map.Entry<K,V>> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig)
      Returns a source that fetches entries from the Hazelcast IMap with the specified name in a remote cluster identified by the supplied ClientConfig and emits them as Map.Entry.

      The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

      If the IMap is modified while being read, or if there is a cluster topology change (triggering data migration), the source may miss and/or duplicate some entries. If we detect a topology change, the job will fail, but the detection is only on a best-effort basis - we might still give incorrect results without reporting a failure. Concurrent mutation is not detected at all.

      The default local parallelism for this processor is 1.

    • remoteMap

      @Nonnull public static <T, K, V> BatchSource<T> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull Predicate<K,V> predicate, @Nonnull Projection<? super Map.Entry<K,V>,? extends T> projection)
      Returns a source that fetches entries from a remote Hazelcast IMap with the specified name in a remote cluster identified by the supplied ClientConfig.

      See RemoteMapSourceBuilder for details on the remote map source.

      Type Parameters:
      T - type of emitted item
      Parameters:
      mapName - the name of the map
      predicate - the predicate to filter the events. If you want to specify just the projection, use Predicates.alwaysTrue() as a pass-through predicate. It must be stateless and cooperative.
      projection - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. If you want to specify just the predicate, use Projections.identity(). It must be stateless and cooperative.
    • remoteMap

      @Nonnull public static <K, V> BatchSource<Map.Entry<K,V>> remoteMap(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef)
      The same as the remoteMap(String, ClientConfig, Predicate, Projection) method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.

      The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used

      Parameters:
      mapName - the name of the map
      dataConnectionRef - the reference to DataConnectionConfig
      Since:
      5.4
    • remoteMap

      @Nonnull public static <T, K, V> BatchSource<T> remoteMap(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull Predicate<K,V> predicate, @Nonnull Projection<? super Map.Entry<K,V>,? extends T> projection)
      The same as the remoteMap(String, ClientConfig, Predicate, Projection) method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.

      The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used

      Parameters:
      mapName - the name of the map
      dataConnectionRef - the reference to DataConnectionConfig
      Since:
      5.4
    • remoteMapBuilder

      @Nonnull public static <K, V> RemoteMapSourceBuilder<K,V,Map.Entry<K,V>> remoteMapBuilder(String mapName)
      Returns a builder to build a source that fetches entries from a remote Hazelcast IMap with the specified name. It provides a fluent API to build the source using the optional parameters.

      See RemoteMapSourceBuilder for details on the remote map source.

      Type Parameters:
      K - the type of the key in the map
      V - the type of the value in the map
      Parameters:
      mapName - the name of the map
      Returns:
      builder for remote map source
      Since:
      5.4
    • remoteMapJournal

      @Nonnull public static <T, K, V> StreamSource<T> remoteMapJournal(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalMapEvent<K,V>,? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalMapEvent<K,V>> predicateFn)
      Returns a source that will stream the EventJournalMapEvent events of the Hazelcast IMap with the specified name from a remote cluster. By supplying a predicate and projection here instead of in separate map/filter transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.

      To use an IMap as a streaming source, you must configure the event journal for it. The journal has fixed capacity and will drop events if it overflows.

      The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).

      If you start a new job from an exported state, you can change the source parameters as needed. If you connect to another cluster, keep in mind that the same offsets will be used. To avoid this, give different name to this source.

      The default local parallelism for this processor is 1.

      Predicate/projection class requirements

      The classes implementing predicateFn and projectionFn need to be available on the remote cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath in JobConfig. The same is true for the class of the objects stored in the map itself. If you cannot meet these requirements, use remoteMapJournal(String, ClientConfig, JournalInitialPosition) and add a subsequent map or filter stage.
      Type Parameters:
      K - type of key
      V - type of value
      T - type of emitted item
      Parameters:
      mapName - the name of the map
      clientConfig - configuration for the client to connect to the remote cluster
      initialPos - describes which event to start receiving from
      projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.mapEventToEntry() to extract just the key and the new value. It must be stateless and cooperative.
      predicateFn - the predicate to filter the events. You may use Util.mapPutEvents() to pass only ADDED and UPDATED events. It must be stateless and cooperative.
    • remoteMapJournal

      @Nonnull public static <T, K, V> StreamSource<T> remoteMapJournal(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalMapEvent<K,V>,? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalMapEvent<K,V>> predicateFn)
      The same as the remoteMapJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx) method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.

      The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used

      (Prerequisite) External dataConnection configuration: Use HazelcastDataConnection.CLIENT_XML for XML or use HazelcastDataConnection.CLIENT_YML for YAML string.

      
       Config config = ...;
       String xmlString = ...;
       DataConnectionConfig dataConnectionConfig = new DataConnectionConfig()
           .setName("my-hzclient-data-connection")
           .setType("Hz")
           .setProperty(HzClientDataConnectionFactory.CLIENT_XML, xmlString);
       config.addDataConnectionConfig(dataConnectionConfig);
        

      Pipeline configuration

      
       PredicateEx<EventJournalMapEvent<String, Integer>> predicate = ...;
       p.readFrom(Sources.remoteMapJournal(
           mapName,
           DataConnectionRef.dataConnectionRef("my-hzclient-data-connection"),
           JournalInitialPosition.START_FROM_OLDEST,
           EventJournalMapEvent::getNewValue,
           predicate
        ));
        
      Type Parameters:
      T - is the return type of the stream
      K - is the key type of EventJournalMapEvent
      V - is the vale type of EventJournalMapEvent
      Parameters:
      mapName - the name of the map
      dataConnectionRef - the reference to DataConnectionConfig
      initialPos - describes which event to start receiving from
      projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.mapEventToEntry() to extract just the key and the new value. It must be stateless and cooperative.
      predicateFn - the predicate to filter the events. If you want to specify just the projection, use Util.mapPutEvents() to pass only ADDED and UPDATED events. It must be stateless and cooperative.
      Returns:
      a stream that can be used as a source
      Since:
      5.3
    • remoteMapJournal

      @Nonnull public static <K, V> StreamSource<Map.Entry<K,V>> remoteMapJournal(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos)
      Convenience for remoteMapJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx) which will pass only ADDED and UPDATED events and will project the event's key and new value into a Map.Entry.
    • remoteMapJournal

      @Nonnull public static <K, V> StreamSource<Map.Entry<K,V>> remoteMapJournal(@Nonnull String mapName, @Nonnull DataConnectionRef dataConnectionRef, @Nonnull JournalInitialPosition initialPos)
      Convenience for remoteMapJournal(String, DataConnectionRef, JournalInitialPosition, FunctionEx, PredicateEx) which will pass only ADDED and UPDATED events and will project the event's key and new value into a Map.Entry.
      Since:
      5.3
    • cache

      @Nonnull public static <K, V> BatchSource<Map.Entry<K,V>> cache(@Nonnull String cacheName)
      Returns a source that fetches entries from a Hazelcast ICache with the given name and emits them as Map.Entry. It leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

      The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

      If entries are added or removed to the cache during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.

      The default local parallelism for this processor 1.

    • cacheJournal

      @Nonnull public static <T, K, V> StreamSource<T> cacheJournal(@Nonnull String cacheName, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalCacheEvent<K,V>,? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalCacheEvent<K,V>> predicateFn)
      Returns a source that will stream the EventJournalCacheEvent events of a Hazelcast ICache with the specified name. By supplying a predicate and projection here instead of in separate map/filter transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.

      The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

      To use an ICache as a streaming source, you must configure the event journal for it. The journal has fixed capacity and will drop events if it overflows.

      The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).

      If you start a new job from an exported state, you can change the source parameters as needed.

      The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).

      Predicate/projection class requirements

      The classes implementing predicateFn and projectionFn need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath in JobConfig. The same is true for the class of the objects stored in the cache itself. If you cannot meet these conditions, use cacheJournal(String, JournalInitialPosition) and add a subsequent map or filter stage.
      Type Parameters:
      T - type of emitted item
      Parameters:
      cacheName - the name of the cache
      initialPos - describes which event to start receiving from
      projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.cacheEventToEntry() to extract just the key and the new value. It must be stateless and cooperative.
      predicateFn - the predicate to filter the events. You may use Util.cachePutEvents() to pass only CREATED and UPDATED events. It must be stateless and cooperative.
    • cacheJournal

      @Nonnull public static <K, V> StreamSource<Map.Entry<K,V>> cacheJournal(@Nonnull String cacheName, @Nonnull JournalInitialPosition initialPos)
      Convenience for cacheJournal(String, JournalInitialPosition, FunctionEx, PredicateEx) which will pass only CREATED and UPDATED events and will project the event's key and new value into a Map.Entry.
    • remoteCache

      @Nonnull public static <K, V> BatchSource<Map.Entry<K,V>> remoteCache(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig)
      Returns a source that fetches entries from the Hazelcast ICache with the specified name in a remote cluster identified by the supplied ClientConfig and emits them as Map.Entry.

      The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

      If entries are added or removed to the cache during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.

      The default local parallelism for this processor is 1.

    • remoteCacheJournal

      @Nonnull public static <T, K, V> StreamSource<T> remoteCacheJournal(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos, @Nonnull FunctionEx<? super EventJournalCacheEvent<K,V>,? extends T> projectionFn, @Nonnull PredicateEx<? super EventJournalCacheEvent<K,V>> predicateFn)
      Returns a source that will stream the EventJournalCacheEvent events of the Hazelcast ICache with the specified name from a remote cluster. By supplying a predicate and projection here instead of in separate map/filter transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.

      To use an ICache as a streaming source, you must configure the event journal for it. The journal has fixed capacity and will drop events if it overflows.

      The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).

      If you start a new job from an exported state, you can change the source parameters as needed. If you connect to another cluster, keep in mind that the same offsets will be used. To avoid this, give different name to this source.

      The default local parallelism for this processor is 1.

      Predicate/projection class requirements

      The classes implementing predicateFn and projectionFn need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath in JobConfig. The same is true for the class of the objects stored in the cache itself. If you cannot meet these conditions, use remoteCacheJournal(String, ClientConfig, JournalInitialPosition) and add a subsequent map or filter stage.
      Type Parameters:
      T - type of emitted item
      Parameters:
      cacheName - the name of the cache
      clientConfig - configuration for the client to connect to the remote cluster
      initialPos - describes which event to start receiving from
      projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.cacheEventToEntry() to extract just the key and the new value. It must be stateless and cooperative.
      predicateFn - the predicate to filter the events. You may use Util.cachePutEvents() to pass only CREATED and UPDATED events. It must be stateless and cooperative.
    • remoteCacheJournal

      @Nonnull public static <K, V> StreamSource<Map.Entry<K,V>> remoteCacheJournal(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig, @Nonnull JournalInitialPosition initialPos)
      Convenience for remoteCacheJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx) which will pass only CREATED and UPDATED events and will project the event's key and new value into a Map.Entry.
    • list

      @Nonnull public static <T> BatchSource<T> list(@Nonnull String listName)
      Returns a source that emits items retrieved from a Hazelcast IList. All elements are emitted on a single member — the one where the entire list is stored by the IMDG.

      If the IList is modified while being read, the source may miss and/or duplicate some entries.

      The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

      One instance of this processor runs only on the member that owns the list.

    • list

      @Nonnull public static <T> BatchSource<T> list(@Nonnull IList<? extends T> list)
      Returns a source that emits items retrieved from a Hazelcast IList. All elements are emitted on a single member — the one where the entire list is stored by the IMDG.

      If the IList is modified while being read, the source may miss and/or duplicate some entries.

      NOTE: Jet only remembers the name of the list you supply and acquires a list with that name on the local cluster. If you supply a list instance from another cluster, no error will be thrown to indicate this.

      The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

      One instance of this processor runs only on the member that owns the list.

    • remoteList

      @Nonnull public static <T> BatchSource<T> remoteList(@Nonnull String listName, @Nonnull ClientConfig clientConfig)
      Returns a source that emits items retrieved from a Hazelcast IList in a remote cluster identified by the supplied ClientConfig. All elements are emitted on a single member.

      If the IList is modified while being read, the source may miss and/or duplicate some entries.

      The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

      Only 1 instance of this processor runs in the cluster.

    • socket

      @Nonnull public static StreamSource<String> socket(@Nonnull String host, int port, @Nonnull Charset charset)
      Returns a source which connects to the specified socket and emits lines of text received from it. It decodes the text using the supplied charset.

      Each underlying processor opens its own TCP connection, so there will be clusterSize * localParallelism open connections to the server.

      The source completes when the server closes the socket. It never attempts to reconnect. Any IOException will cause the job to fail.

      The source does not save any state to snapshot. On job restart, it will emit whichever items the server sends. The implementation uses non-blocking API, the processor is cooperative.

      The default local parallelism for this processor is 1.

    • socket

      @Nonnull public static StreamSource<String> socket(@Nonnull String host, int port)
      Convenience for socket(host, port, charset) with UTF-8 as the charset.
      Parameters:
      host - the hostname to connect to
      port - the port to connect to
    • filesBuilder

      @Nonnull public static FileSourceBuilder filesBuilder(@Nonnull String directory)
      Returns a builder object that offers a step-by-step fluent API to build a custom source to read files for the Pipeline API. The source reads lines from files in a directory (but not its subdirectories). Using this builder you can build batching or streaming reader.
    • files

      @Nonnull public static BatchSource<String> files(@Nonnull String directory)
      A source to read all files in a directory in a batch way.

      This method is a shortcut for:

      
         filesBuilder(directory)
            .charset(UTF_8)
            .glob(GLOB_WILDCARD)
            .sharedFileSystem(false)
            .build((fileName, line) -> line)
       

      If files are appended to while being read, the addition might or might not be emitted or part of a line can be emitted. If files are modified in more complex ways, the behavior is undefined.

      See filesBuilder(String).

    • json

      @Nonnull public static <T> BatchSource<T> json(@Nonnull String directory, @Nonnull Class<T> type)
      A source to read all files in a directory in a batch way. The source expects the content of the files as streaming JSON content, where each JSON string is separated by a new-line. The JSON string itself can span on multiple lines. The source converts each JSON string to an object of given type.

      This method is a shortcut for:

      
         filesBuilder(directory)
            .charset(UTF_8)
            .glob(GLOB_WILDCARD)
            .sharedFileSystem(false)
            .build(path -> JsonUtil.beanSequenceFrom(path, type))
       

      If files are appended to while being read, the addition might or might not be emitted or part of a line can be emitted. If files are modified in more complex ways, the behavior is undefined.

      See filesBuilder(String), files(String).

      Since:
      Jet 4.2
    • json

      @Nonnull public static BatchSource<Map<String,Object>> json(@Nonnull String directory)
      Convenience for json(String, Class) which converts each JSON string to a Map. It will throw ClassCastException if JSON string is just primitive (String, Number, Boolean) or JSON array (List).
      Since:
      Jet 4.2
    • fileWatcher

      @Nonnull public static StreamSource<String> fileWatcher(@Nonnull String watchedDirectory)
      A source to stream lines added to files in a directory. This is a streaming source, it will watch directory and emit lines as they are appended to files in that directory.

      This method is a shortcut for:

      
         filesBuilder(directory)
            .charset(UTF_8)
            .glob(GLOB_WILDCARD)
            .sharedFileSystem(false)
            .buildWatcher((fileName, line) -> line)
       

      Appending lines using an text editor

      If you're testing this source, you might think of using a text editor to append the lines. However, it might not work as expected because some editors write to a temp file and then rename it or append extra newline character at the end which gets overwritten if more text is added in the editor. The best way to append is to use echo text >> yourFile.

      See filesBuilder(String).

    • jsonWatcher

      @Nonnull public static <T> StreamSource<T> jsonWatcher(@Nonnull String watchedDirectory, @Nonnull Class<T> type)
      A source to stream lines added to files in a directory. This is a streaming source, it will watch directory and emit objects of given type by converting each line as they are appended to files in that directory.

      This method is a shortcut for:

      
         filesBuilder(directory)
            .charset(UTF_8)
            .glob(GLOB_WILDCARD)
            .sharedFileSystem(false)
            .buildWatcher((fileName, line) -> JsonUtil.beanFrom(line, type))
       

      Appending lines using an text editor

      If you're testing this source, you might think of using a text editor to append the lines. However, it might not work as expected because some editors write to a temp file and then rename it or append extra newline character at the end which gets overwritten if more text is added in the editor. The best way to append is to use echo text >> yourFile.

      See filesBuilder(String), fileWatcher(String).

      Since:
      Jet 4.2
    • jsonWatcher

      @Nonnull public static StreamSource<Map<String,Object>> jsonWatcher(@Nonnull String watchedDirectory)
      Convenience for jsonWatcher(String, Class) which converts each line appended to the Map representation of the JSON string.
      Since:
      Jet 4.2
    • jmsQueue

      @Nonnull @Deprecated public static StreamSource<jakarta.jms.Message> jmsQueue(@Nonnull SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier, @Nonnull String name)
    • jmsQueue

      @Nonnull public static StreamSource<jakarta.jms.Message> jmsQueue(@Nonnull String name, @Nonnull SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier)
      Shortcut equivalent to:
               return jmsQueueBuilder(factorySupplier)
                       .destinationName(name)
                       .build();
       
      This version creates a connection without any authentication parameters. JMS Message objects are emitted to downstream.

      Note: Message might not be serializable. In that case you can use the builder and add a projection.

      Parameters:
      name - the name of the queue
      factorySupplier - supplier to obtain JMS connection factory. It must be stateless.
      Since:
      Jet 4.1
    • jmsQueueBuilder

      @Nonnull public static JmsSourceBuilder jmsQueueBuilder(SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier)
      Returns a builder object that offers a step-by-step fluent API to build a custom JMS StreamSource for the Pipeline API. See javadoc on JmsSourceBuilder methods for more details.

      This source uses the JMS' message timestamp as the native timestamp, if enabled.

      This source supports exactly-once and at-least-once mode, see JmsSourceBuilder.maxGuarantee(ProcessingGuarantee) for more information.

      IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.

      The default local parallelism for this processor is 1.

      Parameters:
      factorySupplier - supplier to obtain JMS connection factory. It must be stateless.
    • jmsTopic

      @Nonnull @Deprecated public static StreamSource<jakarta.jms.Message> jmsTopic(@Nonnull SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier, @Nonnull String name)
    • jmsTopic

      @Nonnull public static StreamSource<jakarta.jms.Message> jmsTopic(@Nonnull String name, @Nonnull SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier)
      Shortcut equivalent to:
               return jmsTopicBuilder(factorySupplier)
                       .destinationName(name)
                       .build();
       
      This version creates a connection without any authentication parameters. A non-durable, non-shared consumer is used, only one member will connect to the broker. JMS Message objects are emitted to downstream.

      Note: Message might not be serializable. In that case you can use the builder and add a projection.

      Parameters:
      name - the name of the queue
      factorySupplier - supplier to obtain JMS connection factory. It must be stateless.
      Since:
      Jet 4.1
    • jmsTopicBuilder

      @Nonnull public static JmsSourceBuilder jmsTopicBuilder(SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier)
      Returns a builder object that offers a step-by-step fluent API to build a custom JMS StreamSource for the Pipeline API. See javadoc on JmsSourceBuilder methods for more details.

      By default, a non-shared consumer is used. This forces the source to run on only one member of the cluster. You can use JmsSourceBuilder.consumerFn(FunctionEx) to create a shared consumer.

      This source uses the JMS' message timestamp as the native timestamp, if enabled.

      This source supports exactly-once and at-least-once mode for durable topic subscriptions, see JmsSourceBuilder.maxGuarantee(ProcessingGuarantee) for more information.

      IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.

      The default local parallelism for this processor is 1.

      Parameters:
      factorySupplier - supplier to obtain JMS connection factory. It must be stateless.
    • jdbc

      @Nonnull public static <T> BatchSource<T> jdbc(@Nonnull SupplierEx<? extends Connection> newConnectionFn, @Nonnull ToResultSetFunction resultSetFn, @Nonnull FunctionEx<? super ResultSet,? extends T> createOutputFn)
      Returns a source which connects to the specified database using the given newConnectionFn, queries the database and creates a result set using the the given resultSetFn. It creates output objects from the ResultSet using given mapOutputFn and emits them to downstream.

      resultSetFn gets the created connection, total parallelism (local parallelism * member count) and global processor index as arguments and produces a result set. The parallelism and processor index arguments should be used to fetch a part of the whole result set specific to the processor. If the table itself isn't partitioned by the same key, then running multiple queries might not really be faster than using the simpler version of this method, do your own testing.

      createOutputFn gets the ResultSet and creates desired output object. The function is called for each row of the result set, user should not call ResultSet.next() or any other cursor-navigating functions.

      Example:

      
           p.readFrom(Sources.jdbc(
               () -> DriverManager.getConnection(DB_CONNECTION_URL),
               (con, parallelism, index) -> {
                    PreparedStatement stmt = con.prepareStatement("SELECT * FROM TABLE WHERE MOD(id, ?) = ?)");
                    stmt.setInt(1, parallelism);
                    stmt.setInt(2, index);
                    return stmt.executeQuery();
               },
               resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
       

      If the underlying table is modified while being read, the source may miss and/or duplicate some entries, because multiple queries for parts of the data on multiple members will be executed.

      The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

      Any SQLException will cause the job to fail.

      The default local parallelism for this processor is 1.

      The given functions must be stateless.

      Type Parameters:
      T - type of output objects
      Parameters:
      newConnectionFn - creates the connection
      resultSetFn - creates a ResultSet using the connection, total parallelism and index
      createOutputFn - creates output objects from ResultSet
    • jdbc

      @Nonnull public static <T> BatchSource<T> jdbc(@Nonnull DataConnectionRef dataConnectionRef, @Nonnull ToResultSetFunction resultSetFn, @Nonnull FunctionEx<? super ResultSet,? extends T> createOutputFn)
      Returns a source which connects to the specified database using the given dataConnectionRef, queries the database and creates a result set using the given resultSetFn. It creates output objects from the ResultSet using given mapOutputFn and emits them to downstream.

      Example:

      (Prerequisite) Data connection configuration:

      
            Config config = smallInstanceConfig();
            Properties properties = new Properties();
            properties.put("jdbcUrl", jdbcUrl);
            properties.put("username", username);
            properties.put("password", password);
            DataConnectionConfig dataConnectionConfig = new DataConnectionConfig()
                    .setName("my-jdbc-data-connection")
                    .setType("Jdbc")
                    .setProperties(properties);
            config.getDataConnectionConfigs().put(name, dataConnectionConfig);
       

      Pipeline configuration

      
           p.readFrom(Sources.jdbc(
               DataConnectionRef.dataConnectionRef("my-jdbc-data-connection"),
               (con, parallelism, index) -> {
                    PreparedStatement stmt = con.prepareStatement("SELECT * FROM TABLE WHERE MOD(id, ?) = ?)");
                    stmt.setInt(1, parallelism);
                    stmt.setInt(2, index);
                    return stmt.executeQuery();
               },
               resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
       

      See also jdbc(SupplierEx, ToResultSetFunction, FunctionEx).

      Since:
      5.3
    • jdbc

      @Nonnull public static <T> BatchSource<T> jdbc(@Nonnull String connectionURL, @Nonnull String query, @Nonnull FunctionEx<? super ResultSet,? extends T> createOutputFn)
      Convenience for jdbc(SupplierEx, ToResultSetFunction, FunctionEx). A non-distributed, single-worker source which fetches the whole resultSet with a single query on single member.

      This method executes exactly one query in the target database. If the underlying table is modified while being read, the behavior depends on the configured transaction isolation level in the target database. Refer to the documentation for the target database system.

      Example:

      
           p.readFrom(Sources.jdbc(
               DB_CONNECTION_URL,
               "select ID, NAME from PERSON",
               resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
       
      The given function must be stateless.
    • jdbc

      @Nonnull public static <T> BatchSource<T> jdbc(@Nonnull String connectionURL, @Nonnull String query, @Nonnull Properties properties, @Nonnull FunctionEx<? super ResultSet,? extends T> createOutputFn)
      Same as @{link jdbc(String, String, FunctionEx)}

      It is not always possible to use the default properties. This overload allows passing some properties to the JDBC driver

      Example for PostgreSQL to specify fetchSize: PostgreSQL requires that the autocommit should be disabled. Because the backend closes cursors at the end of transactions, so in autocommit enabled mode the backend will have closed the cursor before anything can be fetched from it.

      
              Properties properties = new Properties();
              properties.put(JdbcPropertyKeys.FETCH_SIZE, "5");
              properties.put(JdbcPropertyKeys.AUTO_COMMIT, "false");
              p.readFrom(Sources.jdbc(
                  "jdbc:postgresql://localhost:5432/mydatabase",
                  "select ID, NAME from PERSON",
                  properties
                  resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
          

      Example for MySQL to specify fetchSize: The database connection URL should have "&useCursorFetch=true" parameter to enable cursor-based fetching. This means that the JDBC driver will fetch a set of rows from the database at a time, rather than fetching all the rows in the result set at once

      
              Properties properties = new Properties();
              properties.put(JdbcPropertyKeys.FETCH_SIZE, "5");
              p.readFrom(Sources.jdbc(
                  "jdbc:mysql://localhost:3306/mydatabase?useCursorFetch=true,"
                  "select ID, NAME from PERSON",
                  properties
                  resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))