The DAG-building API is centered around the
DAG
class. This is a pure data class and can be instantiated on its own,
without a Jet instance. This makes it simple to separate the
job-describing code from the code that manages the lifecycle of Jet
instances. To start building a DAG, you just write
DAG dag = new DAG();
A good practice is to structure the DAG-building code into the following sections:
- Create all the vertices.
- Configure the local parallelism of vertices.
- Create the edges.
Example:
DAG dag = new DAG();
// 1. Create vertices
Vertex source = dag.newVertex("source", Sources.readFiles("."));
Vertex transform = dag.newVertex("transform", Processors.map(
(String line) -> entry(line, line.length())));
Vertex sink = dag.newVertex("sink", Sinks.writeMap("sinkMap"));
// 2. Configure local parallelism
source.localParallelism(1);
// 3. Create edges
dag.edge(between(source, transform));
dag.edge(between(transform, sink));
Creating a Vertex
The two mandatory elements of creating a vertex are its string identifier and the supplier of processors. The latter can be provided in three variants, differing in the degree of explicit control over the lifecycle management of the processors. From simple to complex they are:
-
DistributedSupplier<Processor>
directly returns processor instances from itsget()
method. It is expected to be stateless and return equivalent instances on each call. It doesn't provide any initialization or cleanup code. -
ProcessorSupplier
returns in a single call all the processors that will run on a single cluster member. It may specialize each instance, for example to achieve local data partitioning. It is also in charge of the member-local lifecycle (initialization and destruction). -
ProcessorMetaSupplier
returns in a single call an object that will be in charge of creating all the processors for a vertex. Given a list of member addresses, the object it returns is aFunction<Address, ProcessorSupplier>
which will then be called with each of the addresses from the list to retrieve theProcessorSupplier
specialized for the given member.
Usually you don't have to care, or even know, which of these variants is
used. You'll call a library-provided factory method that returns one or
the other and they will integrate the same way into your newVertex()
calls.
Local and Global Parallelism of Vertex
The vertex is implemented by one or more instances of Processor
on
each member. Each vertex can specify how many of its processors will run
per cluster member using the localParallelism
property; every member
will have the same number of processors. A new Vertex
instance has
this property set to -1
, which requests to use the default value equal
to the configured size of the cooperative thread pool. The latter
defaults to Runtime.availableProcessors()
and is configurable via
InstanceConfig.setCooperativeThreadCount()
.
In most cases the only value of local parallelism that you'll want to
explicitly configure is 1
for the cases where no parallelism is
desirable (e.g. on a source processor reading from a file).
The global parallelism of the vertex is also an important value, especially in terms of the distribution of partitions among processors. It is equal to local parallelism multiplied by the cluster size.
Edge Ordinal
An edge is connected to a vertex with a given ordinal, which identifies it to the vertex and its processors. When a processor receives an item, it knows the ordinal of the edge on which the item came in. Things are similar on the outbound side: the processor emits an item to a given ordinal, but also has the option to emit the same item to all ordinals. This is the most typical case and allows easy replication of a data stream across several edges.
When you use the
between()
edge factory, the edge will be connected at ordinal 0 at both ends. When
you need a different ordinal, use the
from(a, ord1).to(b, ord2)
form. There must be no gaps in ordinal assignment, which means a vertex
will have inbound edges with ordinals 0..N and outbound edges with
ordinals 0..M.
This example shows the usage of between()
and from().to()
forms to
build a DAG with one source feeding two computational vertices:
DAG dag = new DAG();
Vertex source = dag.newVertex("source", Sources.readFiles("."));
Vertex v1 = dag.newVertex("v1", ...);
Vertex v2 = dag.newVertex("v2", ...);
dag.edge(between(source, v1));
dag.edge(from(source, 1).to(v2));
Local and Distributed Edge
A major choice to make in terms of data routing is whether the candidate
set of target processors is unconstrained, encompassing all processors
across the cluster, or constrained to just those running on the same
cluster member. This is controlled by the distributed
property of the
edge. By default the edge is local and calling the
distributed()
method removes this restriction.
With appropriate DAG design, network traffic can be minimized by
employing local edges. They are implemented with the most
efficient kind of concurrent queue: single-producer, single-consumer
array-backed queue. It employs wait-free algorithms on both sides and
avoids even the latency of volatile
writes by using lazySet
.
The quintessential example of employing local-distributed edge combo is the two-stage aggregation. Here's a review of that setup from the Word Count tutorial:
dag.edge(between(source, tokenizer))
.edge(between(tokenizer, accumulate)
.partitioned(DistributedFunctions.wholeItem(), Partitioner.HASH_CODE))
.edge(between(accumulate, combine)
.distributed()
.partitioned(DistributedFunctions.entryKey()))
.edge(between(combine, sink));
Note that only the edge from accumulate
to combine
is distributed.
Routing Policies
The routing policy decides which of the processors in the candidate set to route each particular item to.
Unicast
This is the default routing policy, the one you get when you write
dag.edge(between(source, tokenizer))
For each item it chooses a single destination processor with no further restrictions on the choice. The only guarantee given by this pattern is that exactly one processor will receive the item, but typically care will be taken to "spray" the items equally over all the reception candidates.
This choice makes sense when the data does not have to be partitioned, usually implying a downstream vertex which can compute the result based on each item in isolation.
Isolated
This is a more restricted kind of unicast policy: any given downstream
processor receives data from exactly one upstream processor. This is
needed in some DAG setups to apply selective backpressure to individual
upstream source processors. Activate this policy by calling
isolated()
on the edge:
dag.edge(between(source, insertWatermarks).isolated());
Broadcast
A broadcasting edge sends each item to all candidate receivers. This is useful when some small amount of data must be broadcast to all downstream vertices. Usually such vertices will have other inbound edges in addition to the broadcasting one, and will use the broadcast data as context while processing the other edges. In such cases the broadcasting edge will have a raised priority. There are other useful combinations, like a parallelism-one vertex that produces the same result on each member.
Activate this policy by calling broadcast()
on the edge:
dag.edge(between(source, count).broadcast());
Partitioned
A partitioned edge sends each item to the one processor responsible for the item's partition ID. On a distributed edge, this processor will be unique across the whole cluster. On a local edge, each member will have its own processor for each partition ID.
Multiple partitions can be assigned to each processor. The global number of partitions is controlled by the number of partitions in the underlying Hazelcast IMDG configuration. Please refer to the Hazelcast Reference Manual for more information about Hazelcast IMDG partitioning.
This is the default algorithm to determine the partition ID of an item:
- Apply the key extractor function defined on the edge to retrieve the partitioning key.
- Serialize the partitioning key to a byte array using Hazelcast serialization.
- Apply Hazelcast's standard
MurmurHash3
-based algorithm to get the key's hash value. - Partition ID is the hash value modulo the number of partitions.
The above procedure is quite CPU-intensive, but has the crucial property of giving repeatable results across all cluster members, which may be running on disparate JVM implementations.
Another common choice is to use Java's standard Object.hashCode()
. It
is often significantly faster. However, it is not a safe strategy in
general because hashCode()
's contract does not require repeatable
results across JVMs, or even different instances of the same JVM
version. If a given class's Javadoc explicitly specifies the hashing
function used, then its instances are safe to partition with
hashCode()
.
You can provide your own implementation of Partitioner
to gain full
control over the partitioning strategy.
We use both partitioning strategies in the Word Count example:
dag.edge(between(tokenizer, accumulate)
.partitioned(wholeItem(), Partitioner.HASH_CODE))
.edge(between(accumulate, combine)
.distributed()
.partitioned(entryKey()))
The local-partitioned edge uses partitioning by hash code and the distributed edge uses the default Hazelcast partitioning, to ensure correctness. Note that a detailed inspection of the data types that travel on the distributed edge reveals for that particular case that the hashcode-based partitioning would work on the distributed edge as well. We use Hazelcast partitioning nevertheless, for demonstration purposes. Since much less data travels towards the combiner than towards the accumulator, the performance of the whole job is hardly affected by this choice.
All-To-One
The all-to-one routing policy is a special case of the partitioned
policy which assigns the same partition ID to all items. The partition
ID is randomly chosen at job initialization time. This policy makes
sense on a distributed edge when all the items from all the members must
be routed to the same member and the same processor instance running on
it. Local parallelism of the target vertex should be set to 1, otherwise
there will be idle processors that never get any items.
On a local edge this policy doesn't make sense since simply setting the local parallelism of the target vertex to 1 constrains the local choice to just one processor instance.
In the TopNStocks
example the stream-processing job must find the
stocks with fastest-changing prices. To achieve this a single processor
must see the complete picture, so an all-to-one edge is employed:
dag.edge(between(topNStage1, topNStage2).distributed().allToOne())
Priority
By default the processor receives items from all inbound edges as they arrive. However, there are important cases where an edge must be consumed in full to make the processor ready to accept data from other edges. A major example is a "hash join" which enriches the data stream with data from a lookup table. This can be modeled as a join of two data streams where the enriching stream contains the data for the lookup table and must be consumed in full before consuming the stream to be enriched.
The priority
property controls the order of consuming the edges. Edges
are sorted by their priority number (ascending) and consumed in that
order. Edges with the same priority are consumed without particular
ordering (as the data arrives).
We can see a prioritized edge in action in the TF-IDF example:
dag.edge(between(stopwordSource, tokenize).broadcast().priority(-1))
The tokenize
vertex performs lookup table-based filtering of words. It
must receive the entire lookup table before beginning to process the
data.
A Fault Tolerance Caveat
As explained in the section on the Processor API, Jet takes regular snapshots of processor state when fault tolerance is enabled. A processor will get a special item in its input stream, called a barrier. When working in the exactly once mode, as soon as it receives it, it must stop pulling the data from that stream, wait for the same barrier in all other streams, and then emit its state to the snapshot storage. This is in direct contradiction with the contract of edge prioritization: the processor is not allowed to consume any other streams before having fully exhausted the prioritized ones.
This is why Jet does not initiate a snapshot until all the high-priority edges have been fully consumed.
Although strictly speaking this only applies to the exactly once mode, Jet postpones taking the snapshot in at least once mode as well. Even though the snapshot could begin early, it would still not be able to complete until the prioritized edges have been consumed. The result would be just that there are many more items processed twice after the restart.
Fine-Tuning Edges
Edges can be configured with an
EdgeConfig
instance, which specifies additional fine-tuning parameters. For
example,
dag.edge(between(tickerSource, generateTrades)
.setConfig(new EdgeConfig().setQueueSize(512)));
Please refer to the Javadoc of
EdgeConfig
for details.