This document is for an old version of the former Hazelcast IMDG product.

We've combined the in-memory storage of IMDG with the stream processing power of Jet to bring you an all new platform: Hazelcast 5.0

Use the following links to try it:

The DAG-building API is centered around the DAG class. This is a pure data class and can be instantiated on its own, without a Jet instance. This makes it simple to separate the job-describing code from the code that manages the lifecycle of Jet instances. To start building a DAG, you just write

DAG dag = new DAG();

A good practice is to structure the DAG-building code into the following sections:

  1. Create all the vertices.
  2. Configure the local parallelism of vertices.
  3. Create the edges.

Example:

DAG dag = new DAG();

// 1. Create vertices
Vertex source = dag.newVertex("source", Sources.readFiles("."));
Vertex transform = dag.newVertex("transform", Processors.map(
        (String line) -> entry(line, line.length())));
Vertex sink = dag.newVertex("sink", Sinks.writeMap("sinkMap"));

// 2. Configure local parallelism
source.localParallelism(1);

// 3. Create edges
dag.edge(between(source, transform));
dag.edge(between(transform, sink));

Creating a Vertex

The two mandatory elements of creating a vertex are its string identifier and the supplier of processors. The latter can be provided in three variants, differing in the degree of explicit control over the lifecycle management of the processors. From simple to complex they are:

  1. DistributedSupplier<Processor> directly returns processor instances from its get() method. It is expected to be stateless and return equivalent instances on each call. It doesn't provide any initialization or cleanup code.
  2. ProcessorSupplier returns in a single call all the processors that will run on a single cluster member. It may specialize each instance, for example to achieve local data partitioning. It is also in charge of the member-local lifecycle (initialization and destruction).
  3. ProcessorMetaSupplier returns in a single call an object that will be in charge of creating all the processors for a vertex. Given a list of member addresses, the object it returns is a Function<Address, ProcessorSupplier> which will then be called with each of the addresses from the list to retrieve the ProcessorSupplier specialized for the given member.

Usually you don't have to care, or even know, which of these variants is used. You'll call a library-provided factory method that returns one or the other and they will integrate the same way into your newVertex() calls.

Local and Global Parallelism of Vertex

The vertex is implemented by one or more instances of Processor on each member. Each vertex can specify how many of its processors will run per cluster member using the localParallelism property; every member will have the same number of processors. A new Vertex instance has this property set to -1, which requests to use the default value equal to the configured size of the cooperative thread pool. The latter defaults to Runtime.availableProcessors() and is configurable via InstanceConfig.setCooperativeThreadCount().

In most cases the only value of local parallelism that you'll want to explicitly configure is 1 for the cases where no parallelism is desirable (e.g. on a source processor reading from a file).

The global parallelism of the vertex is also an important value, especially in terms of the distribution of partitions among processors. It is equal to local parallelism multiplied by the cluster size.

Edge Ordinal

An edge is connected to a vertex with a given ordinal, which identifies it to the vertex and its processors. When a processor receives an item, it knows the ordinal of the edge on which the item came in. Things are similar on the outbound side: the processor emits an item to a given ordinal, but also has the option to emit the same item to all ordinals. This is the most typical case and allows easy replication of a data stream across several edges.

When you use the between() edge factory, the edge will be connected at ordinal 0 at both ends. When you need a different ordinal, use the from(a, ord1).to(b, ord2) form. There must be no gaps in ordinal assignment, which means a vertex will have inbound edges with ordinals 0..N and outbound edges with ordinals 0..M.

This example shows the usage of between() and from().to() forms to build a DAG with one source feeding two computational vertices:

DAG dag = new DAG();

Vertex source = dag.newVertex("source", Sources.readFiles("."));
Vertex v1 = dag.newVertex("v1", ...);
Vertex v2 = dag.newVertex("v2", ...);

dag.edge(between(source, v1));
dag.edge(from(source, 1).to(v2));

Local and Distributed Edge

A major choice to make in terms of data routing is whether the candidate set of target processors is unconstrained, encompassing all processors across the cluster, or constrained to just those running on the same cluster member. This is controlled by the distributed property of the edge. By default the edge is local and calling the distributed() method removes this restriction.

With appropriate DAG design, network traffic can be minimized by employing local edges. They are implemented with the most efficient kind of concurrent queue: single-producer, single-consumer array-backed queue. It employs wait-free algorithms on both sides and avoids even the latency of volatile writes by using lazySet.

The quintessential example of employing local-distributed edge combo is the two-stage aggregation. Here's a review of that setup from the Word Count tutorial:

dag.edge(between(source, tokenizer))
   .edge(between(tokenizer, accumulate)
           .partitioned(DistributedFunctions.wholeItem(), Partitioner.HASH_CODE))
   .edge(between(accumulate, combine)
           .distributed()
           .partitioned(DistributedFunctions.entryKey()))
   .edge(between(combine, sink));

Note that only the edge from accumulate to combine is distributed.

Routing Policies

The routing policy decides which of the processors in the candidate set to route each particular item to.

Unicast

This is the default routing policy, the one you get when you write

dag.edge(between(source, tokenizer))

For each item it chooses a single destination processor with no further restrictions on the choice. The only guarantee given by this pattern is that exactly one processor will receive the item, but typically care will be taken to "spray" the items equally over all the reception candidates.

This choice makes sense when the data does not have to be partitioned, usually implying a downstream vertex which can compute the result based on each item in isolation.

Isolated

This is a more restricted kind of unicast policy: any given downstream processor receives data from exactly one upstream processor. This is needed in some DAG setups to apply selective backpressure to individual upstream source processors. Activate this policy by calling isolated() on the edge:

dag.edge(between(source, insertWatermarks).isolated());

Broadcast

A broadcasting edge sends each item to all candidate receivers. This is useful when some small amount of data must be broadcast to all downstream vertices. Usually such vertices will have other inbound edges in addition to the broadcasting one, and will use the broadcast data as context while processing the other edges. In such cases the broadcasting edge will have a raised priority. There are other useful combinations, like a parallelism-one vertex that produces the same result on each member.

Activate this policy by calling broadcast() on the edge:

dag.edge(between(source, count).broadcast());

Partitioned

A partitioned edge sends each item to the one processor responsible for the item's partition ID. On a distributed edge, this processor will be unique across the whole cluster. On a local edge, each member will have its own processor for each partition ID.

Multiple partitions can be assigned to each processor. The global number of partitions is controlled by the number of partitions in the underlying Hazelcast IMDG configuration. Please refer to the Hazelcast Reference Manual for more information about Hazelcast IMDG partitioning.

This is the default algorithm to determine the partition ID of an item:

  1. Apply the key extractor function defined on the edge to retrieve the partitioning key.
  2. Serialize the partitioning key to a byte array using Hazelcast serialization.
  3. Apply Hazelcast's standard MurmurHash3-based algorithm to get the key's hash value.
  4. Partition ID is the hash value modulo the number of partitions.

The above procedure is quite CPU-intensive, but has the crucial property of giving repeatable results across all cluster members, which may be running on disparate JVM implementations.

Another common choice is to use Java's standard Object.hashCode(). It is often significantly faster. However, it is not a safe strategy in general because hashCode()'s contract does not require repeatable results across JVMs, or even different instances of the same JVM version. If a given class's Javadoc explicitly specifies the hashing function used, then its instances are safe to partition with hashCode().

You can provide your own implementation of Partitioner to gain full control over the partitioning strategy.

We use both partitioning strategies in the Word Count example:

dag.edge(between(tokenizer, accumulate)
           .partitioned(wholeItem(), Partitioner.HASH_CODE))
   .edge(between(accumulate, combine)
           .distributed()
           .partitioned(entryKey()))

The local-partitioned edge uses partitioning by hash code and the distributed edge uses the default Hazelcast partitioning, to ensure correctness. Note that a detailed inspection of the data types that travel on the distributed edge reveals for that particular case that the hashcode-based partitioning would work on the distributed edge as well. We use Hazelcast partitioning nevertheless, for demonstration purposes. Since much less data travels towards the combiner than towards the accumulator, the performance of the whole job is hardly affected by this choice.

All-To-One

The all-to-one routing policy is a special case of the partitioned policy which assigns the same partition ID to all items. The partition ID is randomly chosen at job initialization time. This policy makes sense on a distributed edge when all the items from all the members must be routed to the same member and the same processor instance running on it. Local parallelism of the target vertex should be set to 1, otherwise there will be idle processors that never get any items.

On a local edge this policy doesn't make sense since simply setting the local parallelism of the target vertex to 1 constrains the local choice to just one processor instance.

In the TopNStocks example the stream-processing job must find the stocks with fastest-changing prices. To achieve this a single processor must see the complete picture, so an all-to-one edge is employed:

dag.edge(between(topNStage1, topNStage2).distributed().allToOne())

Priority

By default the processor receives items from all inbound edges as they arrive. However, there are important cases where an edge must be consumed in full to make the processor ready to accept data from other edges. A major example is a "hash join" which enriches the data stream with data from a lookup table. This can be modeled as a join of two data streams where the enriching stream contains the data for the lookup table and must be consumed in full before consuming the stream to be enriched.

The priority property controls the order of consuming the edges. Edges are sorted by their priority number (ascending) and consumed in that order. Edges with the same priority are consumed without particular ordering (as the data arrives).

We can see a prioritized edge in action in the TF-IDF example:

dag.edge(between(stopwordSource, tokenize).broadcast().priority(-1))

The tokenize vertex performs lookup table-based filtering of words. It must receive the entire lookup table before beginning to process the data.

A Fault Tolerance Caveat

As explained in the section on the Processor API, Jet takes regular snapshots of processor state when fault tolerance is enabled. A processor will get a special item in its input stream, called a barrier. When working in the exactly once mode, as soon as it receives it, it must stop pulling the data from that stream, wait for the same barrier in all other streams, and then emit its state to the snapshot storage. This is in direct contradiction with the contract of edge prioritization: the processor is not allowed to consume any other streams before having fully exhausted the prioritized ones.

This is why Jet does not initiate a snapshot until all the high-priority edges have been fully consumed.

Although strictly speaking this only applies to the exactly once mode, Jet postpones taking the snapshot in at least once mode as well. Even though the snapshot could begin early, it would still not be able to complete until the prioritized edges have been consumed. The result would be just that there are many more items processed twice after the restart.

Fine-Tuning Edges

Edges can be configured with an EdgeConfig instance, which specifies additional fine-tuning parameters. For example,

dag.edge(between(tickerSource, generateTrades)
        .setConfig(new EdgeConfig().setQueueSize(512)));

Please refer to the Javadoc of EdgeConfig for details.