Processor
is the main type whose implementation is up to the user of the Core API:
it contains the code of the computation to be performed by a vertex.
There are a number of Processor building blocks in the Core API which
allow you to just specify the computation logic, while the provided code
handles the processor's cooperative behavior. Please refer to the
AbstractProcessor section.
A processor's work can be conceptually described as follows: "receive
data from zero or more input streams and emit data into zero or more
output streams." Each stream maps to a single DAG edge (either inbound
or outbound). There is no requirement on the correspondence between
input and output items; a processor can emit any data it sees fit,
including none at all. The same Processor
abstraction is used for all
kinds of vertices, including sources and sinks.
The implementation of a processor can be stateful and does not need to be thread-safe because Jet guarantees to use the processor instances from one thread at a time, although not necessarily always the same thread.
Cooperativeness
Processor
instances are cooperative by default. The processor can opt
out of cooperative multithreading by overriding isCooperative()
to
return false
. Jet will then start a dedicated thread for it.
To maintain an overall good throughput, a cooperative processor must take care not to hog the thread for too long (a rule of thumb is up to a millisecond at a time). Jet's design strongly favors cooperative processors and most processors can and should be implemented to fit these requirements. The major exception are sources and sinks because they often have no choice but calling into blocking I/O APIs.
The Outbox
The processor sends its output items to its
Outbox
,
which has a separate bucket for each outbound edge. The buckets have
limited capacity and will refuse an item when full. A cooperative
processor should be implemented such that when the outbox refuses its
item, it saves its processing state and returns from the processing method. The execution engine will then drain the outbox buckets.
Data Processing Callbacks
process(ordinal, inbox)
Jet passes the items received over a given edge to the processor by
calling
process(ordinal, inbox)
.
All items received since the last process()
call are in the inbox, but
also all the items the processor hasn't removed in a previous
process()
call. There is a separate instance of Inbox
for each
inbound edge, so any given process()
call involves items from only one
edge.
The processor must not remove an item from the inbox until it has fully processed it. This is important with respect to the cooperative
behavior: the processor may not be allowed to emit all items
corresponding to a given input item and may need to return from the
process()
call early, saving its state. In such a case the item should
stay in the inbox so Jet knows the processor has more work to do even if
no new items are received.
tryProcess()
If a processor's inbox is empty, Jet will call its
tryProcess()
method instead. This allows the processor to perform work that is not
input data-driven. The method has a boolean
return value and if it
returns false
, it will be called again before any other methods are
called. This way it can retry emitting its output until the outbox
accepts it.
An important use case for this method is the emission of watermark
items. A job that processes an infinite data stream may experience
occasional lulls — periods with no items arriving. On the other
hand, a windowing processor is not allowed to act upon each item
immediately due to event skew; it must wait for a watermark item to
arrive. During a stream lull this becomes problematic because the
watermark itself is primarily data-driven and advances in response to
the observation of event timestamps. The watermark-inserting processor
must be able to advance the watermark even during a stream lull, based
on the passage of wall-clock time, and it can do it inside the
tryProcess()
method.
complete()
Jet calls
complete()
when all the input edges are exhausted. It is the last method to be
invoked on the processor before disposing of it. Typically this is where
a batch processor emits the results of an aggregating operation. If it
can't emit everything in a given call, it should return false
and will
be called again later.
Snapshotting Callbacks
Hazelcast Jet supports fault-tolerant processing jobs by taking distributed snapshots. In regular time intervals each of the source vertices will perform a snapshot of its own state and then emit a special item to its output stream: a barrier. The downstream vertex that receives the barrier item makes its own snapshot and then forwards the barrier to its outbound edges, and so on towards the sinks.
At the level of the Processor
API the barrier items are not visible;
ProcessorTasklet
handles them internally and invokes the snapshotting
callback methods described below.
saveToSnapshot()
Jet will call
saveToSnapshot()
when it determines it's time for the processor to save its state to the
current snapshot. Except for source vertices, this happens when the
processor has received the barrier item from all its inbound streams and
processed all the data items preceding it. The method must emit all its
state to the special snapshotting bucket in the Outbox, by calling
outbox.offerToSnapshot()
. If the outbox doesn't accept all the data,
it must return false
to be called again later, after the outbox has
been flushed.
When this method returns true
, ProcessorTasklet
will forward the
barrier item to all the outbound edges.
restoreFromSnapshot()
When a Jet job is restarting after having been suspended, it will first
reload all the state from the last successful snapshot. Each processor
will get its data through the invocations of
restoreFromSnapshot()
.
Its parameter is the Inbox
filled with a batch of snapshot data. The
method will be called repeatedly until it consumes all the snapshot
data.
finishSnapshotRestore()
After it has delivered all the snapshot data to restoreFromSnapshot()
,
Jet will call
finishSnapshotRestore()
.
The processor may use it to initialize some transient state from the
restored state.
Rules of Watermark Propagation
Jet's internal class
ConcurrentInboundEdgeStream
(CIES for short) manages a processor's input streams belonging to a
single edge. As it receives watermark items from each of them, its duty
is to sort out when to present the watermark item to the processor.
Let's start our analysis from the vertex upstream to CIES. There are N parallel processors determining each its own watermark value and emitting a watermark item when the value advances. All these items travel downstream to M processors of the next vertex, but if they were all let through, the downstream processors would experience a wildly erratic watermark value. This is why CIES contains logic that coalesces the watermark items before letting its processor observe them. These are the rules:
-
The value of the watermark a processor emits must be strictly increasing. CIES will throw an exception if it detects a non-increasing watermark in any input stream.
-
The watermark item is always broadcast, regardless of the edge type. This means that all N upstream processors send their watermark to all M downstream processors.
-
The processor will observe a watermark value only once CIES has received a value at least that high from all upstream processors.
Best Practice: Document At-Least-Once Behavior
As we discuss in the Under the Hood chapter, the behavior of a processor under at-least-once semantics can deviate from correctness in extremely non-trivial and unexpected ways. Therefore the processor should always document its possible behaviors for that case.