The DAG-building API is centered around the
DAG class. This is a pure
data class and can be instantiated on its own, without a Jet instance.
This makes it simple to separate the job-describing code from the code
that manages the lifecycle of Jet instances. To start building a DAG,
you just write
DAG dag = new DAG();
A good practice is to structure the DAG-building code into the following sections:
- Create all the vertices.
- Configure the local parallelism of vertices.
- Create the edges.
DAG dag = new DAG(); // 1. Create vertices Vertex source = dag.newVertex("source", Sources.readFiles(".")); Vertex transform = dag.newVertex("transform", Processors.map( (String line) -> entry(line, line.length()))); Vertex sink = dag.newVertex("sink", Sinks.writeMap("sinkMap")); // 2. Configure local parallelism source.localParallelism(1); // 3. Create edges dag.edge(between(source, transform)); dag.edge(between(transform, sink));
Creating a Vertex
The two mandatory elements of creating a vertex are its string identifier and the supplier of processors. The latter can be provided in three variants, differing in the degree of explicit control over the lifecycle management of the processors. From simple to complex they are:
DistributedSupplier<Processor>directly returns processor instances from its
get()method. It is expected to be stateless and return equivalent instances on each call. It doesn't provide any initialization or cleanup code.
ProcessorSupplierreturns in a single call all the processors that will run on a single cluster member. It may specialize each instance, for example to achieve local data partitioning. It is also in charge of the member-local lifecycle (initialization and destruction).
ProcessorMetaSupplierreturns in a single call an object that will be in charge of creating all the processors for a vertex. Given a list of member addresses, the object it returns is a
Function<Address, ProcessorSupplier>which will then be called with each of the addresses from the list to retrieve the
ProcessorSupplierspecialized for the given member.
Usually you don't have to care, or even know, which of these variants is
used. You'll call a library-provided factory method that returns one or
the other and they will integrate the same way into your
Local and Global Parallelism of Vertex
The vertex is implemented by one or more instances of
each member. Each vertex can specify how many of its processors will run
per cluster member using the
localParallelism property; every member
will have the same number of processors. A new
Vertex instance has
this property set to
-1, which requests to use the default value equal
to the configured size of the cooperative thread pool. The latter
Runtime.availableProcessors() and is configurable via
In most cases the only value of local parallelism that you'll want to
explicitly configure is
1 for the cases where no parallelism is
desirable (e.g. on a source processor reading from a file).
The global parallelism of the vertex is also an important value, especially in terms of the distribution of partitions among processors. It is equal to local parallelism multiplied by the cluster size.
An edge is connected to a vertex with a given ordinal, which identifies it to the vertex and its processors. When a processor receives an item, it knows the ordinal of the edge on which the item came in. Things are similar on the outbound side: the processor emits an item to a given ordinal, but also has the option to emit the same item to all ordinals. This is the most typical case and allows easy replication of a data stream across several edges.
When you use the
between() edge factory, the edge will be connected at
ordinal 0 at both ends. When you need a different ordinal, use the
from(a, ord1).to(b, ord2) form. There must be no gaps in ordinal
assignment, which means a vertex will have inbound edges with ordinals
0..N and outbound edges with ordinals 0..M.
This example shows the usage of
from().to() forms to
build a DAG with one source feeding two computational vertices:
DAG dag = new DAG(); Vertex source = dag.newVertex("source", Sources.readFiles(".")); Vertex v1 = dag.newVertex("v1", ...); Vertex v2 = dag.newVertex("v2", ...); dag.edge(between(source, v1)); dag.edge(from(source, 1).to(v2));
Local and Distributed Edge
A major choice to make in terms of data routing is whether the candidate
set of target processors is unconstrained, encompassing all processors
across the cluster, or constrained to just those running on the same
cluster member. This is controlled by the
distributed property of the
edge. By default the edge is local and calling the
method removes this restriction.
With appropriate DAG design, network traffic can be minimized by
employing local edges. Local edges are implemented with the most
efficient kind of concurrent queue: single-producer, single-consumer
array-backed queue. It employs wait-free algorithms on both sides and
avoids the latency of
volatile writes by using
The quintessential example of employing local-distributed edge combo is the two-stage aggregation. Here's a review of that setup from the Word Count tutorial:
dag.edge(between(source, tokenizer)) .edge(between(tokenizer, accumulate) .partitioned(DistributedFunctions.wholeItem(), Partitioner.HASH_CODE)) .edge(between(accumulate, combine) .distributed() .partitioned(DistributedFunctions.entryKey())) .edge(between(combine, sink));
Note that only the edge from
combine is distributed.
The routing policy decides which of the processors in the candidate set to route each particular item to.
This is the default routing policy, the one you get when you write
For each item a single destination processor is chosen with no further restrictions on the choice. The only guarantee given by this pattern is that the item will be received by exactly one processor, but typically care will be taken to "spray" the items equally over all the reception candidates.
This choice makes sense when the data does not have to be partitioned, usually implying a downstream vertex which can compute the result based on each item in isolation.
A broadcasting edge sends each item to all candidate receivers. This is useful when some small amount of data must be broadcast to all downstream vertices. Usually such vertices will have other inbound edges in addition to the broadcasting one, and will use the broadcast data as context while processing the other edges. In such cases the broadcasting edge will have a raised priority. There are other useful combinations, like a parallelism-one vertex that produces the same result on each member.
Broadcasting is activated with the
broadcast() method call on the edge:
A partitioned edge sends each item to the one processor responsible for the item's partition ID. On a distributed edge, this processor will be unique across the whole cluster. On a local edge, each member will have its own processor for each partition ID.
Multiple partitions can be assigned to each processor. The global number of partitions is controlled by the number of partitions in the underlying Hazelcast IMDG configuration. Please refer to the Hazelcast Reference Manual for more information about Hazelcast IMDG partitioning.
This is the default algorithm to determine the partition ID of an item:
- Apply the key extractor function defined on the edge to retrieve the partitioning key.
- Serialize the partitioning key to a byte array using Hazelcast serialization.
- Apply Hazelcast's standard
MurmurHash3-based algorithm to get the key's hash value.
- Partition ID is the hash value modulo the number of partitions.
The above procedure is quite CPU-intensive, but has the crucial property of giving repeatable results across all cluster members, which may be running on disparate JVM implementations.
Another common choice is to use Java's standard
is often significantly faster. However, it is not a safe strategy in
hashCode()'s contract does not require repeatable
results across JVMs, or even different instances of the same JVM
version. If a given class's Javadoc explicitly specifies the hashing
function used, then its instances are safe to partition with
You can provide your own implementation of
Partitioner to gain full
control over the partitioning strategy.
We use both partitioning strategies in the Word Count example:
dag.edge(between(tokenizer, accumulate) .partitioned(wholeItem(), Partitioner.HASH_CODE)) .edge(between(accumulate, combine) .distributed() .partitioned(entryKey()))
The local-partitioned edge uses partitioning by hash code and the distributed edge uses the default Hazelcast partitioning, to ensure correctness. Note that a detailed inspection of the data types that travel on the distributed edge reveals for that particular case that the hashcode-based partitioning would work on the distributed edge as well. We use Hazelcast partitioning nevertheless, for demonstration purposes. Since much less data travels towards the combiner than towards the accumulator, the performance of the whole job is hardly affected by this choice.
The all-to-one routing policy is a special case of the
policy which assigns the same partition ID to all items. The partition
ID is randomly chosen at job initialization time. This policy makes
sense on a distributed edge when all the items from all the members must
be routed to the same member and the same processor instance running on
it. Local parallelism of the target vertex should be set to 1, otherwise
there will be idle processors that never get any items.
On a local edge this policy doesn't make sense since just setting the local parallelism of the target vertex to 1 constrains the local choice to just one processor instance.
TopNStocks example the stream-processing job must find the
stocks with fastest-changing prices. To achieve this a single processor
must see the complete picture, so an all-to-one edge is employed:
By default the processor receives items from all inbound edges as they arrive. However, there are important cases where an edge must be consumed in full to make the processor ready to accept data from other edges. A major example is a "hash join" which enriches the data stream with data from a lookup table. This can be modeled as a join of two data streams where the "left" one delivers the contents of the lookup table and the "right" one is the main data stream that will be enriched from the lookup table.
priority property controls the order of consuming the edges. Edges
are sorted by their priority number (ascending) and consumed in that
order. Edges with the same priority are consumed without particular
ordering (as the data arrives).
We can see a prioritized edge in action in the TF-IDF example:
tokenize vertex performs lookup table-based filtering of words. It
must receive the entire lookup table before beginning to process the
In some special cases, unbounded data buffering must be allowed on an edge. Consider the following scenario:
A vertex sends output to two edges, creating a fork in the DAG. The branches later rejoin at a downstream vertex which assigns different priorities to its two inbound edges. Since the data for both edges is generated simultaneously, and since the lower-priority edge will apply backpressure while waiting for the higher-priority edge to be consumed in full, the upstream vertex will not be allowed to emit its data and a deadlock will occur. The deadlock is resolved by activating the unbounded buffering on the lower-priority edge.
Edges have some configuration properties which can be used for tuning how the items are transmitted. The following options are available:
|Outbox capacity||A cooperative processor's outbox will contain a bucket dedicated to this edge. When the bucket reaches the configured capacity, it will refuse further items. At that time the processor must yield control back to its caller.||2048|
When data needs to travel between two processors on the same
cluster member, it is sent over a concurrent single-producer,
single-consumer (SPSC) queue of fixed size. This options controls
the size of the queue.
Since there are several processors executing the logic of each vertex, and since the queues are SPSC, there will be a total of
|Packet Size Limit||
For a distributed edge, data is sent to a remote member via
Hazelcast network packets. Each packet is dedicated to the data of
a single edge, but may contain any number of data items. This
setting limits the size of the packet in bytes. Packets should be
large enough to drown out any fixed overheads, but small enough to
allow good interleaving with other packets.
Note that a single item cannot straddle packets, therefore the maximum packet size can exceed the value configured here by the size of a single data item.
This setting has no effect on a non-distributed edge.
|Receive Window Multiplier||
For each distributed edge the receiving member regularly sends
flow-control ("ack") packets to its sender which prevent it from
sending too much data and overflowing the buffers. The sender is
allowed to send the data one receive window further than the last
acknowledged byte and the receive window is sized in proportion to
the rate of processing at the receiver.
Ack packets are sent in regular intervals and the receive window multiplier sets the factor of the linear relationship between the amount of data processed within one such interval and the size of the receive window.
To put it another way, let us define an ackworth to be the amount of data processed between two consecutive ack packets. The receive window multiplier determines the number of ackworths the sender can be ahead of the last acked byte.
This setting has no effect on a non-distributed edge.