This manual is for an old version of Hazelcast Jet, use the latest stable version.

The source vertex reads a Hazelcast IMap so we just use the processor provided in Jet:

dag.newVertex("doc-source", Processors.readMap(DOCID_NAME));

The stopwords-producing vertex has a custom processor:

dag.newVertex("stopword-source", StopwordsP::new);

The processor's implementation is quite simple:

private static class StopwordsP extends AbstractProcessor {
    @Override
    public boolean complete() {
        emit(docLines("stopwords.txt").collect(toSet()));
        return true;
    }
}

It emits a single item: the HashSet built directly from the stream of a text file's lines.

The doc-count processor can again be built from the primitives provided in the Jet's library:

dag.newVertex("doc-count", Processors.accumulate(() -> 0L, (count, x) -> count + 1));

The doc-lines processor is more of a mouthful, but still built from existing primitives:

dag.newVertex("doc-lines",
    nonCooperative(
        Processors.flatMap((Entry<Long, String> e) ->
            traverseStream(docLines("books/" + e.getValue())
                           .map(line -> entry(e.getKey(), line))))));

Let's break down this expression... Processors.flatMap returns a standard processor that emits an arbitrary number of items for each received item. The user supplies a function of the shape inputItem -> Traverser(outputItems) and the processor takes care of all the logic required to cooperatively emit those items while respecting the output buffer limits.

This is the user-supplied expression evaluated for each incoming item:

traverseStream(docLines("books/" + e.getValue())
               .map(line -> entry(e.getKey(), line))))

traverseStream converts a java.util.Stream to Traverser so the inner part builds the stream: docLines() simply returns

Files.lines(Paths.get(TfIdf.class.getResource(name).toURI()))

and then the mapping stage is applied, which creates a pair (docId, line). Finally, the whole processor expression is wrapped into a call of nonCooperative() which will declare the processor non-cooperative, as required by the fact that it does blocking file I/O.

tokenizer is another custom vertex:

dag.newVertex("tokenize", TokenizeP::new);

private static class TokenizeP extends AbstractProcessor {
    private Set<String> stopwords;
    private final FlatMapper<Entry<Long, String>, Entry<Long, String>> flatMapper =
        flatMapper(e -> traverseStream(
                   Arrays.stream(DELIMITER.split(e.getValue()))
                         .filter(word -> !stopwords.contains(word))
                         .map(word -> entry(e.getKey(), word))));

    @Override
    protected boolean tryProcess0(@Nonnull Object item) {
        stopwords = (Set<String>) item;
        return true;
    }

    @Override
    protected boolean tryProcess1(@Nonnull Object item) {
        return flatMapper.tryProcess((Entry<Long, String>) item);
    }
}

This is a processor that must deal with two different inbound edges. It receives the stopword set over edge 0 and then it does a flatmapping operation on edge 1. The logic presented here uses the same approach as the implementation of the provided Processors.flatMap() processor: there is a single instance of FlatMapper that holds the business logic of the transformation, and the tryProcess1() callback method directly delegates into it. If FlatMapper is done emitting the previous items, it will accept the new item, apply the user-provided transformation, and start emitting the output items. If the buffer state prevents it from emitting all the pending items, it will return false, which will make the framework call the same tryProcess1 method later, with the same input item.

Let's show the code that creates the tokenize's two inbound edges:

dag.edge(between(stopwordSource, tokenize).broadcast().priority(-1))
   .edge(from(docLines).to(tokenize, 1));

Especially note the .priority(-1) part: this ensures that there will be no attempt to deliver any data coming from docLines before all the data from stopwordSource is already delivered. The processor would fail if it had to tokenize a line before it has its stopword set in place.

tf is another simple vertex, built purely from the provided primitives:

dag.newVertex("tf", groupAndAccumulate(() -> 0L, (count, x) -> count + 1));

tf-idf is the most complex processor:

dag.newVertex("tf-idf", TfIdfP::new);

private static class TfIdfP extends AbstractProcessor {
    private double logDocCount;

    private final Map<String, List<Entry<Long, Double>>> wordDocTf = new HashMap<>();
    private final Traverser<Entry<String, List<Entry<Long, Double>>>> invertedIndexTraverser =
            lazy(() -> traverseIterable(wordDocTf.entrySet()).map(this::toInvertedIndexEntry));

    @Override
    protected boolean tryProcess0(@Nonnull Object item) throws Exception {
        logDocCount = Math.log((Long) item);
        return true;
    }

    @Override
    protected boolean tryProcess1(@Nonnull Object item) throws Exception {
        final Entry<Entry<Long, String>, Long> e = (Entry<Entry<Long, String>, Long>) item;
        final long docId = e.getKey().getKey();
        final String word = e.getKey().getValue();
        final long tf = e.getValue();
        wordDocTf.computeIfAbsent(word, w -> new ArrayList<>())
                 .add(entry(docId, (double) tf));
        return true;
    }

    @Override
    public boolean complete() {
        return emitCooperatively(invertedIndexTraverser);
    }

    private Entry<String, List<Entry<Long, Double>>> toInvertedIndexEntry(
            Entry<String, List<Entry<Long, Double>>> wordDocTf
    ) {
        final String word = wordDocTf.getKey();
        final List<Entry<Long, Double>> docidTfs = wordDocTf.getValue();
        return entry(word, docScores(docidTfs));
    }

    private List<Entry<Long, Double>> docScores(List<Entry<Long, Double>> docidTfs) {
        final double logDf = Math.log(docidTfs.size());
        return docidTfs.stream()
                       .map(tfe -> tfidfEntry(logDf, tfe))
                       .collect(toList());
    }

    private Entry<Long, Double> tfidfEntry(double logDf, Entry<Long, Double> docidTf) {
        final Long docId = docidTf.getKey();
        final double tf = docidTf.getValue();
        final double idf = logDocCount - logDf;
        return entry(docId, tf * idf);
    }
}

This is quite a lot of code, but each of the three pieces is not too difficult to follow:

  1. tryProcess0() accepts a single item, the total document count.
  2. tryProcess1() performs a boilerplate groupBy operation, collecting a list of items under each key.
  3. complete() outputs the accumulated results, also applying the final transformation on each one: replacing the TF score with the final TF-IDF score. It relies on a lazy traverser, which holds a Supplier<Traverser> and will obtain the inner traverser from it the first time next() is called. This makes it very simple to write code that obtains a traverser from a map after it has been populated.

Finally, our DAG is terminated by a sink vertex:

dag.newVertex("sink", Processors.writeMap(INVERTED_INDEX));