T
- type of emitted objectspublic final class SourceBuilder.Stream<T> extends Object
Modifier and Type | Method and Description |
---|---|
StreamSource<T> |
build()
Builds and returns the unbounded stream source.
|
<S> SourceBuilder.FaultTolerant<SourceBuilder.Stream<T>,S> |
createSnapshotFn(FunctionEx<? super C,? extends S> createSnapshotFn)
Sets the function Jet calls when it's creating a snapshot of the
current job state.
|
SourceBuilder.Stream<T> |
destroyFn(ConsumerEx<? super C> pDestroyFn)
Sets the function that Jet will call when it is done cleaning up after
an execution.
|
SourceBuilder.Stream<T> |
distributed(int preferredLocalParallelism)
Declares that you're creating a distributed source.
|
<T_NEW> SourceBuilder.Stream<T_NEW> |
fillBufferFn(BiConsumerEx<? super C,? super SourceBuilder.SourceBuffer<T_NEW>> fillBufferFn)
Sets the function that Jet will call whenever it needs more data from
your source.
|
SourceBuilder.Stream<T> |
permission(Permission permission)
Sets the the permission required to use this sink when the
security is enabled.
|
@Nonnull public <T_NEW> SourceBuilder.Stream<T_NEW> fillBufferFn(@Nonnull BiConsumerEx<? super C,? super SourceBuilder.SourceBuffer<T_NEW>> fillBufferFn)
createFn
and Jet's buffer object. It should add some items
to the buffer, ideally those it can produce without making any blocking
calls. On any given invocation the function may also choose not to add
any items. Jet will automatically employ an exponential backoff strategy
to avoid calling your function in a tight loop, if the previous call didn't
add any items to the buffer.
The given SourceBuilder.SourceBuffer
isn't thread-safe, you shouldn't pass
it to other threads. For example, you shouldn't add to it in a
callback of an asynchronous operation.
T_NEW
- type of the emitted itemsfillBufferFn
- function that fills the buffer with source data. It
must be stateless.fillBufferFn
@Nonnull public SourceBuilder.Stream<T> destroyFn(@Nonnull ConsumerEx<? super C> pDestroyFn)
The function must be stateless.
@Nonnull public SourceBuilder.Stream<T> distributed(int preferredLocalParallelism)
preferredLocalParallelism
parameter. If you call this, you must
ensure that all the source processors are coordinated and not emitting
duplicated data. The createFn
can consult processorContext.totalParallelism()
and Processor.Context#globalProcessorIndex()
. Jet calls
createFn
exactly once with each globalProcessorIndex
from 0 to totalParallelism - 1
and you can use this to make
all the instances agree on which part of the data to emit.
If you don't call this method, there will be only one processor instance running on an arbitrary member.
preferredLocalParallelism
- the requested number of processors on each cluster memberpublic SourceBuilder.Stream<T> permission(@Nonnull Permission permission)
null
which
means there is no restriction to use this sink. Security is an
enterprise feature.permission
- the required permission to use this sink when
security is enabled.@Nonnull public <S> SourceBuilder.FaultTolerant<SourceBuilder.Stream<T>,S> createSnapshotFn(@Nonnull FunctionEx<? super C,? extends S> createSnapshotFn)
When Jet restarts a job, it first initializes your source as if starting
a new job, and then passes the snapshot object you returned here to
restoreSnapshotFn
. After that it
starts calling fillBufferFn
, which must resume emitting the
stream from the same item it was about to emit when the snapshot was
taken.
The object you return must be serializable. Each source processor will call the function once per snapshot.
Here's an example of a fault-tolerant generator of an infinite sequence of integers:
StreamSource<Integer> source = SourceBuilder
.stream("name", processorContext -> new AtomicInteger())
.<Integer>fillBufferFn((numToEmit, buffer) -> {
for (int i = 0; i < 100; i++) {
buffer.add(numToEmit.getAndIncrement());
}
})
.createSnapshotFn(numToEmit -> numToEmit.get())
.restoreSnapshotFn((numToEmit, states) -> numToEmit.set(states.get(0)))
.build();
S
- type of the snapshot objectcreateSnapshotFn
- a function to create an object to store in the
state snapshot. It must be stateless.@Nonnull public StreamSource<T> build()
Copyright © 2023 Hazelcast, Inc.. All rights reserved.