logo dark

Hazelcast Jet Reference Manual

Version 0.7

Welcome to the Hazelcast Jet Reference Manual. This manual includes concepts, instructions, and samples to guide you on how to use Hazelcast Jet to build and execute computation jobs.

As the reader of this manual you should be familiar with the Java programming language.

Summary of Contents

  1. Overview presents the basic concepts of Hazelast Jet and provides high-level orientation.

  2. Get Started leads you through the steps to get Hazelcast Jet up and running on your machine. You can copy-paste the provided code to execute your first Jet job.

  3. Work With Jet show you how to configure Jet, start a Jet cluster, manage jobs, etc.

  4. The Pipeline API introduces Jet’s Pipeline API. This is the API you use to build the computation you want Jet to execute.

  5. Source and Sink Connectors presents the source and sink connectors that Jet provides.

  6. Configuration explains how to configure Jet. You can tune its performance, level of safety in fault-tolerant jobs, configure the underlying IMDG cluster, etc.

  7. Jet Concepts presents some fundamental concepts that make Jet work. Understanding them will help you get the most of Hazelcast Jet.

  8. Expert Zone — The Core API presents the material you need to understand if you want to successfully use the Core API. You don’t need this knowledge to use Jet. It is relevant only if you plan to implement custom processors or build the DAG by directly using the Core API.

Preface

Naming

  • Hazelcast Jet or just Jet refers to the distributed data processing engine provided by Hazelcast, Inc.

  • Hazelcast IMDG or just Hazelcast refers to the Hazelcast in-memory data grid middleware. Hazelcast is also the name of the company (Hazelcast, Inc.) providing Hazelcast IMDG and Hazelcast Jet.

Licensing

Hazelcast Jet and Hazelcast Jet Reference Manual are free and provided under the Apache License, Version 2.0.

Trademarks

Hazelcast is a registered trademark of Hazelcast, Inc. All other trademarks in this manual are held by their respective owners.

Getting Help

The Jet team provides support to its community via these channels:

For information on the commercial support for Hazelcast Jet, please see this page on hazelcast.com.

1. Overview of Jet

Jet is a distributed data processing engine that treats the data as a stream. It can process both static datasets (i.e., batch jobs) and live event streams. It can compute aggregate functions over infinite data streams by using the concept of windowing: dividing the stream into a sequence of subsets (windows) and applying the aggregate function over each window.

With unbounded event streams there arises the issue of failures in the cluster. Jet can tolerate failures of individual members without loss, offering the exactly-once processing guarantee.

You deploy Jet to a cluster of machines and then submit your processing jobs to it. Jet will automatically make all the cluster members participate in processing your data. It can connect to a distributed data source such as Kafka, Hadoop Distributed File System, or a Hazelcast IMDG cluster. Each member of the Jet cluster will consume a slice of the distributed data, which makes it easy to scale up to any level of throughput.

Jet is built on top of the Hazelcast IMDG technology and thanks to this the Jet cluster can also play the role of the data source and sink. If you use Jet this way, you can achieve perfect data locality and top-of-the-class throughputs.

Architecture Overview

The main programming paradigm you’ll use with Jet is that of a processing pipeline, which is a network of interconnected stages. The stages form a directed acyclic graph (a DAG) and the connections between them indicate the path along which the data flows. Data processing happens inside each stage. You describe the pipeline using the Pipeline API.

Internally Jet converts the pipeline into its native representation, the Core API DAG, which describes the layout of the processing units and the rules for routing the data between them. It is possible to use the Core API to directly create the native representation, but it’s much more complex and error-prone.

To run the computation Jet uses a cooperative multithreading execution engine, which means it can run many Core API Processor s on the same JVM thread. A processor corresponds to a standalone single-threaded task that processes a stream of data. The processors are implemented in such a way that they don’t block the thread while waiting for more data, or while waiting for the receiving processor to accept it. By not dependending on OS-level thread scheduling Jet can achieve greater throughput and better saturate the CPU cores. It can also drive many more processors than the OS can manage native threads.

2. Get Started

In this section we’ll get you started using Hazelcast Jet. We’ll show you how to set up a Java project with the proper dependencies and a quick Hello World example to verify your setup.

2.1. Requirements

In the good tradition of Hazelcast products, Jet is distributed as a JAR with no other dependencies. It requires JRE version 8 or higher to run.

2.2. Install Hazelcast Jet

The easiest way to start using Hazelcast Jet is to add it as a dependency to your project.

Hazelcast Jet is published on the Maven repositories. Add the following lines to your pom.xml:

<dependencies>
  <dependency>
    <groupId>com.hazelcast.jet</groupId>
    <artifactId>hazelcast-jet</artifactId>
    <version>0.7</version>
  </dependency>
</dependencies>

If you prefer to use Gradle, execute the following command:

compile 'com.hazelcast.jet:hazelcast-jet:0.7'

Alternatively you can download the latest distribution package of Hazelcast Jet and add the hazelcast-jet-0.7.jar file to your classpath.

2.3. Install Hazelcast Jet Enterprise (Optional)

Hazelcast Jet Enterprise is a commercial edition of Hazelcast Jet. It’s built on top of Hazelcast Jet open source and extends it with the following features:

  • Security Suite

  • Lossless Recovery (in Jet 1.0)

  • Rolling Job Upgrades (in Jet 1.0)

  • Enterprise PaaS Deployment Environment (Pivotal Cloud Foundry, Openshift Container Platform (Jet 1.0))

Hazelcast Jet Enterprise is available on a Hazelcast Maven repository. Add the following lines to your pom.xml:

<repository>
   <id>Hazelcast Private Snapshot Repository</id>
   <url>https://repository-hazelcast-l337.forge.cloudbees.com/snapshot/</url>
</repository>
<repository>
   <id>Hazelcast Private Release Repository</id>
   <url>https://repository-hazelcast-l337.forge.cloudbees.com/release/</url>
</repository>
<dependency>
   <groupId>com.hazelcast</groupId>
   <artifactId>hazelcast-enterprise</artifactId>
   <version>0.7</version>
</dependency>

You can download the Hazelcast Jet Enterprise package from hazelcast.com.

2.3.1. Set the License Key

To use Hazelcast Jet Enterprise, you must set the license key using one of the configuration methods shown below. You can request a trial license key at hazelcast.com.

Hazelcast Jet Enterprise license keys are required only to run the Jet cluster. A Jet client can access the Enterprise features without the license key.

The license key can be configured using one of the following methods:

Hazelcast Configuration File

Replace the value for the <license-key> tag inside the hazelcast.xml file in the config folder:

<hazelcast ..>
    ...
    <license-key>ENTER LICENSE KEY HERE</license-key>
    ...
</hazelcast>
Programmatic Configuration

License key also can be set in the Jet config object as follows:

JetConfig config = new JetConfig();
config.getHazelcastConfig().setLicenseKey( "Your Enterprise License Key" );
System Property

Set the following system property:

-Dhazelcast.enterprise.license.key=Your Enterprise License Key

2.3.2. Hazelcast Jet Management Center

Hazelcast Jet Management Center is a management and monitoring suite providing a live overview of the Hazelcast Jet cluster. It’s a standalone tool with a web console.

Please see the Hazelcast Jet Management Center Reference Manual for the installation instructions.

2.4. Verify Your Setup With a Word Count Example

You can verify your setup by running this simple program. It processes the contents of a Hazelcast IList that contains lines of text, finds the number of occurrences of each word in it, and stores its results in a Hazelcast IMap. In a distributed computation job the input and output cannot be simple in-memory structures like a Java List; they must be accessible to any member of the computing cluster and must persist after a job ends. This is why we use Hazelcast structures.

import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;

import java.util.List;
import java.util.Map;

import static com.hazelcast.jet.Traversers.traverseArray;
import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
import static com.hazelcast.jet.function.DistributedFunctions.wholeItem;

public class HelloWorld {
    public static void main(String[] args) {
        // Create the specification of the computation pipeline. Note
        // it's a pure POJO: no instance of Jet needed to create it.
        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.<String>list("text"))
         .flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
         .filter(word -> !word.isEmpty())
         .groupingKey(wholeItem())
         .aggregate(counting())
         .drainTo(Sinks.map("counts"));

        // Start Jet, populate the input list
        JetInstance jet = Jet.newJetInstance();
        try {
            List<String> text = jet.getList("text");
            text.add("hello world hello hello world");
            text.add("world world hello world");

            // Perform the computation
            jet.newJob(p).join();

            // Check the results
            Map<String, Long> counts = jet.getMap("counts");
            System.out.println("Count of hello: "
                    + counts.get("hello"));
            System.out.println("Count of world: "
                    + counts.get("world"));
        } finally {
            Jet.shutdownAll();
        }
    }
}

You should expect to see a lot of logging output from Jet (sent to stderr) and two lines on stdout:

Count of hello: 4
Count of world: 5

2.5. Reuse Your java.util.stream Knowledge

If you’ve already used Java’s Stream API, you’ll find many similarities in Jet’s Pipeline API. They both construct a processing pipeline by adding processing steps (stages). Both are FP-oriented APIs with lambdas playing a key role. Simple transformations like map/filter even look exactly the same. The main concern is knowing where the similiarities end. Here are some typical gotchas if the Stream API has set some expectations for you:

  • All lambdas in Jet get serialized so they can be sent to remote cluster members. If your lambda captures a variable from the surrounding scope, that variable’s contents must be serialized as well. If you refer to an instance variable, the entire instance holding it must be serialized. It’s quite easy to accidentally capture and serialize the entire this object and everything it refers to.

  • The pipeline you construct doesn’t execute itself, you must explicitly submit it to a Jet cluster.

  • Since you’re submitting the computation to an external system, you don’t get the result in the return value of a method call. The pipeline explicitly specifies where it will store the results (to a data sink).

  • Whereas in the Stream API aggregation is the terminal step, the one that immediately makes your pipeline execute, in Jet it is just another transformation (an intermediate step).

Finally, you’ll notice that Jet’s Pipeline API is much more powerful than the Stream API. Here are a few highlights:

2.5.1. Example: List Transformation

Here’s a simple example of list transformation with the Stream API:

List<String> strings = Arrays.asList("a", "b");

List<String> uppercased = strings
        .stream()
        .map(String::toUpperCase)
        .collect(toList());

uppercased.forEach(System.out::println);

Here’s the equivalent in Jet. Note that we’re transforming Hazelcast IList s:

JetInstance jet = newJetInstance();
IList<String> strings = jet.getList("strings");
strings.addAll(Arrays.asList("a", "b"));
IList<String> uppercased = jet.getList("uppercased");

Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.list(strings))
        .map(String::toUpperCase)
        .drainTo(Sinks.list(uppercased));
jet.newJob(pipeline).join();

uppercased.forEach(System.out::println);

2.5.2. Example: Grouping and Aggregation

Here’s an example of grouping and aggregation with the Stream API. We compute a histogram of words by their length:

List<String> strings = Arrays.asList("a", "b", "aa", "bb");
Map<Integer, Long> histogram = strings
        .stream()
        .collect(groupingBy(String::length, Collectors.counting()));

histogram.forEach((length, count) -> System.out.format(
        "%d chars: %d occurrences%n", length, count));

And here’s how to aggregate in Jet:

JetInstance jet = newJetInstance();
IList<String> strings = jet.getList("strings");
strings.addAll(Arrays.asList("a", "b", "aa", "bb"));
IMap<Integer, Long> histogram = jet.getMap("histogram");

Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.list(strings))
        .groupingKey(String::length)
        .aggregate(AggregateOperations.counting())
        .drainTo(Sinks.map(histogram));
jet.newJob(pipeline).join();

histogram.forEach((length, count) -> System.out.format(
        "%d chars: %d occurrences%n", length, count));

Note that the result of aggregate is just another pipeline stage, you can apply more transforms to it before draining to the sink.

2.5.3. Example: Collector vs. AggregateOperation

If you have ever written your own Collector for the Stream API, you’ll find that Jet’s AggregateOperation is quite similar and you can transfer your skill to it.

Here’s a Stream API collector that computes the sum of input items:

Collector<Long, LongAccumulator, Long> summingLong = Collector.of(
        LongAccumulator::new,
        (LongAccumulator acc, Long t) -> acc.add(t),
        (acc0, acc1) -> acc0.add(acc1),
        LongAccumulator::get
);

And here’s Jet’s aggregate operation doing the same:

AggregateOperation1<Long, LongAccumulator, Long> summingLong = AggregateOperation
        .withCreate(LongAccumulator::new)
        .andAccumulate((LongAccumulator acc, Long t) -> acc.add(t))
        .andCombine((acc0, acc1) -> acc0.add(acc1))
        .andDeduct((acc0, acc1) -> acc0.subtract(acc1))
        .andExportFinish(LongAccumulator::get);

Compared to Collector, AggregateOperation defines two more primitives:

  • deduct reverses a previous combine. It’s an optional primitive and serves to optimize sliding window aggregation.

  • export is similar to finish, the difference being that export must preserve the accumulator’s state and finish doesn’t. Jet uses finish wherever applicable as it can be implemented more optimally. In this example we use the same lambda for both primitives.

3. Work With Jet

3.1. Start Jet

To create a Jet cluster, we simply start some Jet instances. Normally these would be started on separate machines, but for simple practice we can use the same JVM for two instances. Even though they are in the same JVM, they’ll communicate over the network interface.

JetInstance jet = Jet.newJetInstance();
Jet.newJetInstance();

These two instances should automatically discover each other using IP multicast and form a cluster. You should see a log output similar to the following:

Members [2] {
  Member [10.0.1.3]:5701 - f1e30062-e87e-4e97-83bc-6b4756ef6ea3
  Member [10.0.1.3]:5702 - d7b66a8c-5bc1-4476-a528-795a8a2d9d97 this
}

This means the members successfully formed a cluster. Since the Jet instances start their own threads, it is important to explicitly shut them down at the end of your program; otherwise the Java process will remain alive after the main() method completes:

public static void main(String[] args) {
    try {
        JetInstance jet = Jet.newJetInstance();
        Jet.newJetInstance();

        // work with Jet

    } finally {
        Jet.shutdownAll();
    }
}

3.2. Build the Computation Pipeline

The general shape of any data processing pipeline is drawFromSource → transform → drainToSink and the natural way to build it is from source to sink. The Pipeline API follows this pattern. For example,

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("input"))
 .map(String::toUpperCase)
 .drainTo(Sinks.list("result"));

Refer to the chapter on the Pipeline API for full details.

3.3. Watch out for Capturing Lambdas

A typical Jet pipeline involves lambda expressions. Since the whole pipeline definition must be serialized to be sent to the cluster, the lambda expressions must be serializable as well. The Java standard provides an essential building block: if the static type of the lambda is a subtype of Serializable, you will automatically get a lambda instance that can serialize itself.

None of the functional interfaces in the JDK extend Serializable, so we had to mirror the entire java.util.function package in our own com.hazelcast.jet.function with all the interfaces subtyped and made Serializable. Each subtype has the name of the original with Distributed prepended. For example, a DistributedFunction is just like Function, but implements Serializable. We use these types everywhere in the Pipeline API.

As always with this kind of magic, auto-serializability of lambdas has its flipside: it is easy to overlook what’s going on.

If the lambda references a variable in the outer scope, the variable is captured and must also be serializable. If it references an instance variable of the enclosing class, it implicitly captures this so the entire class will be serialized. For example, this will fail because JetJob1 doesn’t implement Serializable:

class JetJob1 {
    private String instanceVar;

    Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.list("input"))
         .filter(item -> item.equals(instanceVar)); (1)
        return p;
    }
}
1 Refers to instanceVar, capturing this, but JetJob1 is not Serializable so this call will fail.

Just adding implements Serializable to JetJob1 would be a viable workaround here. However, consider something just a bit different:

class JetJob2 {
    private String instanceVar;
    private OutputStream fileOut; (1)

    Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.list("input"))
         .filter(item -> item.equals(instanceVar)); (2)
        return p;
    }
}
1 A non-serializable field.
2 Refers to instanceVar, capturing this. JetJob2 is declared as Serializable, but has a non-serializable field and this fails.

Even though we never refer to fileOut, we are still capturing the entire JetJob2 instance. We might mark fileOut as transient, but the sane approach is to avoid referring to instance variables of the surrounding class. We can simply achieve this by assigning to a local variable, then referring to that variable inside the lambda:

class JetJob3 {
    private String instanceVar;

    Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        String findMe = instanceVar; (1)
        p.drawFrom(Sources.list("input"))
         .filter(item -> item.equals(findMe)); (2)
        return p;
    }
}
1 Declare a local variable that loads the value of the instance field.
2 By referring to the local variable findMe we avoid capturing this and the job runs fine.

Another common pitfall is capturing an instance of DateTimeFormatter or a similar non-serializable class:

DateTimeFormatter formatter = DateTimeFormatter
        .ofPattern("HH:mm:ss.SSS")
        .withZone(ZoneId.systemDefault());
Pipeline p = Pipeline.create();
BatchStage<Long> src = p.drawFrom(Sources.list("input"));
src.map((Long tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp))); (1)
1 Captures the non-serializable formatter, so this fails.

Sometimes we can get away by using one of the preconfigured formatters available in the JDK:

src.map((Long tstamp) -> DateTimeFormatter.ISO_LOCAL_TIME (1)
        .format(Instant.ofEpochMilli(tstamp).atZone(ZoneId.systemDefault())));
1 Accesses the static final field ISO_LOCAL_TIME. Static fields are not subject to lambda capture, they are dereferenced when the code runs on the target machine.

This refers to a static final field in the JDK, so the instance is available on any JVM. If this is not available, you may create a static final field in your own class, but you can also use mapUsingContext(). In this case you provide a serializable factory that Jet will ask to create an object on the target member. The object it returns doesn’t have to be serializable. Here’s an example of that:

Pipeline p = Pipeline.create();
BatchStage<Long> src = p.drawFrom(Sources.list("input"));
ContextFactory<DateTimeFormatter> contextFactory = ContextFactory.withCreateFn( (1)
        x -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
                              .withZone(ZoneId.systemDefault()));
src.mapUsingContext(contextFactory, (2)
        (formatter, tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp))); (3)
1 Create a ContextFactory.
2 Supply it to mapUsingContext().
3 Your mapping function now gets the object the factory created.

3.4. Submit a Jet Job and Manage its Lifecycle

This is how you submit a Jet pipeline for execution:

Pipeline pipeline = buildPipeline();
jet.newJob(pipeline).join();

If you want to submit a Core API DAG, the syntax is identical:

DAG dag = buildDag();
jet.newJob(dag).join();

Job submission is a fire-and-forget action: once a client submits it, the job has a life of its own independent of the submitter. It can disconnect and any other client or Jet member can obtain its own Job instance that controls the same job.

You can use the same API to submit a job from either a client machine or directly on an instance of a Jet cluster member. The same Pipeline or DAG instance can be submitted for execution many times.

3.4.1. JobConfig

To gain more control over how Jet will run your job, you can pass in a JobConfig instance. For example, you can give your job a human-readable name:

JobConfig cfg = new JobConfig();
cfg.setName("my-job");
jet.newJob(pipeline, cfg);

3.4.2. Remember that a Jet Job is Distributed

The API to submit a job to Jet is in a way deceptively simple: "just call a method." As long as you’re toying around with Jet instances started locally in a single JVM, everything will indeed work. However, as soon as you try to deploy to an actual cluster, you’ll face the consequences of the fact that your job definition must travel over the wire to reach remote members which don’t have your code on their classpath.

Your custom code must be packaged with the Jet job. For simple examples you can have everything in a single class and use code like this:

class JetExample {
    static Job createJob(JetInstance jet) {
        JobConfig jobConfig = new JobConfig();
        jobConfig.addClass(JetExample.class);
        return jet.newJob(buildPipeline(), jobConfig);
    }

    static Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        // ...
        return p;
    }
}

If you forget to do this, or don’t add all the classes involved, you may get a quite confusing exception:

java.lang.ClassCastException:
cannot assign instance of java.lang.invoke.SerializedLambda
to field com.hazelcast.jet.core.ProcessorMetaSupplier$1.val$addressToSupplier
of type com.hazelcast.jet.function.DistributedFunction
in instance of com.hazelcast.jet.core.ProcessorMetaSupplier$1

SerializedLambda actually declares readResolve(), which would normally transform it into an instance of the correct functional interface type. If this method throws an exception, Java doesn’t report it but keeps the SerializedLambda instance and continues the deserialization. Later in the process it will try to assign it to a field whose type is the target type of the lambda (DistributedFunction in the example above) and at that point it will fail with the ClassCastException. If you see this kind of error, double-check the list of classes you have added to the Jet job.

For more complex jobs it will become more practical to first package the job in a JAR and then use a command-line utility to submit it, as explained next.

3.4.3. Submit a Job from the Command Line

Jet comes with the jet-submit.sh script, which allows you to submit a Jet job packaged in a JAR file. You can find it in the Jet distribution zipfile, in the bin directory. On Windows use jet-submit.bat. To use it, follow these steps:

  • Write your main() method and your Jet code the usual way, but call JetBootstrap.getInstance() instead of Jet.newJetClient() to acquire a Jet client instance.

  • Create a runnable JAR which declares its Main-Class in MANIFEST.MF.

  • Run your JAR, but instead of java -jar jetjob.jar use jet-submit.sh jetjob.jar.

  • The script will create a Jet client and configure it from hazelcast-client.xml located in the config directory of Jet’s distribution. Adjust that file to suit your needs.

For example, write a class like this:

class CustomJetJob {
    public static void main(String[] args) {
        JetInstance jet = JetBootstrap.getInstance();
        jet.newJob(buildPipeline()).join();
    }

    static Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        // ...
        return p;
    }
}

After building the JAR, submit the job:

$ jet-submit.sh jetjob.jar

3.4.4. Manage a Submitted Job

jet.newJob() and jet.getJob(jobId) return a Job object, which you can use to monitor the job and change its status:

  • Job.suspend: the job will stop running, but its metadata will be kept and it can be resumed later. Use this for example if you want to restart the members one by one and you don’t want the job to restart multiple times in the meantime

  • Job.resume: resumes a previously suspended job

  • Job.restart: stops and restarts the job to make use of new members added to the cluster (if automatic scaling is disabled)

  • Job.cancel: the job will stop running and will be marked as completed. It cannot be restarted later

You can also get the job’s name, configuration, and submission time via job.getName(), job.getConfig(), and job.getSubmissionTime(). job.getStatus() will give you the current status of the job (running, failed, completed etc.). You can also call job.getFuture() to block until the job completes and then get its final outcome (either success or failure).

Jet does not support canceling the job with future.cancel(), instead you must call job.cancel(). This is due to the mismatch in the semantics between future.cancel() on one side and job.cancel() plus job.getStatus() on the other: the future immediately transitions to “completed by cancellation”, but it will take some time until the actual job in the cluster changes to that state. Not to confuse the users with these differences we decided to make future.cancel() fail with an exception.

3.4.5. Get a List of All Submitted Jobs

Jet keeps an inventory of all the jobs submitted to it, including those that have already completed. Access the full list with jet.getJobs(). You can use any Job instance from that list to monitor and manage a job, whether it was you or some other client that submitted it.

This example tells you what Jet has been up to in the last five minutes:

int total = 0;
int completed = 0;
int failed = 0;
int inProgress = 0;
long fiveMinutesAgo = System.currentTimeMillis() - MINUTES.toMillis(5);
for (Job job : jet.getJobs()) {
    if (job.getSubmissionTime() < fiveMinutesAgo) {
        continue;
    }
    total++;
    switch (job.getStatus()) {
        case COMPLETED:
            completed++;
            break;
        case FAILED:
            failed++;
            break;
        default:
            inProgress++;
    }
    System.out.format(
        "In the last five minutes %d jobs were submitted to Jet, of which %d" +
        " already completed, %d jobs failed, and %d jobs are still running.%n",
        total, completed, failed, inProgress);
}

To only return all jobs submitted with a particular name, you can call jet.getJobs(name), or jet.getJob(name) to get just the latest one.

Here’s how you can check the statistics on a job named my-job:

List<Job> myJobs = jet.getJobs("my-job");
long failedCount = myJobs.stream().filter(j -> j.getStatus() == FAILED).count();
System.out.format("Jet ran 'my-job' %d times and it failed %d times.%n",
        myJobs.size(), failedCount);

Note: data about about completed jobs are evicted after 7 days.

3.5. Configure Fault Tolerance

Jet has features to allow an unbounded stream processing job to proceed correctly in the face of Jet members failing and leaving the cluster.

Jet takes snapshots of the entire state of the computation at regular intervals. It coordinates the snapshot across the cluster and synchronizes it with a checkpoint on the data source. The source must ensure that, in the case of a restart, it will be able to replay all the data it emitted after the last checkpoint. Each of the other components in the job will restore its processing state to exactly what it was at the time of the last snapshot. If a cluster member goes away, Jet will restart the job on the remaining members, rewind the sources to the last checkpoint, restore the state of processing from the last snapshot, and then seamlessly continue from that point.

3.5.1. Exactly-Once

"Exactly-once processing" means the output is consistent with processing each stream item exactly once. This is the ultimate guarantee you can ask for.

As of version 0.6, Hazelcast Jet supports exactly-once processing with the source being either a Hazelcast IMap journal or a Kafka topic, and the sink being a Hazelcast IMap.

If you configure Jet for exactly-once but use Kafka as the sink, after a job restart you may get duplicates in the output. As opposed to doubly processing an input item, this is more benign because it just means getting the exact same result twice.

3.5.2. At-Least-Once

"At-least-once processing" means the output is consistent with processing each stream item at least once. Some items may get processed again after a restart, as if they represented new events. This is a lesser guarantee that Jet can deliver at a higher throughput and lower latency. In some cases it is good enough.

In some other cases, however, duplicate processing of data items can have quite surprising consequences. There is more information about this in our Under the Hood chapter.

3.5.3. Enable Fault Tolerance

Fault tolerance is off by default. To activate it for a job, create a JobConfig object and set the processing guarantee. You can also configure snapshot interval.

JobConfig jobConfig = new JobConfig();
jobConfig.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
jobConfig.setSnapshotIntervalMillis(SECONDS.toMillis(10));

Using less frequent snapshots, more data will have to be replayed and the temporary spike in the latency of the output will be greater. More frequent snapshots will reduce the throughput and introduce more latency variation during regular processing.

3.5.4. Automatic Elasticity

You can configure the behavior of what will happen when members are added or removed from the cluster.

  • If auto-scaling is enabled and a member is added or removed, the job will automatically restart. In case of member addition, it will restart after a delay.

  • If auto-scaling is disabled and a member is added, Jet takes no action and the job will not use the added member; you have to manually restart it. If a member is removed (after a shutdown or a failure), Jet suspends the job. You have to manually resume it.

By default, auto-scaling is enabled.

3.5.5. Level of Safety

Jet doesn’t delegate its fault tolerance to an outside system, it backs up the state to its own IMap objects. IMap is a replicated in-memory data structure, storing each key-value pair on a configurable number of cluster members. By default it will make a single backup copy, resulting in a system that tolerates the failure of a single member at a time. You can tweak this setting when starting Jet, for example increase the backup count to two:

JetConfig config = new JetConfig();
config.getInstanceConfig().setBackupCount(2);
JetInstance instance = Jet.newJetInstance(config);

Note: if multiple members are lost simultaneously, some data from the backing IMaps can be lost. This is not currently checked and the job will restart with some state data from the snapshot missing, or it might fail if classpath resources were added and are missing. We plan to address this in future releases.

3.5.6. Split-Brain Protection

A specific kind of failure is a so-called "split brain". It happens on network failure when a member or members think the other members left the cluster, but in fact they still run, but don’t see each other over the network. Now we have two or more fully-functioning Jet clusters where there was supposed to be one. Each one will recover and restart the same Jet job, making it to run multiple times.

Hazelcast Jet offers a mechanism to reduce this hazard: split-brain protection. It works by ensuring that a job can be restarted only in a cluster whose size is more than half of what it was before the job was suspended. Enable split-brain protection like this:

jobConfig.setSplitBrainProtection(true);

If there’s an even number of members in your cluster, this may mean the job will not be able to restart at all if the cluster splits into two equally-sized parts. We recommend having an odd number of members.

Note also that you should ensure there is no split-brain condition at the moment you are introducing new members to the cluster. If that happens, both sub-clusters may grow to more than half of the previous size. This will defuse the split-brain protection mechanism.

3.6. Performance Considerations

3.6.1. Standard Java Serialization is Slow

When it comes to serializing the description of a Jet job, performance is not critical. However, for the data passing through the pipeline, the cost of the serialize-deserialize cycle can easily dwarf the cost of actual data transfer, especially on high-end LANs typical for data centers. In this context the performance of Java serialization is so poor that it regularly becomes the bottleneck. This is due to its heavy usage of reflection, overheads in the serialized form, etc.

Since Hazelcast IMDG faced the same problem a long time ago, we have mature support for optimized custom serialization and in Jet you can use it for stream data. In essence, you must implement a StreamSerializer for the objects you emit from your processors and register it in Jet configuration:

SerializerConfig serializerConfig = new SerializerConfig()
        .setImplementation(new MyItemSerializer())
        .setTypeClass(MyItem.class);
JetConfig config = new JetConfig();
config.getHazelcastConfig().getSerializationConfig()
      .addSerializerConfig(serializerConfig);
JetInstance jet = Jet.newJetInstance(config);

Consult the chapter on custom serialization in Hazelcast IMDG’s reference manual for more details.

Note the limitation implied here: the serializers must be registered with Jet on startup because this is how it is supported in Hazelcast IMDG. There is a plan to improve this and allow serializers to be registered on individual Jet jobs.

3.6.2. Capacity of the Concurrent Queues

By default, Jet runs each internal DAG vertex, roughly equivalent to each step of the computation (such as map or aggregate), at maximum parallelism (equal to the number of CPU cores). This means that even a single Jet job uses quite a lot of parallel tasks. Since Jet’s cooperative tasklets are very cheap to switch between, there’s almost no overhead from this. However, every pair of tasklets that communicate uses a dedicated 1-to-1 concurrent queue so the number of queues scales with the square of the number of CPU cores. The default queue capacity is 1024, which translates to 4-8 kilobytes RAM overhead per tasklet pair and potentially a lot of data items in flight before the queues fill up.

If you experience RAM shortage on the Jet cluster, consider lowering the queue size. This is how you set the default queue size for the whole Jet cluster:

JetConfig cfg = new JetConfig();
cfg.getDefaultEdgeConfig().setQueueSize(128);
JetInstance jet = Jet.newJetInstance(cfg);

You can also set queue sizes individually on each Core API DAG edge. You must first convert your pipeline to the Core DAG, apply the configuration, and then submit the DAG for execution:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("a")).setName("source")
 .map(String::toLowerCase)
 .drainTo(Sinks.list("b"));

DAG dag = p.toDag();
dag.getOutboundEdges("source").get(0)
   .setConfig(new EdgeConfig().setQueueSize(128));

jet.newJob(dag);

3.7. Monitor Execution and Diagnose Problems

3.7.1. Configure Logging

Jet, like Hazelcast IMDG, does not depend on a specific logging framework and has built-in adapters for a variety of logging frameworks. You can also write a new adapter to integrate with loggers Jet doesn’t natively support. To use one of the built-in adapters, set the hazelcast.logging.type property to one of the following:

  • jdk: java.util.logging (default)

  • log4j: Apache Log4j

  • log4j2: Apache Log4j 2

  • slf4j: SLF4J

  • none: Turn off logging

For example, to configure Jet to use Log4j, you can do one of the following:

System.setProperty("hazelcast.logging.type", "log4j");

or

JetConfig config = new JetConfig();
config.getHazelcastConfig()
      .setProperty("hazelcast.logging.type", "log4j");

For more detailed information about how to configure logging, please refer to the IMDG reference manual.

3.7.2. Inspect Output of Individual Stages

While debugging your pipeline you’ll want to see the output of an individual stage. You can achieve it by using the peek() stage. For example:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("inputList"))
 .flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
 .filter(word -> !word.isEmpty())
 .peek() (1)
 .groupingKey(wholeItem())
 .aggregate(counting())
 .drainTo(Sinks.map("counts"));
1 Captures all the word tokens entering the aggregating stage

If you run it like this:

JetInstance jet = Jet.newJetInstance();
try {
    jet.getList("inputList")
       .addAll(asList("The quick brown fox", "jumped over the lazy dog"));
    jet.newJob(p).join();
} finally {
    Jet.shutdownAll();
}

this is how your output may look:

Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#1
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: quick
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#2
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: brown
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#0
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: the
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#4
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: dog
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#3
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: lazy
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#0
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: jumped
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#2
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: the
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#1
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: over
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#3
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: fox

The logger name of com.hazelcast.jet.impl.processor.PeekWrappedP.filter#1 consists of the following parts:

  • com.hazelcast.jet.impl.processor.PeekWrappedP: the type of the processor writing the log message

  • filter: the name of the vertex the processor belongs to

  • #0: index of the processor within the vertex. The index is unique cluster-wide.

For more information about logging when using the Core API, see the Best Practices section.

3.8. Management Center

Hazelcast Jet Management Center is a management and monitoring suite providing a live overview of the Hazelcast Jet cluster. Management Center includes a tool for diagnosing data flow within the running Hazelcast Jet job. It provides a visualization of the computational stages and allows you to peek into the stats across the dataflow graph enabling you to diagnose bottlenecks.

Please refer to the Hazelcast Jet Management Center Reference Manual for installation and usage instructions.

Job Detail

4. The Pipeline API

4.1. The Shape of a Pipeline

The general shape of any data processing pipeline is drawFromSource → transform → drainToSink and the natural way to build it is from source to sink. The Pipeline API follows this pattern. For example,

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("input"))
 .map(String::toUpperCase)
 .drainTo(Sinks.list("result"));

In each step, such as drawFrom or drainTo, you create a pipeline stage. The stage resulting from a drainTo operation is called a sink stage and you can’t attach more stages to it. All others are called compute stages and expect you to attach stages to them.

The API differentiates between batch (bounded) and stream (unbounded) sources and this is reflected in the naming: there is a BatchStage and a StreamStage, each offering the operations appropriate to its kind. In this section we’ll mostly use batch stages, for simplicity, but the API of operations common to both kinds is identical. We’ll explain later on how to apply windowing, which is necessary to aggregate over unbounded streams.

Your pipeline can consist of multiple sources, each starting its own pipeline branch, and you are allowed to mix both kinds of stages in the same pipeline. You can merge the branches with joining transforms. For example, the hash-join transform can join a stream stage with batch stages:

Pipeline p = Pipeline.create();
StreamStage<Trade> trades = p.drawFrom(Sources.mapJournal("trades",
        mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT));
BatchStage<Entry<Integer, Product>> products =
        p.drawFrom(Sources.map("products"));
StreamStage<Tuple2<Trade, Product>> joined = trades.hashJoin(
        products,
        joinMapEntries(Trade::productId),
        Tuple2::tuple2
);

Symmetrically, you can fork the output of a stage and send it to more than one destination:

Pipeline p = Pipeline.create();
BatchStage<String> src = p.drawFrom(Sources.list("src"));
src.map(String::toUpperCase)
   .drainTo(Sinks.list("uppercase"));
src.map(String::toLowerCase)
   .drainTo(Sinks.list("lowercase"));

4.2. Choose Your Data Sources and Sinks

Hazelcast Jet has support for these data sources and sinks:

  • Hazelcast IMap and ICache, both as a batch source of just their contents and their event journal as an infinite source

  • Hazelcast IList (batch)

  • Hadoop Distributed File System (HDFS) (batch)

  • Java Database Connectivity (JDBC) (batch)

  • Java Messaging Services (JMS) queue and topic (infinite stream)

  • Kafka topic (infinite stream)

  • TCP socket (infinite stream)

  • a directory on the filesystem, both as a batch source of the current file contents and an infinite source of append events to the files

  • Apache Avro files (batch)

  • any source/sink you create on your own by using the SourceBuilder and the SinkBuilder.

You can access most of these via the Sources and Sinks utility classes. Kafka, HDFS and Avro connectors are in their separate modules. The source and sink builder factories are in their respective classes.

There’s a dedicated chapter that discusses the topic of data sources and sinks in more detail.

4.3. Basic Transforms: map, filter, flatMap

The simplest kind of transformation is one that can be done on each item individually and independent of other items. The major examples are map, filter and flatMap. We already saw them in use in the previous examples. map transforms each item to another item; filter discards items that don’t match its predicate; and flatMap transforms each item into zero or more output items. You can refer to their Javadoc for finer detail and here we’ll move on to other kinds of transformations.

4.4. Suppress Duplicates

The distinct operation suppresses the duplicates from a stream. If you perform it after adding a grouping key, it emits a single item for every distinct key. The operation works on both batch and stream stages. In the latter case it emits distinct items within a window. Two different windows are processed independently.

In this example we have a batch of Person objects and we choose an arbitrary one from each 5-year age bracket:

Pipeline p = Pipeline.create();
BatchSource<Person> personSource = Sources.list("people");
p.drawFrom(personSource)
 .groupingKey(person -> person.getAge() / 5)
 .distinct()
 .drainTo(Sinks.list("sampleByAgeBracket"));

4.5. Merge Streams

The merge operation combines two pipeline stages in the simplest manner: it just emits all the items from both stages. In this example we merge the trading events from the New York and Tokyo stock exchanges:

Pipeline p = Pipeline.create();
StreamStage<Trade> tradesNewYork = p.drawFrom(Sources.mapJournal(
        "trades-newyork", mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT));
StreamStage<Trade> tradesTokyo = p.drawFrom(Sources.mapJournal(
        "trades-tokyo", mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT));
StreamStage<Trade> merged = tradesNewYork.merge(tradesTokyo);

4.6. Enrich Your Stream

As the data comes in, before you perform any reasoning on it, you must look up and attach to each item all the knowledge you have about it. If the data item represents a trade event, you want the data on the valuable being traded, the buyer, seller, etc. We call this step data enrichment.

Jet offers two basic techniques to enrich your data stream:

  1. Hash-join the stream with one or more datasets that you ingest as bounded streams. If your enriching dataset doesn’t change for the duration of the Jet job, this is your best tool.

  2. Directly look up the enriching data for each item by contacting the system that stores it. If you’re enriching an infinite stream and want to observe updates to the enriching dataset, you should use the this approach.

4.7. Hash-Join

Hash-join is a kind of join operation optimized for the use case of data enrichment. It is like a many-to-one SQL JOIN that matches a foreign key in the stream-to-be-enriched with the primary key in the enriching dataset. You can perform several such joins in one operation, enriching your stream from arbitrarily many sources.

The stream-to-be-enriched (we’ll call in the primary stream for short) can be an unbounded data stream. The enriching streams must be bounded and Jet will consume them in full before starting to enrich the primary stream. It will store their contents in hashtables for fast lookup, which is why we call this the "hash-join".

For each enriching stream you specify a pair of key-extracting functions: one for the enriching item and one for the primary item. This means that you can define a different join key for each of the enriching streams. The following example shows a three-way hash-join between the primary stream of stock trade events and two enriching streams: products and brokers:

Pipeline p = Pipeline.create();

// The primary stream (stream to be enriched): trades
StreamStage<Trade> trades = p.drawFrom(Sources.mapJournal(
        "trades", mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT));

// The enriching streams: products and brokers
BatchStage<Entry<Integer, Product>> prodEntries = p.drawFrom(Sources.map("products"));
BatchStage<Entry<Integer, Broker>> brokEntries = p.drawFrom(Sources.map("brokers"));

// Join the trade stream with the product and broker streams
StreamStage<Tuple3<Trade, Product, Broker>> joined = trades.hashJoin2(
        prodEntries, joinMapEntries(Trade::productId),
        brokEntries, joinMapEntries(Trade::brokerId),
        Tuple3::tuple3
);

Products are joined on Trade.productId and brokers on Trade.brokerId. joinMapEntries() returns a JoinClause, which is a holder of the three functions that specify how to perform a join:

  1. the key extractor for the primary stream’s item

  2. the key extractor for the enriching stream’s item

  3. the projection function that transforms the enriching stream’s item into the item that will be used for enrichment.

Typically the enriching streams will be Map.Entry s coming from a key-value store, but you want just the entry value to appear as the enriching item. In that case you’ll specify Map.Entry::getValue as the projection function. This is what joinMapEntries() does for you. It takes just one function, primary stream’s key extractor, and fills in Entry::getKey and Entry::getValue for the enriching stream key extractor and the projection function, respectively.

In the interest of performance Jet pulls the entire enriching dataset into each cluster member. That’s why this operation is also known as a replicated join. This is something to keep in mind when estimating the RAM requirements for a hash-join operation.

4.7.1. Hash-Join With Four or More Streams Using the Builder

You can hash-join a stream with up to two enriching streams using the API we demonstrated above. If you have more than two enriching streams, you’ll use the hash-join builder. For example, you may want to enrich a trade with its associated product, broker, and market:

Pipeline p = Pipeline.create();

// The stream to be enriched: trades
StreamStage<Trade> trades = p.drawFrom(Sources.mapJournal(
        "trades", mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT));

// The enriching streams: products, brokers and markets
BatchStage<Entry<Integer, Product>> prodEntries =
        p.drawFrom(Sources.map("products"));
BatchStage<Entry<Integer, Broker>> brokEntries =
        p.drawFrom(Sources.map("brokers"));
BatchStage<Entry<Integer, Market>> marketEntries =
        p.drawFrom(Sources.map("markets"));

// Obtain a hash-join builder object from the stream to be enriched
StreamHashJoinBuilder<Trade> builder = trades.hashJoinBuilder();

// Add enriching streams to the builder
Tag<Product> productTag = builder.add(prodEntries,
        joinMapEntries(Trade::productId));
Tag<Broker> brokerTag = builder.add(brokEntries,
        joinMapEntries(Trade::brokerId));
Tag<Market> marketTag = builder.add(marketEntries,
        joinMapEntries(Trade::marketId));

// Build the hash join pipeline
StreamStage<Tuple2<Trade, ItemsByTag>> joined = builder.build(Tuple2::tuple2);

The data type on the hash-joined stage is Tuple2<Trade, ItemsByTag>. The next snippet shows how to use it to access the primary and enriching items:

StreamStage<String> mapped = joined.map((Tuple2<Trade, ItemsByTag> tuple) -> {
    Trade trade = tuple.f0();
    ItemsByTag ibt = tuple.f1();
    Product product = ibt.get(productTag);
    Broker broker = ibt.get(brokerTag);
    Market market = ibt.get(marketTag);
    return trade + ": " + product + ", " + broker + ", " + market;
});

4.8. Enrich Using Direct Lookup

If you’re enriching an infinite stream, you most likely need to observe the changes that happen to the enriching dataset over the long timespan of the Jet job. In this case you can’t use the hash-join, which basically takes a snapshot of the enriching dataset at the beginning of the job. You may also encounter RAM shortage if your enriching dataset is very large.

The xUsingY transforms (such as mapUsingIMap or filterUsingContext) can enrich a stream by looking up from the original data source each time. There’s direct support for Hazelcast maps and Jet exposes the underlying machinery as well so you can write your own code to join with an arbitrary external dataset.

4.8.1. Look Up from Hazelcast Map

Hazelcast Jet allows you to enrich your stream directly from a Hazelcast IMap or ReplicatedMap. Since it must look up the data again for each item, performance is lower than with a hash-join, but the data is kept fresh this way. This matters especially for unbounded streaming jobs, where a hash-join would use data frozen in time at the beginning of the job.

If you enrich from a Hazelcast map (IMap or ReplicatedMap) that is stored inside the Jet cluster, you can achieve data locality. For ReplicatedMap that’s trivial because its entire contents are present on every cluster member. IMap, on the other hand, is partitioned so a given member holds only a part of the data. You must give Jet the key-extracting function so it can do the following for each item in your stream:

  1. extract the lookup key

  2. find its partition ID

  3. send the item to the member holding the IMap data with that partition ID

  4. on the target member, use the lookup key again to fetch the enriching data

We didn’t completely avoid the network here, but we replaced the request-response cycle with just a one-way send. We eliminated the cost of the request-response latency and pay just the overhead of network transfer.

In this example we enrich a stream of trades with detailed stock info. The stock info (the enriching dataset) is stored in a Hazelcast IMap so we use groupingKey() to let Jet partition our stream and use local IMap lookups:

IMap<String, StockInfo> stockMap = jet.getMap("stock-info"); (1)
StreamSource<Trade> tradesSource = Sources.mapJournal("trades",
        mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT);

Pipeline p = Pipeline.create();
p.drawFrom(tradesSource)
 .groupingKey(Trade::ticker) (2)
 .mapUsingIMap(stockMap, Trade::setStockInfo) (3)
 .drainTo(Sinks.list("result"));
1 Obtain the IMap from a Hazelcast Jet instance
2 Set the lookup key, this enables data locality
3 Enrich the stream by setting the looked-up StockInfo on Trade items. This syntax works if the setter has fluent API and returns this.

In the example above the syntax looks very simple, but it hides a layer of complexity: you can’t just fetch a Hazelcast map instance and add it to the pipeline. It’s a proxy object that’s valid only in the JVM process where you obtained it. mapUsingIMap actually remembers just the name of the map and will obtain a map with the same name from the Jet cluster where the job runs.

4.8.2. Look Up From an External System

Hazelcast Jet exposes the facility to look up from an external system. In this case you must define a factory object that creates a client instance and fetches the data from the remote system of record.

Jet will use the factory to create a context object for each Processor in the cluster that executes your transforming step. You can also specify a function that extracts the key from your items. Even though Jet won’t do any lookup or grouping by that key, it will set up the job so that all the items with the same key get paired with the same context object.

For example, you may be fetching data from a remote Hazelcast cluster. To optimize performance, you can enable the near-cache on the client and you can save a lot of memory by specifying the key function: since Jet executes your enriching code in parallel, each worker has its own Hazelcast client instance, with its own near-cache. With the key function, each near-cache will hold a non-overlapping subset of the keys:

ContextFactory<IMap<String, StockInfo>> ctxFac = ContextFactory
        .<IMap<String, StockInfo>>withCreateFn(x -> {
            ClientConfig cc = new ClientConfig();
            cc.getNearCacheConfigMap().put("stock-info", new NearCacheConfig());
            return Jet.newJetClient(cc).getMap("stock-info");
        })
        .shareLocally()
        .nonCooperative();
StreamSource<Trade> tradesSource = Sources.mapJournal("trades",
        mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT);

Pipeline p = Pipeline.create();
p.drawFrom(tradesSource)
 .groupingKey(Trade::ticker)
 .mapUsingContext(ctxFac, (map, key, trade) -> trade.setStockInfo(map.get(key)))
 .drainTo(Sinks.list("result"));

In a similar fashion you can integrate other external systems with a Jet pipeline.

4.8.3. Weak Consistency of Direct Lookup

When you use the xUsingY transform to enrich an infinite stream, your output will reflect the fact that the enriching dataset is changing. Two items with the same key may be enriched with different data. This is expected. However, the output may also behave in a surprising way. Say you update the enriching data for some key from A to B. You expect to see a sequence …​A-A-B-B…​ in the output, but when you sort it by timestamp, you can observe any order, such as A-B-A-B or B-B-A-A. This is because Jet doesn’t enforce the processing order to strictly follow the timestamp order of events.

In effect, your lookups will be eventually consistent, but won’t have the monotonic read consistency. Currently there is no feature in Jet that would achieve monotonic reads while enriching an event stream from a changing dataset.

4.9. Group and Aggregate

Data aggregation is the cornerstone of distributed stream processing. It computes an aggregate function (simple examples: sum or average) over the data items. Typically you don’t want to agregate all the items together, but classify them by some key and then aggregate over each group separately. This is the group-and-aggregate transformation. You can join several streams on a key and simultaneously group-aggregate them on the same key. This is the cogroup-and-aggregate transformation. If you’re processing an unbounded event stream, you must define a bounded window over the stream within which Jet will aggregate the data.

This is how a very simple batch aggregation without grouping may look (the list named text contains lines of text):

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("text"))
 .aggregate(counting())
 .drainTo(Sinks.list("result"));

We count all the lines and push the count to a list named “result”. Add this code to try it out:

JetInstance jet = Jet.newJetInstance();
jet.getList("text").addAll(asList("foo foo bar", "foo bar foo"));
jet.newJob(p).join();
jet.getList("result").forEach(System.out::println);
Jet.shutdownAll();

The program will print 2 because we added two items to the text list.

To get cleaner output, without Jet’s logging, add System.setProperty("hazelcast.logging.type", "none"); to the top.

Let’s count all the words instead:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("text"))
 .flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
 .aggregate(counting())
 .drainTo(Sinks.list("result"));

We split up the lines into words using a regular expression. The program prints 6 now because there are six words in the input.

Now let’s turn this into something more insigtful: a word frequency histogram, giving us for each distinct word the number of its occurrences:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("text"))
 .flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
 .groupingKey(wholeItem())
 .aggregate(counting())
 .drainTo(Sinks.list("result"));

We added a grouping key, in this case it is the word itself (DistributedFunctions.wholeItem() returns the identity function). Now the program will perform the counting aggregation on each group of equal words. It will print

bar=2
foo=4

What we’ve just built up is the classical Word Count task.

The definition of the aggregate operation hides behind the counting() method call. This is a static method in our AggregateOperations utility class, which provides you with some predefined aggregate operations. You can also implement your own aggregate operations; refer to the section dedicated to this.

4.10. Correlate and Join Streams

In Jet you can join any number of streams on a common key, which can be anything you can calculate from the data item. This works great for correlating the data from two or more streams. In the same step you apply an aggregate function to all the grouped items, separated by the originating stream. The aggregate function can produce a summary value like an average or linear trend, but it can also keep a list of all the items, effectively avoiding actual aggregation. You can achieve any kind of join: inner/outer/full, left and right. The constraint, as already noted, is that the join condition cannot be fully general, but must amount to matching a computed key.

Here’s a simple example where we calculate the ratio of page visits to add-to-cart actions for each user. We accumulate the event counts and then do the maths on them in the end:

(1)
JetInstance jet = Jet.newJetInstance();
IList<PageVisit> pageVisits = jet.getList("pageVisit");
IList<AddToCart> addToCarts = jet.getList("addToCart");
IList<Entry<Integer, Double>> results = jet.getList("result");

(2)
pageVisits.add(new PageVisit(1));
pageVisits.add(new PageVisit(1));
addToCarts.add(new AddToCart(1));

pageVisits.add(new PageVisit(2));
addToCarts.add(new AddToCart(2));

(3)
Pipeline p = Pipeline.create();
BatchStageWithKey<PageVisit, Integer> pageVisit =
        p.drawFrom(Sources.list(pageVisits))
         .groupingKey(pv -> pv.userId());
BatchStageWithKey<AddToCart, Integer> addToCart =
        p.drawFrom(Sources.list(addToCarts))
         .groupingKey(atc -> atc.userId());
BatchStage<Entry<Integer, Double>> coAggregated =
        pageVisit.aggregate2(counting(),            (4)
            addToCart, counting(),                  (5)
            (userId, visitCount, addCount) ->
                    entry(userId, (double) addCount / visitCount));

(6)
coAggregated.drainTo(Sinks.list(results));
jet.newJob(p).join();
results.forEach(System.out::println);
Jet.shutdownAll();
1 Start Jet, acquire source and sink lists from it
2 Fill the source lists (the numbers 1 and 2 are user IDs)
3 Construct the pipeline
4 Supply the aggregate operation for pageVisits
5 Supply the aggregate operation for addToCarts
6 Run the job, print the results

The way we prepared the data, user 1 made two visits and added one item to the shopping cart; user 2 made one visit and added one item. Given that, we should expect to see the following output:

1=0.5
2=1.0

Note how we supplied a separate aggregate operation for each input stage. We calculate the overall result based on the results obtained for each input stage in isolation. We have a full code sample at our code samples repository.

You also have the option of constructing a two-input aggregate operation that has immediate access to all the items. Refer to the section on AggregateOperation.

4.10.1. Join Without Aggregating

Let’s assume you have these input stages:

BatchStageWithKey<PageVisit, Integer> pageVisit =
        p.drawFrom(Sources.<PageVisit>list("pageVisit"))
         .groupingKey(PageVisit::userId);
BatchStageWithKey<AddToCart, Integer> addToCart =
        p.drawFrom(Sources.<AddToCart>list("addToCart"))
         .groupingKey(AddToCart::userId);

You may have expected to find a joining transform in the Pipeline API that outputs a stream of all matching pairs of items:

BatchStage<Tuple2<PageVisit, AddToCart>> joined = pageVisits.join(addToCarts);

This would more closely match the semantics of an SQL JOIN, but in the context of a Java API it doesn’t work well. For M page visits joined with N add-to-carts, Jet would have to materialize M * N tuples and feed them into the next stage. It would also be forced to buffer all the data from one stream before receiving the other.

To allow you to get the best performance, Jet strongly couples joining with aggregation and encourages you to frame your solution in these terms.

This doesn’t stop you from getting the "exploded" output with all the joined data, it just makes it a less straightforward option. If your use case can’t be solved any other way than by keeping all individual items, you can specify toList() as the aggregate operation and get all the items in lists:

BatchStage<Tuple2<List<PageVisit>, List<AddToCart>>> joinedLists =
    pageVisit.aggregate2(toList(), addToCart, toList(),
        (userId, pageVisits, addToCarts) -> tuple2(pageVisits, addToCarts));

If you need something else than the full join, you can filter out some pairs of lists. In this example we create a LEFT OUTER JOIN by removing the pairs with empty left-hand list:

BatchStage<Tuple2<List<PageVisit>, List<AddToCart>>> leftOuterJoined =
        joinedLists.filter(pair -> !pair.f0().isEmpty());

If you specifically need to get a stream of all the combinations of matched items, you can add a flatmapping stage:

BatchStage<Tuple2<PageVisit, AddToCart>> fullJoined = joinedLists
    .flatMap(pair -> traverseStream(
            nonEmptyStream(pair.f0())
                .flatMap(pVisit -> nonEmptyStream(pair.f1())
                        .map(addCart -> tuple2(pVisit, addCart)))));

We used this helper method:

static <T> Stream<T> nonEmptyStream(List<T> input) {
    return input.isEmpty() ? Stream.of((T) null) : input.stream();
}

4.10.2. Join Four or More Streams Using the Aggregate Builder

If you need to join more than three streams, you’ll have to use the builder object. For example, your goal may be correlating events coming from different systems, where all the systems serve the same user base. In an online store you may have separate event streams for product page visits, adding-to-cart events, payments, and deliveries. You want to correlate all the events associated with the same user. The example below calculates statistics per category for each user:

Pipeline p = Pipeline.create();

(1)
BatchStageWithKey<PageVisit, Integer> pageVisits =
        p.drawFrom(Sources.<PageVisit>list("pageVisit"))
         .groupingKey(PageVisit::userId);
BatchStageWithKey<AddToCart, Integer> addToCarts =
        p.drawFrom(Sources.<AddToCart>list("addToCart"))
         .groupingKey(AddToCart::userId);
BatchStageWithKey<Payment, Integer> payments =
        p.drawFrom(Sources.<Payment>list("payment"))
         .groupingKey(Payment::userId);
BatchStageWithKey<Delivery, Integer> deliveries =
        p.drawFrom(Sources.<Delivery>list("delivery"))
         .groupingKey(Delivery::userId);

(2)
GroupAggregateBuilder<Integer, List<PageVisit>> builder =
        pageVisits.aggregateBuilder(toList());

(3)
Tag<List<PageVisit>> visitTag = builder.tag0();
Tag<List<AddToCart>> cartTag = builder.add(addToCarts, toList());
Tag<List<Payment>> payTag = builder.add(payments, toList());
Tag<List<Delivery>> deliveryTag = builder.add(deliveries, toList());

(4)
BatchStage<String> coGrouped = builder.build((key, ibt) ->
        String.format("User ID %d: %d visits, %d add-to-carts," +
                        " %d payments, %d deliveries",
                key, ibt.get(visitTag).size(), ibt.get(cartTag).size(),
                ibt.get(payTag).size(), ibt.get(deliveryTag).size()));
1 Create four source streams
2 Obtain a builder object for the co-group transform, specify the aggregate operation for PageVisits
3 Add the co-grouped streams to the builder, specifying the aggregate operation to perform on each
4 Build the co-group transform, retrieve the individual aggregation results using the tags you got in step 3 (ibt is an ItemsByTag)

4.11. Windowed Aggregation

The process of data aggregation takes a finite batch of data and produces a result. We can make it work with an infinite stream if we break up the stream into finite chunks. This is called windowing.

Let’s demonstrate windowing on the example of the Word Count task. If you have a finite batch of tweets you want to analyze for word frequencies, this is how the pipeline can look:

BatchStage<String> tweets = p.drawFrom(Sources.list("tweets"));

tweets.flatMap(tweet -> traverseArray(tweet.toLowerCase().split("\\W+")))
      .filter(word -> !word.isEmpty())
      .groupingKey(wholeItem())
      .aggregate(counting())
      .drainTo(Sinks.map("counts"));

To make it work for an infinite stream, you have to add two things — event timestamps and the specification of the window:

StreamStage<String> tweets = p.drawFrom(Sources.mapJournal("tweets",
        mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT));

tweets.flatMap(tweet -> traverseArray(tweet.toLowerCase().split("\\W+")))
      .filter(word -> !word.isEmpty())
      .addTimestamps()
      .window(sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
      .groupingKey(wholeItem())
      .aggregate(counting())
      .drainTo(Sinks.list("result"));

Now we’ve got a Jet job that does live tracking of words currently trending in tweets. The sliding window definition tells Jet to aggregate the events that occurred within the last minute and update this result every second.

We employed the simplest way of dealing with the notion of time: we ignored the time when the event actually happened and just slapped the current time on it. Usually the event contains its own timestamp and Jet must honor it. To achieve that you must pass a function that extracts the timestamp from the event. When you do that, you must also deal with the fact that events can arrive in an order different from their timestamp order. We discuss these concerns in the Jet Concepts chapter.

Here’s an example with Tweet objects that carry their own timestamp:

Pipeline p = Pipeline.create();
p.<Tweet>drawFrom(Sources.mapJournal("tweets",
        mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT))
 .flatMap(tweet -> traverseArray(tweet.text().toLowerCase().split("\\W+"))
         .map(word -> new TweetWord(tweet.timestamp(), word)))
 .filter(tweetWord -> !tweetWord.word().isEmpty())
 .addTimestamps(TweetWord::timestamp, SECONDS.toMillis(5))
 .window(sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
 .groupingKey(TweetWord::word)
 .aggregate(counting())
 .drainTo(Sinks.list("result"));

In the line .addTimestamps(TweetWord::timestamp, SECONDS.toMillis(5)) we specified two things: how to extract the timestamp and how much event lag we want to tolerate. We said that the timestamp of an event we receive can be at most five seconds behind the highest timestamp we already received. This also means that Jet will have to delay emitting a windowed result until it receives an event that’s five seconds past the window’s end, so the latency will typically be five seconds or more.

Notice how we had to struggle a bit to hold on to the timestamp through the flatmapping transformation. We needed two classes: Tweet and TweetWord. We can avoid this need if we assign timestamps earlier on:

Pipeline p = Pipeline.create();
p.<Tweet>drawFrom(Sources.mapJournal("tweets",
        mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT))
 .addTimestamps(Tweet::timestamp, SECONDS.toMillis(5))
 .flatMap(tweet -> traverseArray(tweet.text().toLowerCase().split("\\W+")))
 .filter(word -> !word.isEmpty())
 .window(sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
 .groupingKey(wholeItem())
 .aggregate(counting())
 .drainTo(Sinks.list("result"));

Now the flatMap stage emits just the words as strings. This will work correctly because Jet keeps the event timestamps internally and correlates the input item with all the output items of a transformation such as flatMap and filter, propagating the timestamp value.

When you set the timestamps as the first thing after a source stage, Jet is able to push the timestamping logic into the source. Some sources, especially partitioned ones such as Kafka, have advanced logic that keeps track of the timestamps on each partition and knows how to reconcile the temporary differences in timestamps that occur between them. This often results in less latency and less events dropped as late.

4.12. Kinds of Windows

Jet supports these kinds of windows:

  • tumbling window: a window of constant size that "tumbles" along the time axis: the consecutive positions of the window don’t overlap. If you use a window size of 1 second, Jet will group together all events that occur within the same second and you’ll get window results for intervals [0-1) seconds, then [1-2) seconds, and so on.

  • sliding window: a window of constant size that slides along the time axis. It slides in discrete steps that are a fraction of the window’s length. A typical setting is to slide by 1% of the window size. Jet outputs the aggregation result each time the window moves on. If you use a window of size 1 second sliding by 10 milliseconds, Jet will output window results for intervals [0.00-1.00) seconds, then [0.01-1.01) seconds, and so on.

  • session window: it captures a burst of events separated by periods of quiescence. You define the "session timeout", i.e., the length of the quiet period that causes the window to close. If you define a grouping key, there is a separate, independent session window for each key.

4.13. Rolling Aggregation

Jet supports a way to aggregate an unbounded stream without windowing: for each input item you get the current aggregation value as output, as if this item was the last one to be aggregated. You use the same AggregateOperation implementations that work with Jet’s aggregate API. Note that Jet doesn’t enforce processing in the order of event time; what you get accounts for the items that Jet happens to have processed so far.

This kind of aggregation is useful in jobs that monitor a stream for extremes or totals. For example, if you have a stream of web server monitoring metrics, you can keep track of the worst latency ever experienced, the highest throughput seen, total number of transactions processed, and so on.

In the following example we process a stream of trading events and get the "largest" trade seen so far (with the highest worth in dollars). Note that the rolling aggregation outputs an item every time, not just after a new record-breaking trade.

Pipeline p = Pipeline.create();
StreamSource<Trade> tradesSource = Sources.mapJournal("trades",
        mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT);
StreamStage<Trade> currLargestTrade =
        p.drawFrom(tradesSource)
         .rollingAggregate(maxBy(DistributedComparator.comparing(Trade::worth)));

4.14. Pipeline API Cheatsheet

Transform

Sample

Map

Apply a mapping function to each input item independently. You can also do filtering by mapping to null.

This example converts the input lines of text to lowercase:

BatchStage<String> lines = p.drawFrom(list("lines"));
BatchStage<String> lowercased = lines.map(String::toLowerCase);

Filter

Apply a filtering function to each input item to decide whether to pass it to the output.

This example removes all empty strings from the stream:

BatchStage<String> lines = p.drawFrom(list("lines"));
BatchStage<String> nonEmpty = lines.filter(string -> !string.isEmpty());

Flatmap

Map each item to arbitrarily many items using a function that returns a Traverser over the result items.

This example splits the lines of text into individual words:

BatchStage<String> lines = p.drawFrom(list("lines"));
BatchStage<String> words = lines.flatMap(
        line -> traverseArray(line.split("\\W+")));

Distinct

Suppress duplicate items from a stream. If you apply a grouping key, two items mapping to the same key will be duplicates. This operation applies primarily to batch streams, but also works on a windowed unbounded stream.

This example takes a batch stage with strings and creates a stage with distinct strings and another where the four-character prefix of the strings is unique:

BatchStage<String> strings = someStrings();
BatchStage<String> distinctStrings = strings.distinct();
BatchStage<String> distinctByPrefix =
        strings.groupingKey(s -> s.substring(0, 4)).distinct();

Merge

Merge the contents of two streams into one. The item type in the right-hand stage must be the same or a subtype of the one in the left-hand stage.

This example merges the streams from New York and Tokyo stock exchanges:

StreamStage<Trade> tradesNewYork = trades("new-york");
StreamStage<Trade> tradesTokyo = trades("tokyo");
StreamStage<Trade> tradesNyAndTokyo =
        tradesNewYork.merge(tradesTokyo);

Enrich by Many-to-One Join

Perform a many-to-one join with arbitrarily many enriching streams. The stream on which you invoke hashJoin holds foreign keys for the items in the enriching streams.

This example enriches a stream of stock trades with detailed info on the stock involved:

BatchStage<Trade> trades = p.drawFrom(list("trades"));
BatchStage<Entry<String, StockInfo>> stockInfo =
        p.drawFrom(list("stockInfo"));
BatchStage<Trade> joined = trades.hashJoin(stockInfo,
        joinMapEntries(Trade::ticker), Trade::setStockInfo);

Enrich by Map Lookup

For each stream item, look up a value from a Hazelcast map and transform (enrich) the item using it. Similar to a hash-join with the map’s entry set, but values don’t get stale (at the expense of throughput).

This example enriches a stream of stock trades with detailed info on the stock involved:

StreamSource<Trade> tradesSource = Sources.mapJournal("trades",
        mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT);
IMap<String, StockInfo> stockMap = jet.getMap("stock-info");

Pipeline p = Pipeline.create();
p.drawFrom(tradesSource)
 .groupingKey(Trade::ticker)
 .mapUsingIMap(stockMap, Trade::setStockInfo)
 .drainTo(Sinks.list("result"));

Aggregate

Aggregates all the stream items with the AggregateOperation you supply.

This example counts the stream items:

BatchStage<String> lines = p.drawFrom(list("lines"));
BatchStage<Long> count = lines.aggregate(counting());

Group and Aggregate

Group the items by key and perform an AggregateOperation on each group.

This example calculates the number of occurrences of each word in the stream:

BatchStage<String> words = p.drawFrom(list("words"));
BatchStage<Entry<String, Long>> wordsAndCounts =
        words.groupingKey(wholeItem())
             .aggregate(counting());

Windowed Group and Aggregate

Perform grouping and aggregation on an unbounded stream by splitting it into bounded windows.

This example calculates the number of occurrences of each word in a stream of tweets within the last second:

StreamStage<Entry<Long, String>> tweetWords = p.drawFrom(
        Sources.mapJournal("tweet-words", START_FROM_OLDEST));
StreamStage<TimestampedEntry<String, Long>> wordFreqs =
        tweetWords.addTimestamps(e -> e.getKey(), 1000)
                  .window(sliding(1000, 10))
                  .groupingKey(entryValue())
                  .aggregate(counting());

Join on Common Key

Perform a many-to-many join of several streams on a common key. Apply an AggregateOperation on each group. As a special case, you can specify an aggregate operation that gives you all the joined items without transformation.

This example joins a "page visits" stream with a "payments" stream in a Web Shop application. For each user it gives you all the recorded page views and payments:

BatchStageWithKey<PageVisit, Integer> pageVisits =
        p.drawFrom(Sources.<PageVisit>list("pageVisit"))
         .groupingKey(pageVisit -> pageVisit.userId());
BatchStageWithKey<Payment, Integer> payments =
        p.drawFrom(Sources.<Payment>list("payment"))
         .groupingKey(payment -> payment.userId());
BatchStage<Entry<Integer, Tuple2<List<PageVisit>, List<Payment>>>>
    joined = pageVisits.aggregate2(toList(), payments, toList());

Streaming Join on a Common Key

Like the above, but also apply a window to the unbounded stream. It joins all the items belonging to the same window.

This example joins two unbounded streams, "page visits" and "payments". For each user it gives you all the page views and payments that they performed within the last minute and updates the result every second:

StreamStageWithKey<PageVisit, Integer> pageVisits =
        p.<PageVisit>drawFrom(Sources.mapJournal("pageVisits",
                mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
                .addTimestamps(PageVisit::timestamp, 1000)
                .groupingKey(PageVisit::userId);
StreamStageWithKey<Payment, Integer> payments =
        p.<Payment>drawFrom(Sources.mapJournal("payments",
                mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
                .addTimestamps(Payment::timestamp, 1000)
                .groupingKey(Payment::userId);
StreamStage<TimestampedEntry<Integer,
                            Tuple2<List<PageVisit>, List<Payment>>>>
    joined = pageVisits.window(sliding(60_000, 1000))
                       .aggregate2(toList(), payments, toList());

Rolling Aggregation

Keep performing the same aggregate operation forever, getting the current result after each item.

This example tracks the largest trade observed in a stream:

Pipeline p = Pipeline.create();
StreamSource<Trade> tradesSource = Sources.mapJournal("trades",
        mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT);
StreamStage<Trade> currLargestTrade =
        p.drawFrom(tradesSource)
         .rollingAggregate(maxBy(comparing(Trade::worth)));

4.15. Implement Your Aggregate Operation

The single most important kind of processing Jet does is aggregation. In general it is a transformation of a set of input values into a single output value. The function that does this transformation is called the “aggregate function”. A basic example is sum applied to a set of integer numbers, but the result can also be a complex value, for example a list of all the input items.

Jet’s library contains a range of predefined aggregate functions, but it also exposes an abstraction, called AggregateOperation, that allows you to plug in your own. Since Jet does the aggregation in a parallelized and distributed way, you can’t simply supply a piece of Java code that implements the aggregate function; we need you to break it down into several smaller pieces that fit into Jet’s processing engine.

The ability to compute the aggregate function in parallel comes at a cost: Jet must be able to give a slice of the total data set to each processing unit and then combine the partial results from all the units. The combining step is crucial: it will only make sense if we’re combining the partial results of a commutative associative function (CA for short). On the example of sum this is trivial: we know from elementary school that + is a CA operation. If you have a stream of numbers: {17, 37, 5, 11, 42}, you can sum up {17, 5} separately from {42, 11, 37} and then combine the partial sums (also note the reordering of the elements).

If you need something more complex, like average, it doesn’t by itself have this property; however if you add one more ingredient, the finish function, you can express it easily. Jet allows you to first compute some CA function, whose partial results can be combined, and then at the very end apply the finish function on the fully combined result. To compute the average, your CA function will output the pair (sum, count). Two such pairs are trivial to combine by summing each component. The finish function will be sum / count.

In addition to the mathematical side, there is also the practical one: you have to provide Jet with a specific mutable object, called the accumulator, which will keep the “running score” of the operation in progress. For the average example, it would be something like

public class AvgAccumulator {
    private long sum;
    private long count;

    public void accumulate(long value) {
        sum += value;
        count++;
    }

    public void combine(AvgAccumulator that) {
        this.sum += that.sum;
        this.count += that.sum;
    }

    public double finish() {
        return (double) sum / count;
    }
}

This object will also have to be serializable, and preferably with Hazelcast’s serialization instead of Java’s because in a group-and-aggregate operation there’s one accumulator per each key and all of them have to be sent across the network to be combined and finished.

Instead of requiring you to write a complete class from scratch, Jet separates the concern of holding the accumulated state from that of the computation performed on it. This means that you just need one accumulator class for each kind of structure that holds the accumulated data, as opposed to one for each aggregate operation. Jet’s library offers in the com.hazelcast.jet.accumulator package several such classes, one of them being LongLongAccumulator, which is a match for our average function. You’ll just have to supply the logic on top of it.

Specifically, you have to provide a set of six functions (we call them “primitives”):

  • create a new accumulator object.

  • accumulate the data of an item by mutating the accumulator’s state.

  • combine the contents of the right-hand accumulator into the left-hand one.

  • deduct the contents of the right-hand accumulator from the left-hand one (undo the effects of combine).

  • finish accumulation by transforming the accumulator object into the final result.

  • export the result of aggregation in a way that’s not destructive for the accumulator (used in rolling aggregations).

We already mentioned most of these above. The deduct primitive is optional and Jet can manage without it, but if you are computing a sliding window over an infinite stream, this primitive can give a significant performance boost because it allows Jet to reuse the results of the previous calculations.

In a similar fashion Jet discerns between the export and finish primitives for optimization purposes. Every function that works as the export primitive will also work as finish, but you can specify a different finish that reuses the state already allocated in the accumulator. Jet applies finish only if it will never again use that accumulator.

If you happen to have a deeper familiarity with JDK’s java.util.stream API, you’ll find AggregateOperation quite similar to Collector, which is also a holder of several functional primitives. Jet’s definitions are slightly different, though, and there are the additional optimizing primitives we just mentioned.

Let’s see how this works with our average function. Using LongLongAccumulator we can express our accumulate primitive as

(acc, n) -> {
    acc.set1(acc.get1() + n);
    acc.set2(acc.get2() + 1);
}

The export/finish primitive will be

acc -> (double) acc.get1() / acc.get2()

Now we have to define the other three primitives to match our main logic. For create we just refer to the constructor: LongLongAccumulator::new. The combine primitive expects you to update the left-hand accumulator with the contents of the right-hand one, so:

(left, right) -> {
    left.set1(left.get1() + right.get1());
    left.set2(left.get2() + right.get2());
}

Deducting must undo the effect of a previous combine:

(left, right) -> {
    left.set1(left.get1() - right.get1());
    left.set2(left.get2() - right.get2());
}

All put together, we can define our averaging operation as follows:

AggregateOperation1<Long, LongLongAccumulator, Double> aggrOp = AggregateOperation
        .withCreate(LongLongAccumulator::new)
        .<Long>andAccumulate((acc, n) -> {
            acc.set1(acc.get1() + n);
            acc.set2(acc.get2() + 1);
        })
        .andCombine((left, right) -> {
            left.set1(left.get1() + right.get1());
            left.set2(left.get2() + right.get2());
        })
        .andDeduct((left, right) -> {
            left.set1(left.get1() - right.get1());
            left.set2(left.get2() - right.get2());
        })
        .andExportFinish(acc -> (double) acc.get1() / acc.get2());

Let’s stop for a second to look at the type we got: AggregateOperation1<Long, LongLongAccumulator, Double>. Its type parameters are:

  1. Long: the type of the input item

  2. LongLongAccumulator: the type of the accumulator

  3. Double: the type of the result

Specifically note the 1 at the end of the type’s name: it signifies that it’s the specialization of the general AggregateOperation to exactly one input stream. In Hazelcast Jet you can also perform a co-aggregating operation, aggregating several input streams together. Since the number of input types is variable, the general AggregateOperation type cannot statically capture them and we need separate subtypes. We decided to statically support up to three input types; if you need more, you’ll have to resort to the less type-safe, general AggregateOperation.

4.15.1. Aggregating over multiple inputs

Hazelcast Jet can join several streams and simultaneously perform aggregation on all of them. You specify a separate aggregate operation for each input stream and have the opportunity to combine their results when done. You can use aggregate operations provided in the library (see the section on co-aggregation for an example).

If you cannot express your aggregation logic using this approach, you can also specify a custom multi-input aggregate operation that can combine the items into the accumulator immediately as it receives them.

We’ll present a simple example on how to build a custom multi-input aggregate operation. Note that the same logic can also be expressed using separate single-input operations; the point of the example is introducing the API.

Say we are interested in the behavior of users in an online shop application and want to gather the following statistics for each user:

  1. total load time of the visited product pages

  2. quantity of items added to the shopping cart

  3. amount paid for bought items

This data is dispersed among separate datasets: PageVisit, AddToCart and Payment. Note that in each case we’re dealing with a simple sum applied to a field in the input item. We can perform a cogroup-and-aggregate transform with the following aggregate operation:

Pipeline p = Pipeline.create();
BatchStage<PageVisit> pageVisit = p.drawFrom(Sources.list("pageVisit"));
BatchStage<AddToCart> addToCart = p.drawFrom(Sources.list("addToCart"));
BatchStage<Payment> payment = p.drawFrom(Sources.list("payment"));

AggregateOperation3<PageVisit, AddToCart, Payment, LongAccumulator[], long[]>
    aggrOp = AggregateOperation
        .withCreate(() -> new LongAccumulator[] {
                new LongAccumulator(),
                new LongAccumulator(),
                new LongAccumulator()
        })
        .<PageVisit>andAccumulate0((accs, pv) -> accs[0].add(pv.loadTime()))
        .<AddToCart>andAccumulate1((accs, atc) -> accs[1].add(atc.quantity()))
        .<Payment>andAccumulate2((accs, pm) -> accs[2].add(pm.amount()))
        .andCombine((accs1, accs2) -> {
            accs1[0].add(accs2[0]);
            accs1[1].add(accs2[1]);
            accs1[2].add(accs2[2]);
        })
        .andExportFinish(accs -> new long[] {
                accs[0].get(),
                accs[1].get(),
                accs[2].get()
        });

BatchStage<Entry<Integer, long[]>> coGrouped =
        pageVisit.groupingKey(PageVisit::userId)
                 .aggregate3(
                         addToCart.groupingKey(AddToCart::userId),
                         payment.groupingKey(Payment::userId),
                         aggrOp);

Note how we got an AggregateOperation3 and how it captured each input type. When we use it as an argument to a cogroup-and-aggregate transform, the compiler will ensure that the ComputeStage s we attach it to have the correct type and are in the correct order.

On the other hand, if you use the co-aggregation builder object, you’ll construct the aggregate operation by calling andAccumulate(tag, accFn) with all the tags you got from the co-aggregation builder, and the static type will be just AggregateOperation. The compiler won’t be able to match up the inputs to their treatment in the aggregate operation.

5. Source and Sink Connectors

Jet accesses data sources and sinks via its connectors. They are a computation job’s point of contact with the outside world.

5.1. Concerns

Although the connectors do their best to unify the various kinds of resources under the same “data stream” paradigm, there are still many concerns that need your attention.

5.1.1. Is it Unbounded?

The first decision when building a Jet computation job is whether it will deal with bounded or unbounded data. A typical example of a bounded resource is a persistent storage system, whereas an unbounded one is usually like a FIFO queue, discarding old data. This is true both for sources and sinks.

Bounded data is handled in batch jobs and there are less concerns to deal with. Examples of finite resources are the Hazelcast IMap/ICache and the Hadoop Distributed File System (HDFS). In the unbounded category the most popular choice is Kafka, but a Hazelcast IMap/ICache can also be used as an infinite source of update events (via the Event Journal feature). You can also set up an IMap/ICache as a sink for an infinite amount of data, either by ensuring that the size of the keyset will be finite or by allowing the eviction of old entries.

5.1.2. Is it Replayable?

Most finite data sources are replayable because they come from persistent storage. You can easily replay the whole dataset. However, an infinite data source may be of such nature that it can be consumed only once. An example is the TCP socket connector. Such sources are bad at fault tolerance: if anything goes wrong during the computation, it cannot be retried.

Does it Support Checkpointing?

You cannot retry to process an infinite data stream from the very beginning. You must save the complete state at regular intervals, then replay the input stream from the last saved position (checkpoint). Jet can create snapshots of its internal processing state, but for this to be useful the data source must have the ability to replay its data from the chosen point, discarding everything before it. Both Kafka and the Hazelcast Event Journal support this.

5.1.3. Is it Distributed?

A distributed computation engine prefers to work with distributed data resources. If the resource is not distributed, all Jet members will have to contend for access to a single endpoint. Kafka, HDFS, IMap and ICache are all distributed. On the other hand, an IList is not: it resides on a single member. When used as a source, only one Jet member pulls its data. When used as a sink, all Jet members send their data to the one that holds it.

A file source/sink operating in local mode is a sort of a "manually distributed" resource, each member accessing its own local filesystem. You have to manually arrange the files so that on each member there is a subset of the full dataset. When used as a sink, you have to manually gather all the pieces that Jet created.

The file source/sink can also operate in shared mode, accessing a shared filesystem mounted as a local directory.

5.1.4. What about Data Locality?

If you’re looking to achieve record-breaking throughput for your application, you’ll have to think carefully how close you can deliver your data to the location where Jet will consume and process it. For example, if your source is HDFS, you should align the topologies of the Hadoop and Jet clusters so that each machine that hosts an HDFS member also hosts a Jet member. Jet will automatically figure this out and arrange for each member to consume only the slice of data stored locally.

If you’re using IMap/ICache as data sources, you have two basic choices: have Jet connect to a Hazelcast IMDG cluster, or use Jet itself to host the data (since a Jet cluster is at the same time a Hazelcast IMDG cluster). In the second case Jet will automatically ensure a data-local access pattern, but there’s a caveat: if the Jet job causes an error of unrestricted scope, such as OutOfMemoryError or StackOverflowError, it will have unpredictable consequences for the state of the whole Jet member, jeopardizing the integrity of the data stored on it.

5.2. Overview of Sources and Sinks

The table below gives you a high-level overview of all the source and sink connectors we deliver with Jet. There are links to Javadoc and code samples. The sections following this one present each connector in more detail.

Table 1. Sources and Sinks
Resource Javadoc Sample Unbounded? Replayable? Checkpointing? Distributed? Data Locality

IMap

Source

Sink

Sample

Src ✅

Sink ❌

ICache

Source

Sink

Sample

Src ✅

Sink ❌

IMap in another cluster

Source

Sink

Sample

ICache in another cluster

Source

Sink

IMap’s Event Journal

Source

Sample

ICache’s Event Journal

Source

Event Journal of IMap in another cluster

Source

Sample

Event Journal of ICache in another cluster

Source

IList

Source

Sink

Sample

IList in another cluster

Source

Sink

Sample

HDFS

Source

Sink

Sample

Kafka

Source

Sink

Source

Files

Source

Sink

Sample

Local FS ✅

Shared FS ❌

File Watcher

Source

Sample

Local FS ✅

Shared FS ❌

Avro

Source

Sink

Source Sample

Sink Sample

Local FS ✅

Shared FS ❌

TCP Socket

Source

Sink

Source

Sink

JMS

Queue Source Topic Source

Queue Sink Topic Sink

Queue Sample

Topic Sample

Queue Source ✅

Queue Sink ✅

Topic Source ❌

Topic Sink ✅

JDBC

Source

Sink

Source Sample

Sink Sample

Application Log

Sink

Sink

N/A

N/A

5.3. Hazelcast IMap and ICache

Hazelcast IMDG’s IMap and ICache are very similar in the way Jet uses them and largely interchangeable. IMap has a bit more features. The simplest way to use them is as finite sources of their contents, but if you enable the Event Journal on a map/cache, you’ll be able to use it as a source of an infinite stream of update events (see below).

The most basic usage is very simple, here are snippets to use IMap and ICache as a source and a sink:

Pipeline p = Pipeline.create();
BatchStage<Entry<String, Long>> stage = p.drawFrom(Sources.map("myMap"));
stage.drainTo(Sinks.map("myMap"));
Pipeline p = Pipeline.create();
BatchStage<Entry<String, Long>> stage = p.drawFrom(Sources.cache("inCache"));
stage.drainTo(Sinks.cache("outCache"));

In these snippets we draw from and drain to the same kind of structure, but you can use any combination.

5.3.1. Access an External Cluster

To access a Hazelcast IMDG cluster separate from the Jet cluster, you have to provide Hazelcast client configuration for the connection. In this simple example we use programmatic configuration to draw from and drain to remote IMap and ICache. Just for variety, we funnel the data from IMap to ICache and vice versa:

ClientConfig cfg = new ClientConfig();
cfg.getGroupConfig().setName("myGroup").setPassword("pAssw0rd");
cfg.getNetworkConfig().addAddress("node1.mydomain.com", "node2.mydomain.com");

Pipeline p = Pipeline.create();
BatchStage<Entry<String, Long>> fromMap =
        p.drawFrom(Sources.remoteMap("inputMap", cfg));
BatchStage<Entry<String, Long>> fromCache =
        p.drawFrom(Sources.remoteCache("inputCache", cfg));
fromMap.drainTo(Sinks.remoteCache("outputCache", cfg));
fromCache.drainTo(Sinks.remoteMap("outputMap", cfg));

For a full discussion on how to configure your client connection, refer to the Hazelcast IMDG documentation on this topic.

5.3.2. Optimize Data Traffic at the Source

If your use case calls for some filtering and/or transformation of the data you retrieve, you can optimize the traffic volume by providing a filtering predicate and an arbitrary transformation function to the source connector itself and they’ll get applied on the remote side, before sending:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer, String, Person>remoteMap(
        "inputMap", clientConfig,
        e -> e.getValue().getAge() > 21,
        e -> e.getValue().getAge()));

The same optimization works on a local IMap, too, but has less impact. However, Hazelcast IMDG goes a step further in optimizing your filtering and mapping to a degree that matters even locally. If you don’t need fully general functions, but can express your predicate via Predicates or PredicateBuilder, they will create a specialized predicate instance that can test the object without deserializing it. Similarly, if the mapping you need is of a constrained kind where you just extract one or more object fields (attributes), you can specify a projection instead of a general mapping lambda: Projections.singleAttribute() or Projections.multiAttribute(). These will extract the listed attributes without deserializing the whole object. For these optimizations to work, however, your objects must employ Hazelcast’s portable serialization. They are especially relevant if the volume of data you need in the Jet job is significantly less than the volume of the stored data.

Note that the above feature is not available on ICache. It is, however, available on `ICache’s event journal, which we introduce next.

5.3.3. Receive an Infinite Stream of Update Events

You can use IMap/ICache as sources of infinite event streams. For this to work you have to enable the Event Journal on your data structure. This is a feature you set in the Jet/IMDG instance configuration, which means you cannot change it while the cluster is running.

This is how you enable the Event Journal on an IMap:

JetConfig cfg = new JetConfig();
cfg.getHazelcastConfig()
   .getMapEventJournalConfig("inputMap")
   .setEnabled(true)
   .setCapacity(1000)         // how many events to keep before evicting
   .setTimeToLiveSeconds(10); // evict events older than this
JetInstance jet = Jet.newJetInstance(cfg);

The default journal capacity is 10,000 and the default time-to-live is 0 (which means “unlimited”). Since the entire event journal is kept in RAM, you should take care to adjust these values to match your use case.

The configuration API for ICache is identical:

cfg.getHazelcastConfig()
   .getCacheEventJournalConfig("inputCache")
   .setEnabled(true)
   .setCapacity(1000)
   .setTimeToLiveSeconds(10);

Once properly configured, you use Event Journal sources like this:

Pipeline p = Pipeline.create();
StreamStage<Entry<String, Long>> fromMap = p.drawFrom(
        Sources.mapJournal("inputMap", START_FROM_CURRENT));
StreamStage<Entry<String, Long>> fromCache = p.drawFrom(
        Sources.cacheJournal("inputCache", START_FROM_CURRENT));

IMap and ICache are on an equal footing here. The second argument, START_FROM_CURRENT here, means "start receiving from events that occur after the processing starts". If you specify START_FROM_OLDEST, you’ll get all the events still on record.

This version of methods will only emit ADDED and UPDATED event types. Also, it will map the event object to simple Map.Entry with the key and new value. To make a different choice, use the overloads that allow you to specify your own projection and filtering. For example, you can request all event types and full event objects:

Pipeline p = Pipeline.create();
StreamStage<EventJournalMapEvent<String, Long>> allFromMap = p.drawFrom(
    Sources.mapJournal("inputMap",
            alwaysTrue(), identity(), START_FROM_CURRENT));
StreamStage<EventJournalCacheEvent<String, Long>> allFromCache = p.drawFrom(
    Sources.cacheJournal("inputMap",
            alwaysTrue(), identity(), START_FROM_CURRENT));

Note the type of the stream element: EventJournalMapEvent and EventJournalCacheEvent. These are almost the same and have these methods:

  • getKey()

  • getOldValue()

  • getNewValue()

  • getType()

The only difference is the return type of getType() which is specific to each kind of structure and gives detailed insight into what kind of event it reports. Add, remove and update are the basic ones, but there are also evict, clear, expire and some others.

Finally, you can get all of the above from a map/cache in another cluster, you just have to prepend remote to the source names and add a ClientConfig, for example:

Pipeline p = Pipeline.create();
StreamStage<Entry<String, Long>> fromRemoteMap = p.drawFrom(
    Sources.remoteMapJournal("inputMap",
            someClientConfig, START_FROM_CURRENT));
StreamStage<Entry<String, Long>> fromRemoteCache = p.drawFrom(
    Sources.remoteCacheJournal("inputCache",
            someClientConfig, START_FROM_CURRENT));

5.3.4. Update Entries in IMap

When you use an IMap as a sink, instead of just pushing the data into it you may have to merge the new with the existing data or delete the existing data. Hazelcast Jet supports this with map-updating sinks which rely on Hazelcast IMDG’s Entry Processor feature. An entry processor allows you to atomically execute a piece of code against a map entry, in a data-local manner.

The updating sinks come in three variants:

  1. mapWithMerging, where you provide a a function that computes the map value from the stream item and a merging function that gets called if a value already exists in the map. Here’s an example that concatenates string values:

    Pipeline pipeline = Pipeline.create();
    pipeline.drawFrom(Sources.<String, Long>map("inMap"))
            .drainTo(Sinks.mapWithMerging("outMap",
                    Entry::getKey,
                    Entry::getValue,
                    (oldValue, newValue) -> oldValue + newValue)
            );
    This operation is NOT lock-aware, it will process the entries regardless whethere they are locked or not. We significancly boost performance by applying the update function in batches, but this operation doesn’t respect the locks. If you use this method on locked entries, it will break the mutual exclusion contract. Use mapWithEntryProcessor if you need the proper locking behavior.
  2. mapWithUpdating, where you provide a single updating function that combines the roles of the two functions in mapWithMerging. It will be called on the stream item and the existing value, if any. Here’s an example that concatenates string values:

    Pipeline pipeline = Pipeline.create();
    pipeline.drawFrom(Sources.<String, Long>map("inMap"))
        .drainTo(Sinks.<Entry<String, Long>, String, Long>mapWithUpdating(
            "outMap", Entry::getKey,
            (oldV, item) -> (oldV != null ? oldV : 0L) + item.getValue())
        );
    This operation is NOT lock-aware, it will process the entries regardless whethere they are locked or not. We significancly boost performance by applying the update function in batches, but this operation doesn’t respect the locks. If you use this method on locked entries, it will break the mutual exclusion contract. Use mapWithEntryProcessor if you need the proper locking behavior.
  3. mapWithEntryProcessor, where you provide a function that returns a full-blown EntryProcessor instance that will be submitted to the map. This is the most general variant, but can’t use batching that the other variants do and thus has a higher cost per item. You should use it only if you need a specialized entry processor that can’t be expressed in terms of the other variants. This example takes the values of the map and submits an entry processor that increments the values by 5:

    Pipeline pipeline = Pipeline.create();
    pipeline.drawFrom(Sources.<String, Integer>map("mymap"))
            .drainTo(Sinks.mapWithEntryProcessor("mymap",
                    Entry::getKey,
                    entry -> new IncrementEntryProcessor(5)
            ));
    
    class IncrementEntryProcessor implements EntryProcessor<String, Integer> {
    
        private int incrementBy;
    
        public IncrementEntryProcessor(int incrementBy) {
            this.incrementBy = incrementBy;
        }
    
        @Override
        public Object process(Entry<String, Integer> entry) {
            return entry.setValue(entry.getValue() + incrementBy);
        }
    
        @Override
        public EntryBackupProcessor<String, Integer> getBackupProcessor() {
            return null;
        }
    }

5.4. Hazelcast IList

Whereas IMap and ICache are the recommended choice of data sources and sinks in Jet jobs, Jet supports IList purely for convenience during prototyping, unit testing and similar non-production situations. It is not a partitioned data structure and only one cluster member has all the contents. In a distributed Jet job all the members will compete for access to the single member holding it.

With that said, IList is very simple to use. Here’s an example how to fill it with test data, consume it in a Jet job, dump its results into another list, and fetch the results (we assume you already have a Jet instance in the variable jet):

IList<Integer> inputList = jet.getList("inputList");
for (int i = 0; i < 10; i++) {
    inputList.add(i);
}

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer>list("inputList"))
 .map(i -> "item" + i)
 .drainTo(Sinks.list("resultList"));

jet.newJob(p).join();

IList<String> resultList = jet.getList("resultList");
System.out.println("Results: " + new ArrayList<>(resultList));

You can access a list in an external cluster as well, by providing a ClientConfig object:

ClientConfig clientConfig = new ClientConfig();
clientConfig.getGroupConfig()
            .setName("myGroup").setPassword("pAssw0rd");
clientConfig.getNetworkConfig()
            .addAddress("node1.mydomain.com", "node2.mydomain.com");

Pipeline p = Pipeline.create();
p.drawFrom(Sources.remoteList("inputlist", clientConfig))
 .drainTo(Sinks.remoteList("outputList", clientConfig));

5.5. Kafka

Apache Kafka is a production-worthy choice of both source and sink for infinite stream processing jobs. It supports fault tolerance and snapshotting. The basic paradigm is that of a distributed publish/subscribe message queue. Jet’s Kafka Source subscribes to a Kafka topic and the sink publishes events to a Kafka topic.

The following code will consume from topics t1 and t2 and then write to t3:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.serializer", StringSerializer.class.getCanonicalName());
props.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
props.setProperty("value.serializer", IntegerSerializer.class.getCanonicalName());
props.setProperty("value.deserializer", IntegerDeserializer.class.getCanonicalName());
props.setProperty("auto.offset.reset", "earliest");

Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(props, "t1", "t2"))
 .drainTo(KafkaSinks.kafka(props, "t3"));

5.5.1. Using Kafka as a Source

The Kafka source emits entries of type Map.Entry<Key,Value> which can be transformed using an optional mapping function. It never completes. The job will end only if explicitly cancelled or aborted due to an error.

Internally Jet creates one KafkaConsumer per Processor instance using the supplied properties. Jet uses manual partition assignment to arrange the available Kafka partitions among the available processors and will ignore the group.id property.

Currently there is a requirement that the global parallelism of the Kafka source be at most the number of partitions you are subscribing to. The local parallelism of the Kafka source is 2 and if your Jet cluster has 4 members, this means that a minimum of 8 Kafka partitions must be available.

If any new partitions are added while the job is running, Jet will automatically assign them to the existing processors and consume them from the beginning.

5.5.2. Processing Guarantees

The Kafka source supports snapshots. Upon each snapshot it saves the current offset for each partition. When the job is restarted from a snapshot, the source can continue reading from the saved offset.

If snapshots are disabled, the source will commit the offset of the last record it read to the Kafka cluster. Since the fact that the source read an item doesn’t mean that the whole Jet pipeline processed it, this doesn’t guarantee against data loss.

5.5.3. Using Kafka as a Sink

The Kafka sink creates one KafkaProducer per cluster member and shares it among all the sink processors on that member. You can provide a mapping function that transforms the items the sink receives into `ProducerRecord`s.

5.6. HDFS

The Hadoop Distributed File System is a production-worthy choice for both a data source and a data sink in a batch computation job. It is a distributed, replicated storage system that handles these concerns automatically, exposing a simple unified view to the client.

The HDFS source and sink require a configuration object of type JobConf which supplies the input and output paths and formats. They don’t actually create a MapReduce job, this config is simply used to describe the required inputs and outputs. You can share the same JobConf instance between several source/sink instances.

Here’s a configuration sample:

JobConf jobConfig = new JobConf();
jobConfig.setInputFormat(TextInputFormat.class);
jobConfig.setOutputFormat(TextOutputFormat.class);
TextInputFormat.addInputPath(jobConfig, new Path("input-path"));
TextOutputFormat.setOutputPath(jobConfig, new Path("output-path"));

The word count pipeline can then be expressed using HDFS as follows

Pipeline p = Pipeline.create();
p.drawFrom(HdfsSources.hdfs(jobConfig, (k, v) -> v.toString()))
 .flatMap(line -> traverseArray(line.toLowerCase().split("\\W+"))
                       .filter(w -> !w.isEmpty()))
 .groupingKey(wholeItem())
 .aggregate(counting())
 .drainTo(HdfsSinks.hdfs(jobConfig));

5.6.1. Data Locality When Reading

Jet will split the input data across the cluster, with each processor instance reading a part of the input. If the Jet nodes are running along the HDFS datanodes, then Jet can make use of data locality by reading the blocks locally where possible. This can bring a significant increase in read speed.

5.6.2. Output

Each processor will write to a different file in the output folder identified by the unique processor id. The files will be in a temporary state until the job is completed and will be committed when the job is complete. For streaming jobs, they will be committed when the job is cancelled. We have plans to introduce a rolling sink for HDFS in the future to have better streaming support.

5.6.3. Dealing with Writables

Hadoop types implement their own serialization mechanism through the use of Writable. Jet provides an adapter to register a Writable for Hazelcast serialization without having to write additional serialization code. To use this adapter, you can register your own Writable types by extending WritableSerializerHook and registering the hook.

5.6.4. Hadoop JARs and Classpath

When submitting JARs along with a Job, sending Hadoop JARs should be avoided and instead Hadoop JARs should be present on the classpath of the running members. Hadoop JARs contain some JVM hooks and can keep lingering references inside the JVM long after the job has ended, causing memory leaks.

5.7. Files

Hazelcast Jet provides file and TCP/IP socket connectors that have limited production use, but are simple and can be very useful in an early rapid prototyping phase. They assume the data is in the form of plain text and emit/receive data items which represent individual lines of text.

These connectors are not fault-tolerant. On job restart they behave the as if you started a new job. The sources don’t do snapshotting. The sinks don’t suppress duplicate data.

5.7.1. File Sources

The file sources are designed to work with the local and shared file systems. For local file system, the sources expect to see on each member just the files that member should read. You can achieve the effect of a distributed source if you manually prepare a different set of files on each member. For shared file system, the sources split the work so that each member will read a part of the files.

There are two flavors of the file source: bounded and unbounded.

Here’s an example with the bounded source:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.files("/home/jet/input"))
 .drainTo(Sinks.logger());

This will log on each Jet member the contents of all the files in the specified directory. When the source reads all the files, the job completes. If the files change while the job is running, the behavior is undefined.

Here’s an example with the unbounded source:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.fileWatcher("/home/jet/input"))
 .drainTo(Sinks.logger());

It will watch the directory for changes. It will emit only new contents added after startup: both new files and new content appended to existing ones. Files must be updated in an append-only fashion; if the existing content changes, the behavior is undefined.

If you delete the watched directory, the job will complete.

5.7.2. File Sink

The file sink can work with either a local or a shared network file system. Each member will write to different filenames. You can achieve the effect of a distributed sink if you manually collect all the output files on all members and combine their contents.

Here’s a small example of usage:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("inputList"))
 .drainTo(Sinks.files("/home/jet/output"));

5.7.3. Avro

Hazelcast Jet provides Apache Avro file source and sink for batch processing jobs. This is a specialized version of file connector. Avro connector assumes the data is in the Avro Object Container File format.

Avro File Source

Avro file source reads the files in the specified directory using the given datum reader supplier and emits the records to downstream.

Here’s an example with a reflect datum reader:

Pipeline p = Pipeline.create();
p.drawFrom(AvroSources.files("/home/jet/input", Person.class))
 .drainTo(Sinks.logger());

This will log on each Jet member the records of all the files in the specified directory. When the source reads all the files, the job completes. If the files change while the job is running, the behavior is undefined.

Avro File Sink

Avro file sink writes the records to the files using the given datum writer and schema suppliers. The sink always overwrites the existing files, does not append.

Here is an example with a generic datum writer:

String schemaString = "{\"type\":\"record\",\"name\":\"Person\"," +
        "\"namespace\":\"datamodel\",\"fields\":[{" +
        "\"name\":\"id\",\"type\":\"int\"}]}";
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<GenericRecord>list("inputList"))
 .drainTo(AvroSinks.files("/home/jet/output",
         () -> new Schema.Parser().parse(schemaString)));

5.8. TCP/IP Socket

5.8.1. Socket Source

The socket source opens a blocking client TCP/IP socket and receives data over it. The data must be lines of plain text.

Each underlying worker of the Socket Source connector opens its own client socket and asks for data from it. The user supplies the host:port connection details. The server side should ensure a meaningful dispersion of data among all the connected clients, but how it does it is outside of Jet’s control.

Here’s a simple example:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.socket("localhost", 8080, StandardCharsets.UTF_8))
 .drainTo(Sinks.logger());

You can study a comprehensive code sample including a sample socket server using Netty.

Sink

The socket sink opens a blocking client TCP/IP socket and sends data over it. The data must be in the form of lines of plain text. To get meaningful behavior, the server side must collect and combine the from all the concurrently connected clients.

Here’s a simple example:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("inputList"))
 .drainTo(Sinks.socket("localhost", 8080));

5.9. JDBC

Jet contains both JDBC source and a sink. They can be used to read or write from/to relational databases or another source that supports the standard JDBC API.

The source is only a batch-style source: it reads records that are produced by one SQL query and once all records are emitted, the source completes. The sink can work in both batch and streaming jobs.

The connector is not fault-tolerant. On job restart it behaves as if you have started a new job. The source does not do snapshotting, the sink does not suppress duplicate data.

In order to use JDBC connector, user should include a JDBC Driver in classpath.

5.9.1. Using the JDBC Source

The JDBC source comes in two versions:

  1. Parallel: You will need to provide a ToResultSetFunction that will create one query for each parallel worker to query part of the data.

  2. Non-parallel: There will be only one parallel worker that will read all the records.

Example of parallel source

Here we use modulo operator to select a part of the keys, but any query returning non-overlapping subsets is possible. Note that this way might not actually perform better that the non-parallel version unless the underlying table is actually physically partitioned by this key.

Pipeline p = Pipeline.create();
p.drawFrom(Sources.jdbc(
    () -> DriverManager.getConnection(DB_CONNECTION_URL),
    (con, parallelism, index) -> {
        PreparedStatement stmt = con.prepareStatement("SELECT * FROM PERSON WHERE MOD(id, ?) = ?)");
        stmt.setInt(1, parallelism);
        stmt.setInt(2, index);
        return stmt.executeQuery();
    },
    resultSet ->
        new Person(resultSet.getInt(1), resultSet.getString(2))
)).drainTo(Sinks.logger());
Example of non-parallel source

A single worker of the source will fetch the whole result set with a single query. You need to provide just one SQL query.

Pipeline p = Pipeline.create();
p.drawFrom(Sources.jdbc(
    DB_CONNECTION_URL,
    "select * from PERSON",
    resultSet ->
            new Person(resultSet.getInt(1), resultSet.getString(2))
)).drainTo(Sinks.logger());

Output function gets the ResultSet and creates desired output object. The function is called for each row of the result set, user should not call ResultSet#next() or any other cursor-navigating functions.

The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

Any SQLException will cause the job to fail. The default local parallelism for this processor is 1.

5.9.2. Using the JDBC Sink

The JDBC sink connects to the specified database using the given connection supplier, prepares a statement using the given update query and inserts/updates the items.

The update query should be a parametrized query. The bind function will receive a PreparedStatement created for this query and should bind parameters to it. It should not execute the query, call commit or any other method.

The records will be committed after each batch of records and a batch mode will be used (if the driver supports it). Auto-commit will be disabled on the connection.

In case of an SQLException the processor will automatically try to reconnect and the job won’t fail, except for the SQLNonTransientException subclass. The default local parallelism for this sink is 1.

No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee. For this reason you should not use INSERT statement which can fail on duplicate primary key. Rather use an insert-or-update statement that can tolerate duplicate writes.

The following code snippet shows writing the Person objects to a database table:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Person>list("inputList"))
 .drainTo(Sinks.jdbc(
         "REPLACE INTO PERSON (id, name) values(?, ?)",
         DB_CONNECTION_URL,
         (stmt, item) -> {
             stmt.setInt(1, item.id);
             stmt.setString(2, item.name);
         }));

5.10. JMS

JMS (Java Message Service) connector can be used both as a source and a sink for infinite stream processing. The connector is not fault-tolerant. On job restart it behaves as if you have started a new job. The source does not do snapshotting, the sink does not suppress duplicate data.

In order to use JMS connector, user should include a JMS Client in classpath. IO failures are generally handled by JMS Client and do not cause the connector to fail. Most of the clients offer a configuration parameter to enable auto-reconnection, refer to client documentation for details.

5.10.1. Using JMS as a Source

The JMS source opens a connection to the JMS server for each member. Then each underlying worker of the source creates a session and a message consumer using that connection. The user supplies necessary functions to create the connection, session and message consumer.

The JMS source uses non-blocking API to receive the messages and transforms each message to a desired output object using supplied projection function. Source flushes the session after receiving each message using the supplied flush function.

The following code snippets show streaming messages from a JMS queue and a JMS topic using ActiveMQ JMS Client.

Pipeline p = Pipeline.create();
p.drawFrom(Sources.jmsQueue(() -> new ActiveMQConnectionFactory(
        "tcp://localhost:61616"), "queue"))
 .drainTo(Sinks.logger());
Pipeline p = Pipeline.create();
p.drawFrom(Sources.jmsTopic(() -> new ActiveMQConnectionFactory(
        "tcp://localhost:61616"), "topic"))
 .drainTo(Sinks.logger());

The JMS topic is a non-distributed source, if messages are consumed by multiple consumer, all of them will get the same messages. Therefore the source operates on a single member with local parallelism of 1. Setting local parallelism to a value other than 1 causes IllegalArgumentException.

5.10.2. Using JMS as a Sink

The JMS sink opens a connection to the JMS server for each member. Then each underlying worker of the sink creates a session and a message producer using that connection. The user supplies necessary functions and parameters to create the connection, session and message producer.

The JMS sink uses the supplied function to create a Message object for each input item and sends this message using the supplied send function. After a batch of messages is sent, sink flushes the session using the supplied flush function.

The following code snippets show writing to a JMS queue and a JMS topic using ActiveMQ JMS Client.

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("inputList"))
 .drainTo(Sinks.jmsQueue(() -> new ActiveMQConnectionFactory(
         "tcp://localhost:61616"), "queue"));
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("inputList"))
 .drainTo(Sinks.jmsTopic(() -> new ActiveMQConnectionFactory(
         "tcp://localhost:61616"), "topic"));

5.11. Source and Sink Builders

If Jet doesn’t natively support the data source/sink you need, you can build a connector for it yourself by using the SourceBuilder and SinkBuilder.

5.11.1. Source Builder

To make your custom source connector you need two basic ingredients:

  1. an object that will hold all the state you need to keep track of

  2. a stateless function, fillBufferFn, taking two parameters: the state object and a buffer object provided by Jet

Jet repeatedly calls fillBufferFn whenever it needs more data items. Optimally, the function will fill the buffer with the items it can acquire without blocking. A hundred items at a time is enough to eliminate any per-call overheads within Jet. The function may block as well (it’s running inside a non-cooperative processor), but taking longer than a second to complete can have negative effects on the overall performance of the processing pipeline.

Build a Bounded (Batch) Source

In this example we build a source that emits the lines of a file:

BatchSource<String> fileSource = SourceBuilder
    .batch("file-source", x ->                               (1)
            new BufferedReader(new FileReader("input.txt")))
    .<String>fillBufferFn((in, buf) -> {                          (2)
        String line = in.readLine();
        if (line != null) {
            buf.add(line);
        } else {
            buf.close();                                     (3)
        }
    })
    .destroyFn(BufferedReader::close)
    .build();
Pipeline p = Pipeline.create();
BatchStage<String> srcStage = p.drawFrom(fileSource);
1 here we pass createFn, the factory of state objects
2 the main logic goes to fillBufferFn
3 call buf.close() to mark the end of the bounded data stream

The file must be available on all the members of the Jet cluster for this to work. Only one member will actually read it, but you can’t choose or predict which one.

The code above emits a single item per fillBufferFn call. To ensure we get the best performance, we can improve it to emit the data in chunks:

BatchSource<String> fileSource = SourceBuilder
    .batch("file-source", x ->
            new BufferedReader(new FileReader("input.txt")))
    .<String>fillBufferFn((in, buf) -> {
        for (int i = 0; i < 128; i++) {
            String line = in.readLine();
            if (line == null) {
                buf.close();
                return;
            }
            buf.add(line);
        }
    })
    .destroyFn(BufferedReader::close)
    .build();
Build an Unbounded (Stream) Source

Here’s how you can build a simple source that keeps polling a URL, emitting all the lines it gets in the response:

StreamSource<String> httpSource = SourceBuilder
    .stream("http-source", ctx -> HttpClients.createDefault())
    .<String>fillBufferFn((httpc, buf) ->
        new BufferedReader(new InputStreamReader(
            httpc.execute(new HttpGet("localhost:8008"))
                 .getEntity().getContent()))
            .lines()
            .forEach(buf::add)
        )
    .destroyFn(CloseableHttpClient::close)
    .build();
Pipeline p = Pipeline.create();
StreamStage<String> srcStage = p.drawFrom(httpSource);

Our state object is an instance of the Apache HTTP Client. It maintains a connection pool so it will reuse the same connection for all our requests. Note that we’re making a blocking call here, but it’s expected to respond quickly. In a more serious production setting we could configure timeouts on the HTTP client to limit the blocking time. We’d also have to add error handling so we just retry on failure instead of causing the whole Jet job to fail.

Build a Stream Source with Event Timestamps

To make the data usable for windowed aggregation, each event item must be timestamped. If the source doesn’t emit timestamped events, you’ll have to add them while building the processing pipeline. It’s more convenient and efficient to add the timestamps right at the source. Jet offers you an API variant specifically tailored to this need. Let’s say that in the above example the first 9 characters denote the event’s timestamp. We can extract them as follows:

StreamSource<String> httpSource = SourceBuilder
    .timestampedStream("http-source", ctx -> HttpClients.createDefault())
    .<String>fillBufferFn((httpc, buf) ->
        new BufferedReader(new InputStreamReader(
            httpc.execute(new HttpGet("localhost:8008"))
                 .getEntity().getContent()))
            .lines()
            .forEach(item -> {
                long timestamp = Long.valueOf(item.substring(0, 9));
                buf.add(item.substring(9), timestamp);
            })
        )
    .destroyFn(CloseableHttpClient::close)
    .allowedLateness(2000)
    .build();

Note the line allowedLateness(2000). The events aren’t necessarily ordered by timestamp, for example they may come from many users over different connections and distances. On the other hand, to complete a windowed computation Jet must know when it has received all the events from a given time range. The "allowed lateness" parameter specifies how much (in milliseconds) an event’s timestamp can lag behind the highest timestamp received so far. With the parameter set as above, Jet will wait to receive an item with t = 01:00:02 or higher in order to complete the processing of events up to t = 01:00:00.

Distributed Stream Source

In the examples we showed so far the source was non-distributed: Jet will create just a single processor in the whole cluster to serve all the data. This is an easy and obvious way to create a source connector.

If you want to create a distributed source, the challenge is coordinating all the parallel instances to appear as a single, unified source. Each instance must emit its unique slice of the whole data stream. Jet passes the Processor.Context to your createFn and you can use it to identify each state object you create. Consult the properties totalParallelism and globalProcessorIndex: Jet will call createFn exactly once with each globalProcessorIndex from 0 to totalParallelism - 1 and you can use this to slice up the data.

Here’s a rudimentary example that shows how such a scheme could work:

class SourceState {
    final CloseableHttpClient client = HttpClients.createDefault();
    final int myIndex;
    final int numProcessors;

    SourceState(Processor.Context ctx) {
        this.myIndex = ctx.globalProcessorIndex();
        this.numProcessors = ctx.totalParallelism();
    }
}
StreamSource<String> socketSource = SourceBuilder
    .stream("http-source", SourceState::new)
    .<String>fillBufferFn((st, buf) ->
        new BufferedReader(new InputStreamReader(
            st.client.execute(new HttpGet(String.format(
                    "localhost:8008?index=%d&count=%d", st.myIndex, st.numProcessors)))
                 .getEntity().getContent()))
            .lines()
            .forEach(buf::add)
        )
    .destroyFn(st -> st.client.close())
    .distributed(2)  (1)
    .build();
1 we request two parallel processors on each Jet member

We have presented a toy example of accessing a partitioned, distributed data source. In the real world there are some fundamental differences. The data source is partitioned in advance and you cannot request the number of partitions that perfectly matches your topology. Instead you must write some logic that distributes, more or less evenly, M partitions over N processors.

There’s a specifically nasty problem hiding in this scheme: when we assign several partitions to a single processor and consume the data from each partition in chunks, the difference in the timestamp of the last event we fetch from partition i and the first item from partition j can be very large, much larger than the maximum lateness we configured. By the time it receives these events, Jet has already completed the processing of their window and moved on. The items must be dropped as "too late to process".

Jet has a mechanism that mitigates this issue by individually tracking the highest timestamp from each partition and emitting the appropriate watermark items that account for the lagging partitions. It is available if you create a custom source processor using the Core API, but it’s not currently exposed through the source builder. Refer to the section on custom source vertices and to the Javadoc of WatermarkSourceUtil.

Fault Tolerance

The source you get from the builder doesn’t participate in Jet’s fault tolerance protocol. You won’t be able to save the state to the snapshot to be able to resume from it after a restart, replaying the items you emitted since that point. That means you can’t use these sources in jobs with the at-least_once or exactly-once processing guarantee.

5.11.2. Sink Builder

To make your custom sink connector you need two basic ingredients:

  1. an object that will hold all the state you need to keep track of

  2. a stateless function, receiveFn, taking two parameters: the state object and a data item sent to the sink

This is a simple example with a file sink:

Sink<Object> sink = sinkBuilder(
        "file-sink", x -> new PrintWriter(new FileWriter("output.txt")))
    .receiveFn((out, item) -> out.println(item.toString()))
    .destroyFn(PrintWriter::close)
    .build();
Pipeline p = Pipeline.create();
p.drawFrom(list("input"))
 .drainTo(sink);

This will create the file output.txt on each member, so the overall job output consists of the contents of all these files put together.

Note that we’re using blocking IO here. Jet will use non-cooperative processors for the sink, so we’re allowed to do that. For good overall job performance we should still ensure that the call receiveFn doesn’t take more than a second to complete.

In the above example our state object is a PrintWriter, which has internal buffering. Jet allows us to make buffering a first-class concern and deal with it explicitly by taking an optional flushFn which it will call at regular intervals. Here’s our example modified to do explicit buffering and flushing:

Sink<Object> sink = sinkBuilder("file-sink", x -> new StringBuilder())
    .receiveFn((buf, item) -> buf.append(item).append('\n'))
    .flushFn(buf -> {
        try (Writer out = new FileWriter("output.txt", true)) {
            out.write(buf.toString());
            buf.setLength(0);
        }
    })
    .build();

In this case we don’t need the destroyFn because we keep the file closed while not appending to it. Through the use of buffering we drown out the overhead of opening and closing the file each time and we get an overall more robust solution.

Sink Parallelism

Jet builds a sink that is distributed: each member of the Jet cluster has a processor running it. You can configure how many parallel processors there are on each member (the local parallelism) by calling SinkBuilder.preferredLocalParallelism(). By default there will be one processor per member.

Fault Tolerance

The sink you get from the sink builder doesn’t participate in the fault tolerance protocol. You can’t preserve any internal state if a job fails and gets restarted. In a job with snapshotting enabled your sink will still receive every item at least once. If the system you’re storing the data into is idempotent, ignoring duplicate items, this will have the effect of the exactly-once guarantee.

6. Configuration

You can configure Hazelcast Jet either programmatically or declaratively (XML).

6.1. Programmatic Configuration

Programmatic configuration is the simplest way to configure Jet. You instantiate a JetConfig and set the desired properties. For example, the following will configure Jet to use only two threads in its cooperative multithreading pool:

JetConfig config = new JetConfig();
config.getInstanceConfig().setCooperativeThreadCount(2);
JetInstance jet = Jet.newJetInstance(config);

6.2. Declarative Configuration

If you don’t pass an explicit JetConfig object when constructing a Jet instance, it will perform the following lookup procedure:

  1. Read the system property hazelcast.jet.config. If it starts with classpath:, treat it as a classpath resource, otherwise it’s a file pathname. If it’s defined but Jet can’t find the file it specifies, startup fails.

  2. Look for hazelcast-jet.xml in the working directory.

  3. Look for hazelcast-jet.xml in the classpath.

  4. Load the default XML configuration packaged in Jet’s JAR.

An example configuration looks like the following:

<hazelcast-jet xsi:schemaLocation="http://www.hazelcast.com/schema/jet-config hazelcast-jet-config-0.6.xsd"
               xmlns="http://www.hazelcast.com/schema/jet-config"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <instance>
        <!-- number of threads in the cooperative thread pool -->
       <cooperative-thread-count>8</cooperative-thread-count>
        <!-- period between flow control packets in milliseconds -->
       <flow-control-period>100</flow-control-period>
        <!-- directory for temporary files -->
       <temp-dir>/var/tmp/jet</temp-dir>
        <!-- number of backup copies to configure for Hazelcast IMaps used internally in a Jet job -->
       <backup-count>1</backup-count>
    </instance>
    <properties>
       <property name="custom.property">custom property</property>
    </properties>
    <edge-defaults>
        <!-- capacity of the concurrent SPSC queue between each two processors -->
       <queue-size>1024</queue-size>

        <!-- maximum packet size in bytes, only applies to distributed edges -->
       <packet-size-limit>16384</packet-size-limit>

        <!-- receive window size multiplier, only applies to distributed edges -->
       <receive-window-multiplier>3</receive-window-multiplier>
    </edge-defaults>
    <!-- custom properties which can be read within a ProcessorSupplier -->
</hazelcast-jet>

The following are the configuration elements for Hazelcast Jet:

  • Cooperative Thread Count: number of threads Jet will create in its cooperative multithreading pool. The default value is Runtime.getRuntime().availableProcessors()

  • Temp Directory: directory where Jet can create temporary files such as JAR files submitted by clients. Jet will create a temp directory and delete it on exit.

  • Flow Control Period: Jet uses a flow control mechanism between cluster members to prevent a slower vertex from getting overflowed with data from a faster upstream vertex. Each receiver regularly reports to each sender how much more data it may send over a given DAG edge. This option sets the duration (in milliseconds) of the interval between flow-control packets. Its default value is 100ms.

  • Backup Count: Sets the number of synchronous backups to configure on IMaps that Jet needs internally to store job metadata and snapshots. The maximum allowed value is 6. The default value is 1.

  • Edge Defaults: The default values to be used for all edges. See the section on Tuning Edges.

6.3. Configure the Underlying Hazelcast Instance

Each Jet member or client has its underlying Hazelcast member or client. Please refer to the Hazelcast Reference Manual for specific configuration options for Hazelcast IMDG.

6.3.1. Programmatic

You can configure the underlying Hazelcast IMDG member as follows:

JetConfig jetConfig = new JetConfig();
jetConfig.getHazelcastConfig().getGroupConfig().setName("test");
JetInstance jet = Jet.newJetInstance(jetConfig);

Similarly, you can also configure the underlying Hazelcast client:

ClientConfig clientConfig = new ClientConfig();
clientConfig.getGroupConfig().setName("test");
JetInstance jet = Jet.newJetClient(clientConfig);

6.3.2. Declarative

Hazelcast IMDG can also be configured declaratively. Please refer to the Hazelcast Reference Manual for information on how to do this.

6.4. Spring Integration

You can configure and start a Hazelcast Jet instance as a component in the Spring Application Context. You can use the plain bean element and define individual properties on a JetConfig instance, but Jet provides its own schema-based configuration which will make this much less verbose.

6.4.1. Approach 1: Use the Plain bean Element

You can declare Hazelcast Jet objects using the default Spring beans namespace. Here is an example for a Hazelcast Jet Instance declaration:

<bean id="instance" class="com.hazelcast.jet.Jet" factory-method="newJetInstance">
    <constructor-arg>
        <bean class="com.hazelcast.jet.config.JetConfig">
            <property name="hazelcastConfig">
                <bean class="com.hazelcast.config.Config">
                    <!-- ... -->
                </bean>
            </property>
            <property name="instanceConfig">
                <bean class="com.hazelcast.jet.config.InstanceConfig">
                    <property name="cooperativeThreadCount" value="2"/>
                </bean>
            </property>
            <property name="defaultEdgeConfig">
                <bean class="com.hazelcast.jet.config.EdgeConfig">
                    <property name="queueSize" value="2048"/>
                </bean>
            </property>
            <property name="properties">
                <props>
                    <prop key="foo">bar</prop>
                </props>
            </property>
        </bean>
    </constructor-arg>
</bean>

<bean id="map" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="my-map"/>
</bean>

6.4.2. Approach 2: Use jet:instance

Hazelcast Jet provides its own Spring config schema. Add the namespace declaration xmlns:jet=“http://www.hazelcast.com/schema/jet-spring” to the beans element and then use the jet namespace prefix. Make sure you added hazelcast-jet-spring.jar to the classpath.

Here’s how your namespace and schema instance declarations may look:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jet="http://www.hazelcast.com/schema/jet-spring"
       xmlns:hz="http://www.hazelcast.com/schema/spring"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
        http://www.hazelcast.com/schema/spring
        http://www.hazelcast.com/schema/spring/hazelcast-spring-3.10.xsd
        http://www.hazelcast.com/schema/jet-spring
        http://www.hazelcast.com/schema/jet-spring/hazelcast-jet-spring-0.6.xsd">
        <!-- ... -->
 </beans>
Configuring the Hazelcast Jet Instance
<jet:instance id="instance">
    <hz:config>
        <hz:spring-aware/>
        <hz:group name="jet"/>
        <hz:network port="5701" port-auto-increment="false">
            <hz:join>
                <hz:multicast enabled="false"/>
                <hz:tcp-ip enabled="true">
                    <hz:member>127.0.0.1:5701</hz:member>
                </hz:tcp-ip>
            </hz:join>
        </hz:network>
        <hz:map name="map" backup-count="3">
        </hz:map>
    </hz:config>
    <jet:instance-config cooperative-thread-Count="2"/>
    <jet:default-edge-config queue-size="2048"/>
    <jet:properties>
        <hz:property name="foo">bar</hz:property>
    </jet:properties>
</jet:instance>
Configure a Jet Client
<jet:client id="jet-client">
    <jet:group name="jet"/>
    <jet:network>
        <hz:member>127.0.0.1:5701</hz:member>
    </jet:network>
    <jet:spring-aware/>
</jet:client>
Additional Bean Types Supported by the Jet Namespace

You can obtain the underlying HazelcastInstance from the Jet instance as a bean and use it to obtain these Hazelcast IMDG beans:

  • map

  • list

  • multiMap

  • replicatedmap

  • queue

  • topic

  • set

  • executorService

  • idGenerator

  • atomicLong

  • atomicReference

  • semaphore

  • countDownLatch

  • lock

Here are short examples for each of them:

<jet:hazelcast jet-instance-ref="jet-instance" id="hazelcast-instance"/>

<jet:map instance-ref="jet-instance" name="my-map" id="my-map-bean"/>

<jet:list instance-ref="jet-client" name="my-list" id="my-list-bean"/>

<hz:multiMap id="multiMap" instance-ref="hazelcast-instance" name="my-multiMap"/>

<hz:replicatedMap id="replicatedMap" instance-ref="hazelcast-instance" name="my-replicatedMap"/>

<hz:queue id="queue" instance-ref="hazelcast-instance" name="my-queue"/>

<hz:topic id="topic" instance-ref="hazelcast-instance" name="my-topic"/>

<hz:set id="set" instance-ref="hazelcast-instance" name="my-set"/>

<hz:executorService id="executorService" instance-ref="hazelcast-instance" name="my-executorService"/>

<hz:idGenerator id="idGenerator" instance-ref="hazelcast-instance" name="my-idGenerator"/>

<hz:atomicLong id="atomicLong" instance-ref="hazelcast-instance" name="my-atomicLong"/>

<hz:atomicReference id="atomicReference" instance-ref="hazelcast-instance" name="my-atomicReference"/>

<hz:semaphore id="semaphore" instance-ref="hazelcast-instance" name="my-semaphore"/>

<hz:countDownLatch id="countDownLatch" instance-ref="hazelcast-instance" name="my-countDownLatch"/>

<hz:lock id="lock" instance-ref="hazelcast-instance" name="my-lock"/>

Hazelcast Jet also supports lazy-init, scope and depends-on bean attributes.

<jet:instance id="instance" lazy-init="true" scope="singleton">
<!-- ... -->
</jet:instance>
<jet:client id="client" scope="prototype" depends-on="instance">
<!-- ... -->
</jet:client>

6.4.3. Annotation-Based Configuration

Annotation-Based Configuration does not require any XML definition. Simply create a configuration class annotated with @Configuration and provide a JetInstance as a bean by annotating the method with @Bean.

@Configuration
public class AppConfig {

    @Bean
    public JetInstance instance() {
        return Jet.newJetInstance();
    }
}

6.4.4. Enabling SpringAware Objects

Hazelcast IMDG has a special annotation, @SpringAware, which enables you to initialize the object with spring context.

When a job is submitted to the cluster, processors are created by Hazelcast Jet on each member. By marking your processor with @SpringAware, you make spring context accessible to your processor which gives you the ability:

  • to apply bean properties

  • to apply factory callbacks such as ApplicationContextAware, BeanNameAware

  • to apply bean post-processing annotations such as InitializingBean, @PostConstruct

You need to configure Hazelcast Jet with <hz:spring-aware/> tag or set SpringManagedContext programmatically to enable spring-aware objects. Code samples for declarative and annotation-based configurations are available at our Code Samples repo.

6.5. Configuring TLS (Enterprise only)

It is possible to configure Jet to use TLS for all member to member and member to client communications. TLS requires the use of Jet Enterprise version.

Once TLS is enabled on a member, all other members in the same cluster and all clients connecting to the cluster must also have TLS enabled.

The way TLS is configured is same as how it’s done in Hazelcast IMDG Enterprise. To configure TLS, you need a server certificate and a matching private key and do the necessary configuration on the member side.

The following is an example configuration snippet for hazelcast.xml. The configuration must be present on all the members.

<network>
    <ssl enabled="true">
        <factory-class-name>com.hazelcast.nio.ssl.BasicSSLContextFactory</factory-class-name>
        <properties>
            <property name="protocol">TLS</property>
            <property name="keyStore">/opt/hazelcast.keystore</property>
            <property name="keyStorePassword">password12345</property>
            <property name="trustStore">/opt/hazelcast.truststore</property>
            <property name="trustStorePassword">password12345</property>
            <property name="keyManagerAlgorithm">SunX509</property>
            <property name="trustManagerAlgorithm">SunX509</property>
            <property name="mutualAuthentication">REQUIRED</property>
        </properties>
    </ssl>
</network>

And similarly, a matching snippet must be present on the clients inside the hazelcast-client.xml file:

<network>
    <ssl enabled="true">
        <factory-class-name>com.hazelcast.nio.ssl.BasicSSLContextFactory</factory-class-name>
        <properties>
            <property name="protocol">TLS</property>

            <property name="trustStore">/opt/hazelcast-client.truststore</property>
            <property name="trustStorePassword">secret.123456</property>
            <property name="trustStoreType">JKS</property>

            <!-- Following properties are only needed when the mutual authentication is used. -->
            <property name="keyStore">/opt/hazelcast-client.keystore</property>
            <property name="keyStorePassword">keystorePassword123</property>
            <property name="keyStoreType">JKS</property>
            <property name="mutualAuthentication">REQUIRED</property>
        </properties>
    </ssl>
</network>

A detailed list of TLS configuration options can be found in the Hazelcast IMDG Reference Manual.

6.5.1. Mutual Authentication

When a client connects to a node, the node can authenticate the client by using mutual authentication. In this mode, the client also has to present a certificate to the member upon connection. This requires some additional configuration. See the Mutual Authentication section on the IMDG Reference Manual for further details.

6.5.2. OpenSSL Configuration

By default Jet uses the Java implementation of SSL. For better performance, it is also possible to use Jet with OpenSSL instead. For configuring OpenSSL please refer to the Hazelcast IMDG Reference Manual.

6.5.3. TLS Impact on Performance

TLS can have some effect on performance when running jobs, as all node to node communications need to be encrypted. During an aggregation or another partitioned operation in a job, large amounts of data might be transferred from one node to another.

Jet makes use of several techniques that reduce the need for data to be transferred across the nodes, however this is sometimes unavoidable. When using Java SSL, the performance impact can be between 5-30% depending on how much network is utilized. When using OpenSSL, we measured the impact to be minimal regardless of the amount of network traffic. Detailed benchmarks are available upon request.

6.5.4. Remote Hazelcast Sources and Sinks

Currently it’s not possible to connect to remote Hazelcast sources and sinks using TLS. This issue will be resolved in a further patch release.

7. Jet Concepts

In this chapter we will take a deep dive into the fundamentals of distributed computing and Jet’s specific take on it. You’ll find that having some intuition and insight into how distributed computing actually works in Jet makes a big difference when diagnosing your pipeline and improving its performance.

Jet performs high performance in-memory data processing by modeling the computation as a Directed Acyclic Graph (DAG), where vertices represent computation and edges represent data flows. A vertex receives data from its inbound edges, performs a step in the computation, and emits data to its outbound edges. Both the edge and the vertex are distributed entities: there are many parallel instances of the Processor type that perform a single vertex’s computation work on each cluster member. An edge between two vertices is implemented with many data connections, both within a member (concurrent SPSC queues) and between members (Hazelcast network connections).

One of the major reasons to divide the full computation task into several vertices is data partitioning: the ability to split the data stream into slices which can be processed in parallel, independently of each other. This is how Jet can parallelize and distribute the group-and-aggregate stream transformation, the major workhorse in distributed computing. To make this work, there must be a function which computes the partitioning key for each item and makes all related items map to the same key. Jet can then route all such items to the same processor instance, but has the freedom to route items with different keys to different processors.

Typically your computation job consists of a mapping vertex, where you pre-process the input data into a form that’s ready to be partitioned, followed by the grouping-and-aggregating vertex. The edge between them contains the partitioning logic.

7.1. Modeling the Computation as a DAG

We’ll take one specific problem, the Word Count, dissect it and explain how it gets computed in a Jet cluster. Let us first see the definition of the computation in the Pipeline API]:

Pattern delimiter = Pattern.compile("\\W+");
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Long, String>map("book-lines"))
 .flatMap(e -> traverseArray(delimiter.split(e.getValue().toLowerCase())))
 .filter(word -> !word.isEmpty())
 .groupingKey(wholeItem())
 .aggregate(AggregateOperations.counting())
 .drainTo(Sinks.map("counts"));

Now let’s step back from this and start from the single-threaded Java code that solves the problem for a basic data structure such as an ArrayList. If you have some familiarity with the java.util.stream API, this is how you’d express it:

Map<String, Long> counts =
        lines.stream()
             .flatMap(line -> Arrays.stream(line.toLowerCase().split("\\W+")))
             .filter(word -> !word.isEmpty())
             .collect(Collectors.groupingBy(word -> word, Collectors.counting()));

You can notice a strong similarity with the Pipeline API formulation, but the way it’s executed is radically different. Java will compute it in a single thread, basically running this code:

List<String> lines = someExistingList();
Map<String, Long> counts = new HashMap<>();
for (String line : lines) {
    for (String word : line.toLowerCase().split("\\W+")) {
        if (!word.isEmpty()) {
            counts.merge(word, 1L, (count, one) -> count + one);
        }
    }
}

The j.u.s. formulation helps us see the steps taken to process each data item:

  1. lines.stream(): read the items (lines of text) from the data source (we’ll call this the “source” step).

  2. flatMap()+filter(): split each line into lowercase words, avoiding empty strings (the tokenizing step).

  3. collect(): group equal words together and count them (the accumulating step).

Our next move is to express these steps as a DAG. We’ll start with a single-threaded model and then make several transformations to reach a parallelized, distributed one, discussing at each step the concerns that arise and how to meet them.

We can represent the steps outlined above in a directed acyclic graph (DAG):

Word-counting DAG

The simplest, single-threaded code (shown above) deals with each item as it is produced: the outer loop reads the lines, the inner loop that runs for each line deals with the words on that line, and inside the inner loop we populate the result map with running counts.

However, just by modeling the computation as a DAG, we’ve split the work into isolated steps with clear data interfaces between them. We can perform the same computation by running a separate thread for each step. Roughly speaking, these are the snippets the threads would be executing:

// Source thread
for (String line : readLines()) {
    emit(line);
}
// Tokenizer thread
for (String line : receive()) {
    for (String word : line.toLowerCase().split("\\W+")) {
        if (!word.isEmpty()) {
            emit(word);
        }
    }
}
// Accumulator thread
Map<String, Long> counts = new HashMap<>();
for (String word : receive()) {
    counts.merge(word, 1L, (count, one) -> count + one);
}
// finally, when done receiving:
for (Entry<String, Long> wordAndCount : counts.entrySet()) {
    emit(wordAndCount);
}

The source loop feeds the tokenizer loop over a concurrent queue, the tokenizer feeds the accumulator loop, and after the accumulator is done receiving, it emits its results to the sink. Diagrammatically it looks like this:

Word-counting DAG with concurrent queues shown

This transformation brought us a pipelined architecture: while the tokenizer is busy with the regex work, the accumulator is updating the map using the data the tokenizer is done with; and the source and sink stages are pumping the data from/to the environment. Our design is now able to engage more than one CPU core and will complete that much sooner; however, we’re still limited by the number of vertices. We’ll be able utilize two or three cores regardless of how many are available. To move forward we must try to parallelize the work of each individual vertex.

Given that our input is an in-memory list of lines, the bottleneck occurs in the processing stages (tokenizing and accumulating). Let’s first attack the tokenizing stage: it is a so-called "embarrassingly parallelizable" task because the processing of each line is completely self-contained. At this point we have to make a clear distinction between the notions of vertex and processor: there can be several processors doing the work of a single vertex. Let’s add another tokenizing processor:

Word-counting DAG with tokenizer vertex parallelized

The input processor can now use all the available tokenizers as a pool and submit to any one whose queue has some room.

The next step is parallelizing the accumulator vertex, but this is trickier: accumulators count word occurrences so using them as a pool will result in each processor observing almost all distinct words (entries taking space in its hashtable), but the counts will be partial and will need combining. The common strategy to reduce memory usage is to ensure that all occurrences of the same word go to the same processor. This is called “data partitioning” and in Jet we’ll use a partitioned edge between the tokenizer and the accumulator:

Word-counting DAG with tokenizer and accumulator parallelized

As a word is emitted from the tokenizer, it goes through a “switchboard” stage where it’s routed to the correct downstream processor. To determine where a word should be routed, we can calculate its hashcode and use the lowest bit to address either accumulator 0 or accumulator 1.

At this point we have a blueprint for a fully functional parallelized computation job which can max out all the CPU cores given enough instances of tokenizing and accumulating processors. The next challenge is making this work across machines.

For starters, our input can no longer be a simple in-memory list because that would mean each machine processes the same data. To exploit the cluster as a unified computation device, each cluster member must observe only a slice of the dataset. Given that a Jet instance is also a fully functional Hazelcast IMDG instance and a Jet cluster is also a Hazelcast IMDG cluster, the natural choice is to pre-load our data into an IMap, which will be automatically partitioned and distributed across the members. Now each Jet member can just read the slice of data that was stored locally on it.

When run in a cluster, Jet will instantiate a replica of the whole DAG on each member. On a two-member cluster there will be two source processors, four tokenizers, and so on. The trickiest part is the partitioned edge between tokenizer and accumulator: each accumulator is supposed to receive its own subset of words. That means that, for example, a word emitted from tokenizer 0 will have to travel across the network to reach accumulator 3, if that’s the one that happens to own it. On average we can expect every other word to need network transport, causing both serious network traffic and serialization/deserialization CPU load.

There is a simple trick we can employ to avoid most of this traffic, closely related to what we pointed above as a source of problems when parallelizing locally: members of the cluster can be used as a pool, each doing its own partial word counts, and then send their results to a combining vertex. Note that this means sending only one item per distinct word. Here’s the rough equivalent of the code the combining vertex executes:

// Combining vertex
Map<String, Long> combined = new HashMap<>();
for (Entry<String, Long> wordAndCount : receive()) {
    combined.merge(wordAndCount.getKey(), wordAndCount.getValue(),
            (accCount, newCount) -> accCount + newCount);
}
// finally, when done receiving:
for (Entry<String, Long> wordAndCount : combined.entrySet()) {
    emit(wordAndCount);
}

As noted above, such a scheme takes more memory due to more hashtable entries on each member, but it saves network traffic (an issue we didn’t have within a member). Given that memory costs scale with the number of distinct keys (english words in our case), the memory cost is more-or-less constant regardless of how much book material we process. On the other hand, network traffic scales with the total data size so the more material we process, the more we save on network traffic.

Jet distinguishes between local and distributed edges, so we’ll use a local partitioned edge for tokenize→`accumulate` and a distributed partitioned edge for accumulate→`combine`. With this move we’ve finalized our DAG design, which can be illustrated by the following diagram:

Word-counting DAG parallelized and distributed

7.2. Unbounded Stream Processing

So far we’ve worked with a bounded (finite) stream processing task. In general, you provide Jet with one or more pre-existing datasets and order it to mine them for interesting information. The most important workhorse in this area is the "join, group and aggregate" operation: you define a classifying function that computes a grouping key for each of the datasets and an aggregate operation that will be performed on all the items in each group, yielding one result item per distinct key. Jet can apply the same operation on unbounded data streams as well.

7.2.1. The Importance of “Right Now”

In batch jobs the data we process represents a point-in-time snapshot of our state of knowledge (for example, warehouse inventory where individual data items represent items on stock). We can recapitulate each business day by setting up regular snapshots and batch jobs. However, there is more value hiding in the freshest data — our business can win by reacting to minute-old or even second-old updates. To get there we must make a shift from the finite to the infinite: from the snapshot to a continuous influx of events that update our state of knowledge. For example, an event could pop up in our stream every time an item is checked in or out of the warehouse.

A single word that captures the above story is latency: we want our system to minimize the latency from observing an event to acting upon it.

7.2.2. Windowing

In an unbounded stream, the dimension of time is always there. Consider a batch job: it may process a dataset labeled “Wednesday”, but the computation itself doesn’t have to know this. Its results will be understood from the outside to be “about Wednesday”. An endless stream, on the other hand, delivers information about the reality as it is unfolding, in near-real time, and the computation itself must deal with time explicitly.

Another point: in a batch it is obvious when to stop aggregating and emit the results: when we have exhausted the whole dataset. However, with unbounded streams we need a policy on how to select bounded chunks whose aggregate results we are interested in. This is called windowing. We imagine the window as a time interval laid over the time axis. A given window contains only the events that belong to that interval.

A very basic type of window is the tumbling window, which can be imagined to advance by tumbling over each time. There is no overlap between the successive positions of the window. In other words, it splits the time-series data into batches delimited by points on the time axis. The result of this is very similar to running a sequence of batch jobs, one per time interval.

A more useful and powerful policy is the sliding window: instead of splitting the data at fixed boundaries, it lets it roll in incrementally, new data gradually displacing the old. The window (pseudo)continuously slides along the time axis.

Another popular policy is called the session window and it’s used to detect bursts of activity by correlating events bunched together on the time axis. In an analogy to a user’s session with a web application, the session window “closes” when the specified session timeout elapses with no further events.

7.2.3. Time Ordering and the Watermark

Usually the time of observing an event is explicitly written in a field of the stream item. There is no guarantee that items will occur in the stream ordered by the value of that field; in fact in many cases it is certain that they won’t. Consider events gathered from users of a mobile app: for all kinds of reasons the items will arrive to our datacenter out of order, even with significant delays due to connectivity issues.

This disorder in the event stream makes it more difficult to formally specify a rule that tells us at which point all the data for a given window has been gathered, allowing us to emit the aggregated result.

To approach these challenges we use the concept of the watermark. It is a timestamped item Jet inserts into the stream that says "from this point on there will be no more items with timestamp less than this". Unfortunately, we almost never know for sure when such a statement becomes true and there is always a chance some events will arrive even later. If we do observe such an offending item, we must categorize it as “too late” and just filter it out.

Note the tension in defining the “perfect” watermark for a given use case: it is bad both the more we wait and the less we wait to emit a given watermark. The more we wait, the higher the latency of getting the results of the computation; the less we wait, the worse their accuracy due to missed events.

For these reasons Jet cannot determine the watermark on its own, you must decide how much disorder to accept (and expect).

7.3. Sliding and Tumbling Window

Many quantities, like “the current rate of change of a price” require you to aggregate your data over some time period. This is what makes the sliding window so important: it tracks the value of such a quantity in real time.

Calculating a single sliding window result can be quite computationally intensive, but we also expect it to slide smoothly and give a new result often, even many times per second. This is why we gave special attention to optimizing this computation.

We optimize especially heavily for those aggregate operations that have a cheap way of combining partial results and even more so for those which can cheaply undo the combining. For cheap combining you have to express your operation in terms of a commutative and associative (CA for short) function; to undo a combine you need the notion of “negating” an argument to the function. A great many operations can be expressed through CA functions: average, variance, standard deviation and linear regression are some examples. All of these also support the undoing (which we call deduct). The computation of extreme values (min/max) is an example that has CA, but no good notion of negation and thus doesn’t support deducting.

This is the way we leverage the above properties: our sliding window actually “hops” in fixed-size steps. The length of the window is an integer multiple of the step size. Under such a definition, the tumbling window becomes just a special case with one step per window.

This allows us to divide the timestamp axis into frames of equal length and assign each event to its frame. Instead of keeping the event object, we immediately pass it to the aggregate operation’s accumulate primitive. To compute a sliding window, we take all the frames covered by it and combine them. Finally, to compute the next window, we just deduct the trailing frame and combine the leading frame into the existing result.

Even without deduct the above process is much cheaper than the most naïve approach where you’d keep all data and recompute everything from scratch each time. After accumulating an item just once, the rest of the process has fixed cost regardless of input size. With deduct, the fixed cost approaches zero.

7.3.1. Example: 30-second Window Sliding by 10 Seconds

We’ll now illustrate the above story with a specific example: we’ll construct a 30-second window which slides by 10 seconds (i.e., three steps per window). The aggregate operation is to simply count the number of events. In the diagrams we label the events as minutes:seconds. This is the outline of the process:

  1. Throw each event into its “bucket” (the frame whose time interval it belongs to).

  2. Instead of keeping the items in the frame, just keep the item count.

  3. Combine the frames into three different positions of the sliding window, yielding the final result: the number of events that occurred within the window’s timespan.

Grouping disordered events by frame and then to sliding window

This would be a useful interpretation of the results: "At the time 1:30, the 30-second running average was 8/30 = 0.27 events per second. Over the next 20 seconds it increased to 10/30 = 0.33 events per second."

Keep in mind that the whole diagram represents what happens on just one cluster member and for just one grouping key. The same process is going on simultaneously for all the keys on all the members.

7.3.2. Two-stage aggregation

The concept of frame combining helps us implement two-stage aggregation as well. In the first stage the individual members come up with their partial results by frame and send them over a distributed edge to the second stage, which combines the frames with the same timestamp. After having combined all the partial frames from members, it combines the results along the event time axis into the sliding window.

Combining partial frames in two-stage aggregation

7.4. Session Window

In the abstract sense, the session window is a quite intuitive concept: it simply captures a burst of events. If no new events occur within the configured session timeout, the window closes. However, because the Jet processor encounters events out of their original order, this kind of window becomes quite tricky to compute.

The way Jet computes the session windows is easiest to explain in terms of the event interval: the range [eventTimestamp, eventTimestamp + sessionTimeout]. Initially an event causes a new session window to be created, covering exactly the event interval.

Session window: single event

A following event under the same key belongs to this window iff its interval overlaps it. The window is extended to cover the entire interval of the new event.

Session window: extend with another event

If the event intervals don’t overlap, Jet creates new session window for the new event.

Session window: create a new window after session timeout

An event may happen to belong to two existing windows if its interval bridges the gap between them; in that case they are combined into one.

Session window: an event may merge two existing windows

Once the watermark has passed the closing time of a session window, Jet can close it and emit the result of its aggregation.

7.5. Fault Tolerance and Processing Guarantees

One less-than-obvious consequence of stepping up from finite to infinite streams is the difficulty of forever maintaining the continuity of the output, even in the face of changing cluster topology. A Jet node may leave the cluster due to an internal error, loss of networking, or deliberate shutdown for maintenance. This will cause the computation job to be suspended. Except for the obvious problem of new data pouring in while we’re down, we have a much more fiddly issue of restarting the computation in a differently laid-out cluster exactly where it left off and neither miss anything nor process it twice. The technical term for this is the "exactly-once processing guarantee".

Jet achieves fault tolerance in streaming jobs by making a snapshot of the internal processing state at regular intervals. If a member of the cluster fails while a job is running, Jet will detect this and restart the job on the new cluster topology. It will restore its internal state from the snapshot and tell the source to start sending data from the last “committed” position (where the snapshot was taken). The data source must have built-in support to replay the data from the given checkpoint. The sink must either support transactions or be idempotent, tolerating duplicate submission of data.

In a Jet cluster, one member is the coordinator. It tells other members what to do and they report to it any status changes. The coordinator may fail and the cluster will automatically re-elect another one. If any other member fails, the coordinator restarts the job on the remaining members.

7.6. Jet’s Execution Model

At the heart of Jet is the TaskletExecutionService. It manages the threads that perform all the computation in a Jet job. Although this class is not formally a part of Jet’s public API, understanding how it schedules code for execution is essential if you want to implement a cooperative processor.

7.6.1. Cooperative Multithreading

Cooperative multithreading is one of the core features of Jet and can be roughly compared to green threads. It is purely a library-level feature and does not involve any low-level system or JVM tricks; the Processor API is simply designed in such a way that the processor can do a small amount of work each time it is invoked, then yield back to the Jet engine. The engine manages a thread pool of fixed size and on each thread, the processors take their turn in a round-robin fashion.

The point of cooperative multithreading is better performance. Several factors contribute to this:

  • The overhead of context switching between processors is much lower since the operating system’s thread scheduler is not involved.

  • The worker thread driving the processors stays on the same core for longer periods, preserving the CPU cache lines.

  • The worker thread has direct knowledge of the ability of a processor to make progress (by inspecting its input/output buffers).

7.6.2. Tasklet

The execution service doesn’t deal with processors directly; instead it deals with tasklets. :jet-master:/impl/execution/Tasklet.java[Tasklet] is a very simple functional interface derived from the standard Java Callable<ProgressState>. The execution service manages a pool of worker threads, each being responsible for a list of tasklets. The worker thread simply invokes the call() methods on its tasklets in a round-robin fashion. The method’s return value tells whether the tasklet made progress and whether it is now done.

The most important tasklet is the one driving a processor (ProcessorTasklet); there are a few others that deal with network sending/receiving and taking snapshots.

7.6.3. Work Stealing

When a tasklet is done, its worker will inspect all the other workers' tasklet lists to see if any of them has a longer tasklet list than its own. If it finds such a worker, it will “steal” one of its tasklets to even out the load per thread.

7.6.4. Exponential Backoff

If none of the worker’s tasklets report having made progress, the worker will go to a short sleep. If this happens again after it wakes up, it will sleep for twice as long. Once it reaches 1 ms sleep time, it will continue retrying once per millisecond to see if any tasklets can make progress.

7.6.5. ProcessorTasklet

:jet-core:/impl/execution/ProcessorTasklet.java[ProcessorTasklet] is the one that drives a processor. It manages its inbox, outbox, inbound/outbound concurrent queues, and tracks the current processor state so it knows which of its callback methods to call.

During each tasklet.call(), ProcessorTasklet makes one call into one of its processor’s callbacks. It determines the processor’s progress status and reports it to the execution service.

7.6.6. Non-Cooperative Processor

If a processor declares itself as non-cooperative, the execution service will start a dedicated Java thread for its tasklet to run on.

Even if it’s non-cooperative, the processor’s callback methods must still make sure they don’t run for longer than a second or so at a time. Otherwise the tasklet will never be able to initiate a snapshot on the processor.

7.7. What Happens When you Submit a Job

When you submit a Job to it, Jet replicates the DAG to the whole Jet cluster and executes a copy of it on each member.

DAG Distribution

Jet executes the job on a user-configurable number of threads which use work stealing to balance the amount of work being done on each thread. Each worker thread has a list of tasklets it is in charge of and as tasklets complete at different rates, the remaining ones are moved between workers to keep the load balanced.

Each instance of a Processor is wrapped in one tasklet which the execution service repeatedly executes until it is done. A vertex with a parallelism of 8 running on 4 members would have a total of 32 tasklets running at the same time. Each member has the same number of tasklets running.

Tasklet execution model

When you make a request to execute a Job, the corresponding DAG and additional resources are deployed to the Jet cluster. Jet builds an execution plan for the DAG on each member, which creates the associated tasklets for each Vertex and connects them to their inputs and outputs.

Jet uses Single Producer/Single Consumer ringbuffers to transfer the data between processors on the same member. They are data-type agnostic, so any data type can be used to transfer the data between vertices.

Ringbuffers, being bounded queues, introduce natural backpressure into the system; if a consumer’s ringbuffer is full, the producer will have to back off until it can enqueue the next item. When data is sent to another member over the network, there is no natural backpressure, so Jet uses explicit signaling in the form of adaptive receive windows.

7.8. Distributed Snapshot

The technique Jet uses to achieve fault tolerance is called a “distributed snapshot”, described in a paper by Chandy and Lamport. At regular intervals, Jet raises a global flag that says "it’s time for another snapshot". All processors belonging to source vertices observe the flag, create a checkpoint on their source, and emit a barrier item to the downstream processors and resumes processing.

As the barrier item reaches a processor, it stops what it’s doing and emits its state to the snapshot storage. Once complete, it forwards the barrier item to its downstream processors.

Due to parallelism, in most cases a processor receives data from more than one upstream processor. It will receive the barrier item from each of them at separate times, but it must start taking a snapshot at a single point in time. There are two approaches it can take, as explained below.

7.8.1. Exactly-Once Snapshotting

With exactly-once configured, as soon as the processor gets a barrier item in any input stream (from any upstream processor), it must stop consuming it until it gets the same barrier item in all the streams:

Exactly-once processing: received one barrier
  1. At the barrier in stream X, but not Y. Must not accept any more X items.

    Exactly-once processing: received both barriers
  2. At the barrier in both streams, taking a snapshot.

    Exactly-once processing: forward the barrier
  3. Snapshot done, barrier forwarded. Can resume consuming all streams.

7.8.2. At-Least-Once Snapshotting

With at-least-once configured, the processor can keep consuming all the streams until it gets all the barriers, at which point it will stop to take the snapshot:

At-Least-once processing: received one barrier
  1. At the barrier in stream X, but not Y. Carry on consuming all streams.

    At-Least-once processing: received both barriers
  2. At the barrier in both streams, already consumed x1 and x2. Taking a snapshot.

    At-Least-once processing: forward the barrier
  3. Snapshot done, barrier forwarded.

Even though x1 and x2 occur after the barrier, the processor consumed and processed them, updating its state accordingly. If the computation job stops and restarts, this state will be restored from the snapshot and then the source will replay x1 and x2. The processor will think it got two new items.

7.9. Stream Skew

We explained how we use the concept of watermark to impose order onto a disordered data stream. However, items arriving out of order aren’t our only challenge; modern stream sources like Kafka are partitioned and distributed so “the stream” is actually a set of independent substreams, moving on in parallel. Substantial time difference may arise between events being processed on each one, but our system must produce coherent output as if there was only one stream. We meet this challenge by coalescing watermarks: as the data travels over a partitioned/distributed edge, we make sure the downstream processor observes the correct watermark value, which is the least of watermarks received from the contributing substreams.

7.9.1. Rules of Watermark Propagation

Watermark objects are sent interleaved with other stream items, but are handled specially:

  • The value of the watermark a processor emits must be strictly increasing. Jet will throw an exception if it detects a non-increasing watermark.

  • When a processor receives and handles a watermark, it is automatically emitted to the outbox. Therefore there should be only one processor emitting watermarks in the pipeline.

  • The watermark item is always broadcast, regardless of the edge type. This means that all N upstream processors send their watermark to all M downstream processors.

  • The processor will observe only the highest watermark received from all upstream processors and from all upstream edges. This is called watermark coalescing.

Jet’s internal class WatermarkCoalescer manages watermarks received from multiple inputs. As it receives watermark items from them, its duty is to decide when to forward the watermark downstream. This happens at two levels: * between multiple queues backing single edge * between multiple input edges to single processor

7.9.2. Idle inputs

A special object called idle message can be emitted from source processor when the processor sees no events for configured idle timeout. This can happen in real life when some external partitions have no events while others do.

When an idle message is received from an input, that input will be excluded from watermark coalescing. This means that we will not wait to receive watermark from idle input. It will cause that other active inputs can be processed without any delay. When idle timeout is disabled and some processor doesn’t emit any watermarks (because it sees no events), the processing will stall indefinitely (unless maximum retention is configured).

7.10. The Pitfalls of At-Least-Once Processing

In some cases at-least-once semantics can have consequences of quite an unexpected magnitude, as we discuss next.

7.10.1. Apparent Data Loss

Imagine a very simple kind of processor: it matches up the items that belong to a pair based on some rule. If it receives item A first, it remembers it. Later on, when it receives item B, it emits that fact to its outbound edge and forgets about the two items. It may also first receive B and wait for A.

Now imagine this sequence: A → BARRIER → B. In at-least-once the processor may observe both A and B, emit its output, and forget about them, all before taking the snapshot. After the restart, item B will be replayed because it occurred after the last barrier, but item A won’t. Now the processor is stuck forever in a state where it’s expecting A and has no idea it already got it and emitted that fact.

Problems similar to this may happen with any state the processor keeps until it has got enough information to emit the results and then forgets it. By the time it takes a snapshot, the post-barrier items will have caused it to forget facts about some pre-barrier items. After a restart it will behave as though it has never observed those pre-barrier items, resulting in behavior equivalent to data loss.

7.10.2. Non-Monotonic Watermark

One special case of the above story concerns watermark items. Thanks to watermark coalescing, processors are typically implemented against the invariant that the watermark value always increases. However, in at-least-once the post-barrier watermark items will advance the processor’s watermark value. After the job restarts and the state gets restored to the snapshotted point, the watermark will appear to have gone back, breaking the invariant. This can again lead to apparent data loss.

8. Expert Zone — The Core API

This section covers the Core API, Jet’s low-level API that directly exposes the computation engine’s raw features. If you are looking for the API to build your computation pipeline, please refer to the Pipeline API section.

Creating a Core API DAG requires expert-level familiarity with concepts like partitioning schemes, vertex parallelism, distributed vs. local edges, etc. Furthermore, this API offers no static type safety and it is very easy to create a DAG that fails with a ClassCastException when executed. Even though it is possible, this API is not intended to create DAGs by hand; it offers the infrastructure on top of which to build high-level DSLs and APIs that describe computation jobs.

Implementing a Core API Processor requires even greater expertise than building a DAG. Among other things, you have to be acquainted in detail with Jet’s concept of cooperative multithreading. While we provide as much convenience as we can for extending Jet with your custom processors, we cannot remove the dangers of using these facilities improperly.

8.1. DAG

The DAG-building API is centered around the DAG class. This is a pure data class and can be instantiated on its own, without a Jet instance. This makes it simple to separate the job-describing code from the code that manages the lifecycle of Jet instances.

You can "compile" a Jet pipeline into a DAG:

DAG dag = pipeline.toDag();

Studying these DAGs may be a useful aid while learning the Core API.

To start building a DAG from scratch, write

DAG dag = new DAG();

A good practice is to structure the DAG-building code into the following sections:

  1. Create all the vertices.

  2. Configure the local parallelism of vertices.

  3. Create the edges.

Example:

(1)
Vertex source = dag.newVertex("source",
        SourceProcessors.readFilesP(".", UTF_8, "*", false, (file, line) -> line)
);
Vertex transform = dag.newVertex("transform", mapP(
        (String line) -> entry(line, line.length())
));
Vertex sink = dag.newVertex("sink", SinkProcessors.writeMapP("sinkMap"));

(2)
source.localParallelism(1);

(3)
dag.edge(between(source, transform));
dag.edge(between(transform, sink));
1 Create the vertices
2 Configure local parallelism
3 Create the edges

8.1.1. Creating a Vertex

The two mandatory elements of creating a vertex are its string identifier and the supplier of processors. The latter can be provided in three variants, differing in the degree of explicit control over the lifecycle management of the processors. From simple to complex they are:

  1. DistributedSupplier<Processor> directly returns processor instances from its get() method. It is expected to be stateless and return equivalent instances on each call. It doesn’t provide any initialization or cleanup code.

  2. ProcessorSupplier returns in a single call all the processors that will run on a single cluster member. It may specialize each instance, for example to achieve local data partitioning. It is also in charge of the member-local lifecycle (initialization and destruction).

  3. ProcessorMetaSupplier returns in a single call an object that will be in charge of creating all the processors for a vertex. Given a list of member addresses, the object it returns is a Function<Address, ProcessorSupplier> which will then be called with each of the addresses from the list to retrieve the ProcessorSupplier specialized for the given member.

ProcessorMetaSupplier is the most fundamental facility. It is a factory of ProcessorSupplier s. DistributedSupplier<Processor> exists purely as convenience over ProcessorSupplier for the simplest kinds of vertices.

You make the choice which of the three to use for a particular vertex when you implement it. When you build a DAG from already implemented vertices, you don’t have to care, or even know, which one it’s using. You’ll call a factory method that returns one or the other and they will integrate the same way into your newVertex() calls.

8.1.2. Local and Global Parallelism of Vertex

The vertex is implemented at runtime by one or more instances of Processor on each member. Each vertex can specify how many of its processors will run per cluster member using the localParallelism property; every member will have the same number of processors. A new Vertex instance has this property set to -1, which requests to use the default value equal to the configured size of the cooperative thread pool. The latter defaults to Runtime.availableProcessors() and is configurable via InstanceConfig.setCooperativeThreadCount().

In most cases the only level of local parallelism that you’ll want to explicitly configure is 1 for the cases where no parallelism is desirable (e.g. on a source processor reading from a file).

The global parallelism of a vertex is the total number of its processors running in the whole cluster, equal to local parallelism multiplied by cluster size. It is especially relevant to the distribution of partitions among processors. You should watch out for and avoid the possibility that the global parallelism exceeds the number of partitions because that would mean some processors get no input/produce no output. This has severe consequences in a streaming job with watermarks: the event skew grows without bound.

8.1.3. Edge Ordinal

An edge is connected to a vertex at a given ordinal, which identifies it to the vertex and its processors. When a processor receives an item, it knows which ordinal it came from. Things are similar on the outbound side: the processor emits an item to a given ordinal, but also has the option to emit the same item to all ordinals. This is the most typical case and allows easy replication of a data stream across several edges.

When you use the between(a, b) edge factory, the edge will be connected at ordinal 0 at both ends. When you need a different ordinal, use the from(a, ord1).to(b, ord2) form. There must be no gaps in ordinal assignment, which means a vertex will have inbound edges with ordinals 0..N and outbound edges with ordinals 0..M.

This example shows the usage of between() and from().to() forms to build a DAG with one source feeding two computational vertices:

Vertex source = dag.newVertex("source",
        SourceProcessors.readFilesP(".", UTF_8, "*", false, (file, line) -> line)
);
Vertex toUpper = dag.newVertex("toUpper", mapP((String in) -> in.toUpperCase()));
Vertex toLower = dag.newVertex("toLower", mapP((String in) -> in.toLowerCase()));

dag.edge(between(source, toUpper));
dag.edge(from(source, 1).to(toLower));

8.1.4. Local and Distributed Edge

A major choice to make in terms of routing the data coming out of a processor is whether the candidate set of the target vertex’s processors is unconstrained, encompassing all its processors across the cluster, or constrained to just those running on the same cluster member. You control this with the distributed property of the edge. By default the edge is local and calling the distributed() method removes this restriction.

You can minimize network traffic by employing local edges. They are implemented with the most efficient kind of concurrent queue: single-producer, single-consumer array-backed queue. It employs wait-free algorithms on both sides and avoids even the latency of volatile writes by using lazySet.

A good example of employing a local-distributed edge combo is two-stage aggregation. Here’s how it looks on our Word Count example:

dag.edge(between(tokenize, accumulate)
           .partitioned(wholeItem(), HASH_CODE))
   .edge(between(accumulate, combine)
           .distributed()
           .partitioned(entryKey()));

Note that only the edge from accumulate to combine is distributed.

8.1.5. Routing Policies

The routing policy decides which of the processors in the candidate set to route each particular item to.

Unicast

This is the default routing policy, the one you get when you write

dag.edge(between(input, output));

For each item it chooses a single destination processor with no further restrictions on the choice. The only guarantee given by this policy is that exactly one processor will receive the item, but Jet also takes care to “spray” the items equally over all the reception candidates.

This choice makes sense when the data doesn’t have to be partitioned, usually implying a downstream vertex which can compute the result based on each item in isolation.

Isolated

This is a more restricted kind of unicast policy: any given downstream processor receives data from exactly one upstream processor. In some DAG setups yoo may need to apply selective backpressure to individual upstream processors, with this policy you can achieve it. If the connected vertices have equal parallelism, an isolated edge creates a one-to-one mapping between their processors, preserving the encounter order and partitioning.

Activate this policy by calling isolated() on the edge:

dag.edge(between(input, output).isolated());
Broadcast

A broadcasting edge sends each item to all candidate receivers. Due to the redundancy it creates, this kind of edge isn’t appropriate for the main data stream. Its purpose is dispatching a small amount of data to all the processors of a vertex, typically as a part of setting up before processing the data stream. For this reason a broadcast edge is often high-priority as well. For example, in a hash-join one vertex creates the lookup hashtable and sends the same instance to all the processors of the next vertex, the one that processes the main data stream.

Activate this policy by calling broadcast() on the edge:

dag.edge(between(input, output).broadcast());
Partitioned

A partitioned edge sends each item to the one processor responsible for the item’s partition ID. On a distributed edge, this processor will be unique across the whole cluster. On a local edge, each member will have its own processor for each partition ID.

Jet automatically assigns partitions to processors during job initialization. The number of partitions is fixed for the lifetime of a cluster and you configure it on the IMDG level with the system property hazelcast.partition.count. The number of partitions must not be less than the highest global parallelism of any vertex, otherwise some processors will get no partitions.

You can also refer to the Hazelcast Reference Manual for more details on partitioning in Hazelcast IMDG.

This is the default algorithm to determine the partition ID of an item:

  1. Apply the key extractor function defined on the edge to retrieve the partitioning key.

  2. Serialize the partitioning key to a byte array using Hazelcast serialization.

  3. Apply Hazelcast’s standard MurmurHash3-based algorithm to get the key’s hash value.

  4. Partition ID is the hash value modulo the number of partitions.

The above procedure is quite CPU-intensive, but has the crucial property of giving repeatable results across all cluster members, which may be running on disparate JVM implementations.

Another common choice is to use Java’s standard Object.hashCode(). It is often significantly faster and it’s safe to use on a local edge. On a distributed edge it is not a safe strategy in general because hashCode() 's contract does not require repeatable results across JVMs or even different instances of the same JVM version. If a given class’s Javadoc explicitly specifies the hashing function it uses, then its instances are safe to partition with hashCode().

You can provide your own implementation of Partitioner to gain full control over the partitioning strategy.

We use both partitioning strategies in the Word Count example:

dag.edge(between(tokenize, accumulate)
        .partitioned(wholeItem(), Partitioner.HASH_CODE))
   .edge(between(accumulate, combine)
           .distributed()
           .partitioned(entryKey()));

Note that the local-partitioned edge uses partitioning by hash code and the distributed edge uses the default Hazelcast partitioning. It turns out that in this particular example we could safely use the hash code both times, but we leave it like this for didactic purposes. Since much less data travels towards the combiner than towards the accumulator, the performance of the whole job is hardly affected by this choice.

All-To-One

The all-to-one routing policy is a special case of the partitioned policy which assigns the same partition ID to all items. Jet randomly chooses the partition while initializing the job. This policy makes sense on a distributed edge when all the items from all the members must go to the same member and the same processor instance running on it. You should always set the local parallelism of the target vertex to 1, otherwise there will be idle processors that never get any items.

On a local edge this policy doesn’t make sense since simply setting the local parallelism of the target vertex to 1 constrains the local choice to just one processor instance.

One case where the all-to-one routing is appropriate is global aggregation (without grouping). A single processor must see all the data. Here’s how we would set it between the first-stage and the second-stage aggregating vertex:

dag.edge(between(stage1, stage2).distributed().allToOne());

8.1.6. Priority

By default the processor receives items from all inbound edges as they arrive. However, there are important cases where an edge must be consumed in full to make the processor ready to accept data from other edges. A major example is a “hash join” which enriches the data stream with data from a lookup table. This can be modeled as a join of two data streams where the enriching stream contains the data for the lookup table and must be consumed in full before consuming the stream to be enriched.

The priority property controls the order of consuming the edges. Edges are sorted by their priority number (ascending) and consumed in that order. Edges with the same priority are consumed without particular ordering (as the data arrives).

We can see a prioritized edge in action in the TF-IDF example:

dag.edge(between(stopwordSource, tokenize).broadcast().priority(-1));

The tokenize vertex receives a set of stopwords and filters out their occurrences from the input text. It must receive the entire set before beginning to process the text.

A Fault Tolerance Caveat

As explained in the section on the Processor API, Jet takes regular snapshots of processor state when fault tolerance is enabled. A processor will get a special item in its input stream, called a barrier. When working in the exactly once mode, as soon as it receives it, it must stop pulling the data from that stream and continue pulling from all other streams until it receives the same barrier in all of them, and then emit its state to the snapshot storage. This is in direct contradiction with the contract of edge prioritization: the processor is not allowed to consume any other streams before having fully exhausted the prioritized ones.

For this reason Jet does not initiate a snapshot until all the high-priority edges have been fully consumed.

Although strictly speaking this only applies to the exactly once mode, Jet postpones taking the snapshot in at least once mode as well. Even though the snapshot could begin early, it would still not be able to complete until the processor has consumed all the prioritized edges, started consuming non-prioritized ones, and received the barrier in all of them. The result would be many more items processed twice after the restart.

8.1.7. Fine-Tuning Edges

Edges can be configured with an EdgeConfig instance, which specifies additional fine-tuning parameters. For example,

dag.edge(between(tickerSource, generateTrades)
        .setConfig(new EdgeConfig().setQueueSize(512)));

Please refer to the Javadoc of EdgeConfig for details.

8.2. How to Build a DAG

8.2.1. Bounded Stream (Batch) DAG

Let’s use the Core API to build the Word Count DAG, already described in an earlier section):

Word Count DAG

We start by instantiating the DAG class and adding the source vertex:

DAG dag = new DAG();
Vertex source = dag.newVertex("source", SourceProcessors.readMapP("lines"));

Note how we can build the DAG outside the context of any running Jet instances: it is a pure POJO.

The source vertex will read the lines from the IMap and emit items of type Map.Entry<Integer, String> to the next vertex. The key of the entry is the line number, and the value is the line itself. The built-in map-reading processor will do just what we want: on each member it will read only the data local to that member.

The next vertex is the tokenizer, which does a simple "flat-mapping" operation (transforms one input item into zero or more output items). The low-level support for such a processor is a part of Jet’s library, we just need to provide the mapping function:

// (lineNum, line) -> words
Pattern delimiter = Pattern.compile("\\W+");
Vertex tokenize = dag.newVertex("tokenize",
        Processors.flatMapP((Entry<Integer, String> e) ->
                traverseArray(delimiter.split(e.getValue().toLowerCase()))
                        .filter(word -> !word.isEmpty()))
);

This creates a processor that applies the given function to each incoming item, obtaining zero or more output items, and emits them. Specifically, our processor accepts items of type Entry<Integer, String>, splits the entry value into lowercase words, and emits all non-empty words. The function must return a Traverser, which is a functional interface used to traverse a sequence of non-null items. Its purpose is equivalent to the standard Java Iterator, but avoids the cumbersome two-method API. Since a lot of support for cooperative multithreading in Hazelcast Jet deals with sequence traversal, this abstraction simplifies many of its aspects.

The next vertex will do the actual word count. We can use the built-in accumulateByKey processor for this:

// word -> (word, count)
Vertex accumulate = dag.newVertex("accumulate",
        Processors.accumulateByKeyP(singletonList(wholeItem()), counting())
);

This processor maintains a hashtable that maps each distinct key to its accumulated value. We specify wholeItem() as the key extractor function: our input item is just the word, which is also the grouping key. The second argument is the kind of aggregate operation we want to perform: counting. We are relying on Jet’s out-of-the-box definitions here, but it is easy to define your own aggregate operations and key extractors. The processor emits nothing until it has received all the input, and at that point it emits the hashtable as a stream of Entry<String, Long>.

Next is the combining step which computes the grand totals from individual members' contributions. This is the code:

// (word, count) -> (word, count)
Vertex combine = dag.newVertex("combine",
        Processors.combineByKeyP(counting(), Util::entry)
);

combineByKey is designed to be used downstream of accumulateByKey, which is why it doesn’t need an explicit key extractor. The aggregate operation must be the same as on accumulateByKey.

The final vertex is the sink; we want to store the output in another IMap:

Vertex sink = dag.newVertex("sink", SinkProcessors.writeMapP("counts"));

Now that we have all the vertices, we must connect them into a graph and specify the edge type as discussed in the previous section. Here’s the code:

dag.edge(between(source, tokenize))
   .edge(between(tokenize, accumulate)                            (1)
           .partitioned(wholeItem(), Partitioner.HASH_CODE))
   .edge(between(accumulate, combine)                             (2)
           .distributed()
           .partitioned(entryKey()))
   .edge(between(combine, sink));
1 Here we chose a local partitioned edge. For each word, there will be a processor responsible for it on each member so that no items must travel across the network. In the partitioned() call we specify two things: the function that extracts the partitioning key (wholeItem() — same as the grouping key extractor), and the policy object that decides how to compute the partition ID from the key. Here we use the built-in HASH_CODE, which will derive the ID from Object.hashCode(). As long as the the definitions of equals()/hashCode() on the key object match our expected notion of key equality, this policy is always safe to use on a local edge.
2 is a distributed partitioned edge: for each word there is a single combiner processor in the whole cluster responsible for it and items will be sent over the network if needed. The partitioning key is again the word, but here it is the key part of the Map.Entry<String, Long>. We are using the default partitioning policy here (Hazelcast’s own partitioning scheme). It is the slower-but-safe choice on a distributed edge. Detailed inspection shows that hashcode-based partitioning would be safe as well because all of String, Long, and Map.Entry have the hash function specified in their Javadoc.

You can acces a full, self-contained Java program with the above DAG code at the Hazelcast Jet Reference Manual repository.

8.2.2. Unbounded Stream DAG

For this example we’ll build a simple Jet job that monitors trading events on a stock market, categorizes the events by stock ticker, and reports the number of trades per time unit (the time window). In terms of DAG design, not much changes going from batch to streaming. This is how it looks:

Trade monitoring DAG

We have the same cascade of source, two-stage aggregation, and sink. The source is the event journal of a Hazelcast IMap (we assume some other process continuously updates this map with trade events). On the sink side there’s another mapping vertex, format-output, that transforms the window result items into lines of text. The sink vertex writes these lines to a file.

Here’s the DAG-building code in full:

DistributedToLongFunction<? super Trade> timestampFn = Trade::timestamp;
DistributedFunction<? super Trade, ?> keyFn = Trade::productId;
SlidingWindowPolicy winPolicy = slidingWinPolicy(
        SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS);

DAG dag = new DAG();
Vertex tradeSource = dag.newVertex("trade-source",
        SourceProcessors.<Trade, Long, Trade>streamMapP(
                TRADES_MAP_NAME,
                alwaysTrue(),                              (1)
                EventJournalMapEvent::getNewValue,         (1)
                JournalInitialPosition.START_FROM_OLDEST,  (2)
                wmGenParams(
                        timestampFn,                       (3)
                        limitingLag(SECONDS.toMillis(3)),  (4)
                        emitByFrame(winPolicy),            (5)
                        SECONDS.toMillis(3)                (6)
                )));
Vertex slidingStage1 = dag.newVertex("sliding-stage-1",
        Processors.accumulateByFrameP(
                singletonList(keyFn),
                singletonList(timestampFn),
                TimestampKind.EVENT,
                winPolicy, counting()
        ));
Vertex slidingStage2 = dag.newVertex("sliding-stage-2",
    Processors.combineToSlidingWindowP(winPolicy, counting(),
            TimestampedEntry::fromWindowResult));
Vertex formatOutput = dag.newVertex("format-output", mapUsingContextP(    (7)
    ContextFactory.withCreateFn(x -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS")),
    (DateTimeFormatter timeFormat, TimestampedEntry<String, Long> tse) ->
        String.format("%s %5s %4d",
            timeFormat.format(Instant.ofEpochMilli(tse.getTimestamp())
                                     .atZone(ZoneId.systemDefault())),
            tse.getKey(), tse.getValue())
));
Vertex sink = dag.newVertex("sink", SinkProcessors.writeFileP(
        OUTPUT_DIR_NAME, Object::toString, UTF_8, false));

tradeSource.localParallelism(1);

return dag
        .edge(between(tradeSource, slidingStage1)
                .partitioned(keyFn, HASH_CODE))
        .edge(between(slidingStage1, slidingStage2)
                .partitioned(entryKey(), HASH_CODE)
                .distributed())
        .edge(between(slidingStage2, formatOutput)
                .isolated())
        .edge(between(formatOutput, sink));

You can see quite a lot of code going into the setup of the streaming source. Let’s zoom in on it:

1 filtering and mapping functions we supply directly to the source. Hazelcast IMDG will apply them before serializing and sending to Jet so this saves network traffic and CPU.
2 where to start from in map’s event journal: the oldest entry still available.
3 function to apply to the event object to get its timestamp.
4 watermark policy. Here we use the simplest kind, limitingLag, which will make the watermark lag behind the top observed event timestamp by the fixed amount we specified (3 seconds).
5 watermark emission policy that tells Jet when to actually send a watermark event. Since the sliding window processor ignores all watermark events that belong to the same frame, we configure a matching policy that emits only one watermark per frame.
6 partition idle timeout. This is a countermeasure to stalling problems that occur when some of the IMap partitions don’t receive any updates. Due to watermark coalescing this could stall the entire stream, but with this setting a partition will be marked as idle after 3 seconds of inactivity and then the rest of the system behaves as if it didn’t exist.
7 Here we use mapUsingContextP which allows us to create an object available to the processor at a late point, after all the job serialization-deserialization is done. In this case we need it because the Java 8 DateTimeFormatter isn’t serializable.

The full code of this sample is in StockExchangeCoreApi.java and running it you’ll get an endless stream of data accumulating on the disk. To spare your filesystem we’ve limited the execution time to 10 seconds.

8.2.3. Advanced Batch DAG — Inverted TF-IDF Index

In this tutorial we’ll explore what the Core API DAG model offers beyond the capabilities of the Pipeline API. Our DAG will feature splits, joins, broadcast, and prioritized edges. We’ll access data from the file system and show a simple technique to distribute file reading across Jet members. Several vertices we use can’t be implemented in terms of out-of-the-box processors, so we’ll also show you how to implement your own with minimum boilerplate.

The full code is available at the hazelcast-jet-code-samples repository:

Let us first introduce the problem. The inverted index is a basic data structure in the domain of full-text search. First used in the 1950s, it is still at the core of modern information retrieval systems such as Lucene. The goal is to be able to quickly find the documents that contain a given set of search terms, and to sort them by relevance. To understand it we’ll need to throw in some terminology.

  • A document is treated as a list of words that has a unique ID. It is useful to define the notion of a document index which maps each document ID to the list of words it contains. We won’t build this index; it’s just for the sake of explanation.

  • The inverted index is the inverse of the document index: it maps each word to the list of documents that contain it. This is the fundamental building block in our search algorithm: it will allow us to find in O(1) time all documents relevant to a search term.

  • In the inverted index, each entry in the list is assigned a TF-IDF score which quantifies how relevant the document is to the search request.

  • Let DF (document frequency) be the length of the list: the number of documents that contain the word.

  • Let D be the total number of documents that were indexed.

  • IDF (inverse document frequency) is equal to log(D/DF).

  • TF (term frequency) is the number of occurrences of the word in the document.

  • TF-IDF score is simply the product of TF * IDF.

Note that IDF is a property of the word itself: it quantifies the relevance of each entered word to the search request as a whole. The list of entered words can be perceived as a list of filtering functions that we apply to the full set of documents. A more relevant word will apply a stronger filter. Specifically, common words like “the”, “it”, “on” act as pure "pass-through" filters and consequently have an IDF of zero, making them completely irrelevant to the search.

TF, on the other hand, is the property of the combination of word and document, and tells us how relevant the document is to the word, regardless of the relevance of the word itself.

When the user enters a search phrase:

  1. each individual term from the phrase is looked up in the inverted index;

  2. an intersection is found of all the lists, resulting in the list of documents that contain all the words;

  3. each document is scored by summing the TF-IDF contributions of each word;

  4. the result list is sorted by score (descending) and presented to the user.

Let’s have a look at a specific search phrase:

the man in the black suit murdered the king

The list of documents that contain all the above words is quite long…​ how do we decide which are the most relevant? The TF-IDF logic will make those stand out that have an above-average occurrence of words that are generally rare across all documents. For example, “murdered” occurs in far fewer documents than “black”…​ so given two documents where one has the same number of “murdered” as the other one has of “black”, the one with “murdered” wins because its word is more salient in general. On the other hand, “suit” and “king” might have a similar IDF, so the document that simply contains more of both wins.

Also note the limitation of this technique: a phrase is treated as just the sum of its parts; a document may contain the exact phrase and this will not affect its score.

Building the Inverted Index with Java Streams

To warm us up, let’s see what it takes to build the inverted index with just thread parallelism and without the ability to scale out across many machines. It is expressible in Java Streams API without too much work. The full code is here.

We’ll start by preparing a Stream<Entry<Long, String>> docWords: a stream of all the words found in all the documents. We use Map.Entry as a holder of a pair of values (a 2-tuple) and here we have a pair of Long docId and String word:

Stream<Entry<Long, String>> docWords = docId2Name
        .entrySet()
        .parallelStream()
        .flatMap(TfIdfJdkStreams::docLines)
        .flatMap(this::tokenize);

We know the number of all documents so we can compute double logDocCount, the logarithm of the document count:

final double logDocCount = Math.log(docId2Name.size());

Calculating TF is very easy, just count the number of occurrences of each distinct pair and save the result in a Map<Entry<Long, String>, Long>:

Map<Entry<Long, String>, Long> tfMap = docWords
        .parallel()
        .collect(groupingBy(identity(), counting()));

And now we build the inverted index. We start from tfMap, group by word, and the list under each word already matches our final product: the list of all the documents containing the word. We finish off by applying a transformation to the list: currently it’s just the raw entries from the tf map, but we need pairs (docId, tfIDfScore).

invertedIndex = tfMap
    .entrySet()
    .parallelStream()
    .collect(groupingBy(
        e -> e.getKey().getValue(),
        collectingAndThen(
            toList(),
            entries -> {
                double idf = logDocCount - Math.log(entries.size());
                return entries.stream().map(e ->
                        tfidfEntry(e, idf)).collect(toList());
            }
        )
    ));

The search function can be implemented with another Streams expression, which you can review in the SearchGui class. You can also run the TfIdfJdkStreams class and take the inverted index for a spin, making actual searches.

There is one last concept in this model that we haven’t mentioned yet: the stopword set. It contains those words that are known in advance to be common enough to occur in every document. Without treatment, these words are the worst case for the inverted index: the document list under each such word is the longest possible, and the score of all documents is zero due to zero IDF. They raise the index’s memory footprint without providing any value. The cure is to prepare a file, stopwords.txt, which is read in advance into a Set<String> and used to filter out the words in the tokenization phase. The same set is used to cross out words from the user’s search phrase, as if they weren’t entered. We’ll add this feature to our DAG based model in the following section.

Translating to Jet DAG

Our DAG as a whole will look relatively complex, but it can be understood as a “backbone” (cascade of vertices) starting from a source and ending in a sink with several more vertices attached on the side. This is just the backbone:

Backbone of the TF-IDF DAG
  1. The data source is a Hazelcast IMap which holds a mapping from document ID to its filename. The source vertex will emit all the map’s entries, but only a subset on each cluster member.

  2. doc-lines opens each file named by the map entry and emits all its lines in the (docId, line) format.

  3. tokenize transforms each line into a sequence of its words, again paired with the document ID, so it emits (docId, word).

  4. tf builds a set of all distinct pairs emitted from tokenize and maintains the count of each pair’s occurrences (its TF score).

  5. tf-idf takes that set, groups the pairs by word, and calculates the TF-IDF scores. It emits the results to the sink, which saves them to a distributed IMap.

Edge types follow the same pattern as in the word-counting job: after flatmapping there is first a local, then a distributed partitioned edge. The logic behind it is not the same, though: TF can actually compute the final TF scores by observing just the local data. This is because it treats each document separately (document ID is a part of the grouping key) and the source data is already partitioned by document ID. The TF-IDF vertex does something similar to word count’s combining, but there’s again a twist: it will group the TF entries by word, but instead of just merging them into a single result per word, it will keep them all in lists.

To this cascade we add a stopword-source which reads the stopwords file, parses it into a HashSet, and sends the whole set as a single item to the tokenize vertex. We also add a vertex that takes the data from doc-source and simply counts its items; this is the total document count used in the TF-IDF formula. We end up with this DAG:

The TF-IDF DAG

The choice of edge types into and out of doc-count may look surprising, so let’s examine it. We start with the doc-source vertex, which emits one item per document, but its output is distributed across the cluster. To get the full document count on each member, each doc-count processor must get all the items, and that’s just what the distributed broadcast edge will achieve. We’ll configure doc-count with local parallelism of 1, so there will be one processor on every member, each observing all the doc-source items. The output of doc-count must reach all tf-idf processors on the same member, so we use the local broadcast edge.

Another thing to note are the two flat-mapping vertices: doc-lines and tokenize. From a purely semantic standpoint, composing flatmap with flatmap yields just another flatmap. As we’ll see below, we’re using custom code for these two processors…​ so why did we choose to separate the logic this way? There are actually two good reasons. The first one has to do with Jet’s cooperative multithreading model: doc-lines makes blocking file IO calls, so it must be declared non-cooperative; tokenization is pure computation so it can be in a cooperative processor. The second one is more general: the workload of doc-lines is very uneven. It consists of waiting, then suddenly coming up with a whole block of data. If we left tokenization there, performance would suffer because first the CPU would be forced to sit idle, then we’d be late in making the next IO call while tokenizing the input. The separate vertex can proceed at full speed all the time.

Implementation Code

As we announced, some of the processors in our DAG will need custom implementation code. Let’s start from the source vertex. It is easy, just the standard IMap reader:

dag.newVertex("doc-source", readMapP(DOCID_NAME));

The stopwords-producing processor has custom code, but it’s quite simple:

dag.newVertex("stopword-source", StopwordsP::new);
private static class StopwordsP extends AbstractProcessor {
    @Override
    public boolean complete() {
        return tryEmit(docLines("stopwords.txt").collect(toSet()));
    }
}

Since this is a source processor, all its action happens in complete(). It emits a single item: the HashSet built directly from the text file’s lines.

The doc-count processor can be built from the primitives provided in Jet’s library:

dag.newVertex("doc-count", Processors.aggregateP(counting()));

The doc-lines processor is more of a mouthful, but still built from existing primitives:

Vertex docLines = dag.newVertex("doc-lines", flatMapUsingContextP(
        ContextFactory.withCreateFn(jet -> null).nonCooperative(),
        (Object ctx, Entry<Long, String> e) ->
        traverseStream(docLines("books/" + e.getValue())
            .map(line -> entry(e.getKey(), line)))));

Let’s break down this expression…​ Processors.flatMap returns a standard processor that emits an arbitrary number of items for each received item. We already saw one in the introductory Word Count example. There we created a traverser from an array, here we create it from a Java stream. We additionally apply the nonCooperative() wrapper which will declare all the created processors non-cooperative. We already explained why we do this: this processor will make blocking I/O calls.

tokenize is another custom vertex:

dag.newVertex("tokenize", TokenizeP::new);
private static class TokenizeP extends AbstractProcessor {
    private Set<String> stopwords;
    private final FlatMapper<Entry<Long, String>, Entry<Long, String>>
            flatMapper = flatMapper(e -> traverseStream(
            Arrays.stream(DELIMITER.split(e.getValue().toLowerCase()))
                  .filter(word -> !stopwords.contains(word))
                  .map(word -> entry(e.getKey(), word))));

    @Override
    @SuppressWarnings("unchecked")
    protected boolean tryProcess0(@Nonnull Object item) {
        stopwords = (Set<String>) item;
        return true;
    }

    @Override
    @SuppressWarnings("unchecked")
    protected boolean tryProcess1(@Nonnull Object item) {
        return flatMapper.tryProcess((Entry<Long, String>) item);
    }
}

This is a processor that must deal with two different inbound edges. It receives the stopword set over edge 0 and then it does a flatmapping operation on edge 1. The logic presented here uses the same approach as the implementation of the provided Processors.flatMap() processor: there is a single instance of FlatMapper that holds the business logic of the transformation, and tryProcess1() directly delegates into it. If FlatMapper is done emitting the previous items, it will accept the new item, apply the user-provided transformation, and start emitting the output items. If the outbox refuses a pending item, it will return false, which will make the framework call the same tryProcess1() method later, with the same input item.

Let’s show the code that creates the tokenize 's two inbound edges:

dag.edge(between(stopwordSource, tokenize).broadcast().priority(-1))
   .edge(from(docLines).to(tokenize, 1));

Especially note the .priority(-1) part: this ensures that there will be no attempt to deliver any data coming from docLines before all the data from stopwordSource is already delivered. The processor would fail if it had to tokenize a line before it has its stopword set in place.

tf is another simple vertex, built purely from the provided primitives:

dag.newVertex("tf", aggregateByKeyP(
        singletonList(wholeItem()), counting(), Util::entry));

tf-idf is the most complex processor:

dag.newVertex("tf-idf", TfIdfP::new);
private static class TfIdfP extends AbstractProcessor {
    private double logDocCount;

    private final Map<String, List<Entry<Long, Double>>> wordDocTf =
            new HashMap<>();
    private final Traverser<Entry<String, List<Entry<Long, Double>>>>
            invertedIndexTraverser = lazy(() ->
            traverseIterable(wordDocTf.entrySet())
                    .map(this::toInvertedIndexEntry));

    @Override
    protected boolean tryProcess0(@Nonnull Object item) {
        logDocCount = Math.log((Long) item);
        return true;
    }

    @Override
    @SuppressWarnings("unchecked")
    protected boolean tryProcess1(@Nonnull Object item) {
        Entry<Entry<Long, String>, Long> e =
                (Entry<Entry<Long, String>, Long>) item;
        long docId = e.getKey().getKey();
        String word = e.getKey().getValue();
        long tf = e.getValue();
        wordDocTf.computeIfAbsent(word, w -> new ArrayList<>())
                 .add(entry(docId, (double) tf));
        return true;
    }

    @Override
    public boolean complete() {
        return emitFromTraverser(invertedIndexTraverser);
    }

    private Entry<String, List<Entry<Long, Double>>> toInvertedIndexEntry(
            Entry<String, List<Entry<Long, Double>>> wordDocTf
    ) {
        String word = wordDocTf.getKey();
        List<Entry<Long, Double>> docidTfs = wordDocTf.getValue();
        return entry(word, docScores(docidTfs));
    }

    private List<Entry<Long, Double>> docScores(
            List<Entry<Long, Double>> docidTfs
    ) {
        double logDf = Math.log(docidTfs.size());
        return docidTfs.stream()
                       .map(tfe -> tfidfEntry(logDf, tfe))
                       .collect(toList());
    }

    private Entry<Long, Double> tfidfEntry(
            double logDf, Entry<Long, Double> docidTf
    ) {
        Long docId = docidTf.getKey();
        double tf = docidTf.getValue();
        double idf = logDocCount - logDf;
        return entry(docId, tf * idf);
    }
}

This is quite a lot of code, but each of the three pieces is not too difficult to follow:

  1. tryProcess0() accepts a single item, the total document count.

  2. tryProcess1() performs a boilerplate group-and-aggregate operation, collecting a list of items under each key.

  3. complete() outputs the accumulated results, also applying the final transformation on each one: replacing the TF score with the final TF-IDF score. It relies on a lazy traverser, which holds a Supplier<Traverser> and will obtain the inner traverser from it the first time next() is called. This makes it very simple to write code that obtains a traverser from a map after it has been populated.

Finally, our DAG is terminated by a sink vertex:

dag.newVertex("sink", SinkProcessors.writeMapP(INVERTED_INDEX));

8.3. Processor

Processor is the main type whose implementation is up to the user of the Core API: it contains the code of the computation to be performed by a vertex. There are a number of Processor building blocks in the Core API which allow you to just specify the computation logic, while the provided code handles the processor’s cooperative behavior. Please refer to the AbstractProcessor section.

A processor’s work can be conceptually described as follows: "receive data from zero or more input streams and emit data into zero or more output streams." Each stream maps to a single DAG edge (either inbound or outbound). There is no requirement on the correspondence between input and output items; a processor can emit any data it sees fit, including none at all. The same Processor abstraction is used for all kinds of vertices, including sources and sinks.

The implementation of a processor can be stateful and does not need to be thread-safe because Jet guarantees to use the processor instances from one thread at a time, although not necessarily always the same thread.

8.3.1. Cooperativeness

Processor instances are cooperative by default. The processor can opt out of cooperative multithreading by overriding isCooperative() to return false. Jet will then start a dedicated thread for it.

To maintain an overall good throughput, a cooperative processor must take care not to hog the thread for too long (a rule of thumb is up to a millisecond at a time). Jet’s design strongly favors cooperative processors and most processors can and should be implemented to fit these requirements. The major exception are sources and sinks because they often have no choice but calling into blocking I/O APIs.

8.3.2. The Outbox

The processor sends its output items to its Outbox which has a separate bucket for each outbound edge. The buckets have limited capacity and will refuse an item when full. A cooperative processor should be implemented such that when the outbox refuses its item, it saves its processing state and returns from the processing method. The execution engine will then drain the outbox buckets.

8.3.3. Data Processing Callbacks

process(ordinal, inbox)

Jet passes the items received over a given edge to the processor by calling process(ordinal, inbox). All items received since the last process() call are in the inbox, but also all the items the processor hasn’t removed in a previous process() call. There is a separate instance of Inbox for each inbound edge, so any given process() call involves items from only one edge.

The processor must not remove an item from the inbox until it has fully processed it. This is important with respect to the cooperative behavior: the processor may not be allowed to emit all items corresponding to a given input item and may need to return from the process() call early, saving its state. In such a case the item should stay in the inbox so Jet knows the processor has more work to do even if no new items are received.

tryProcessWatermark(watermark)

When new highest watermark is received from all input edges and all input processor instances, the tryProcessWatermark(watermark) method is called. The watermark value is always greater than in the previous call.

The implementation may choose to process only partially and return false, in which case it will be called again later with the same timestamp before any other processing method is called. When the method returns true, the watermark is forwarded to the downstream processors.

tryProcess()

If a processor’s inbox is empty, Jet will call its tryProcess() method instead. This allows the processor to perform work that is not input data-driven. The method has a boolean return value and if it returns false, it will be called again before any other methods are called. This way it can retry emitting its output until the outbox accepts it.

An important use case for this method is the emission of watermark items. A job that processes an infinite data stream may experience occasional lulls - periods with no items arriving. On the other hand, a windowing processor is not allowed to act upon each item immediately due to event skew; it must wait for a watermark item to arrive. During a stream lull this becomes problematic because the watermark itself is primarily data-driven and advances in response to the observation of event timestamps. The watermark-inserting processor must be able to advance the watermark even during a stream lull, based on the passage of wall-clock time, and it can do it inside the tryProcess() method.

complete()

Jet calls complete() when all the input edges are exhausted. It is the last method to be invoked on the processor before disposing of it. Typically this is where a batch processor emits the results of an aggregating operation. If it can’t emit everything in a given call, it should return false and will be called again later.

8.3.4. Snapshotting Callbacks

Hazelcast Jet supports fault-tolerant processing jobs by taking distributed snapshots. In regular time intervals each of the source vertices will perform a snapshot of its own state and then emit a special item to its output stream: a barrier. The downstream vertex that receives the barrier item makes its own snapshot and then forwards the barrier to its outbound edges, and so on towards the sinks.

At the level of the Processor API the barrier items are not visible; ProcessorTasklet handles them internally and invokes the snapshotting callback methods described below.

saveToSnapshot()

Jet will call saveToSnapshot() when it determines it’s time for the processor to save its state to the current snapshot. Except for source vertices, this happens when the processor has received the barrier item from all its inbound streams and processed all the data items preceding it. The method must emit all its state to the special snapshotting bucket in the Outbox, by calling outbox.offerToSnapshot(). If the outbox doesn’t accept all the data, it must return false to be called again later, after the outbox has been flushed.

When this method returns true, ProcessorTasklet will forward the barrier item to all the outbound edges.

restoreFromSnapshot()

When a Jet job is restarting after having been suspended, it will first reload all the state from the last successful snapshot. Each processor will get its data through the invocations of restoreFromSnapshot(). Its parameter is the Inbox filled with a batch of snapshot data. The method will be called repeatedly until it consumes all the snapshot data.

finishSnapshotRestore()

After it has delivered all the snapshot data to restoreFromSnapshot(), Jet will call finishSnapshotRestore(). The processor may use it to initialize some transient state from the restored state.

8.3.5. Best Practice: Document At-Least-Once Behavior

As we discuss in the Under the Hood chapter, the behavior of a processor under at-least-once semantics can deviate from correctness in extremely non-trivial and unexpected ways. Therefore the processor should always document its possible behaviors for that case.

8.4. AbstractProcessor

AbstractProcessor is a convenience class designed to deal with most of the boilerplate in implementing the full Processor API.

8.4.1. Receiving items

On the reception side the first line of convenience are the tryProcessN() methods. While in the inbox the watermark and data items are interleaved, these methods take care of the boilerplate needed to filter out the watermarks. Additionally, they get one item at a time, eliminating the need to write a suspendable loop over the input items.

There is a separate method specialized for each edge from 0 to 4 (tryProcess0..tryProcess4) and a catch-all method tryProcess(ordinal, item). If the processor doesn’t need to distinguish between the inbound edges, the latter method is a good match; otherwise, it is simpler to implement one or more of the ordinal-specific methods. The catch-all method is also the only way to access inbound edges beyond ordinal 4, but such cases are very rare in practice.

Paralleling the above there are tryProcessWm(ordinal, wm) and tryProcessWmN(wm) methods that get just the watermark items.

8.4.2. Emitting items

AbstractProcessor has a private reference to its outbox and lets you access all its functionality indirectly. The tryEmit() methods offer your items to the outbox. If you get a false return value, you must stop emitting items and return from the current callback method of the processor. For example, if you called tryEmit() from tryProcess0(), you should return false so Jet will call tryProcess0() again later, when there’s more room in the outbox. Similar to these methods there are tryEmitToSnapshot() and emitFromTraverserToSnapshot(), to be used from the saveToSnapshot() callback.

Implementing a processor to respect the above rule is quite tricky and error-prone. Things get especially tricky when there are several items to emit, such as:

  • when a single input item maps to many output items

  • when the processor performs a group-by-key operation and emits each group as a separate item

You can avoid most of the complexity if you wrap all your output in a Traverser. Then you can simply say return emitFromTraverser(myTraverser). It will:

  1. try to emit as many items as possible

  2. return false if the outbox refuses an item

  3. hold on to the refused item and continue from it when it’s called again with the same traverser.

There is one more layer of convenience relying on emitFromTraverser(): the nested class FlatMapper, which makes it easy to implement a flatmapping kind of transformation. It automatically handles the concern of creating a new traverser when the previous one is exhausted and reusing the previous one until exhausted.

8.4.3. Traverser

Traverser is a very simple functional interface whose shape matches that of a Supplier, but with a contract specialized for the traversal over a sequence of non-null items: each call to its next() method returns another item of the sequence until exhausted, then keeps returning null. A traverser may also represent an infinite, non-blocking stream of items: it may return null when no item is currently available, then later return more items.

The point of this type is the ability to implement traversal over any kind of dataset or lazy sequence with minimum hassle: often just by providing a one-liner lambda expression. This makes it very easy to integrate with Jet’s convenience APIs for cooperative processors.

Traverser also supports some default methods that facilitate building a simple transformation layer over the underlying sequence: map, filter, flatMap, etc.

The following example shows how you can implement a simple flatmapping processor:

class ItemAndSuccessorP extends AbstractProcessor {
    private final FlatMapper<Integer, Integer> flatMapper =
            flatMapper(i -> traverseIterable(asList(i, i + 1)));

    @Override
    protected boolean tryProcess(int ordinal, Object item) {
        return flatMapper.tryProcess((int) item);
    }
}

For each received Integer this processor emits the number and its successor. If the outbox refuses an item, flatMapper.tryProcess() returns false and stays ready to resume the next time it is invoked. The fact that it returned false signals Jet to invoke ItemAndSuccessorP.tryProcess() again with the same arguments.

8.5. WatermarkPolicy

As mentioned in the Work_with_Jet chapter, determining the watermark is somewhat of a black art; it’s about superimposing order over a disordered stream of events. We must decide at which point it stops making sense to wait even longer for data about past events to arrive. There’s a tension between two opposing forces here:

  • wait as long as possible to account for all the data;

  • get results as soon as possible.

While there are ways to (kind of) achieve both, there’s a significant associated cost in terms of complexity and overall performance. Hazelcast Jet takes a simple approach and strictly triages stream items into “still on time” and “late”, discarding the latter.

WatermarkPolicy is the abstraction that computes the value of the watermark for a (sub)stream of disordered data items. It takes as input the timestamp of each observed item and outputs the current watermark value.

8.5.1. Predefined watermark policies

We provide some general, data-agnostic watermark policies in the WatermarkPolicies class. They vary in how well they deal with advancing the watermark during a stream lull. The better they deal with it, the more assumptions they must make on the nature of the events' timestamp values and on the relationship between the timestamps and the locally observed wall-clock time.

“Limiting Lag”

The limitingLag() policy will maintain a watermark that lags behind the highest observed event timestamp by a configured amount. In other words, each time an event with the highest timestamp so far is encountered, this policy advances the watermark to eventTimestamp - lag. This puts a limit on the spread between timestamps in the stream: all events whose timestamp is more than the configured lag behind the highest timestamp are considered late.

“Limiting Lag and Delay”

The limitingLagAndDelay() policy applies the same fixed-lag logic as above and adds another limit: maximum delay from observing an item and advancing the watermark to at least that item’s timestamp. A stream may experience a lull (no items arriving) and this added limit will ensure that the watermark doesn’t stay behind the highest timestamp observed before the onset of the lull. However, the skew between substreams may still cause the watermark that reaches the downstream vertex to stay behind some timestamps. This is because the downstream will only get the lowest of all substream watermarks.

The advantage of this policy is that it doesn’t assume anything about the unit of measurement used for event timestamps.

“Limiting Lag and Lull”

The limitingLagAndLull() policy is similar to limitingLagAndDelay in addressing the stream lull problem and goes a step further by addressing the issues of lull combined with skew. To achieve this it must introduce an assumption, though: that the time unit used for event timestamps is milliseconds. After a given period passes with the watermark not being advanced by the arriving data (i.e., a lull happens), it will start advancing it in lockstep with the passage of the local system time. The watermark isn’t adjusted towards the local time; the policy just ensures the difference between local time and the watermark stays the same during a lull. Since the system time advances equally on all substream processors, the watermark propagated to downstream is now guaranteed to advance regardless of the lull.

There is, however, a subtle issue with limitingLagAndLull(): if there is any substream that never observes an item, that substream’s policy instance won’t be able to initialize its “last seen timestamp” and will cause the watermark sent to the downstream to forever lag behind all the actual data.

“Limiting Timestamp and Wall-Clock Lag”

The limitingTimestampAndWallClockLag() policy makes a stronger assumption: that the event timestamps are in milliseconds since the Unix epoch and that they are synchronized with the local time on the processing machine. It puts a limit on how much the watermark can lag behind the local time. As long as its assumption holds, this policy gives straightforward results. It also doesn’t suffer from the subtle issue with limitingLagAndLull().

8.5.2. Watermark Throttling

The policy objects presented above will return the “ideal” watermark value according to their logic; however it would be too much overhead to insert a watermark item each time the ideal watermark advances (typically a thousand times per second). WatermarkEmissionPolicy is the object that decides whether to emit a watermark item given the last emitted and the current value of the watermark. For the purpose of sliding windows there is an easy answer: suppress all watermark items that belong to the same frame as the already emitted one. Such items would have no effect since the watermark must advance beyond a frame’s end for the aggregating vertex to consider the frame completed and act upon its results. The method emitByFrame() will return a policy with this kind of throttling applied. For other cases there is emitByMinStep() which suppresses watermark items until the watermark has advanced at least minStep ahead of the previously emitted one.

8.5.3. Maximum Watermark Retention When Merging Substreams

When two input streams are merged into one for downstream processing, Jet waits for the watermark to advance in all substreams in order to advance the overall watermark. The process that does this is called watermark coalescing and it results in increased latency of the output with respect to the input and possibly also increased memory usage due to the retention of all the pending data.

The skew between two distributed streams is defined as the difference in their watermark values. There is always some skew in the system and it’s acceptable, but it can grow very large due to various causes such as a hiccup on one of the cluster members (a long GC pause, for example), external source hiccup, non-balanced partitions and so on.

An option to limit the watermark retention is available using `JobConfig.setMaxWatermarkRetainMillis() `. The option sets the maximum time to retain the watermarks while coalescing them. A negative value disables the limit and Jet will retain the watermark as long as needed. With this setting you choose a trade-off between latency and correctness that arises when dealing with stream skew.

8.6. Vertices in the Library

While formally there’s only one kind of vertex in Jet, in practice there is an important distinction between the following:

  • A source is a vertex with no inbound edges. It injects data from the environment into the Jet job.

  • A sink is a vertex with no outbound edges. It drains the output of the Jet job into the environment.

  • A computational vertex has both kinds of edges. It accepts some data from upstream vertices, performs some computation, and emits the results to downstream vertices. Typically it doesn’t interact with the environment.

The com.hazelcast.jet.processor package contains static utility classes with factory methods that return suppliers of processors, as required by the dag.newVertex(name, procSupplier) calls. There is a convention in Jet that every module containing vertex implementations contributes a utility class to the same package. Inspecting the contents of this package in your IDE should allow you to discover all vertex implementations available on the project’s classpath. For example, there are modules that connect to 3rd party resources like Kafka and Hadoop Distributed File System (HDFS). Each such module declares a class in the same package, com.hazelcast.jet.processor, exposing the module’s source and sink definitions.

The main factory class for the source vertices provided by the Jet core module is SourceProcessors. It contains sources that ingest data from Hazelcast IMDG structures like IMap, ICache, IList, etc., as well as some simple sources that get data from files and TCP sockets (readFiles, streamSocket and some more).

Paralleling the sources there’s SinkProcessors for the sink vertices, supporting the same range of resources (IMDG, files, sockets). There’s also a general writeBuffered method that takes some boilerplate out of writing custom sinks. The user must implement a few primitives: create a new buffer, add an item to it, flush the buffer. The provided code takes care of integrating these primitives into the Processor API (draining the inbox into the buffer and handling the general lifecycle).

Finally, the computational vertices are where the main action takes place. The main class with factories for built-in computational vertices is Processors.

8.7. Implement a Custom Source or Sink Vertex

The Hazelcast Jet distribution contains vertices for the sources and sinks exposed through the Pipeline API. You can extend Jet’s support for sources and sinks by writing your own vertex implementations.

One of the main concerns when implementing a source vertex is that the data source is typically distributed across multiple machines and partitions, and the work needs to be distributed across multiple members and processors.

8.7.1. How Jet Creates and Initializes a Job

These are the steps taken to create and initialize a Jet job:

  1. The user builds the DAG and submits it to the local Jet client instance.

  2. The client instance serializes the DAG and sends it to a member of the Jet cluster. This member becomes the coordinator for this Jet job.

  3. The coordinator deserializes the DAG and builds an execution plan for each member.

  4. The coordinator serializes the execution plans and distributes each to its target member.

  5. Each member acts upon its execution plan by creating all the needed tasklets, concurrent queues, network senders/receivers, etc.

  6. The coordinator sends the signal to all members to start job execution.

The most visible consequence of the above process is the ProcessorMetaSupplier type: one must be provided for each Vertex. In Step 3, the coordinator deserializes the meta-supplier as a constituent of the DAG and asks it to create ProcessorSupplier instances which go into the execution plans. A separate instance of ProcessorSupplier is created specifically for each member’s plan. In Step 4, the coordinator serializes these and sends each to its member. In Step 5 each member deserializes its ProcessorSupplier and asks it to create as many Processor instances as configured by the vertex’s localParallelism property.

This process is so involved because each Processor instance may need to be differently configured. This is especially relevant for processors driving a source vertex: typically each one will emit only a slice of the total data stream, as appropriate to the partitions it is in charge of.

ProcessorMetaSupplier

The client serializes an instance of ProcessorMetaSupplier as part of each Vertex in the DAG. The coordinator member deserializes the instance and uses it to create ProcessorSupplier`s by calling the `ProcessorMetaSupplier.get() method. Before that, coordinator calls the init() method with a context object that you can use to get useful information. The get() method takes List<Address> as a parameter, which you should use to determine cluster members that will run the job, if needed.

ProcessorSupplier

Usually this type will be custom-implemented in the same cases where the meta-supplier is custom-implemented and will complete the logic of a distributed data source’s partition assignment. It creates instances of Processor ready to start executing the vertex’s logic.

Another use is to open and close external resources, such as files or connections. We provide CloseableProcessorSupplier for this.

8.7.2. Example - Distributed Integer Generator

Let’s say we want to write a simple source that will generate numbers from 0 to 1,000,000 (exclusive). It is trivial to write a single Processor which can do this using java.util.stream and [Traverser](AbstractProcessor#page_Traverser):

class GenerateNumbersP extends AbstractProcessor {

    private final Traverser<Integer> traverser;

    GenerateNumbersP(int upperBound) {
        traverser = Traversers.traverseStream(range(0, upperBound).boxed());
    }

    @Override
    public boolean complete() {
        return emitFromTraverser(traverser);
    }
}

Now we can build our DAG and execute it:

JetInstance jet = Jet.newJetInstance();

int upperBound = 10;
DAG dag = new DAG();
Vertex generateNumbers = dag.newVertex("generate-numbers",
        () -> new GenerateNumbersP(upperBound));
Vertex logInput = dag.newVertex("log-input",
        DiagnosticProcessors.writeLoggerP(i -> "Received number: " + i));
dag.edge(Edge.between(generateNumbers, logInput));

try {
    jet.newJob(dag).join();
} finally {
    Jet.shutdownAll();
}

When you run this code, you will see the output as below:

Received number: 4
Received number: 0
Received number: 3
Received number: 2
Received number: 2
Received number: 2

Since we are using the default parallelism setting on this vertex, several instances of the source processor were created, all of which generated the same sequence of values. Generally we want the ability to parallelize the source vertex, so we have to make each processor emit only a slice of the total data set.

So far we’ve used the simplest approach to creating processors: a DistributeSupplier<Processor> function that keeps returning equal instances of processors. Now we’ll step up to Jet’s custom interface that gives us the ability to provide a list of separately configured processors: ProcessorSupplier and its method get(int processorCount).

First we must decide on a partitioning policy: what subset will each processor emit. In our simple example we can use a simple policy: we’ll label each processor with its index in the list and have it emit only those numbers n that satisfy n % processorCount processorIndex. Let’s write a new constructor for our processor which implements this partitioning logic:

GenerateNumbersP(int upperBound, int processorCount, int processorIndex) {
    traverser = Traversers.traverseStream(
            range(0, upperBound)
                     .filter(n -> n % processorCount == processorIndex)
                     .boxed());

}

Given this preparation, implementing ProcessorSupplier is easy:

class GenerateNumbersPSupplier implements ProcessorSupplier {

    private final int upperBound;

    GenerateNumbersPSupplier(int upperBound) {
        this.upperBound = upperBound;
    }

    @Override @Nonnull
    public List<? extends Processor> get(int processorCount) {
        return
                range(0, processorCount)
                .mapToObj(index -> new GenerateNumbersP(
                        upperBound, processorCount, index))
                .collect(toList());
    }
}

Let’s use the custom processor supplier in our DAG-building code:

DAG dag = new DAG();
Vertex generateNumbers = dag.newVertex("generate-numbers",
        new GenerateNumbersPSupplier(10));
Vertex logInput = dag.newVertex("log-input",
        DiagnosticProcessors.writeLoggerP(i -> "Received number: " + i));
dag.edge(Edge.between(generateNumbers, logInput));

Now we can re-run our example and see that each number indeed occurs only once. However, note that we are still working with a single-member Jet cluster; let’s see what happens when we add another member:

JetInstance jet = Jet.newJetInstance();
Jet.newJetInstance();

int upperBound = 10;
DAG dag = new DAG();
// rest of the code same as above

Running after this change we’ll see that both members are generating the same set of numbers. This is because ProcessorSupplier is instantiated independently for each member and asked for the same number of processors, resulting in identical processors on all members. We have to solve the same problem as we just did, but at the higher level of cluster-wide parallelism. For that we’ll need the ProcessorMetaSupplier: an interface which acts as a factory of ProcessorSupplier s, one for each cluster member. Under the hood it is actually always the meta-supplier that’s created by the DAG-building code; the above examples are just implicit about it for the sake of convenience. They result in a simple meta-supplier that reuses the provided suppliers everywhere.

The meta-supplier is a bit trickier to implement because its method takes a list of Jet member addresses instead of a simple count, and the return value is a function from address to ProcessorSupplier. In our case we’ll treat the address as just an opaque ID and we’ll build a map from address to a properly configured ProcessorSupplier. Then we can simply return map::get as our function.

class GenerateNumbersPMetaSupplier implements ProcessorMetaSupplier {

    private final int upperBound;

    private transient int totalParallelism;
    private transient int localParallelism;

    GenerateNumbersPMetaSupplier(int upperBound) {
        this.upperBound = upperBound;
    }

    @Override
    public void init(@Nonnull Context context) {
        totalParallelism = context.totalParallelism();
        localParallelism = context.localParallelism();
    }

    @Override @Nonnull
    public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses) {
        Map<Address, ProcessorSupplier> map = new HashMap<>();
        for (int i = 0; i < addresses.size(); i++) {
            // We'll calculate the global index of each processor in the cluster:
            int globalIndexBase = localParallelism * i;
            // Capture the value of the transient field for the lambdas below:
            int divisor = totalParallelism;
            // processorCount will be equal to localParallelism:
            ProcessorSupplier supplier = processorCount ->
                    range(globalIndexBase, globalIndexBase + processorCount)
                            .mapToObj(globalIndex ->
                                    new GenerateNumbersP(upperBound, divisor, globalIndex)
                            ).collect(toList());
            map.put(addresses.get(i), supplier);
        }
        return map::get;
    }
}

We change our DAG-building code to use the meta-supplier:

DAG dag = new DAG();
Vertex generateNumbers = dag.newVertex("generate-numbers",
        new GenerateNumbersPMetaSupplier(upperBound));
Vertex logInput = dag.newVertex("log-input",
        DiagnosticProcessors.writeLoggerP(i -> "Received number: " + i));
dag.edge(Edge.between(generateNumbers, logInput));

After re-running with two Jet members, we should once again see each number generated just once.

8.7.3. Sinks

Like a source, a sink is just another kind of processor. It accepts items from the inbox and pushes them into some system external to the Jet job (Hazelcast IMap, files, databases, distributed queues, etc.). A simple way to implement it is to extend AbstractProcessor and override tryProcess, which deals with items one at a time. However, sink processors must often explicitly deal with batching. In this case directly implementing Processor is better because its process() method gets the entire Inbox which can be drained to a buffer and flushed out.

8.7.4. Example: File Writer

In this example we’ll implement a vertex that writes the received items to files. To avoid contention and conflicts, each processor must write to its own file. Since we’ll be using a BufferedWriter which takes care of the buffering/batching concern, we can use the simpler approach of extending AbstractProcessor:

class WriteFileP extends AbstractProcessor implements Closeable {

    private final String path;

    private transient BufferedWriter writer;

    WriteFileP(String path) {
        this.path = path;
    }

    @Override
    public boolean isCooperative() {
        return false;
    }

    @Override
    protected void init(@Nonnull Context context) throws Exception {
        Path path = Paths.get(this.path, context.jetInstance().getName()
                + '-' + context.globalProcessorIndex());
        writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8);
    }

    @Override
    protected boolean tryProcess(int ordinal, Object item) throws Exception {
        writer.append(item.toString());
        writer.newLine();
        return true;
    }

    @Override
    public void close() throws IOException {
        if (writer != null) {
            writer.close();
        }
    }
}

Some comments:

  • The constructor declares the processor non-cooperative because it will perform blocking IO operations.

  • init() method finds a unique filename for each processor by relying on the information reachable from the Context object.

  • Note the careful implementation of close(): it first checks if writer is null, which can happen if newBufferedWriter() fails in init(). This would make init() fail as well, which would make the whole job fail and then our ProcessorSupplier would call close() to clean up.

Cleaning up on completion/failure is actually the only concern that we need ProcessorSupplier for: the other typical concern, specializing processors to achieve data partitioning, was achieved directly from the processor’s code. This is the supplier’s code:

class WriteFilePSupplier implements ProcessorSupplier {

    private final String path;

    private transient List<WriteFileP> processors;

    WriteFilePSupplier(String path) {
        this.path = path;
    }

    @Override
    public void init(@Nonnull Context context) {
        File homeDir = new File(path);
        boolean success = homeDir.isDirectory() || homeDir.mkdirs();
        if (!success) {
            throw new JetException("Failed to create " + homeDir);
        }
    }

    @Override @Nonnull
    public List<WriteFileP> get(int count) {
        processors = Stream.generate(() -> new WriteFileP(path))
                           .limit(count)
                           .collect(Collectors.toList());
        return processors;
    }

    @Override
    public void close(Throwable error) {
        try {
            for (WriteFileP p : processors) {
                p.close();
            }
        } catch (IOException e) {
            throw new JetException(e);
        }
    }
}

8.8. Best Practices

8.8.1. Inspecting Processor Input and Output

The structure of the DAG model is a very poor match for Java’s type system, which results in the lack of compile-time type safety between connected vertices. Developing a type-correct DAG therefore usually requires some trial and error. To facilitate this process, but also to allow many more kinds of diagnostics and debugging, Jet’s library offers ways to capture the input/output of a vertex and inspect it.

Peeking with Processor Wrappers

The first approach is to decorate a vertex declaration with a layer that will log all the data traffic going through it. This support is present in the DiagnosticProcessors factory class, which contains the following methods:

  • peekInput(): logs items received at any edge ordinal.

  • peekOutput(): logs items emitted to any ordinal. An item emitted to several ordinals is logged just once.

These methods take two optional parameters:

  • toStringF returns the string representation of an item. The default is to use Object.toString().

  • shouldLogF is a filtering function so you can focus your log output only on some specific items. The default is to log all items.

Example Usage

Suppose we have declared the second-stage vertex in a two-stage aggregation setup:

DAG dag = new DAG();
    Vertex combine = dag.newVertex("combine",
            combineByKeyP(counting(), Util::entry)
    );

We’d like to see what exactly we’re getting from the first stage, so we’ll wrap the processor supplier with peekInput():

Vertex combine = dag.newVertex("combine",
        peekInputP(combineByKeyP(counting(), Util::entry))
);

Keep in mind that logging happens on the machine running hosting the processor, so this technique is primarily targeted to Jet jobs the developer runs locally in his development environment.

Attaching a Sink Vertex

Since most vertices are implemented to emit the same data stream to all attached edges, it is usually possible to attach a diagnostic sink to any vertex. For example, Jet’s standard writeFileP() sink vertex can be very useful here.

Example Usage

In the example from the Word Count tutorial we can add the following declarations:

Vertex diagnose = dag
        .newVertex("diagnose", writeFileP(
                "tokenize-output", Object::toString, UTF_8, false))
        .localParallelism(1);
dag.edge(from(tokenize, 1).to(diagnose));

This will create the directory tokenize-output which will contain one file per processor instance running on the machine. When running in a cluster, you can inspect on each member the input seen on that member. By specifying the allToOne() routing policy you can also have the output of all the processors on all the members saved on a single member (although the choice of exactly which member will be arbitrary).

8.8.2. How to Unit-Test a Processor

We provide some utility classes to simplify writing unit tests for your custom processors. You can find them in the com.hazelcast.jet.core.test package. Using these utility classes you can unit test your processor by passing it some input items and asserting the expected output.

Start by calling TestSupport.verifyProcessor() by passing it a processor supplier or a processor instance.

The test process does the following:

  • initialize the processor by calling Processor.init()

  • do a snapshot+restore (optional, see below)

  • call Processor.process(0, inbox). The inbox always contains one item from the input parameter

  • every time the inbox gets empty, do a snapshot+restore

  • call Processor.complete() until it returns true (optional)

  • do a final snapshot+restore after complete() is done

The optional snapshot+restore test procedure:

  • call saveToSnapshot()

  • create a new processor instance and use it instead of the existing one

  • restore the snapshot using restoreFromSnapshot()

  • call finishSnapshotRestore()

The test optionally asserts that the processor made progress on each call to any processing method. To be judged as having made progress, the callback method must do at least one of these:

  • take something from the inbox

  • put something to the outbox

  • return true (applies only to boolean-returning methods)

Cooperative Processors

The test will provide a 1-capacity outbox to cooperative processors. The outbox will already be full on every other call to process(). This tests the edge case: process() may be called even when the outbox is full, giving the processor a chance to process the inbox without emitting anything.

The test will also assert that the processor doesn’t spend more time in any callback than the limit specified in cooperativeTimeout(long).

Cases Not Covered

This class does not cover these cases:

  • testing of processors which distinguish input or output edges by ordinal

  • checking that the state of a stateful processor is empty at the end (you can do that yourself afterwards with the last instance returned from your supplier)

  • it never calls Processor.tryProcess()

Example Usage

This will test one of the jet-provided processors:

TestSupport.verifyProcessor(mapP((String s) -> s.toUpperCase()))
           .disableCompleteCall()       (1)
           .disableLogging()            (1)
           .disableProgressAssertion()  (1)
           .disableSnapshots()          (1)
           .cooperativeTimeout(2000)                         (2)
           .outputChecker(TestSupport.SAME_ITEMS_ANY_ORDER)  (3)
           .input(asList("foo", "bar"))                      (4)
           .expectOutput(asList("FOO", "BAR"));
1 Enabled by default
2 The default is 1000ms
3 The default is Objects::equal
4 The default is emptyList()
Other Utility Classes

com.hazelcast.jet.test contains these classes that you can use as implementations of Jet interfaces in tests:

  • TestInbox

  • TestOutbox

  • TestProcessorContext

  • TestProcessorSupplierContext

  • TestProcessorMetaSupplierContext

The class JetAssert contains a few of the assertX() methods normally found in JUnit’s Assert class. We had to reimplement them to avoid a dependency on JUnit from our production code.

8.9. JMX and Management Center Integration

8.9.1. Configuration

The metrics collection is enabled by default. You can configure it using the hazelcast-jet.xml file:

<metrics enabled="true" jmxEnabled="true">
    <!-- The number of seconds the metrics will be retained on
         the instance -->
    <retention-seconds>5</retention-seconds>

    <!-- The metrics collection interval in seconds -->
    <collection-interval-seconds>5</collection-interval-seconds>

    <!-- whether metrics should be collected for data structures.
         Metrics collection can have some overhead if there is a
         large number of data structures -->
    <metrics-for-data-structures>false</metrics-for-data-structures>
</metrics>

or using JetConfig object:

JetConfig jetConfig = new JetConfig();
// use set-methods on this object
MetricsConfig metricsConfig = jetConfig.getMetricsConfig();

8.9.2. Exposed JMX MBeans

Jet exposes metrics using the JVM’s standard JMX interface. You can use tools such as Java Mission Control or JConsole to display them. All Jet-related beans are stored under com.hazelcast.jet/Metrics/<instanceName>/ node.

List of the Beans and their Attributes
Property Description

cooperativeWorker-<N>

* iterationCount: The total number of iterations the driver of tasklets in cooperative thread N makes. It should increase by at least 250 iterations/s. Lower value means some of the cooperative processors blocks for too long.

* taskletCount:The number of assigned tasklets to cooperative thread N

Job-related metrics, nested under
job=<jobId>
   /exec=<executionId>
   /vertex=<vertexName>

All job-related metrics are nested under these nodes. If the vertex is a source or a sink, additionally source=true or sink=true is inserted to the tree.

/ordinal=<N>

This MBean groups metrics for input or output ordinal N.

Values in this section are 0 for non-distributed edges, they only account for data actually transmitted over the network between members. This numbers include watermarks, snapshot barriers etc.

* distributedBytesIn: Total number of bytes received from remote members

* distributedBytesOut: Total number of bytes sent to remote members

* distributedItemsIn: Total number of items received from remote members

* distributedItemsOut: Total number of items sent to remote members

/proc=N

This MBean groups metrics for processor instance N. The N is global processor index. Processor is the parallel worker doing the work of the vertex.

* lastReceivedWm: The value of watermark received last

* queuesCapacity: The total capacity of input queues
* queuesSize: The total number of items waiting in input queues

All input queues for all edges to the processor are added in the above two metrics. If size is close to capacity, backpressure is applied and this processor is a bottleneck.

If input edges are of different priority, edges with higher priority are reflected first. After these are completed, the next priority edges are reflected etc.

/proc=N
   /ordinal=<M>

This MBean groups metrics pertaining to processor instance N and input or output edge M. M can be a number, or it can be snapshot for output items written to state snapshot.

* emittedCount: The number of emitted items. This number includes watermarks, snapshot barriers etc. Unlike distributedItemsOut, it includes items emitted items to local processors.

* receivedCount: The number of received items. This number does not include watermarks, snapshot barriers etc. It’s the number of items the Processor.process method will receive.

* receivedBatches: The number of received batches. Processor.process receives a batch of items at a time, this is the number of such batches. By dividing receivedCount by receivedBatches, you get the average batch size. It will be 1 under low load.

8.9.3. Jet Management Center

The above metrics are also exposed in the Jet Management Center. Please refer to its reference manual for more information.

Appendix A: Phone Homes

Hazelcast uses phone home data to learn about the usage of Hazelcast Jet.

Hazelcast Jet instances call our phone home server initially when they are started and then every 24 hours. This applies to all the instances joined to the cluster.

A.1. What is sent in?

The following information is sent in a phone home:

  • Hazelcast Jet version

  • Local Hazelcast Jet member UUID

  • Download ID

  • A hash value of the cluster ID

  • Cluster size bands for 5, 10, 20, 40, 60, 100, 150, 300, 600 and > 600

  • Number of connected clients bands of 5, 10, 20, 40, 60, 100, 150, 300, 600 and > 600

  • Cluster uptime

  • Member uptime

  • Environment Information:

    • Name of operating system

    • Kernel architecture (32-bit or 64-bit)

    • Version of operating system

    • Version of installed Java

    • Name of Java Virtual Machine

  • Hazelcast IMDG Enterprise specific:

    • Number of clients by language (Java, C++, C#)

    • Flag for Hazelcast Enterprise

    • Hash value of license key

    • Native memory usage

A.2. Phone Home Code

The phone home code itself is open source. Please see here.

A.3. Disabling Phone Homes

Set the hazelcast.phone.home.enabled system property to false either in the config or on the Java command line.

Starting with Hazelcast Jet 0.5, you can also disable the phone home using the environment variable HZ_PHONE_HOME_ENABLED. Simply add the following line to your .bash_profile:

export HZ_PHONE_HOME_ENABLED=false

A.4. Phone Home URL

The URL used for phone home requests is

http://phonehome.hazelcast.com/ping

Appendix B: License Questions

Hazelcast Jet is distributed using the Apache License 2, therefore permissions are granted to use, reproduce and distribute it along with any kind of open source and closed source applications.

Depending on the used feature-set, Hazelcast Jet has certain runtime dependencies which might have different licenses. Following are dependencies and their respective licenses.

B.1. Embedded Dependencies

Embedded dependencies are merged (shaded) with the Hazelcast Jet codebase at compile-time. These dependencies become an integral part of the Hazelcast Jet distribution.

For license files of embedded dependencies, please see the license directory of the Hazelcast Jet distribution, available at our download page.

B.1.1. minimal-json

minimal-json is a JSON parsing and generation library which is a part of the Hazelcast Jet distribution. It is used for communication between the Hazelcast Jet cluster and the Management Center.

minimal-json is distributed under the MIT license and offers the same rights to add, use, modify, and distribute the source code as the Apache License 2.0 that Hazelcast uses. However, some other restrictions might apply.

B.1.2. Runtime Dependencies

Depending on the used features, additional dependencies might be added to the dependency set. Those runtime dependencies might have other licenses. See the following list of additional runtime dependencies.

B.1.3. Apache Hadoop

Hazelcast integrates with Apache Hadoop and can use it as a data sink or source. Jet has a dependency on the libraries required to read from and write to the Hadoop File System.

Apache Hadoop is distributed under the terms of the Apache License 2.

B.1.4. Apache Kafka

Hazelcast integrates with Apache Kafka and can make use of it as a data sink or source. Hazelcast has a dependency on Kafka client libraries.

Apache Kafka is distributed under the terms of the Apache License 2.

B.1.5. Spring

Hazelcast integrates with Spring and can be configured using Spring Context. Jet has a dependency on the libraries required to create a Spring context.

Spring is distributed under the terms of the Apache License 2.

Appendix C: FAQ

You can refer to the FAQ page to see the answers to frequently asked questions related to topics such as the relationship and differences between Hazelcast Jet and Hazelcast IMDG, Jet’s APIs and roadmap.

Appendix D: Common Exceptions

You may see the following exceptions thrown when working with Jet:

  • JetException: A general exception thrown if a job failure occurs. It has the original exception as its cause.

  • TopologyChangedException: Thrown when a member participating in a job leaves the cluster. If auto-restart is enabled, Jet will restart the job automatically, without throwing the exception to the user.

  • JobNotFoundException: Thrown when the coordinator node is not able to find the metadata for a given job.

There are also several Hazelcast exceptions that might be thrown when interacting with JetInstance. For a description of Hazelcast IMDG exceptions, please refer to the IMDG Reference manual.

Glossary

Accumulation

The act of building up an intermediate result inside a mutable object (called the accumulator) as a part of performing an aggregate operation. After all accumulation is done, a finishing function is applied to the object to produce the result of the operation.

Aggregate Operation

A set of functional primitives that instructs Jet how to calculate some aggregate function over one or more data sets. Used in the group-by, co-group and windowing transforms.

Aggregation

The act of applying an aggregate function to a stream of items. The result of the function can be simple, like a sum or average, or complex, like a collection of all aggregated items.

At-Least-Once Processing Guarantee

The system guarantees to process each item of the input stream(s), but doesn’t guarantee it will process it just once.

Batch Processing

The act of processing a finite dataset, such as one stored in Hazelcast IMDG or HDFS.

Client Server Topology

Hazelcast topology where members run outside the user application and are connected to clients using client libraries. The client library is installed in the user application.

Co-Grouping

An operation that is a mix of an SQL JOIN and GROUP BY with specific restrictions. Sometimes called a “stream join”. Each item of each stream that is being joined must be mapped to its grouping key. All items with the same grouping key (from all streams) are aggregated together into a single result. However, the result can be structured and preserve all input items separated by their stream of origin. In that form the operation effectively becomes a pure JOIN with no aggregation.

DAG

Directed Acyclic Graph which Hazelcast Jet uses to model the relationships between individual steps of the data processing.

Edge

A DAG element which holds the logic on how to route the data from one vertex’s processing units to the next one’s.

Embedded Topology

Hazelcast topology where the members are in-process with the user application and act as both client and server.

Event time

A data item in an infinite stream typically contains a timestamp data field. This is its event time. As the stream items go by, the event time passes as the items' timestamps increase. A typical distributed stream has a certain amount of event time disorder (items aren’t strictly ordered by their timestamp) so the “passage of event time” is a somewhat fuzzy concept. Jet uses the watermark to superimpose order over the disordered stream.

Exactly-Once Processing Guarantee

The system guarantees that it will process each item of the input stream(s) and will never process an item more than once.

Fault Tolerance

The property of a distributed computation system that gives it resilience to changes in the topology of the cluster running the computation. If a member leaves the cluster, the system adapts to the change and resumes the computation without loss.

Hash-Join

A special-purpose stream join optimized for the use case of data enrichment. Each item of the primary stream is joined with one item from each of the enriching streams. Items are matched by the join key. The name "hash-join" stems from the fact that the contents of the enriching streams are held in hashtables for fast lookup. Hashtables are replicated on each cluster member, which is why this operation is also known as a “replicated join”.

Hazelcast IMDG

An In-Memory Data grid (IMDG) is a data structure that resides entirely in memory, and is distributed among many machines in a single location (and possibly replicated across different locations). IMDGs can support millions of in-memory data updates per second, and they can be clustered and scaled in ways that support large quantities of data. Hazelcast IMDG is the in-memory data grid offered by Hazelcast.

HDFS

Hadoop Distributed File System. Hazelcast Jet can use it both as a data source and a sink.

Jet Job

A unit of distributed computation that Jet executes. One job has one DAG specifying what to do. A distributed array of Jet processors performs the computation.

Kafka

Apache Kafka is a product that offers a distributed publish-subscribe message queue with guarantees of delivery and message persistence. The most commonly used component over which heterogenous distributed systems exchange data.

Latency

The time that passes from the occurrence of an event that triggers some response to the occurrence of the response. In the case of Hazelcast Jet’s stream processing, latency refers to the time that passes from the point in time the last item that belongs to a window enters the system to the point where the result for that window appears in the output.

Member

A Hazelcast Jet instance (node) that is a member of a cluster. A single JVM can host one or more Jet members, but in production there should be one member per physical machine.

Partition (Data)

To guarantee that all items with the same grouping key are processed by the same processor, Hazelcast Jet uses a total surjective function to map each data item to the ID of its partition and assigns to each processor its unique subset of all partition IDs. A partitioned edge then routes all items with the same partition ID to the same processor.

Partition (Network)

A malfunction in network connectivity that splits the cluster into two or more parts that are mutually unreachable, but the connections among nodes within each part remain intact. May cause each of the parts to behave as if it was “the” cluster that lost the other members. Also known as “split brain”.

Pipeline

Hazelcast Jet’s name for the high-level description of a computation job constructed using the Pipeline API. Topologically it is a DAG, but the vertices have different semantics than the Core API vertices and are called pipeline stages. Edges are implicit and not expressed in the API. Each stage (except for source/sink stages) has an associated transform that it performs on its input data.

Processor

The unit which contains the code of the computation to be performed by a vertex. Each vertex’s computation is implemented by a processor. On each Jet cluster member there are one or more instances of the processor running in parallel for a single vertex.

Session Window

A window that groups an infinite stream’s items by their timestamp. It groups together bursts of events closely spaced in time (by less than the configured session timeout).

Sliding Window

A window that groups an infinite stream’s items by their timestamp. It groups together events that belong to a segment of fixed size on the timeline. As the time passes, the segment slides along, always extending from the present into the recent past. In Jet, the window doesn’t literally slide, but hops in steps of user-defined size. (“Time” here refers to the stream’s own notion of time, i.e., event time.)

Source

A resource present in a Jet job’s environment that delivers a data stream to it. Hazelcast Jet uses a source connector to access the resource. Alternatively, source may refer to the DAG vertex that hosts the connector.

Sink

A resource present in a Jet job’s environment that accepts its output data. Hazelcast Jet uses a sink connector to access the resource. Alternatively, sink may refer to the vertex that hosts the connector.

Skew

A generalization of the term “clock skew” applied to distributed stream processing. In this context it refers to the deviation in event time as opposed to wall-clock time in the classical usage. Several substreams of a distributed stream may at the same time emit events with timestamps differing by some delta, due to various lags that accumulate in the delivery pipeline for each substream. This is called stream skew. Event skew refers to the disorder within a substream, where data items appear out of order with respect to their timestamps.

Split Brain

A popular name for a network partition, which see above.

Stream Processing

The act of processing an infinite stream of data, typically implying that the data is processed as soon as it appears. Such a processing job must explicitly deal with the notion of time in order to make sense of the data. It achieves this with the concept of windowing.

Throughput

A measure for the volume of data a system is capable of processing per unit of time. Typical ways to express it for Hazelcast Jet are in terms of events per second and megabytes per second.

Tumbling Window

A window that groups an infinite stream’s items by their timestamp. It groups together events that belong to a segment of fixed size on the timeline. As the time passes, the segment “tumbles” along, never covering the same point in time twice. This means that each event belongs to just one tumbling window position. (“Time” here refers to the stream’s own notion of time, i.e., event time.)

Vertex

The DAG element that performs a step in the overall computation. It receives data from its inbound edges and sends the results of its computation to its outbound edges. There are three kinds of vertices: source (has only outbound edges), sink (has only inbound edges) and computational (has both kinds of edges).

Watermark

A concept that superimposes order over a disordered underlying data stream. An infinite data stream’s items represent timestamped events, but they don’t occur in the stream ordered by the timestamp. The value of the watermark at a certain location in the processing pipeline denotes the lowest value of the timestamp that is expected to occur in the upcoming items. Items that don’t meet this criterion are discarded because they arrived too late to be processed.

Windowing

The act of splitting an infinite stream’s data into windows according to some rule, most typically one that involves the item’s timestamps. Each window becomes the target of an aggregate function, which outputs one data item per window (and per grouping key).