C - type of the context objectpublic final class SourceBuilder<C> extends Object
For further details refer to the factory methods:
| Modifier and Type | Class and Description | 
|---|---|
| class  | SourceBuilder.Batch<T> | 
| class  | SourceBuilder.FaultTolerant<B,S>Represents a step in building a custom source where you add a  SourceBuilder.FaultTolerant.restoreSnapshotFn(com.hazelcast.function.BiConsumerEx<? super C, ? super java.util.List<S>>)after adding acreateSnapshotFn. | 
| static interface  | SourceBuilder.SourceBuffer<T>The buffer object that the  fillBufferFngets on each call. | 
| class  | SourceBuilder.Stream<T> | 
| static interface  | SourceBuilder.TimestampedSourceBuffer<T>The buffer object that the  fillBufferFngets on each call. | 
| class  | SourceBuilder.TimestampedStream<T> | 
| Modifier and Type | Method and Description | 
|---|---|
| static <C> SourceBuilder.Batch<Void> | batch(String name,
     FunctionEx<? super Processor.Context,? extends C> createFn)Returns a fluent-API builder with which you can create a batch source for a Jet pipeline. | 
| static <C> SourceBuilder.Stream<Void> | stream(String name,
      FunctionEx<? super Processor.Context,? extends C> createFn)Returns a fluent-API builder with which you can create an unbounded stream source for a Jet pipeline. | 
| static <C> SourceBuilder.TimestampedStream<Void> | timestampedStream(String name,
                 FunctionEx<? super Processor.Context,? extends C> createFn)Returns a fluent-API builder with which you can create an unbounded stream source for a Jet pipeline. | 
@Nonnull public static <C> SourceBuilder.Batch<Void> batch(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context,? extends C> createFn)
 Each parallel processor that drives your source has its private instance
 of a context object it got from your createFn. To get
 the data items to emit to the pipeline, the processor repeatedly calls
 your fillBufferFn with the context object and a buffer object.
 
Your function should add some items to the buffer, ideally those it has ready without having to block. A hundred items at a time is enough to eliminate any per-call overheads within Jet. If it doesn't have any items ready, it may also return without adding anything. In any case the function should not take more than a second or so to complete, otherwise you risk interfering with Jet's coordination mechanisms and getting bad performance.
 Once it has emitted all the data, fillBufferFn must call buffer.close(). This signals Jet to not call fillBufferFn again and at some later point it will call the destroyFn with the context object.
 
 Unless you call builder.distributed(),
 Jet will create just a single processor that should emit all the data.
 If you do call it, make sure your distributed source takes care of
 splitting the data between processors. Your createFn should
 consult processorContext.totalParallelism()
 and processorContext.globalProcessorIndex().
 Jet calls it exactly once with each globalProcessorIndex from 0
 to totalParallelism - 1 and each of the resulting context
 objects must emit its distinct slice of the total source data.
 
 Here's an example that builds a simple, non-distributed source that
 reads the lines from a single text file. Since you can't control on
 which member of the Jet cluster the source's processor will run, the
 file should be available on all members. The source emits one line per
 fillBufferFn call.
 
 BatchSource<String> fileSource = SourceBuilder
         .batch("file-source", processorContext ->
             new BufferedReader(new FileReader("input.txt")))
         .<String>fillBufferFn((in, buf) -> {
             String line = in.readLine();
             if (line != null) {
                 buf.add(line);
             } else {
                 buf.close();
             }
         })
         .destroyFn(BufferedReader::close)
         .build();
 Pipeline p = Pipeline.create();
 BatchStage<String> srcStage = p.readFrom(fileSource);
 C - type of the context objectname - a descriptive name for the source (for diagnostic purposes)createFn - a function that creates the source's context object. It
                 must be stateless.@Nonnull public static <C> SourceBuilder.Stream<Void> stream(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context,? extends C> createFn)
 Each parallel processor that drives your source has its private instance
 of a context object it got from your createFn. To get
 the data items to emit to the pipeline, the processor repeatedly calls
 your fillBufferFn with the state object and a buffer object.
 
Your function should add some items to the buffer, ideally those it has ready without having to block. A hundred items at a time is enough to eliminate any per-call overheads within Jet. If it doesn't have any items ready, it may also return without adding anything. In any case the function should not take more than a second or so to complete, otherwise you risk interfering with Jet's coordination mechanisms and getting bad performance.
 Unless you call builder.distributed(),
 Jet will create just a single processor that should emit all the data.
 If you do call it, make sure your distributed source takes care of
 splitting the data between processors. Your createFn should
 consult processorContext.totalParallelism()
 and processorContext.globalProcessorIndex().
 Jet calls it exactly once with each globalProcessorIndex from 0
 to totalParallelism - 1 and each of the resulting context objects
 must emit its distinct slice of the total source data.
 
Here's an example that builds a simple, non-distributed source that polls a URL and emits all the lines it gets in the response:
 StreamSource<String> socketSource = SourceBuilder
     .stream("http-source", processorContext -> HttpClients.createDefault())
     .<String>fillBufferFn((httpc, buf) -> {
         new BufferedReader(new InputStreamReader(
             httpc.execute(new HttpGet("localhost:8008"))
                  .getEntity().getContent()))
             .lines()
             .forEach(buf::add);
     })
     .destroyFn(Closeable::close)
     .build();
 Pipeline p = Pipeline.create();
 StreamStage<String> srcStage = p.readFrom(socketSource);
 C - type of the context objectname - a descriptive name for the source (for diagnostic purposes)createFn - a function that creates the source's context object. It
                 must be stateless.@Nonnull public static <C> SourceBuilder.TimestampedStream<Void> timestampedStream(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context,? extends C> createFn)
 When emitting an item, the source can explicitly assign a timestamp to
 it. You can tell a Jet pipeline to use those timestamps by calling
 sourceStage.withNativeTimestamps().
 
 Each parallel processor that drives your source has its private instance
 of a context object it gets from the given createFn. To get
 the data items to emit to the pipeline, the processor repeatedly calls
 your fillBufferFn with the context object and a buffer object. The
 buffer's add() method
 takes two arguments: the item and the timestamp in milliseconds.
 
Your function should add some items to the buffer, ideally those it has ready without having to block. A hundred items at a time is enough to eliminate any per-call overheads within Jet. If it doesn't have any items ready, it may also return without adding anything. In any case the function should not take more than a second or so to complete, otherwise you risk interfering with Jet's coordination mechanisms and getting bad performance.
 Unless you call builder.distributed(), Jet will create just a single processor that
 should emit all the data. If you do call it, make sure your distributed
 source takes care of splitting the data between processors. Your createFn should consult procContext.totalParallelism() and procContext.globalProcessorIndex(). Jet calls it exactly once with each
 globalProcessorIndex from 0 to totalParallelism - 1 and
 each of the resulting context objects must emit its distinct slice of the
 total source data.
 
Here's an example that builds a simple, non-distributed source that polls a URL and emits all the lines it gets in the response, interpreting the first 9 characters as the timestamp.
 StreamSource<String> socketSource = SourceBuilder
     .timestampedStream("http-source",
         processorContext -> HttpClients.createDefault())
     .<String>fillBufferFn((httpc, buf) -> {
         new BufferedReader(new InputStreamReader(
             httpc.execute(new HttpGet("localhost:8008"))
                  .getEntity().getContent()))
             .lines()
             .forEach(line -> {
                 long timestamp = Long.valueOf(line.substring(0, 9));
                 buf.add(line.substring(9), timestamp);
             });
     })
     .destroyFn(Closeable::close)
     .build();
 Pipeline p = Pipeline.create();
 StreamStage<String> srcStage = p.readFrom(socketSource)
         .withNativeTimestamps(SECONDS.toMillis(5));
 NOTE: if the data source you're adapting to Jet is partitioned, you may run into issues with event skew between partitions assigned to a given parallel processor. The timestamp you get from one partition may be significantly behind the timestamp you already got from another one. If the skew is more than the allowed lag and you have configured native timestamps, you risk that the events will be late. Use a custom processor if you need to coalesce watermarks from multiple partitions.
C - type of the context objectname - a descriptive name for the source (for diagnostic purposes)createFn - a function that creates the source's context object. It
     must be stateless.Copyright © 2024 Hazelcast, Inc.. All rights reserved.