Class SourceBuilder<C>

java.lang.Object
com.hazelcast.jet.pipeline.SourceBuilder<C>
Type Parameters:
C - type of the context object

public final class SourceBuilder<C> extends Object
Top-level class for Jet custom source builders. It is associated with a context object that holds any necessary state and resources you need to make your source work.

For further details refer to the factory methods:

Since:
Jet 3.0
  • Method Details

    • batch

      @Nonnull public static <C> SourceBuilder<C>.Batch<Void> batch(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context,? extends C> createFn)
      Returns a fluent-API builder with which you can create a batch source for a Jet pipeline. The source will use non-cooperative processors.

      Each parallel processor that drives your source has its private instance of a context object it got from your createFn. To get the data items to emit to the pipeline, the processor repeatedly calls your fillBufferFn with the context object and a buffer object.

      Your function should add some items to the buffer, ideally those it has ready without having to block. A hundred items at a time is enough to eliminate any per-call overheads within Jet. If it doesn't have any items ready, it may also return without adding anything. In any case the function should not take more than a second or so to complete, otherwise you risk interfering with Jet's coordination mechanisms and getting bad performance.

      Once it has emitted all the data, fillBufferFn must call buffer.close(). This signals Jet to not call fillBufferFn again and at some later point it will call the destroyFn with the context object.

      Unless you call builder.distributed(), Jet will create just a single processor that should emit all the data. If you do call it, make sure your distributed source takes care of splitting the data between processors. Your createFn should consult processorContext.totalParallelism() and processorContext.globalProcessorIndex(). Jet calls it exactly once with each globalProcessorIndex from 0 to totalParallelism - 1 and each of the resulting context objects must emit its distinct slice of the total source data.

      Here's an example that builds a simple, non-distributed source that reads the lines from a single text file. Since you can't control on which member of the Jet cluster the source's processor will run, the file should be available on all members. The source emits one line per fillBufferFn call.

      
       BatchSource<String> fileSource = SourceBuilder
               .batch("file-source", processorContext ->
                   new BufferedReader(new FileReader("input.txt")))
               .<String>fillBufferFn((in, buf) -> {
                   String line = in.readLine();
                   if (line != null) {
                       buf.add(line);
                   } else {
                       buf.close();
                   }
               })
               .destroyFn(BufferedReader::close)
               .build();
       Pipeline p = Pipeline.create();
       BatchStage<String> srcStage = p.readFrom(fileSource);
       
      Type Parameters:
      C - type of the context object
      Parameters:
      name - a descriptive name for the source (for diagnostic purposes)
      createFn - a function that creates the source's context object. It must be stateless.
      Since:
      Jet 3.0
    • stream

      @Nonnull public static <C> SourceBuilder<C>.Stream<Void> stream(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context,? extends C> createFn)
      Returns a fluent-API builder with which you can create an unbounded stream source for a Jet pipeline. The source will use non-cooperative processors.

      Each parallel processor that drives your source has its private instance of a context object it got from your createFn. To get the data items to emit to the pipeline, the processor repeatedly calls your fillBufferFn with the state object and a buffer object.

      Your function should add some items to the buffer, ideally those it has ready without having to block. A hundred items at a time is enough to eliminate any per-call overheads within Jet. If it doesn't have any items ready, it may also return without adding anything. In any case the function should not take more than a second or so to complete, otherwise you risk interfering with Jet's coordination mechanisms and getting bad performance.

      Unless you call builder.distributed(), Jet will create just a single processor that should emit all the data. If you do call it, make sure your distributed source takes care of splitting the data between processors. Your createFn should consult processorContext.totalParallelism() and processorContext.globalProcessorIndex(). Jet calls it exactly once with each globalProcessorIndex from 0 to totalParallelism - 1 and each of the resulting context objects must emit its distinct slice of the total source data.

      Here's an example that builds a simple, non-distributed source that polls a URL and emits all the lines it gets in the response:

      
       StreamSource<String> socketSource = SourceBuilder
           .stream("http-source", processorContext -> HttpClients.createDefault())
           .<String>fillBufferFn((httpc, buf) -> {
               new BufferedReader(new InputStreamReader(
                   httpc.execute(new HttpGet("localhost:8008"))
                        .getEntity().getContent()))
                   .lines()
                   .forEach(buf::add);
           })
           .destroyFn(Closeable::close)
           .build();
       Pipeline p = Pipeline.create();
       StreamStage<String> srcStage = p.readFrom(socketSource);
       
      Type Parameters:
      C - type of the context object
      Parameters:
      name - a descriptive name for the source (for diagnostic purposes)
      createFn - a function that creates the source's context object. It must be stateless.
      Since:
      Jet 3.0
    • timestampedStream

      @Nonnull public static <C> SourceBuilder<C>.TimestampedStream<Void> timestampedStream(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context,? extends C> createFn)
      Returns a fluent-API builder with which you can create an unbounded stream source for a Jet pipeline. It will use non-cooperative processors.

      When emitting an item, the source can explicitly assign a timestamp to it. You can tell a Jet pipeline to use those timestamps by calling sourceStage.withNativeTimestamps().

      Each parallel processor that drives your source has its private instance of a context object it gets from the given createFn. To get the data items to emit to the pipeline, the processor repeatedly calls your fillBufferFn with the context object and a buffer object. The buffer's add() method takes two arguments: the item and the timestamp in milliseconds.

      Your function should add some items to the buffer, ideally those it has ready without having to block. A hundred items at a time is enough to eliminate any per-call overheads within Jet. If it doesn't have any items ready, it may also return without adding anything. In any case the function should not take more than a second or so to complete, otherwise you risk interfering with Jet's coordination mechanisms and getting bad performance.

      Unless you call builder.distributed(), Jet will create just a single processor that should emit all the data. If you do call it, make sure your distributed source takes care of splitting the data between processors. Your createFn should consult procContext.totalParallelism() and procContext.globalProcessorIndex(). Jet calls it exactly once with each globalProcessorIndex from 0 to totalParallelism - 1 and each of the resulting context objects must emit its distinct slice of the total source data.

      Here's an example that builds a simple, non-distributed source that polls a URL and emits all the lines it gets in the response, interpreting the first 9 characters as the timestamp.

      
       StreamSource<String> socketSource = SourceBuilder
           .timestampedStream("http-source",
               processorContext -> HttpClients.createDefault())
           .<String>fillBufferFn((httpc, buf) -> {
               new BufferedReader(new InputStreamReader(
                   httpc.execute(new HttpGet("localhost:8008"))
                        .getEntity().getContent()))
                   .lines()
                   .forEach(line -> {
                       long timestamp = Long.valueOf(line.substring(0, 9));
                       buf.add(line.substring(9), timestamp);
                   });
           })
           .destroyFn(Closeable::close)
           .build();
       Pipeline p = Pipeline.create();
       StreamStage<String> srcStage = p.readFrom(socketSource)
               .withNativeTimestamps(SECONDS.toMillis(5));
       

      NOTE: if the data source you're adapting to Jet is partitioned, you may run into issues with event skew between partitions assigned to a given parallel processor. The timestamp you get from one partition may be significantly behind the timestamp you already got from another one. If the skew is more than the allowed lag, and you have configured native timestamps, you risk that the events will be late. Use a custom processor if you need to coalesce watermarks from multiple partitions.

      Type Parameters:
      C - type of the context object
      Parameters:
      name - a descriptive name for the source (for diagnostic purposes)
      createFn - a function that creates the source's context object. It must be stateless.
      Since:
      Jet 3.0