This manual is for an old version of Hazelcast Jet, use the latest stable version.

Now we will implement this DAG in Jet. The first step is to create a DAG and source vertex:

DAG dag = new DAG();
Vertex source = dag.newVertex("source", Processors.readMap("lines"));

This is a simple vertex which will read the lines from the IMap and emit items of type Map.Entry<Integer, String> to the next vertex. The key of the entry is the line number, and the value is the line itself. We can use the built-in map-reading processor here, which can read a distributed IMap.

The next vertex is the tokenizer. Its responsibility is to take incoming lines and split them into words. This operation can be represented using a flat map processor, which comes built in with Jet:

// (lineNum, line) -> words
Pattern delimiter = Pattern.compile("\\W+");
Vertex tokenizer = dag.newVertex("tokenizer",
    Processors.flatMap((Entry<Integer, String> e) ->
        Traversers.traverseArray(delimiter.split(e.getValue().toLowerCase()))
                  .filter(word -> !word.isEmpty()))
);

This vertex will take an item of type Map.Entry<Integer, String> and split its value part into words. The key is ignored, as the line number is not useful for the purposes of word count. There will be one item emitted for each word, which will be the word itself. The Traverser interface is a convenience designed to be used by the built-in Jet processors.

The next vertex will do the grouping of the words and emit the count for each word. We can use the built-in groupAndAccumulate processor.

// word -> (word, count)
Vertex accumulator = dag.newVertex("accumulator",
    Processors.groupAndAccumulate(() -> 0L, (count, x) -> count + 1)
);

This processor will take items of type String, where the item is the word. The initial value of the count for a word is zero, and the value is incremented by one for each time the word is encountered. The expected output is of the type Entry<String, Long> where the key is the word, and the value is the accumulated count. The processor can only emit the final values after it has exhausted all the data.

The accumulation lambda given to the groupAndAccumulate processor combines the current running count with the count from the new entry.

This vertex will do a local accumulation of word counts on each member. The next step is to do a global accumulation of counts. This is the combining step:

// (word, count) -> (word, count)
Vertex combiner = dag.newVertex("combiner",
    Processors.groupAndAccumulate(
            Entry<String, Long>::getKey,
            () -> 0L,
            (count, wordAndCount) -> count + wordAndCount.getValue())
);

This vertex is very similar to the previous accumulator vertex, except we are combining two accumulated values instead of accumulated one for each word.

The final vertex is the output — we want to store the output in another IMap:

Vertex sink = dag.newVertex("sink", Processors.writeMap("counts"));

Next, we add the vertices we created to our DAG, and connect the vertices together with edges:

dag.edge(between(source, tokenizer))
   .edge(between(tokenizer, accumulator)
           .partitioned(KeyExtractors.wholeItem(), Partitioner.HASH_CODE))
   .edge(between(accumulator, combiner)
           .distributed()
           .partitioned(KeyExtractors.entryKey()))
   .edge(between(combiner, sink));

Let's take a closer look at some of the connections between the vertices. First, source and tokenizer:

.edge(between(tokenizer, accumulator)
        .partitioned(KeyExtractors.wholeItem(), Partitioner.HASH_CODE))

The edge between the tokenizer and accumulator is partitioned, because all entries with the same word as key need to be processed by the same instance of the vertex. Otherwise the same word would be duplicated across many instances. The partitioning key is the built-in wholeItem() partitioner, and we are using the built-in HASH_CODE as the partitioning function, which uses Object.hashCode().

.edge(between(accumulator, combiner)
        .distributed()
        .partitioned(KeyExtractors.entryKey()))

The edge between the accumulator and combiner is also partitioned, similar to the edge between the generator and accumulator. However, there is a key difference: the edge is also distributed. A distributed edge allows items to be sent to other members. Since this edge is both partitioned and distributed, the partitioning will be across all the members: all entries with the same word as key will be sent to a single processor instance in the whole cluster. This ensures that we get the correct total count for a word.

The partitioning key here is the key part of the Map.Entry<String, Long>, which is the word. We are using the default partitioning function here which uses default Hazelcast partitioning. This partitioning function can be slightly slower than HASH_CODE partitioning, but is guaranteed to return consistent results across all JVM processes, so is a better choice for distributed edges.

To run the DAG and print out the results, we simply do the following:

instance1.newJob(dag).execute().get();
System.out.println(instance1.getMap("counts").entrySet());

The final output should look like the following:

[heaven=1, times=2, of=12, its=2, far=1, light=1, noisiest=1,
the=14, other=1, incredulity=1, worst=1, hope=1, on=1, good=1, going=2,
like=1, we=4, was=11, best=1, nothing=1, degree=1, epoch=2, all=2,
that=1, us=2, winter=1, it=10, present=1, to=1, short=1, period=2,
had=2, wisdom=1, received=1, superlative=1, age=2, darkness=1, direct=2,
only=1, in=2, before=2, were=2, so=1, season=2, evil=1, being=1,
insisted=1, despair=1, belief=1, comparison=1, some=1, foolishness=1,
or=1, everything=1, spring=1, authorities=1, way=1, for=2]

An executable version of this sample can be found at the Hazelcast Jet code samples repository.