Class Sources


  • public final class Sources
    extends java.lang.Object
    Contains factory methods for various types of pipeline sources. To start building a pipeline, pass a source to Pipeline.readFrom(BatchSource) and you will obtain the initial BatchStage. You can then attach further stages to it.

    The same pipeline may contain more than one source, each starting its own branch. The branches may be merged with multiple-input transforms such as co-group and hash-join.

    The default local parallelism for sources in this class is 1 or 2, check the documentation of individual methods.

    Since:
    Jet 3.0
    • Method Detail

      • batchFromProcessor

        @Nonnull
        public static <T> BatchSource<T> batchFromProcessor​(@Nonnull
                                                            java.lang.String sourceName,
                                                            @Nonnull
                                                            ProcessorMetaSupplier metaSupplier)
        Returns a bounded (batch) source constructed directly from the given Core API processor meta-supplier.
        Parameters:
        sourceName - user-friendly source name
        metaSupplier - the processor meta-supplier
      • streamFromProcessorWithWatermarks

        @Nonnull
        public static <T> StreamSource<T> streamFromProcessorWithWatermarks​(@Nonnull
                                                                            java.lang.String sourceName,
                                                                            boolean supportsNativeTimestamps,
                                                                            @Nonnull
                                                                            FunctionEx<EventTimePolicy<? super T>,​ProcessorMetaSupplier> metaSupplierFn)
        Returns an unbounded (event stream) source that will use the supplied function to create processor meta-suppliers as required by the Core API. Jet will call the function you supply with an EventTimePolicy and it must return a meta-supplier of processors that will act according to the parameters in the policy and must emit the watermark items as the policy specifies.

        If you are implementing a custom source processor, be sure to check out the EventTimeMapper class that will help you correctly implement watermark emission.

        Parameters:
        sourceName - user-friendly source name
        supportsNativeTimestamps - true, if the processor is able to work
        metaSupplierFn - factory of processor meta-suppliers. Since Jet 4.3 this argument changed from Function to FunctionEx to support serializability.
      • streamFromProcessor

        @Nonnull
        public static <T> StreamSource<T> streamFromProcessor​(@Nonnull
                                                              java.lang.String sourceName,
                                                              @Nonnull
                                                              ProcessorMetaSupplier metaSupplier)
        Returns an unbounded (event stream) source constructed directly from the given Core API processor meta-supplier.
        Parameters:
        sourceName - user-friendly source name
        metaSupplier - the processor meta-supplier
      • map

        @Nonnull
        public static <K,​V> BatchSource<java.util.Map.Entry<K,​V>> map​(@Nonnull
                                                                                  java.lang.String mapName)
        Returns a source that fetches entries from a local Hazelcast IMap with the specified name and emits them as Map.Entry. It leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

        The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

        If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.

        The default local parallelism for this processor is 1.

      • map

        @Nonnull
        public static <K,​V> BatchSource<java.util.Map.Entry<K,​V>> map​(@Nonnull
                                                                                  IMap<? extends K,​? extends V> map)
        Returns a source that fetches entries from the given Hazelcast IMap and emits them as Map.Entry. It leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

        NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.

        The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

        If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.

        The default local parallelism for this processor is 1.

      • map

        @Nonnull
        public static <T,​K,​V> BatchSource<T> map​(@Nonnull
                                                             java.lang.String mapName,
                                                             @Nonnull
                                                             Predicate<K,​V> predicate,
                                                             @Nonnull
                                                             Projection<? super java.util.Map.Entry<K,​V>,​? extends T> projection)
        Returns a source that fetches entries from a local Hazelcast IMap with the specified name. By supplying a predicate and projection here instead of in separate map/filter transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic. If your data is stored in the IMDG using the portable serialization format, there are additional optimizations available when using Projections.singleAttribute(java.lang.String) and Projections.multiAttribute(java.lang.String...)) to create your projection instance and using the Predicates factory or PredicateBuilder to create the predicate. In this case Jet can test the predicate and apply the projection without deserializing the whole object.

        Due to the current limitations in the way Jet reads the map it can't use any indexes on the map. It will always scan the map in full.

        The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

        The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

        If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.

        The default local parallelism for this processor is 1.

        Predicate/projection class requirements

        The classes implementing predicate and projection need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath in JobConfig. The same is true for the class of the objects stored in the map itself. If you cannot meet these requirements, use map(String) and add a subsequent map or filter stage.
        Type Parameters:
        T - type of emitted item
        Parameters:
        mapName - the name of the map
        predicate - the predicate to filter the events. If you want to specify just the projection, use Predicates.alwaysTrue() as a pass-through predicate. It must be stateless and cooperative.
        projection - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. If you want to specify just the predicate, use Projections.identity(). It must be stateless and cooperative.
      • map

        @Nonnull
        public static <T,​K,​V> BatchSource<T> map​(@Nonnull
                                                             IMap<? extends K,​? extends V> map,
                                                             @Nonnull
                                                             Predicate<K,​V> predicate,
                                                             @Nonnull
                                                             Projection<? super java.util.Map.Entry<K,​V>,​? extends T> projection)
        Returns a source that fetches entries from the given Hazelcast IMap. By supplying a predicate and projection here instead of in separate map/filter transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.

        NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.

        If your data is stored in the IMDG using the portable serialization format, there are additional optimizations available when using Projections.singleAttribute(java.lang.String) and Projections.multiAttribute(java.lang.String...)) to create your projection instance and using the Predicates factory or PredicateBuilder to create the predicate. In this case Jet can test the predicate and apply the projection without deserializing the whole object.

        Due to the current limitations in the way Jet reads the map it can't use any indexes on the map. It will always scan the map in full.

        The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

        The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

        If entries are added or removed to the map during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.

        The default local parallelism for this processor 1.

        Predicate/projection class requirements

        The classes implementing predicate and projection need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath in JobConfig. The same is true for the class of the objects stored in the map itself. If you cannot meet these requirements, use map(String) and add a subsequent map or filter stage.

        Type Parameters:
        T - type of emitted item
        Parameters:
        map - the Hazelcast map to read data from
        predicate - the predicate to filter the events. If you want to specify just the projection, use Predicates.alwaysTrue() as a pass-through predicate. It must be stateless and cooperative.
        projection - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. If you want to specify just the predicate, use Projections.identity(). It must be stateless and cooperative.
      • mapJournal

        @Nonnull
        public static <T,​K,​V> StreamSource<T> mapJournal​(@Nonnull
                                                                     java.lang.String mapName,
                                                                     @Nonnull
                                                                     JournalInitialPosition initialPos,
                                                                     @Nonnull
                                                                     FunctionEx<? super EventJournalMapEvent<K,​V>,​? extends T> projectionFn,
                                                                     @Nonnull
                                                                     PredicateEx<? super EventJournalMapEvent<K,​V>> predicateFn)
        Returns a source that will stream EventJournalMapEvents of the Hazelcast IMap with the specified name. By supplying a predicate and projection here instead of in separate map/filter transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.

        The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

        To use an IMap as a streaming source, you must configure the event journal for it. The journal has fixed capacity and will drop events if it overflows.

        The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).

        If you start a new job from an exported state, you can change the source parameters as needed.

        The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).

        Predicate/projection class requirements

        The classes implementing predicateFn and projectionFn need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath in JobConfig. The same is true for the class of the objects stored in the map itself. If you cannot meet these requirements, use mapJournal(String, JournalInitialPosition) and add a subsequent map or filter stage.
        Type Parameters:
        T - type of emitted item
        Parameters:
        mapName - the name of the map
        initialPos - describes which event to start receiving from
        projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.mapEventToEntry() to extract just the key and the new value. It must be stateless and cooperative.
        predicateFn - the predicate to filter the events. If you want to specify just the projection, use Util.mapPutEvents() to pass only ADDED and UPDATED events. It must be stateless and cooperative.
      • mapJournal

        @Nonnull
        public static <T,​K,​V> StreamSource<T> mapJournal​(@Nonnull
                                                                     IMap<? extends K,​? extends V> map,
                                                                     @Nonnull
                                                                     JournalInitialPosition initialPos,
                                                                     @Nonnull
                                                                     FunctionEx<? super EventJournalMapEvent<K,​V>,​? extends T> projectionFn,
                                                                     @Nonnull
                                                                     PredicateEx<? super EventJournalMapEvent<K,​V>> predicateFn)
        Returns a source that will stream EventJournalMapEvents of the given Hazelcast IMap. By supplying a predicate and projection here instead of in separate map/filter transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.

        NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.

        The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

        To use an IMap as a streaming source, you must configure the event journal for it. The journal has fixed capacity and will drop events if it overflows.

        The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).

        If you start a new job from an exported state, you can change the source parameters as needed.

        The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).

        Predicate/projection class requirements

        The classes implementing predicateFn and projectionFn need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath in JobConfig. The same is true for the class of the objects stored in the map itself. If you cannot meet these requirements, use mapJournal(String, JournalInitialPosition) and add a subsequent map or filter stage.

        Issue when "catching up"

        This processor does not coalesce watermarks from partitions. It reads partitions one by one: it emits events from one partition and then from another one in batches. This adds time disorder to events: it might emit very recent event from partition1 while not yet emitting an old event from partition2; and it generates watermarks based on this order. Even if items in your partitions are ordered by timestamp, you can't use allowed lag of 0. Most notably, the "catching up" happens after the job is restarted, when events since the last snapshot are reprocessed in a burst. In order to not lose any events, the lag should be configured to at least snapshotInterval + timeToRestart + normalEventLag. The reason for this behavior that the default partition count in the cluster is pretty high and cannot be changed per object and for low-traffic maps it takes long until all partitions see an event to allow emitting of a coalesced watermark.
        Type Parameters:
        T - type of emitted item
        Parameters:
        map - the map to read data from
        initialPos - describes which event to start receiving from
        projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.mapEventToEntry() to extract just the key and the new value. It must be stateless and cooperative.
        predicateFn - the predicate to filter the events. If you want to specify just the projection, use Util.mapPutEvents() to pass only ADDED and UPDATED events. It must be stateless and cooperative.
      • mapJournal

        @Nonnull
        public static <K,​V> StreamSource<java.util.Map.Entry<K,​V>> mapJournal​(@Nonnull
                                                                                          IMap<? extends K,​? extends V> map,
                                                                                          @Nonnull
                                                                                          JournalInitialPosition initialPos)
        Convenience for mapJournal(IMap, JournalInitialPosition, FunctionEx, PredicateEx) which will pass only ADDED and UPDATED events and will project the event's key and new value into a Map.Entry.

        NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.

      • remoteMap

        @Nonnull
        public static <K,​V> BatchSource<java.util.Map.Entry<K,​V>> remoteMap​(@Nonnull
                                                                                        java.lang.String mapName,
                                                                                        @Nonnull
                                                                                        ClientConfig clientConfig)
        Returns a source that fetches entries from the Hazelcast IMap with the specified name in a remote cluster identified by the supplied ClientConfig and emits them as Map.Entry.

        The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

        If the IMap is modified while being read, or if there is a cluster topology change (triggering data migration), the source may miss and/or duplicate some entries. If we detect a topology change, the job will fail, but the detection is only on a best-effort basis - we might still give incorrect results without reporting a failure. Concurrent mutation is not detected at all.

        The default local parallelism for this processor is 1.

      • remoteMap

        @Nonnull
        public static <T,​K,​V> BatchSource<T> remoteMap​(@Nonnull
                                                                   java.lang.String mapName,
                                                                   @Nonnull
                                                                   ClientConfig clientConfig,
                                                                   @Nonnull
                                                                   Predicate<K,​V> predicate,
                                                                   @Nonnull
                                                                   Projection<? super java.util.Map.Entry<K,​V>,​? extends T> projection)
        Returns a source that fetches entries from a remote Hazelcast IMap with the specified name in a remote cluster identified by the supplied ClientConfig.

        See RemoteMapSourceBuilder for details on the remote map source.

        Type Parameters:
        T - type of emitted item
        Parameters:
        mapName - the name of the map
        predicate - the predicate to filter the events. If you want to specify just the projection, use Predicates.alwaysTrue() as a pass-through predicate. It must be stateless and cooperative.
        projection - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. If you want to specify just the predicate, use Projections.identity(). It must be stateless and cooperative.
      • remoteMap

        @Nonnull
        public static <K,​V> BatchSource<java.util.Map.Entry<K,​V>> remoteMap​(@Nonnull
                                                                                        java.lang.String mapName,
                                                                                        @Nonnull
                                                                                        DataConnectionRef dataConnectionRef)
        The same as the remoteMap(String, ClientConfig, Predicate, Projection) method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.

        The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used

        Parameters:
        mapName - the name of the map
        dataConnectionRef - the reference to DataConnectionConfig
        Since:
        5.4
      • remoteMap

        @Nonnull
        public static <T,​K,​V> BatchSource<T> remoteMap​(@Nonnull
                                                                   java.lang.String mapName,
                                                                   @Nonnull
                                                                   DataConnectionRef dataConnectionRef,
                                                                   @Nonnull
                                                                   Predicate<K,​V> predicate,
                                                                   @Nonnull
                                                                   Projection<? super java.util.Map.Entry<K,​V>,​? extends T> projection)
        The same as the remoteMap(String, ClientConfig, Predicate, Projection) method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.

        The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used

        Parameters:
        mapName - the name of the map
        dataConnectionRef - the reference to DataConnectionConfig
        Since:
        5.4
      • remoteMapBuilder

        @Nonnull
        public static <K,​V> RemoteMapSourceBuilder<K,​V,​java.util.Map.Entry<K,​V>> remoteMapBuilder​(java.lang.String mapName)
        Returns a builder to build a source that fetches entries from a remote Hazelcast IMap with the specified name. It provides a fluent API to build the source using the optional parameters.

        See RemoteMapSourceBuilder for details on the remote map source.

        Type Parameters:
        K - the type of the key in the map
        V - the type of the value in the map
        Parameters:
        mapName - the name of the map
        Returns:
        builder for remote map source
        Since:
        5.4
      • remoteMapJournal

        @Nonnull
        public static <T,​K,​V> StreamSource<T> remoteMapJournal​(@Nonnull
                                                                           java.lang.String mapName,
                                                                           @Nonnull
                                                                           ClientConfig clientConfig,
                                                                           @Nonnull
                                                                           JournalInitialPosition initialPos,
                                                                           @Nonnull
                                                                           FunctionEx<? super EventJournalMapEvent<K,​V>,​? extends T> projectionFn,
                                                                           @Nonnull
                                                                           PredicateEx<? super EventJournalMapEvent<K,​V>> predicateFn)
        Returns a source that will stream the EventJournalMapEvent events of the Hazelcast IMap with the specified name from a remote cluster. By supplying a predicate and projection here instead of in separate map/filter transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.

        To use an IMap as a streaming source, you must configure the event journal for it. The journal has fixed capacity and will drop events if it overflows.

        The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).

        If you start a new job from an exported state, you can change the source parameters as needed. If you connect to another cluster, keep in mind that the same offsets will be used. To avoid this, give different name to this source.

        The default local parallelism for this processor is 1.

        Predicate/projection class requirements

        The classes implementing predicateFn and projectionFn need to be available on the remote cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath in JobConfig. The same is true for the class of the objects stored in the map itself. If you cannot meet these requirements, use remoteMapJournal(String, ClientConfig, JournalInitialPosition) and add a subsequent map or filter stage.
        Type Parameters:
        K - type of key
        V - type of value
        T - type of emitted item
        Parameters:
        mapName - the name of the map
        clientConfig - configuration for the client to connect to the remote cluster
        initialPos - describes which event to start receiving from
        projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.mapEventToEntry() to extract just the key and the new value. It must be stateless and cooperative.
        predicateFn - the predicate to filter the events. You may use Util.mapPutEvents() to pass only ADDED and UPDATED events. It must be stateless and cooperative.
      • remoteMapJournal

        @Nonnull
        public static <T,​K,​V> StreamSource<T> remoteMapJournal​(@Nonnull
                                                                           java.lang.String mapName,
                                                                           @Nonnull
                                                                           DataConnectionRef dataConnectionRef,
                                                                           @Nonnull
                                                                           JournalInitialPosition initialPos,
                                                                           @Nonnull
                                                                           FunctionEx<? super EventJournalMapEvent<K,​V>,​? extends T> projectionFn,
                                                                           @Nonnull
                                                                           PredicateEx<? super EventJournalMapEvent<K,​V>> predicateFn)
        The same as the remoteMapJournal(String, ClientConfig, JournalInitialPosition, FunctionEx, PredicateEx) method. The only difference is instead of a ClientConfig parameter that is used to connect to remote cluster, this method receives a DataConnectionConfig.

        The DataConnectionConfig caches the connection to remote cluster, so that it can be re-used

        (Prerequisite) External dataConnection configuration: Use HazelcastDataConnection.CLIENT_XML for XML or use HazelcastDataConnection.CLIENT_YML for YAML string.

        
         Config config = ...;
         String xmlString = ...;
         DataConnectionConfig dataConnectionConfig = new DataConnectionConfig()
             .setName("my-hzclient-data-connection")
             .setType("Hz")
             .setProperty(HzClientDataConnectionFactory.CLIENT_XML, xmlString);
         config.addDataConnectionConfig(dataConnectionConfig);
          

        Pipeline configuration

        
         PredicateEx<EventJournalMapEvent<String, Integer>> predicate = ...;
         p.readFrom(Sources.remoteMapJournal(
             mapName,
             DataConnectionRef.dataConnectionRef("my-hzclient-data-connection"),
             JournalInitialPosition.START_FROM_OLDEST,
             EventJournalMapEvent::getNewValue,
             predicate
          ));
          
        Type Parameters:
        T - is the return type of the stream
        K - is the key type of EventJournalMapEvent
        V - is the vale type of EventJournalMapEvent
        Parameters:
        mapName - the name of the map
        dataConnectionRef - the reference to DataConnectionConfig
        initialPos - describes which event to start receiving from
        projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.mapEventToEntry() to extract just the key and the new value. It must be stateless and cooperative.
        predicateFn - the predicate to filter the events. If you want to specify just the projection, use Util.mapPutEvents() to pass only ADDED and UPDATED events. It must be stateless and cooperative.
        Returns:
        a stream that can be used as a source
        Since:
        5.3
      • cache

        @Nonnull
        public static <K,​V> BatchSource<java.util.Map.Entry<K,​V>> cache​(@Nonnull
                                                                                    java.lang.String cacheName)
        Returns a source that fetches entries from a Hazelcast ICache with the given name and emits them as Map.Entry. It leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

        The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

        If entries are added or removed to the cache during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.

        The default local parallelism for this processor 1.

      • cacheJournal

        @Nonnull
        public static <T,​K,​V> StreamSource<T> cacheJournal​(@Nonnull
                                                                       java.lang.String cacheName,
                                                                       @Nonnull
                                                                       JournalInitialPosition initialPos,
                                                                       @Nonnull
                                                                       FunctionEx<? super EventJournalCacheEvent<K,​V>,​? extends T> projectionFn,
                                                                       @Nonnull
                                                                       PredicateEx<? super EventJournalCacheEvent<K,​V>> predicateFn)
        Returns a source that will stream the EventJournalCacheEvent events of a Hazelcast ICache with the specified name. By supplying a predicate and projection here instead of in separate map/filter transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.

        The source leverages data locality by making each of the underlying processors fetch only those entries that are stored on the member where it is running.

        To use an ICache as a streaming source, you must configure the event journal for it. The journal has fixed capacity and will drop events if it overflows.

        The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).

        If you start a new job from an exported state, you can change the source parameters as needed.

        The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).

        Predicate/projection class requirements

        The classes implementing predicateFn and projectionFn need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath in JobConfig. The same is true for the class of the objects stored in the cache itself. If you cannot meet these conditions, use cacheJournal(String, JournalInitialPosition) and add a subsequent map or filter stage.
        Type Parameters:
        T - type of emitted item
        Parameters:
        cacheName - the name of the cache
        initialPos - describes which event to start receiving from
        projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.cacheEventToEntry() to extract just the key and the new value. It must be stateless and cooperative.
        predicateFn - the predicate to filter the events. You may use Util.cachePutEvents() to pass only CREATED and UPDATED events. It must be stateless and cooperative.
      • remoteCache

        @Nonnull
        public static <K,​V> BatchSource<java.util.Map.Entry<K,​V>> remoteCache​(@Nonnull
                                                                                          java.lang.String cacheName,
                                                                                          @Nonnull
                                                                                          ClientConfig clientConfig)
        Returns a source that fetches entries from the Hazelcast ICache with the specified name in a remote cluster identified by the supplied ClientConfig and emits them as Map.Entry.

        The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

        If entries are added or removed to the cache during the job, the keys which have been deleted or added since the job started will be emitted at most once, other keys must be emitted exactly once.

        The default local parallelism for this processor is 1.

      • remoteCacheJournal

        @Nonnull
        public static <T,​K,​V> StreamSource<T> remoteCacheJournal​(@Nonnull
                                                                             java.lang.String cacheName,
                                                                             @Nonnull
                                                                             ClientConfig clientConfig,
                                                                             @Nonnull
                                                                             JournalInitialPosition initialPos,
                                                                             @Nonnull
                                                                             FunctionEx<? super EventJournalCacheEvent<K,​V>,​? extends T> projectionFn,
                                                                             @Nonnull
                                                                             PredicateEx<? super EventJournalCacheEvent<K,​V>> predicateFn)
        Returns a source that will stream the EventJournalCacheEvent events of the Hazelcast ICache with the specified name from a remote cluster. By supplying a predicate and projection here instead of in separate map/filter transforms you allow the source to apply these functions early, before generating any output, with the potential of significantly reducing data traffic.

        To use an ICache as a streaming source, you must configure the event journal for it. The journal has fixed capacity and will drop events if it overflows.

        The source saves the journal offsets to the snapshot. If the job restarts, it starts emitting from the saved offsets with an exactly-once guarantee (unless the journal has overflowed).

        If you start a new job from an exported state, you can change the source parameters as needed. If you connect to another cluster, keep in mind that the same offsets will be used. To avoid this, give different name to this source.

        The default local parallelism for this processor is 1.

        Predicate/projection class requirements

        The classes implementing predicateFn and projectionFn need to be available on the cluster's classpath or loaded using Hazelcast User Code Deployment. It's not enough to add them to the job classpath in JobConfig. The same is true for the class of the objects stored in the cache itself. If you cannot meet these conditions, use remoteCacheJournal(String, ClientConfig, JournalInitialPosition) and add a subsequent map or filter stage.
        Type Parameters:
        T - type of emitted item
        Parameters:
        cacheName - the name of the cache
        clientConfig - configuration for the client to connect to the remote cluster
        initialPos - describes which event to start receiving from
        projectionFn - the projection to map the events. If the projection returns a null for an item, that item will be filtered out. You may use Util.cacheEventToEntry() to extract just the key and the new value. It must be stateless and cooperative.
        predicateFn - the predicate to filter the events. You may use Util.cachePutEvents() to pass only CREATED and UPDATED events. It must be stateless and cooperative.
      • list

        @Nonnull
        public static <T> BatchSource<T> list​(@Nonnull
                                              java.lang.String listName)
        Returns a source that emits items retrieved from a Hazelcast IList. All elements are emitted on a single member — the one where the entire list is stored by the IMDG.

        If the IList is modified while being read, the source may miss and/or duplicate some entries.

        The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

        One instance of this processor runs only on the member that owns the list.

      • list

        @Nonnull
        public static <T> BatchSource<T> list​(@Nonnull
                                              IList<? extends T> list)
        Returns a source that emits items retrieved from a Hazelcast IList. All elements are emitted on a single member — the one where the entire list is stored by the IMDG.

        If the IList is modified while being read, the source may miss and/or duplicate some entries.

        NOTE: Jet only remembers the name of the list you supply and acquires a list with that name on the local cluster. If you supply a list instance from another cluster, no error will be thrown to indicate this.

        The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

        One instance of this processor runs only on the member that owns the list.

      • remoteList

        @Nonnull
        public static <T> BatchSource<T> remoteList​(@Nonnull
                                                    java.lang.String listName,
                                                    @Nonnull
                                                    ClientConfig clientConfig)
        Returns a source that emits items retrieved from a Hazelcast IList in a remote cluster identified by the supplied ClientConfig. All elements are emitted on a single member.

        If the IList is modified while being read, the source may miss and/or duplicate some entries.

        The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

        Only 1 instance of this processor runs in the cluster.

      • socket

        @Nonnull
        public static StreamSource<java.lang.String> socket​(@Nonnull
                                                            java.lang.String host,
                                                            int port,
                                                            @Nonnull
                                                            java.nio.charset.Charset charset)
        Returns a source which connects to the specified socket and emits lines of text received from it. It decodes the text using the supplied charset.

        Each underlying processor opens its own TCP connection, so there will be clusterSize * localParallelism open connections to the server.

        The source completes when the server closes the socket. It never attempts to reconnect. Any IOException will cause the job to fail.

        The source does not save any state to snapshot. On job restart, it will emit whichever items the server sends. The implementation uses non-blocking API, the processor is cooperative.

        The default local parallelism for this processor is 1.

      • socket

        @Nonnull
        public static StreamSource<java.lang.String> socket​(@Nonnull
                                                            java.lang.String host,
                                                            int port)
        Convenience for socket(host, port, charset) with UTF-8 as the charset.
        Parameters:
        host - the hostname to connect to
        port - the port to connect to
      • filesBuilder

        @Nonnull
        public static FileSourceBuilder filesBuilder​(@Nonnull
                                                     java.lang.String directory)
        Returns a builder object that offers a step-by-step fluent API to build a custom source to read files for the Pipeline API. The source reads lines from files in a directory (but not its subdirectories). Using this builder you can build batching or streaming reader.
      • files

        @Nonnull
        public static BatchSource<java.lang.String> files​(@Nonnull
                                                          java.lang.String directory)
        A source to read all files in a directory in a batch way.

        This method is a shortcut for:

        
           filesBuilder(directory)
              .charset(UTF_8)
              .glob(GLOB_WILDCARD)
              .sharedFileSystem(false)
              .build((fileName, line) -> line)
         

        If files are appended to while being read, the addition might or might not be emitted or part of a line can be emitted. If files are modified in more complex ways, the behavior is undefined.

        See filesBuilder(String).

      • json

        @Nonnull
        public static <T> BatchSource<T> json​(@Nonnull
                                              java.lang.String directory,
                                              @Nonnull
                                              java.lang.Class<T> type)
        A source to read all files in a directory in a batch way. The source expects the content of the files as streaming JSON content, where each JSON string is separated by a new-line. The JSON string itself can span on multiple lines. The source converts each JSON string to an object of given type.

        This method is a shortcut for:

        
           filesBuilder(directory)
              .charset(UTF_8)
              .glob(GLOB_WILDCARD)
              .sharedFileSystem(false)
              .build(path -> JsonUtil.beanSequenceFrom(path, type))
         

        If files are appended to while being read, the addition might or might not be emitted or part of a line can be emitted. If files are modified in more complex ways, the behavior is undefined.

        See filesBuilder(String), files(String).

        Since:
        Jet 4.2
      • json

        @Nonnull
        public static BatchSource<java.util.Map<java.lang.String,​java.lang.Object>> json​(@Nonnull
                                                                                               java.lang.String directory)
        Convenience for json(String, Class) which converts each JSON string to a Map. It will throw ClassCastException if JSON string is just primitive (String, Number, Boolean) or JSON array (List).
        Since:
        Jet 4.2
      • fileWatcher

        @Nonnull
        public static StreamSource<java.lang.String> fileWatcher​(@Nonnull
                                                                 java.lang.String watchedDirectory)
        A source to stream lines added to files in a directory. This is a streaming source, it will watch directory and emit lines as they are appended to files in that directory.

        This method is a shortcut for:

        
           filesBuilder(directory)
              .charset(UTF_8)
              .glob(GLOB_WILDCARD)
              .sharedFileSystem(false)
              .buildWatcher((fileName, line) -> line)
         

        Appending lines using an text editor

        If you're testing this source, you might think of using a text editor to append the lines. However, it might not work as expected because some editors write to a temp file and then rename it or append extra newline character at the end which gets overwritten if more text is added in the editor. The best way to append is to use echo text >> yourFile.

        See filesBuilder(String).

      • jsonWatcher

        @Nonnull
        public static <T> StreamSource<T> jsonWatcher​(@Nonnull
                                                      java.lang.String watchedDirectory,
                                                      @Nonnull
                                                      java.lang.Class<T> type)
        A source to stream lines added to files in a directory. This is a streaming source, it will watch directory and emit objects of given type by converting each line as they are appended to files in that directory.

        This method is a shortcut for:

        
           filesBuilder(directory)
              .charset(UTF_8)
              .glob(GLOB_WILDCARD)
              .sharedFileSystem(false)
              .buildWatcher((fileName, line) -> JsonUtil.beanFrom(line, type))
         

        Appending lines using an text editor

        If you're testing this source, you might think of using a text editor to append the lines. However, it might not work as expected because some editors write to a temp file and then rename it or append extra newline character at the end which gets overwritten if more text is added in the editor. The best way to append is to use echo text >> yourFile.

        See filesBuilder(String), fileWatcher(String).

        Since:
        Jet 4.2
      • jsonWatcher

        @Nonnull
        public static StreamSource<java.util.Map<java.lang.String,​java.lang.Object>> jsonWatcher​(@Nonnull
                                                                                                       java.lang.String watchedDirectory)
        Convenience for jsonWatcher(String, Class) which converts each line appended to the Map representation of the JSON string.
        Since:
        Jet 4.2
      • jmsQueue

        @Nonnull
        @Deprecated
        public static StreamSource<jakarta.jms.Message> jmsQueue​(@Nonnull
                                                                 SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier,
                                                                 @Nonnull
                                                                 java.lang.String name)
      • jmsQueue

        @Nonnull
        public static StreamSource<jakarta.jms.Message> jmsQueue​(@Nonnull
                                                                 java.lang.String name,
                                                                 @Nonnull
                                                                 SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier)
        Shortcut equivalent to:
                 return jmsQueueBuilder(factorySupplier)
                         .destinationName(name)
                         .build();
         
        This version creates a connection without any authentication parameters. JMS Message objects are emitted to downstream.

        Note: Message might not be serializable. In that case you can use the builder and add a projection.

        Parameters:
        name - the name of the queue
        factorySupplier - supplier to obtain JMS connection factory. It must be stateless.
        Since:
        Jet 4.1
      • jmsQueueBuilder

        @Nonnull
        public static JmsSourceBuilder jmsQueueBuilder​(SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier)
        Returns a builder object that offers a step-by-step fluent API to build a custom JMS StreamSource for the Pipeline API. See javadoc on JmsSourceBuilder methods for more details.

        This source uses the JMS' message timestamp as the native timestamp, if enabled.

        This source supports exactly-once and at-least-once mode, see JmsSourceBuilder.maxGuarantee(ProcessingGuarantee) for more information.

        IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.

        The default local parallelism for this processor is 1.

        Parameters:
        factorySupplier - supplier to obtain JMS connection factory. It must be stateless.
      • jmsTopic

        @Nonnull
        @Deprecated
        public static StreamSource<jakarta.jms.Message> jmsTopic​(@Nonnull
                                                                 SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier,
                                                                 @Nonnull
                                                                 java.lang.String name)
      • jmsTopic

        @Nonnull
        public static StreamSource<jakarta.jms.Message> jmsTopic​(@Nonnull
                                                                 java.lang.String name,
                                                                 @Nonnull
                                                                 SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier)
        Shortcut equivalent to:
                 return jmsTopicBuilder(factorySupplier)
                         .destinationName(name)
                         .build();
         
        This version creates a connection without any authentication parameters. A non-durable, non-shared consumer is used, only one member will connect to the broker. JMS Message objects are emitted to downstream.

        Note: Message might not be serializable. In that case you can use the builder and add a projection.

        Parameters:
        name - the name of the queue
        factorySupplier - supplier to obtain JMS connection factory. It must be stateless.
        Since:
        Jet 4.1
      • jmsTopicBuilder

        @Nonnull
        public static JmsSourceBuilder jmsTopicBuilder​(SupplierEx<? extends jakarta.jms.ConnectionFactory> factorySupplier)
        Returns a builder object that offers a step-by-step fluent API to build a custom JMS StreamSource for the Pipeline API. See javadoc on JmsSourceBuilder methods for more details.

        By default, a non-shared consumer is used. This forces the source to run on only one member of the cluster. You can use JmsSourceBuilder.consumerFn(FunctionEx) to create a shared consumer.

        This source uses the JMS' message timestamp as the native timestamp, if enabled.

        This source supports exactly-once and at-least-once mode for durable topic subscriptions, see JmsSourceBuilder.maxGuarantee(ProcessingGuarantee) for more information.

        IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.

        The default local parallelism for this processor is 1.

        Parameters:
        factorySupplier - supplier to obtain JMS connection factory. It must be stateless.
      • jdbc

        @Nonnull
        public static <T> BatchSource<T> jdbc​(@Nonnull
                                              SupplierEx<? extends java.sql.Connection> newConnectionFn,
                                              @Nonnull
                                              ToResultSetFunction resultSetFn,
                                              @Nonnull
                                              FunctionEx<? super java.sql.ResultSet,​? extends T> createOutputFn)
        Returns a source which connects to the specified database using the given newConnectionFn, queries the database and creates a result set using the the given resultSetFn. It creates output objects from the ResultSet using given mapOutputFn and emits them to downstream.

        resultSetFn gets the created connection, total parallelism (local parallelism * member count) and global processor index as arguments and produces a result set. The parallelism and processor index arguments should be used to fetch a part of the whole result set specific to the processor. If the table itself isn't partitioned by the same key, then running multiple queries might not really be faster than using the simpler version of this method, do your own testing.

        createOutputFn gets the ResultSet and creates desired output object. The function is called for each row of the result set, user should not call ResultSet.next() or any other cursor-navigating functions.

        Example:

        
             p.readFrom(Sources.jdbc(
                 () -> DriverManager.getConnection(DB_CONNECTION_URL),
                 (con, parallelism, index) -> {
                      PreparedStatement stmt = con.prepareStatement("SELECT * FROM TABLE WHERE MOD(id, ?) = ?)");
                      stmt.setInt(1, parallelism);
                      stmt.setInt(2, index);
                      return stmt.executeQuery();
                 },
                 resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
         

        If the underlying table is modified while being read, the source may miss and/or duplicate some entries, because multiple queries for parts of the data on multiple members will be executed.

        The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

        Any SQLException will cause the job to fail.

        The default local parallelism for this processor is 1.

        The given functions must be stateless.

        Type Parameters:
        T - type of output objects
        Parameters:
        newConnectionFn - creates the connection
        resultSetFn - creates a ResultSet using the connection, total parallelism and index
        createOutputFn - creates output objects from ResultSet
      • jdbc

        @Nonnull
        public static <T> BatchSource<T> jdbc​(@Nonnull
                                              DataConnectionRef dataConnectionRef,
                                              @Nonnull
                                              ToResultSetFunction resultSetFn,
                                              @Nonnull
                                              FunctionEx<? super java.sql.ResultSet,​? extends T> createOutputFn)
        Returns a source which connects to the specified database using the given dataConnectionRef, queries the database and creates a result set using the given resultSetFn. It creates output objects from the ResultSet using given mapOutputFn and emits them to downstream.

        Example:

        (Prerequisite) Data connection configuration:

        
              Config config = smallInstanceConfig();
              Properties properties = new Properties();
              properties.put("jdbcUrl", jdbcUrl);
              properties.put("username", username);
              properties.put("password", password);
              DataConnectionConfig dataConnectionConfig = new DataConnectionConfig()
                      .setName("my-jdbc-data-connection")
                      .setType("Jdbc")
                      .setProperties(properties);
              config.getDataConnectionConfigs().put(name, dataConnectionConfig);
         

        Pipeline configuration

        
             p.readFrom(Sources.jdbc(
                 DataConnectionRef.dataConnectionRef("my-jdbc-data-connection"),
                 (con, parallelism, index) -> {
                      PreparedStatement stmt = con.prepareStatement("SELECT * FROM TABLE WHERE MOD(id, ?) = ?)");
                      stmt.setInt(1, parallelism);
                      stmt.setInt(2, index);
                      return stmt.executeQuery();
                 },
                 resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
         

        See also jdbc(SupplierEx, ToResultSetFunction, FunctionEx).

        Since:
        5.3
      • jdbc

        @Nonnull
        public static <T> BatchSource<T> jdbc​(@Nonnull
                                              java.lang.String connectionURL,
                                              @Nonnull
                                              java.lang.String query,
                                              @Nonnull
                                              FunctionEx<? super java.sql.ResultSet,​? extends T> createOutputFn)
        Convenience for jdbc(SupplierEx, ToResultSetFunction, FunctionEx). A non-distributed, single-worker source which fetches the whole resultSet with a single query on single member.

        This method executes exactly one query in the target database. If the underlying table is modified while being read, the behavior depends on the configured transaction isolation level in the target database. Refer to the documentation for the target database system.

        Example:

        
             p.readFrom(Sources.jdbc(
                 DB_CONNECTION_URL,
                 "select ID, NAME from PERSON",
                 resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
         
        The given function must be stateless.
      • jdbc

        @Nonnull
        public static <T> BatchSource<T> jdbc​(@Nonnull
                                              java.lang.String connectionURL,
                                              @Nonnull
                                              java.lang.String query,
                                              @Nonnull
                                              java.util.Properties properties,
                                              @Nonnull
                                              FunctionEx<? super java.sql.ResultSet,​? extends T> createOutputFn)
        Same as @{link jdbc(String, String, FunctionEx)}

        It is not always possible to use the default properties. This overload allows passing some properties to the JDBC driver

        Example for PostgreSQL to specify fetchSize: PostgreSQL requires that the autocommit should be disabled. Because the backend closes cursors at the end of transactions, so in autocommit enabled mode the backend will have closed the cursor before anything can be fetched from it.

        
                Properties properties = new Properties();
                properties.put(JdbcPropertyKeys.FETCH_SIZE, "5");
                properties.put(JdbcPropertyKeys.AUTO_COMMIT, "false");
                p.readFrom(Sources.jdbc(
                    "jdbc:postgresql://localhost:5432/mydatabase",
                    "select ID, NAME from PERSON",
                    properties
                    resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))
            

        Example for MySQL to specify fetchSize: The database connection URL should have "&useCursorFetch=true" parameter to enable cursor-based fetching. This means that the JDBC driver will fetch a set of rows from the database at a time, rather than fetching all the rows in the result set at once

        
                Properties properties = new Properties();
                properties.put(JdbcPropertyKeys.FETCH_SIZE, "5");
                p.readFrom(Sources.jdbc(
                    "jdbc:mysql://localhost:3306/mydatabase?useCursorFetch=true,"
                    "select ID, NAME from PERSON",
                    properties
                    resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))))