C
- type of the context objectpublic final class SourceBuilder<C> extends Object
For further details refer to the factory methods:
Modifier and Type | Class and Description |
---|---|
class |
SourceBuilder.Batch<T>
|
class |
SourceBuilder.FaultTolerant<B,S>
Represents a step in building a custom source where you add a
SourceBuilder.FaultTolerant.restoreSnapshotFn(com.hazelcast.function.BiConsumerEx<? super C, ? super java.util.List<S>>) after adding a createSnapshotFn . |
static interface |
SourceBuilder.SourceBuffer<T>
The buffer object that the
fillBufferFn gets on each call. |
class |
SourceBuilder.Stream<T>
|
static interface |
SourceBuilder.TimestampedSourceBuffer<T>
The buffer object that the
fillBufferFn gets on each call. |
class |
SourceBuilder.TimestampedStream<T>
|
Modifier and Type | Method and Description |
---|---|
static <C> SourceBuilder.Batch<Void> |
batch(String name,
FunctionEx<? super Processor.Context,? extends C> createFn)
Returns a fluent-API builder with which you can create a batch source for a Jet pipeline.
|
static <C> SourceBuilder.Stream<Void> |
stream(String name,
FunctionEx<? super Processor.Context,? extends C> createFn)
Returns a fluent-API builder with which you can create an unbounded stream source for a Jet pipeline.
|
static <C> SourceBuilder.TimestampedStream<Void> |
timestampedStream(String name,
FunctionEx<? super Processor.Context,? extends C> createFn)
Returns a fluent-API builder with which you can create an unbounded stream source for a Jet pipeline.
|
@Nonnull public static <C> SourceBuilder.Batch<Void> batch(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context,? extends C> createFn)
Each parallel processor that drives your source has its private instance
of a context object it got from your createFn
. To get
the data items to emit to the pipeline, the processor repeatedly calls
your fillBufferFn
with the context object and a buffer object.
Your function should add some items to the buffer, ideally those it has ready without having to block. A hundred items at a time is enough to eliminate any per-call overheads within Jet. If it doesn't have any items ready, it may also return without adding anything. In any case the function should not take more than a second or so to complete, otherwise you risk interfering with Jet's coordination mechanisms and getting bad performance.
Once it has emitted all the data, fillBufferFn
must call buffer.close()
. This signals Jet to not call fillBufferFn
again and at some later point it will call the destroyFn
with the context object.
Unless you call builder.distributed()
,
Jet will create just a single processor that should emit all the data.
If you do call it, make sure your distributed source takes care of
splitting the data between processors. Your createFn
should
consult processorContext.totalParallelism()
and processorContext.globalProcessorIndex()
.
Jet calls it exactly once with each globalProcessorIndex
from 0
to totalParallelism - 1
and each of the resulting context
objects must emit its distinct slice of the total source data.
Here's an example that builds a simple, non-distributed source that
reads the lines from a single text file. Since you can't control on
which member of the Jet cluster the source's processor will run, the
file should be available on all members. The source emits one line per
fillBufferFn
call.
BatchSource<String> fileSource = SourceBuilder
.batch("file-source", processorContext ->
new BufferedReader(new FileReader("input.txt")))
.<String>fillBufferFn((in, buf) -> {
String line = in.readLine();
if (line != null) {
buf.add(line);
} else {
buf.close();
}
})
.destroyFn(BufferedReader::close)
.build();
Pipeline p = Pipeline.create();
BatchStage<String> srcStage = p.readFrom(fileSource);
C
- type of the context objectname
- a descriptive name for the source (for diagnostic purposes)createFn
- a function that creates the source's context object. It
must be stateless.@Nonnull public static <C> SourceBuilder.Stream<Void> stream(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context,? extends C> createFn)
Each parallel processor that drives your source has its private instance
of a context object it got from your createFn
. To get
the data items to emit to the pipeline, the processor repeatedly calls
your fillBufferFn
with the state object and a buffer object.
Your function should add some items to the buffer, ideally those it has ready without having to block. A hundred items at a time is enough to eliminate any per-call overheads within Jet. If it doesn't have any items ready, it may also return without adding anything. In any case the function should not take more than a second or so to complete, otherwise you risk interfering with Jet's coordination mechanisms and getting bad performance.
Unless you call builder.distributed()
,
Jet will create just a single processor that should emit all the data.
If you do call it, make sure your distributed source takes care of
splitting the data between processors. Your createFn
should
consult processorContext.totalParallelism()
and processorContext.globalProcessorIndex()
.
Jet calls it exactly once with each globalProcessorIndex
from 0
to totalParallelism - 1
and each of the resulting context objects
must emit its distinct slice of the total source data.
Here's an example that builds a simple, non-distributed source that polls a URL and emits all the lines it gets in the response:
StreamSource<String> socketSource = SourceBuilder
.stream("http-source", processorContext -> HttpClients.createDefault())
.<String>fillBufferFn((httpc, buf) -> {
new BufferedReader(new InputStreamReader(
httpc.execute(new HttpGet("localhost:8008"))
.getEntity().getContent()))
.lines()
.forEach(buf::add);
})
.destroyFn(Closeable::close)
.build();
Pipeline p = Pipeline.create();
StreamStage<String> srcStage = p.readFrom(socketSource);
C
- type of the context objectname
- a descriptive name for the source (for diagnostic purposes)createFn
- a function that creates the source's context object. It
must be stateless.@Nonnull public static <C> SourceBuilder.TimestampedStream<Void> timestampedStream(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context,? extends C> createFn)
When emitting an item, the source can explicitly assign a timestamp to
it. You can tell a Jet pipeline to use those timestamps by calling
sourceStage.withNativeTimestamps()
.
Each parallel processor that drives your source has its private instance
of a context object it gets from the given createFn
. To get
the data items to emit to the pipeline, the processor repeatedly calls
your fillBufferFn
with the context object and a buffer object. The
buffer's add()
method
takes two arguments: the item and the timestamp in milliseconds.
Your function should add some items to the buffer, ideally those it has ready without having to block. A hundred items at a time is enough to eliminate any per-call overheads within Jet. If it doesn't have any items ready, it may also return without adding anything. In any case the function should not take more than a second or so to complete, otherwise you risk interfering with Jet's coordination mechanisms and getting bad performance.
Unless you call builder.distributed()
, Jet will create just a single processor that
should emit all the data. If you do call it, make sure your distributed
source takes care of splitting the data between processors. Your createFn
should consult procContext.totalParallelism()
and procContext.globalProcessorIndex()
. Jet calls it exactly once with each
globalProcessorIndex
from 0 to totalParallelism - 1
and
each of the resulting context objects must emit its distinct slice of the
total source data.
Here's an example that builds a simple, non-distributed source that polls a URL and emits all the lines it gets in the response, interpreting the first 9 characters as the timestamp.
StreamSource<String> socketSource = SourceBuilder
.timestampedStream("http-source",
processorContext -> HttpClients.createDefault())
.<String>fillBufferFn((httpc, buf) -> {
new BufferedReader(new InputStreamReader(
httpc.execute(new HttpGet("localhost:8008"))
.getEntity().getContent()))
.lines()
.forEach(line -> {
long timestamp = Long.valueOf(line.substring(0, 9));
buf.add(line.substring(9), timestamp);
});
})
.destroyFn(Closeable::close)
.build();
Pipeline p = Pipeline.create();
StreamStage<String> srcStage = p.readFrom(socketSource)
.withNativeTimestamps(SECONDS.toMillis(5));
NOTE: if the data source you're adapting to Jet is partitioned, you may run into issues with event skew between partitions assigned to a given parallel processor. The timestamp you get from one partition may be significantly behind the timestamp you already got from another one. If the skew is more than the allowed lag and you have configured native timestamps, you risk that the events will be late. Use a custom processor if you need to coalesce watermarks from multiple partitions.
C
- type of the context objectname
- a descriptive name for the source (for diagnostic purposes)createFn
- a function that creates the source's context object. It
must be stateless.Copyright © 2021 Hazelcast, Inc.. All rights reserved.