Class SourceBuilder<C>
- Type Parameters:
C
- type of the context object
For further details refer to the factory methods:
- Since:
- Jet 3.0
-
Nested Class Summary
Modifier and TypeClassDescriptionfinal class
final class
Represents a step in building a custom source where you add aSourceBuilder.FaultTolerant.restoreSnapshotFn(com.hazelcast.function.BiConsumerEx<? super C, ? super java.util.List<S>>)
after adding acreateSnapshotFn
.static interface
The buffer object that thefillBufferFn
gets on each call.final class
static interface
The buffer object that thefillBufferFn
gets on each call.final class
-
Method Summary
Modifier and TypeMethodDescriptionstatic <C> SourceBuilder<C>.Batch<Void>
batch
(String name, FunctionEx<? super Processor.Context, ? extends C> createFn) Returns a fluent-API builder with which you can create a batch source for a Jet pipeline.static <C> SourceBuilder<C>.Stream<Void>
stream
(String name, FunctionEx<? super Processor.Context, ? extends C> createFn) Returns a fluent-API builder with which you can create an unbounded stream source for a Jet pipeline.static <C> SourceBuilder<C>.TimestampedStream<Void>
timestampedStream
(String name, FunctionEx<? super Processor.Context, ? extends C> createFn) Returns a fluent-API builder with which you can create an unbounded stream source for a Jet pipeline.
-
Method Details
-
batch
@Nonnull public static <C> SourceBuilder<C>.Batch<Void> batch(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context, ? extends C> createFn) Returns a fluent-API builder with which you can create a batch source for a Jet pipeline. The source will use non-cooperative processors.Each parallel processor that drives your source has its private instance of a context object it got from your
createFn
. To get the data items to emit to the pipeline, the processor repeatedly calls yourfillBufferFn
with the context object and a buffer object.Your function should add some items to the buffer, ideally those it has ready without having to block. A hundred items at a time is enough to eliminate any per-call overheads within Jet. If it doesn't have any items ready, it may also return without adding anything. In any case the function should not take more than a second or so to complete, otherwise you risk interfering with Jet's coordination mechanisms and getting bad performance.
Once it has emitted all the data,
fillBufferFn
must callbuffer.close()
. This signals Jet to not callfillBufferFn
again and at some later point it will call thedestroyFn
with the context object.Unless you call
builder.distributed()
, Jet will create just a single processor that should emit all the data. If you do call it, make sure your distributed source takes care of splitting the data between processors. YourcreateFn
should consultprocessorContext.totalParallelism()
andprocessorContext.globalProcessorIndex()
. Jet calls it exactly once with eachglobalProcessorIndex
from 0 tototalParallelism - 1
and each of the resulting context objects must emit its distinct slice of the total source data.Here's an example that builds a simple, non-distributed source that reads the lines from a single text file. Since you can't control on which member of the Jet cluster the source's processor will run, the file should be available on all members. The source emits one line per
fillBufferFn
call.BatchSource<String> fileSource = SourceBuilder .batch("file-source", processorContext -> new BufferedReader(new FileReader("input.txt"))) .<String>fillBufferFn((in, buf) -> { String line = in.readLine(); if (line != null) { buf.add(line); } else { buf.close(); } }) .destroyFn(BufferedReader::close) .build(); Pipeline p = Pipeline.create(); BatchStage<String> srcStage = p.readFrom(fileSource);
- Type Parameters:
C
- type of the context object- Parameters:
name
- a descriptive name for the source (for diagnostic purposes)createFn
- a function that creates the source's context object. It must be stateless.- Since:
- Jet 3.0
-
stream
@Nonnull public static <C> SourceBuilder<C>.Stream<Void> stream(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context, ? extends C> createFn) Returns a fluent-API builder with which you can create an unbounded stream source for a Jet pipeline. The source will use non-cooperative processors.Each parallel processor that drives your source has its private instance of a context object it got from your
createFn
. To get the data items to emit to the pipeline, the processor repeatedly calls yourfillBufferFn
with the state object and a buffer object.Your function should add some items to the buffer, ideally those it has ready without having to block. A hundred items at a time is enough to eliminate any per-call overheads within Jet. If it doesn't have any items ready, it may also return without adding anything. In any case the function should not take more than a second or so to complete, otherwise you risk interfering with Jet's coordination mechanisms and getting bad performance.
Unless you call
builder.distributed()
, Jet will create just a single processor that should emit all the data. If you do call it, make sure your distributed source takes care of splitting the data between processors. YourcreateFn
should consultprocessorContext.totalParallelism()
andprocessorContext.globalProcessorIndex()
. Jet calls it exactly once with eachglobalProcessorIndex
from 0 tototalParallelism - 1
and each of the resulting context objects must emit its distinct slice of the total source data.Here's an example that builds a simple, non-distributed source that polls a URL and emits all the lines it gets in the response:
StreamSource<String> socketSource = SourceBuilder .stream("http-source", processorContext -> HttpClients.createDefault()) .<String>fillBufferFn((httpc, buf) -> { new BufferedReader(new InputStreamReader( httpc.execute(new HttpGet("localhost:8008")) .getEntity().getContent())) .lines() .forEach(buf::add); }) .destroyFn(Closeable::close) .build(); Pipeline p = Pipeline.create(); StreamStage<String> srcStage = p.readFrom(socketSource);
- Type Parameters:
C
- type of the context object- Parameters:
name
- a descriptive name for the source (for diagnostic purposes)createFn
- a function that creates the source's context object. It must be stateless.- Since:
- Jet 3.0
-
timestampedStream
@Nonnull public static <C> SourceBuilder<C>.TimestampedStream<Void> timestampedStream(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context, ? extends C> createFn) Returns a fluent-API builder with which you can create an unbounded stream source for a Jet pipeline. It will use non-cooperative processors.When emitting an item, the source can explicitly assign a timestamp to it. You can tell a Jet pipeline to use those timestamps by calling
sourceStage.withNativeTimestamps()
.Each parallel processor that drives your source has its private instance of a context object it gets from the given
createFn
. To get the data items to emit to the pipeline, the processor repeatedly calls yourfillBufferFn
with the context object and a buffer object. The buffer'sadd()
method takes two arguments: the item and the timestamp in milliseconds.Your function should add some items to the buffer, ideally those it has ready without having to block. A hundred items at a time is enough to eliminate any per-call overheads within Jet. If it doesn't have any items ready, it may also return without adding anything. In any case the function should not take more than a second or so to complete, otherwise you risk interfering with Jet's coordination mechanisms and getting bad performance.
Unless you call
builder.distributed()
, Jet will create just a single processor that should emit all the data. If you do call it, make sure your distributed source takes care of splitting the data between processors. YourcreateFn
should consultprocContext.totalParallelism()
andprocContext.globalProcessorIndex()
. Jet calls it exactly once with eachglobalProcessorIndex
from 0 tototalParallelism - 1
and each of the resulting context objects must emit its distinct slice of the total source data.Here's an example that builds a simple, non-distributed source that polls a URL and emits all the lines it gets in the response, interpreting the first 9 characters as the timestamp.
StreamSource<String> socketSource = SourceBuilder .timestampedStream("http-source", processorContext -> HttpClients.createDefault()) .<String>fillBufferFn((httpc, buf) -> { new BufferedReader(new InputStreamReader( httpc.execute(new HttpGet("localhost:8008")) .getEntity().getContent())) .lines() .forEach(line -> { long timestamp = Long.valueOf(line.substring(0, 9)); buf.add(line.substring(9), timestamp); }); }) .destroyFn(Closeable::close) .build(); Pipeline p = Pipeline.create(); StreamStage<String> srcStage = p.readFrom(socketSource) .withNativeTimestamps(SECONDS.toMillis(5));
NOTE: if the data source you're adapting to Jet is partitioned, you may run into issues with event skew between partitions assigned to a given parallel processor. The timestamp you get from one partition may be significantly behind the timestamp you already got from another one. If the skew is more than the allowed lag, and you have configured native timestamps, you risk that the events will be late. Use a custom processor if you need to coalesce watermarks from multiple partitions.
- Type Parameters:
C
- type of the context object- Parameters:
name
- a descriptive name for the source (for diagnostic purposes)createFn
- a function that creates the source's context object. It must be stateless.- Since:
- Jet 3.0
-