This manual is for an old version of Hazelcast Jet, use the latest stable version.

Processor is the main type whose implementation is up to the user: it contains the code of the computation to be performed by a vertex. There are a number of Processor building blocks in the Jet API which allow you to just specify the computation logic, while the provided code handles the processor's cooperative behavior. Please refer to the AbstractProcessor section.

A processor's work can be conceptually described as follows: "receive data from zero or more input streams and emit data into zero or more output streams." Each stream maps to a single DAG edge (either inbound or outbound). There is no requirement on the correspondence between input and output items; a processor can emit any data it sees fit, including none at all. The same Processor abstraction is used for all kinds of vertices, including sources and sinks.

Cooperative Multithreading

Cooperative multithreading is one of the core features of Jet and can be roughly compared to green threads. It is purely a library-level feature and does not involve any low-level system or JVM tricks; the Processor API is simply designed in such a way that the processor can do a small amount of work each time it is invoked, then yield back to the Jet engine. The engine manages a thread pool of fixed size and on each thread, the processors take their turn in a round-robin fashion.

The point of cooperative multithreading is better performance. Several factors contribute to this:

  • The overhead of context switching between processors is much lower since the operating system's thread scheduler is not involved.
  • The worker thread driving the processors stays on the same core for longer periods, preserving the CPU cache lines.
  • The worker thread has direct knowledge of the ability of a processor to make progress (by inspecting its input/output buffers).

Processor instances are cooperative by default. The processor can opt out of cooperative multithreading by overriding isCooperative() to return false. Jet will then start a dedicated thread for it.

Requirements

To maintain an overall good throughput, a cooperative processor must take care not to hog the thread for too long (a rule of thumb is up to a millisecond at a time). Jet's design strongly favors cooperative processors and most processors can and should be implemented to fit these requirements. The major exception are sources and sinks because they often have no choice but calling into blocking I/O APIs.

The Outbox

The processor sends its output items to its Outbox, which has a separate bucket for each outbound edge. The buckets have limited capacity and will refuse an item when full. A cooperative processor should be implemented such that when its item is rejected by the outbox, it saves its processing state and returns from the processing method. The execution engine will then drain the outbox buckets.

By contrast, a non-cooperative processor gets an auto-flushing, blocking outbox that never rejects an item. This can be leveraged to simplify the processor's implementation; however the simplification alone should never be the reason to declare a processor non-cooperative.

Data Processing Callbacks

Two callback methods are involved in data processing: process() and complete(). Implementations of these methods can be stateful and do not need to be thread-safe because Jet guarantees to use the processor instances from one thread at a time, although not necessarily always the same thread.

process(ordinal, inbox)

Jet passes the items received over a given edge to the processor by calling process(ordinal, inbox). All items received since the last process() call are in the inbox, but also all the items the processor hasn't removed in a previous process() call. There is a separate instance of Inbox for each inbound edge, so any given process() call involves items from only one edge.

The processor must not remove an item from the inbox until it has fully processed it. This is important with respect to the cooperative behavior: the processor may not be allowed to emit all items corresponding to a given input item and may need to return from the process() call early, saving its state. In such a case the item should stay in the inbox so Jet knows the processor has more work to do even if no new items are received.

tryProcess()

If a processor's inbox is empty, Jet will call its tryProcess() method instead. This allows the processor to perform work that is not input data-driven. The method has a boolean return value and if it returns false, it will be called again before any other methods are called. This way it can retry emitting its output until the outbox accepts it.

An important use case for this method is the emission of watermark items. A job that processes an infinite data stream may experience occasional lulls — periods with no items arriving. On the other hand, a windowing processor is not allowed to act upon each item immediately due to event skew; it must wait for a watermark item to arrive. During a stream lull this becomes problematic because the watermark itself is primarily data-driven and advances in response to the observation of event timestamps. The watermark-inserting processor must be able to advance the watermark even during a stream lull, based on the passage of wall-clock time, and it can do it inside the tryProcess() method.

complete()

Jet calls complete() after all the input edges are exhausted. It is the last method to be invoked on the processor before disposing of it. Typically this is where a batch processor emits the results of an accumulating operation. If it can't emit everything in a given call, it should return false and will be called again later.