This manual is for an old version of Hazelcast Jet, use the latest stable version.

The DAG-building API is centered around the DAG class. This is a pure data class and can be instantiated on its own, without a Jet instance. This makes it simple to separate the job-describing code from the code that manages the lifecycle of Jet instances. To start building a DAG, you just write

DAG dag = new DAG();

A good practice is to structure the DAG-building code into the following sections:

  1. Create all the vertices.
  2. Configure the local parallelism of vertices.
  3. Create the edges.

Example:

DAG dag = new DAG();

// 1. Create vertices
Vertex source = dag.newVertex("source", Sources.readFiles("."));
Vertex transform = dag.newVertex("transform", Processors.map(
        (String line) -> entry(line, line.length())));
Vertex sink = dag.newVertex("sink", Sinks.writeMap("sinkMap"));

// 2. Configure local parallelism
source.localParallelism(1);

// 3. Create edges
dag.edge(between(source, transform));
dag.edge(between(transform, sink));

Creating a Vertex

The two mandatory elements of creating a vertex are its string identifier and the supplier of processors. The latter can be provided in three variants, differing in the degree of explicit control over the lifecycle management of the processors. From simple to complex they are:

  1. DistributedSupplier<Processor> directly returns processor instances from its get() method. It is expected to be stateless and return equivalent instances on each call. It doesn't provide any initialization or cleanup code.
  2. ProcessorSupplier returns in a single call all the processors that will run on a single cluster member. It may specialize each instance, for example to achieve local data partitioning. It is also in charge of the member-local lifecycle (initialization and destruction).
  3. ProcessorMetaSupplier returns in a single call an object that will be in charge of creating all the processors for a vertex. Given a list of member addresses, the object it returns is a Function<Address, ProcessorSupplier> which will then be called with each of the addresses from the list to retrieve the ProcessorSupplier specialized for the given member.

Usually you don't have to care, or even know, which of these variants is used. You'll call a library-provided factory method that returns one or the other and they will integrate the same way into your newVertex() calls.

Local and Global Parallelism of Vertex

The vertex is implemented by one or more instances of Processor on each member. Each vertex can specify how many of its processors will run per cluster member using the localParallelism property; every member will have the same number of processors. A new Vertex instance has this property set to -1, which requests to use the default value equal to the configured size of the cooperative thread pool. The latter defaults to Runtime.availableProcessors() and is configurable via InstanceConfig.setCooperativeThreadCount().

In most cases the only value of local parallelism that you'll want to explicitly configure is 1 for the cases where no parallelism is desirable (e.g. on a source processor reading from a file).

The global parallelism of the vertex is also an important value, especially in terms of the distribution of partitions among processors. It is equal to local parallelism multiplied by the cluster size.

Edge Ordinal

An edge is connected to a vertex with a given ordinal, which identifies it to the vertex and its processors. When a processor receives an item, it knows the ordinal of the edge on which the item came in. Things are similar on the outbound side: the processor emits an item to a given ordinal, but also has the option to emit the same item to all ordinals. This is the most typical case and allows easy replication of a data stream across several edges.

When you use the between() edge factory, the edge will be connected at ordinal 0 at both ends. When you need a different ordinal, use the from(a, ord1).to(b, ord2) form. There must be no gaps in ordinal assignment, which means a vertex will have inbound edges with ordinals 0..N and outbound edges with ordinals 0..M.

This example shows the usage of between() and from().to() forms to build a DAG with one source feeding two computational vertices:

DAG dag = new DAG();

Vertex source = dag.newVertex("source", Sources.readFiles("."));
Vertex v1 = dag.newVertex("v1", ...);
Vertex v2 = dag.newVertex("v2", ...);

dag.edge(between(source, v1));
dag.edge(from(source, 1).to(v2));

Local and Distributed Edge

A major choice to make in terms of data routing is whether the candidate set of target processors is unconstrained, encompassing all processors across the cluster, or constrained to just those running on the same cluster member. This is controlled by the distributed property of the edge. By default the edge is local and calling the distributed() method removes this restriction.

With appropriate DAG design, network traffic can be minimized by employing local edges. Local edges are implemented with the most efficient kind of concurrent queue: single-producer, single-consumer array-backed queue. It employs wait-free algorithms on both sides and avoids the latency of volatile writes by using lazySet.

The quintessential example of employing local-distributed edge combo is the two-stage aggregation. Here's a review of that setup from the Word Count tutorial:

dag.edge(between(source, tokenizer))
   .edge(between(tokenizer, accumulate)
           .partitioned(DistributedFunctions.wholeItem(), Partitioner.HASH_CODE))
   .edge(between(accumulate, combine)
           .distributed()
           .partitioned(DistributedFunctions.entryKey()))
   .edge(between(combine, sink));

Note that only the edge from accumulate to combine is distributed.

Routing Policies

The routing policy decides which of the processors in the candidate set to route each particular item to.

Unicast

This is the default routing policy, the one you get when you write

dag.edge(between(source, tokenizer))

For each item a single destination processor is chosen with no further restrictions on the choice. The only guarantee given by this pattern is that the item will be received by exactly one processor, but typically care will be taken to "spray" the items equally over all the reception candidates.

This choice makes sense when the data does not have to be partitioned, usually implying a downstream vertex which can compute the result based on each item in isolation.

Broadcast

A broadcasting edge sends each item to all candidate receivers. This is useful when some small amount of data must be broadcast to all downstream vertices. Usually such vertices will have other inbound edges in addition to the broadcasting one, and will use the broadcast data as context while processing the other edges. In such cases the broadcasting edge will have a raised priority. There are other useful combinations, like a parallelism-one vertex that produces the same result on each member.

Broadcasting is activated with the broadcast() method call on the edge:

dag.edge(between(source, count).broadcast());

Partitioned

A partitioned edge sends each item to the one processor responsible for the item's partition ID. On a distributed edge, this processor will be unique across the whole cluster. On a local edge, each member will have its own processor for each partition ID.

Multiple partitions can be assigned to each processor. The global number of partitions is controlled by the number of partitions in the underlying Hazelcast IMDG configuration. Please refer to the Hazelcast Reference Manual for more information about Hazelcast IMDG partitioning.

This is the default algorithm to determine the partition ID of an item:

  1. Apply the key extractor function defined on the edge to retrieve the partitioning key.
  2. Serialize the partitioning key to a byte array using Hazelcast serialization.
  3. Apply Hazelcast's standard MurmurHash3-based algorithm to get the key's hash value.
  4. Partition ID is the hash value modulo the number of partitions.

The above procedure is quite CPU-intensive, but has the crucial property of giving repeatable results across all cluster members, which may be running on disparate JVM implementations.

Another common choice is to use Java's standard Object.hashCode(). It is often significantly faster. However, it is not a safe strategy in general because hashCode()'s contract does not require repeatable results across JVMs, or even different instances of the same JVM version. If a given class's Javadoc explicitly specifies the hashing function used, then its instances are safe to partition with hashCode().

You can provide your own implementation of Partitioner to gain full control over the partitioning strategy.

We use both partitioning strategies in the Word Count example:

dag.edge(between(tokenizer, accumulate)
           .partitioned(wholeItem(), Partitioner.HASH_CODE))
   .edge(between(accumulate, combine)
           .distributed()
           .partitioned(entryKey()))

The local-partitioned edge uses partitioning by hash code and the distributed edge uses the default Hazelcast partitioning, to ensure correctness. Note that a detailed inspection of the data types that travel on the distributed edge reveals for that particular case that the hashcode-based partitioning would work on the distributed edge as well. We use Hazelcast partitioning nevertheless, for demonstration purposes. Since much less data travels towards the combiner than towards the accumulator, the performance of the whole job is hardly affected by this choice.

All-To-One

The all-to-one routing policy is a special case of the partitioned policy which assigns the same partition ID to all items. The partition ID is randomly chosen at job initialization time. This policy makes sense on a distributed edge when all the items from all the members must be routed to the same member and the same processor instance running on it. Local parallelism of the target vertex should be set to 1, otherwise there will be idle processors that never get any items.

On a local edge this policy doesn't make sense since just setting the local parallelism of the target vertex to 1 constrains the local choice to just one processor instance.

In the TopNStocks example the stream-processing job must find the stocks with fastest-changing prices. To achieve this a single processor must see the complete picture, so an all-to-one edge is employed:

dag.edge(between(topNStage1, topNStage2).distributed().allToOne())

Priority

By default the processor receives items from all inbound edges as they arrive. However, there are important cases where an edge must be consumed in full to make the processor ready to accept data from other edges. A major example is a "hash join" which enriches the data stream with data from a lookup table. This can be modeled as a join of two data streams where the "left" one delivers the contents of the lookup table and the "right" one is the main data stream that will be enriched from the lookup table.

The priority property controls the order of consuming the edges. Edges are sorted by their priority number (ascending) and consumed in that order. Edges with the same priority are consumed without particular ordering (as the data arrives).

We can see a prioritized edge in action in the TF-IDF example:

dag.edge(between(stopwordSource, tokenize).broadcast().priority(-1))

The tokenize vertex performs lookup table-based filtering of words. It must receive the entire lookup table before beginning to process the data.

Buffered Edge

In some special cases, unbounded data buffering must be allowed on an edge. Consider the following scenario:

A vertex sends output to two edges, creating a fork in the DAG. The branches later rejoin at a downstream vertex which assigns different priorities to its two inbound edges. Since the data for both edges is generated simultaneously, and since the lower-priority edge will apply backpressure while waiting for the higher-priority edge to be consumed in full, the upstream vertex will not be allowed to emit its data and a deadlock will occur. The deadlock is resolved by activating the unbounded buffering on the lower-priority edge.

Fine-Tuning Edges

Edges have some configuration properties which can be used for tuning how the items are transmitted. The following options are available:

Name Description Default Value
Outbox capacity A cooperative processor's outbox will contain a bucket dedicated to this edge. When the bucket reaches the configured capacity, it will refuse further items. At that time the processor must yield control back to its caller. 2048
Queue Size When data needs to travel between two processors on the same cluster member, it is sent over a concurrent single-producer, single-consumer (SPSC) queue of fixed size. This options controls the size of the queue.
Since there are several processors executing the logic of each vertex, and since the queues are SPSC, there will be a total of senderParallelism * receiverParallelism queues representing the edge on each member. Care should be taken to strike a balance between performance and memory usage.
1024
Packet Size Limit For a distributed edge, data is sent to a remote member via Hazelcast network packets. Each packet is dedicated to the data of a single edge, but may contain any number of data items. This setting limits the size of the packet in bytes. Packets should be large enough to drown out any fixed overheads, but small enough to allow good interleaving with other packets.
Note that a single item cannot straddle packets, therefore the maximum packet size can exceed the value configured here by the size of a single data item.
This setting has no effect on a non-distributed edge.
16384
Receive Window Multiplier For each distributed edge the receiving member regularly sends flow-control ("ack") packets to its sender which prevent it from sending too much data and overflowing the buffers. The sender is allowed to send the data one receive window further than the last acknowledged byte and the receive window is sized in proportion to the rate of processing at the receiver.
Ack packets are sent in regular intervals and the receive window multiplier sets the factor of the linear relationship between the amount of data processed within one such interval and the size of the receive window.
To put it another way, let us define an ackworth to be the amount of data processed between two consecutive ack packets. The receive window multiplier determines the number of ackworths the sender can be ahead of the last acked byte.
This setting has no effect on a non-distributed edge.
3