This manual is for an old version of Hazelcast Jet, use the latest stable version.

Processor is the main type whose implementation is up to the user of the Core API: it contains the code of the computation to be performed by a vertex. There are a number of Processor building blocks in the Core API which allow you to just specify the computation logic, while the provided code handles the processor's cooperative behavior. Please refer to the AbstractProcessor section.

A processor's work can be conceptually described as follows: "receive data from zero or more input streams and emit data into zero or more output streams." Each stream maps to a single DAG edge (either inbound or outbound). There is no requirement on the correspondence between input and output items; a processor can emit any data it sees fit, including none at all. The same Processor abstraction is used for all kinds of vertices, including sources and sinks.

The implementation of a processor can be stateful and does not need to be thread-safe because Jet guarantees to use the processor instances from one thread at a time, although not necessarily always the same thread.

Cooperativeness

Processor instances are cooperative by default. The processor can opt out of cooperative multithreading by overriding isCooperative() to return false. Jet will then start a dedicated thread for it.

To maintain an overall good throughput, a cooperative processor must take care not to hog the thread for too long (a rule of thumb is up to a millisecond at a time). Jet's design strongly favors cooperative processors and most processors can and should be implemented to fit these requirements. The major exception are sources and sinks because they often have no choice but calling into blocking I/O APIs.

The Outbox

The processor sends its output items to its Outbox, which has a separate bucket for each outbound edge. The buckets have limited capacity and will refuse an item when full. A cooperative processor should be implemented such that when the outbox refuses its item, it saves its processing state and returns from the processing method. The execution engine will then drain the outbox buckets.

Data Processing Callbacks

process(ordinal, inbox)

Jet passes the items received over a given edge to the processor by calling process(ordinal, inbox). All items received since the last process() call are in the inbox, but also all the items the processor hasn't removed in a previous process() call. There is a separate instance of Inbox for each inbound edge, so any given process() call involves items from only one edge.

The processor must not remove an item from the inbox until it has fully processed it. This is important with respect to the cooperative behavior: the processor may not be allowed to emit all items corresponding to a given input item and may need to return from the process() call early, saving its state. In such a case the item should stay in the inbox so Jet knows the processor has more work to do even if no new items are received.

tryProcess()

If a processor's inbox is empty, Jet will call its tryProcess() method instead. This allows the processor to perform work that is not input data-driven. The method has a boolean return value and if it returns false, it will be called again before any other methods are called. This way it can retry emitting its output until the outbox accepts it.

An important use case for this method is the emission of watermark items. A job that processes an infinite data stream may experience occasional lulls — periods with no items arriving. On the other hand, a windowing processor is not allowed to act upon each item immediately due to event skew; it must wait for a watermark item to arrive. During a stream lull this becomes problematic because the watermark itself is primarily data-driven and advances in response to the observation of event timestamps. The watermark-inserting processor must be able to advance the watermark even during a stream lull, based on the passage of wall-clock time, and it can do it inside the tryProcess() method.

complete()

Jet calls complete() when all the input edges are exhausted. It is the last method to be invoked on the processor before disposing of it. Typically this is where a batch processor emits the results of an aggregating operation. If it can't emit everything in a given call, it should return false and will be called again later.

Snapshotting Callbacks

Hazelcast Jet supports fault-tolerant processing jobs by taking distributed snapshots. In regular time intervals each of the source vertices will perform a snapshot of its own state and then emit a special item to its output stream: a barrier. The downstream vertex that receives the barrier item makes its own snapshot and then forwards the barrier to its outbound edges, and so on towards the sinks.

At the level of the Processor API the barrier items are not visible; ProcessorTasklet handles them internally and invokes the snapshotting callback methods described below.

saveToSnapshot()

Jet will call saveToSnapshot() when it determines it's time for the processor to save its state to the current snapshot. Except for source vertices, this happens when the processor has received the barrier item from all its inbound streams and processed all the data items preceding it. The method must emit all its state to the special snapshotting bucket in the Outbox, by calling outbox.offerToSnapshot(). If the outbox doesn't accept all the data, it must return false to be called again later, after the outbox has been flushed.

When this method returns true, ProcessorTasklet will forward the barrier item to all the outbound edges.

restoreFromSnapshot()

When a Jet job is restarting after having been suspended, it will first reload all the state from the last successful snapshot. Each processor will get its data through the invocations of restoreFromSnapshot(). Its parameter is the Inbox filled with a batch of snapshot data. The method will be called repeatedly until it consumes all the snapshot data.

finishSnapshotRestore()

After it has delivered all the snapshot data to restoreFromSnapshot(), Jet will call finishSnapshotRestore(). The processor may use it to initialize some transient state from the restored state.

Rules of Watermark Propagation

Jet's internal class ConcurrentInboundEdgeStream (CIES for short) manages a processor's input streams belonging to a single edge. As it receives watermark items from each of them, its duty is to sort out when to present the watermark item to the processor.

Let's start our analysis from the vertex upstream to CIES. There are N parallel processors determining each its own watermark value and emitting a watermark item when the value advances. All these items travel downstream to M processors of the next vertex, but if they were all let through, the downstream processors would experience a wildly erratic watermark value. This is why CIES contains logic that coalesces the watermark items before letting its processor observe them. These are the rules:

  • The value of the watermark a processor emits must be strictly increasing. CIES will throw an exception if it detects a non-increasing watermark in any input stream.

  • The watermark item is always broadcast, regardless of the edge type. This means that all N upstream processors send their watermark to all M downstream processors.

  • The processor will observe a watermark value only once CIES has received a value at least that high from all upstream processors.

Best Practice: Document At-Least-Once Behavior

As we discuss in the Under the Hood chapter, the behavior of a processor under at-least-once semantics can deviate from correctness in extremely non-trivial and unexpected ways. Therefore the processor should always document its possible behaviors for that case.