This document is for an old version of the former Hazelcast IMDG product.

We've combined the in-memory storage of IMDG with the stream processing power of Jet to bring you an all new platform: Hazelcast 5.0

Use the following links to try it:

logo dark

Hazelcast Jet Reference Manual

Version 3.2

Welcome to the Hazelcast Jet Reference Manual. This manual includes concepts, instructions, and samples to guide you on how to use Hazelcast Jet to build and execute computation jobs.

As the reader of this manual you should be familiar with the Java programming language.

Summary of Contents

  1. Overview presents the basic concepts of Hazelcast Jet and provides high-level orientation.

  2. Get Started leads you through the steps to get Hazelcast Jet up and running on your machine. You can copy-paste the provided code to execute your first Jet job.

  3. Work With Jet show you how to configure Jet, start a Jet cluster, manage jobs, etc.

  4. The Pipeline API introduces Jet’s Pipeline API. This is the API you use to build the computation you want Jet to execute.

  5. Source and Sink Connectors presents the source and sink connectors that Jet provides.

  6. Configuration explains how to configure Jet. You can tune its performance, level of safety in fault-tolerant jobs, configure the underlying IMDG cluster, etc.

  7. Jet Concepts presents some fundamental concepts that make Jet work. Understanding them will help you get the most of Hazelcast Jet.

  8. Expert Zone — The Core API presents the material you need to understand if you want to successfully use the Core API. You don’t need this knowledge to use Jet. It is relevant only if you plan to implement custom processors or build the DAG by directly using the Core API.

Preface

Naming

  • Hazelcast Jet or just Jet refers to the distributed data processing engine provided by Hazelcast, Inc.

  • Hazelcast IMDG or just Hazelcast refers to the Hazelcast in-memory data grid middleware. Hazelcast is also the name of the company (Hazelcast, Inc.) providing Hazelcast IMDG and Hazelcast Jet.

Licensing

Hazelcast Jet and Hazelcast Jet Reference Manual are free and provided under the Apache License, Version 2.0.

Trademarks

Hazelcast is a registered trademark of Hazelcast, Inc. All other trademarks in this manual are held by their respective owners.

Getting Help

The Jet team provides support to its community via these channels:

For information on the commercial support for Hazelcast Jet, please see this page on hazelcast.com.

1. Overview of Jet

Jet is a distributed data processing engine that treats the data as a stream. It can process both static datasets (i.e., batch jobs) and live event streams. It can compute aggregate functions over infinite data streams by using the concept of windowing: dividing the stream into a sequence of subsets (windows) and applying the aggregate function over each window.

With unbounded event streams there arises the issue of failures in the cluster. Jet can tolerate failures of individual members without loss, offering the exactly-once processing guarantee.

You deploy Jet to a cluster of machines and then submit your processing jobs to it. Jet will automatically make all the cluster members participate in processing your data. It can connect to a distributed data source such as Kafka, Hadoop Distributed File System, or a Hazelcast IMDG cluster. Each member of the Jet cluster will consume a slice of the distributed data, which makes it easy to scale up to any level of throughput.

Jet is built on top of the Hazelcast IMDG technology and thanks to this the Jet cluster can also play the role of the data source and sink. If you use Jet this way, you can achieve perfect data locality and top-of-the-class throughputs.

Architecture Overview

The main programming paradigm you’ll use with Jet is that of a processing pipeline, which is a network of interconnected stages. The stages form a directed acyclic graph (a DAG) and the connections between them indicate the path along which the data flows. Data processing happens inside each stage. You describe the pipeline using the Pipeline API.

Internally Jet converts the pipeline into its native representation, the Core API DAG, which describes the layout of the processing units and the rules for routing the data between them. It is possible to use the Core API to directly create the native representation, but it’s much more complex and error-prone.

To run the computation Jet uses a cooperative multithreading execution engine, which means it can run many Core API Processor s on the same JVM thread. A processor corresponds to a standalone single-threaded task that processes a stream of data. The processors are implemented in such a way that they don’t block the thread while waiting for more data, or while waiting for the receiving processor to accept it. By not depending on OS-level thread scheduling Jet can achieve greater throughput and better saturate the CPU cores. It can also drive many more processors than the OS can manage native threads.

2. Get Started

In this section we’ll get you started using Hazelcast Jet. We’ll show you how to set up a Java project with the proper dependencies and a quick Hello World example to verify your setup.

2.1. Requirements

In the good tradition of Hazelcast products, Jet is distributed as a JAR with no other dependencies. It requires JRE version 8 or higher to run.

2.2. Install Hazelcast Jet

The easiest way to start using Hazelcast Jet is to add it as a dependency to your project.

Hazelcast Jet is published on the Maven repositories. Add the following lines to your pom.xml:

<dependencies>
  <dependency>
    <groupId>com.hazelcast.jet</groupId>
    <artifactId>hazelcast-jet</artifactId>
    <version>3.2</version>
  </dependency>
</dependencies>

If you prefer to use Gradle, execute the following command:

compile 'com.hazelcast.jet:hazelcast-jet:3.2'

Alternatively you can download the latest distribution package of Hazelcast Jet and add the hazelcast-jet-3.2.jar file to your classpath.

2.3. Install Hazelcast Jet Enterprise (Optional)

Hazelcast Jet Enterprise is a commercial edition of Hazelcast Jet. It’s built on top of Hazelcast Jet open source and extends it with the following features:

  • Security Suite

  • Lossless Restart (in Jet 3.0)

  • Job Upgrades (in Jet 3.0)

  • Enterprise PaaS Deployment Environment (Pivotal Cloud Foundry, Openshift Container Platform (Jet 3.0))

Hazelcast Jet Enterprise is available on a Hazelcast Maven repository. Add the following lines to your pom.xml:

<repository>
   <id>Hazelcast Private Snapshot Repository</id>
   <url>https://repository.hazelcast.com/snapshot/</url>
</repository>
<repository>
   <id>Hazelcast Private Release Repository</id>
   <url>https://repository.hazelcast.com/release/</url>
</repository>
<dependency>
   <groupId>com.hazelcast.jet</groupId>
   <artifactId>hazelcast-jet-enterprise</artifactId>
   <version>3.2</version>
</dependency>

You can download the Hazelcast Jet Enterprise package from hazelcast.com.

2.3.1. Set the License Key

To use Hazelcast Jet Enterprise, you must set the license key using one of the configuration methods shown below. You can request a trial license key at hazelcast.com.

Hazelcast Jet Enterprise license keys are required only to run the Jet cluster. A Jet client can access the Enterprise features without the license key.

The license key can be configured using one of the following methods:

Hazelcast Configuration File

Replace the value for the <license-key> tag inside the hazelcast.xml file in the config folder:

<hazelcast ..>
    ...
    <license-key>ENTER LICENSE KEY HERE</license-key>
    ...
</hazelcast>
Programmatic Configuration

License key also can be set in the Jet config object as follows:

JetConfig config = new JetConfig();
config.getHazelcastConfig().setLicenseKey( "Your Enterprise License Key" );
System Property

Set the following system property:

-Dhazelcast.enterprise.license.key=Your Enterprise License Key

2.3.2. Hazelcast Jet Management Center

Hazelcast Jet Management Center is a management and monitoring suite providing a live overview of the Hazelcast Jet cluster. It’s a standalone tool with a web console.

Please see the Hazelcast Jet Management Center Reference Manual for the installation instructions.

2.4. Verify Your Setup With a Word Count Example

You can verify your setup by running this simple program. It processes the contents of a Hazelcast IList that contains lines of text, finds the number of occurrences of each word in it, and stores its results in a Hazelcast IMap. In a distributed computation job the input and output cannot be simple in-memory structures like a Java List; they must be accessible to any member of the computing cluster and must persist after a job ends. This is why we use Hazelcast structures.

/*
 * Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;

import java.util.List;
import java.util.Map;

import static com.hazelcast.jet.Traversers.traverseArray;
import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
import static com.hazelcast.jet.function.Functions.wholeItem;

public class HelloWorld {
    public static void main(String[] args) {
        // Create the specification of the computation pipeline. Note
        // it's a pure POJO: no instance of Jet needed to create it.
        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.<String>list("text"))
         .flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
         .filter(word -> !word.isEmpty())
         .groupingKey(wholeItem())
         .aggregate(counting())
         .drainTo(Sinks.map("counts"));

        // Start Jet, populate the input list
        JetInstance jet = Jet.newJetInstance();
        try {
            List<String> text = jet.getList("text");
            text.add("hello world hello hello world");
            text.add("world world hello world");

            // Perform the computation
            jet.newJob(p).join();

            // Check the results
            Map<String, Long> counts = jet.getMap("counts");
            System.out.println("Count of hello: "
                    + counts.get("hello"));
            System.out.println("Count of world: "
                    + counts.get("world"));
        } finally {
            Jet.shutdownAll();
        }
    }
}

You should expect to see a lot of logging output from Jet (sent to stderr) and two lines on stdout:

Count of hello: 4
Count of world: 5

2.5. Reuse Your java.util.stream Knowledge

If you’ve already used Java’s Stream API, you’ll find many similarities in Jet’s Pipeline API. They both construct a processing pipeline by adding processing steps (stages). Both are FP-oriented APIs with lambdas playing a key role. Simple transformations like map/filter even look exactly the same. The main concern is knowing where the similarities end. Here are some typical gotchas if the Stream API has set some expectations for you:

  • All lambdas in Jet get serialized so they can be sent to remote cluster members. If your lambda captures a variable from the surrounding scope, that variable’s contents must be serialized as well. If you refer to an instance variable, the entire instance holding it must be serialized. It’s quite easy to accidentally capture and serialize the entire this object and everything it refers to.

  • The pipeline you construct doesn’t execute itself, you must explicitly submit it to a Jet cluster.

  • Since you’re submitting the computation to an external system, you don’t get the result in the return value of a method call. The pipeline explicitly specifies where it will store the results (to a data sink).

  • Whereas in the Stream API the aggregation is the terminal step, the one that immediately makes your pipeline execute, in Jet it is just another transformation (an intermediate step).

Finally, you’ll notice that Jet’s Pipeline API is much more powerful than the Stream API. Here are a few highlights:

2.5.1. Example: List Transformation

Here’s a simple example of list transformation with the Stream API:

List<String> strings = Arrays.asList("a", "b");

List<String> uppercased = strings
        .stream()
        .map(String::toUpperCase)
        .collect(toList());

uppercased.forEach(System.out::println);

Here’s the equivalent in Jet. Note that we’re transforming Hazelcast IList s:

JetInstance jet = newJetInstance();
IList<String> strings = jet.getList("strings");
strings.addAll(Arrays.asList("a", "b"));
IList<String> uppercased = jet.getList("uppercased");

Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.list(strings))
        .map(String::toUpperCase)
        .drainTo(Sinks.list(uppercased));
jet.newJob(pipeline).join();

uppercased.forEach(System.out::println);

2.5.2. Example: Grouping and Aggregation

Here’s an example of grouping and aggregation with the Stream API. We compute a histogram of words by their length:

List<String> strings = Arrays.asList("a", "b", "aa", "bb");
Map<Integer, Long> histogram = strings
        .stream()
        .collect(groupingBy(String::length, Collectors.counting()));

histogram.forEach((length, count) -> System.out.format(
        "%d chars: %d occurrences%n", length, count));

And here’s how to aggregate in Jet:

JetInstance jet = newJetInstance();
IList<String> strings = jet.getList("strings");
strings.addAll(Arrays.asList("a", "b", "aa", "bb"));
IMap<Integer, Long> histogram = jet.getMap("histogram");

Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.list(strings))
        .groupingKey(String::length)
        .aggregate(AggregateOperations.counting())
        .drainTo(Sinks.map(histogram));
jet.newJob(pipeline).join();

histogram.forEach((length, count) -> System.out.format(
        "%d chars: %d occurrences%n", length, count));

Note that the result of aggregate is just another pipeline stage, you can apply more transforms to it before draining to the sink.

2.5.3. Example: Collector vs. AggregateOperation

If you have ever written your own Collector for the Stream API, you’ll find that Jet’s AggregateOperation is quite similar and you can transfer your skill to it.

Here’s a Stream API collector that computes the sum of input items:

Collector<Long, LongAccumulator, Long> summingLong = Collector.of(
        LongAccumulator::new,
        (LongAccumulator acc, Long t) -> acc.add(t),
        (acc0, acc1) -> acc0.add(acc1),
        LongAccumulator::get
);

And here’s Jet’s aggregate operation doing the same:

AggregateOperation1<Long, LongAccumulator, Long> summingLong = AggregateOperation
        .withCreate(LongAccumulator::new)
        .andAccumulate((LongAccumulator acc, Long t) -> acc.add(t))
        .andCombine((acc0, acc1) -> acc0.add(acc1))
        .andDeduct((acc0, acc1) -> acc0.subtract(acc1))
        .andExportFinish(LongAccumulator::get);

Compared to Collector, AggregateOperation defines two more primitives:

  • deduct reverses a previous combine. It’s an optional primitive and serves to optimize sliding window aggregation.

  • export is similar to finish, the difference being that export must preserve the accumulator’s state and finish doesn’t. Jet uses finish wherever applicable as it can be implemented more optimally. In this example we use the same lambda for both primitives.

2.6. Deploy Using Docker

You can deploy your Hazelcast Jet projects using the Docker containers. Hazelcast Jet has the following images on Docker:

  • Hazelcast Jet

  • Hazelcast Jet Enterprise

  • Hazelcast Jet Management Center

After you pull an image from the Docker registry, you can run your image to start the Jet Management Center or a Jet instance with its default configuration. All repositories provide the latest stable releases but you can pull a specific release, too. You can also specify environment variables when running the image.

If you want to start customized instances of Jet or Jet Management Center, you can extend the the above listed images by providing your own configuration file. This feature is provided as a Hazelcast Jet plugin. Please see their own GitHub repos at Hazelcast Jet Docker and Hazelcast Jet Management Center Docker for details on configurations and usages.

2.7. Deploy Using Kubernetes

Hazelcast Jet provides Kubernetes-ready Docker images which you can easily deploy into Kubernetes environments. These images use the Hazelcast Kubernetes plugin to discover other Hazelcast Jet members in the Kubernetes environment by interacting with the Kubernetes API.

Using Kubernetes API requires granting certain permissions. Therefore, you need to create Role Based Access Control definition, (rbac.yaml), with the following content. You can check out the Hazelcast Kubernetes plugin for more details on this manner.

Here is the content of rbac.yaml:

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: default-cluster
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: view
subjects:
- kind: ServiceAccount
  name: default
  namespace: default

You can grant the required permissions stated above using the following command:

kubectl apply -f rbac.yaml

Then, you need to configure Hazelcast Jet to use Kubernetes Discovery to discover other members, to do so create a file named hazelcast-jet-config.yaml with the contents as shown below.

Here is the content of hazelcast-jet-config.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: hazelcast-jet-configuration
data:
  hazelcast.yaml: |-
    hazelcast:
      network:
        join:
          multicast:
            enabled: false
          kubernetes:
            enabled: true
            namespace: default
            service-name: hazelcast-jet-service

The following command will create a ConfigMap object in the Kubernetes Cluster.

kubectl apply -f hazelcast-jet-config.yaml

This object holds our Kubernetes Discovery enabled Hazelcast Jet configuration. You will use this configuration when you create pods as explained below.

You need to create a StatefulSet, which manages pods that are based on an identical container spec, and a Service, which is an abstraction defining a logical set of pods and a policy using which to access them. These two should be defined in the (hazelcast-jet.yaml) file, as shown below.

Here is the content of hazelcast-jet.yaml:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: hazelcast-jet
  labels:
    app: hazelcast-jet
spec:
  replicas: 2
  serviceName: hazelcast-jet-service
  selector:
    matchLabels:
      app: hazelcast-jet
  template:
    metadata:
      labels:
        app: hazelcast-jet
    spec:
      containers:
      - name: hazelcast-jet
        image: hazelcast/hazelcast-jet:latest
        imagePullPolicy: IfNotPresent
        ports:
        - name: hazelcast-jet
          containerPort: 5701
        livenessProbe:
          httpGet:
            path: /hazelcast/health/node-state
            port: 5701
          initialDelaySeconds: 30
          periodSeconds: 10
          timeoutSeconds: 5
          successThreshold: 1
          failureThreshold: 3
        readinessProbe:
          httpGet:
            path: /hazelcast/health/node-state
            port: 5701
          initialDelaySeconds: 30
          periodSeconds: 10
          timeoutSeconds: 1
          successThreshold: 1
          failureThreshold: 1
        volumeMounts:
        - name: hazelcast-jet-storage
          mountPath: /data/hazelcast-jet
        env:
        - name: JAVA_OPTS
          value: "-Dhazelcast.rest.enabled=true -Dhazelcast.config=/data/hazelcast-jet/hazelcast.yaml"
      volumes:
      - name: hazelcast-jet-storage
        configMap:
          name: hazelcast-jet-configuration
---
apiVersion: v1
kind: Service
metadata:
  name: hazelcast-jet-service
spec:
  selector:
    app: hazelcast-jet
  ports:
  - protocol: TCP
    port: 5701

After creating the (hazelcast-jet.yaml) file with the content above, run the following command to create the Service and StatefulSet:

kubectl apply -f hazelcast-jet.yaml

After that, you can inspect the pod logs to see that they formed a cluster using the following command:

kubectl logs hazelcast-jet-0

It should output a log similar to the one below:

....
Nov 15, 2018 9:39:15 AM com.hazelcast.internal.cluster.ClusterService
INFO: [172.17.0.2]:5701 [dev] [0.7]

Members {size:2, ver:2} [
        Member [172.17.0.2]:5701 - d935437e-143e-4e3f-81d6-f3ece16eb23e this
        Member [172.17.0.6]:5701 - 88472d6c-1cae-4b0d-9681-f6da6199bc9c
]
....

For more details about deployment and information on how to submit the Hazelcast Jet jobs to your cluster, see the Kubernetes Integration code sample.

2.8. Deployment using Helm

You can deploy Hazelcast Jet using the Helm charts. These charts provide an easy way to package and deploy applications on Kubernetes.

You can install the chart using the following command:

helm install stable/hazelcast-jet

See Hazelcast Jet Helm Chart for more details on configuration and the Troubleshooting in Kubernetes Environments section if you encounter any issues.

Apart from the official Helm charts for Hazelcast Jet, we do also provide Helm charts for Hazelcast Jet with Hazelcast Jet Management Center and Hazelcast Jet Enterprise on the Hazelcast Enterprise Helm Chart Repository.

2.9. Scaling the Cluster in Kubernetes

Hazelcast Jet cluster is easily scalable within Kubernetes. You can use the standard kubectl scale command to change the cluster size. See Scaling StatefulSets for more information.

Note however that, by default, Hazelcast Jet is configured to TERMINATE on receiving the SIGTERM signal from Kubernetes, which means that a container stops quickly, but the cluster’s data safety relies on the backup stored by other Hazelcast Jet members. If you suddenly scale down by more than your backup-count property (1 by default), you may lose the cluster data.

The other option is to use the GRACEFUL shutdown, which triggers the partition migration before shutting down the Hazelcast Jet member. Note that it may take some time depending on your data size.

To use graceful shutdown approach, set the following properties: - terminationGracePeriodSeconds: in your StatefulSet (or Deployment) configuration; the value defines how much time Kubernetes waits before terminating the pod and it should be high enough to cover the data migration process. Default value is 30 seconds - -Dhazelcast.shutdownhook.policy=GRACEFUL: in the JVM parameters (JAVA_OPTS environment variable) - -Dhazelcast.graceful.shutdown.max.wait: in the JVM parameters (JAVA_OPTS environment variable) ; the value should be high enough to cover the data migration process. Default value is 600 seconds

The graceful shutdown configuration is already included in Hazelcast Jet Helm Chart.

With the Automatic Elasticity feature, your jobs will be automatically restarted from the latest known snapshot when you downscale your cluster. For more details about snapshotting and processing guarantees see Fault Tolerance section.

3. Work With Jet

3.1. Start Jet

To create a Jet cluster, we simply start some Jet instances. Normally these would be started on separate machines, but for simple practice we can use the same JVM for two instances. Even though they are in the same JVM, they’ll communicate over the network interface.

JetInstance jet = Jet.newJetInstance();
Jet.newJetInstance();

These two instances should automatically discover each other using IP multicast and form a cluster. You should see a log output similar to the following:

Members [2] {
  Member [10.0.1.3]:5701 - f1e30062-e87e-4e97-83bc-6b4756ef6ea3
  Member [10.0.1.3]:5702 - d7b66a8c-5bc1-4476-a528-795a8a2d9d97 this
}

This means the members successfully formed a cluster. Since the Jet instances start their own threads, it is important to explicitly shut them down at the end of your program; otherwise the Java process will remain alive after the main() method completes:

public static void main(String[] args) {
    try {
        JetInstance jet = Jet.newJetInstance();
        Jet.newJetInstance();

        // work with Jet

    } finally {
        Jet.shutdownAll();
    }
}

It is important to use at least 2 members when developing because otherwise Jet doesn’t serialize the stream items and serialization errors will not be discovered.

3.2. Build the Computation Pipeline

The general shape of any data processing pipeline is drawFromSource → transform → drainToSink and the natural way to build it is from source to sink. The Pipeline API follows this pattern. For example,

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("input"))
 .map(String::toUpperCase)
 .drainTo(Sinks.list("result"));

Refer to the chapter on the Pipeline API for full details.

3.3. Watch out for Capturing Lambdas

A typical Jet pipeline involves lambda expressions. Since the whole pipeline definition must be serialized to be sent to the cluster, the lambda expressions must be serializable as well. The Java standard provides an essential building block: if the static type of the lambda is a subtype of Serializable, you will automatically get a lambda instance that can serialize itself.

None of the functional interfaces in the JDK extend Serializable, so we had to mirror the entire java.util.function package in our own com.hazelcast.jet.function with all the interfaces subtyped and made Serializable. Each subtype has the name of the original with Ex appended. For example, a FunctionEx is just like Function, but implements Serializable. We use these types everywhere in the Pipeline API.

As always with this kind of magic, auto-serializability of lambdas has its flipside: it is easy to overlook what’s going on.

If the lambda references a variable in the outer scope, the variable is captured and must also be serializable. If it references an instance variable of the enclosing class, it implicitly captures this so the entire class will be serialized. For example, this will fail because JetJob1 doesn’t implement Serializable:

class JetJob1 {
    private String instanceVar;

    Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.list("input"))
         .filter(item -> item.equals(instanceVar)); (1)
        return p;
    }
}
1 Refers to instanceVar, capturing this, but JetJob1 is not Serializable so this call will fail.

Just adding implements Serializable to JetJob1 would be a viable workaround here. However, consider something just a bit different:

class JetJob2 implements Serializable {
    private String instanceVar;
    private OutputStream fileOut; (1)

    Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.list("input"))
         .filter(item -> item.equals(instanceVar)); (2)
        return p;
    }
}
1 A non-serializable field.
2 Refers to instanceVar, capturing this. JetJob2 is declared as Serializable, but has a non-serializable field and this fails.

Even though we never refer to fileOut, we are still capturing the entire JetJob2 instance. We might mark fileOut as transient, but the sane approach is to avoid referring to instance variables of the surrounding class. We can simply achieve this by assigning to a local variable, then referring to that variable inside the lambda:

class JetJob3 {
    private String instanceVar;

    Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        String findMe = instanceVar; (1)
        p.drawFrom(Sources.list("input"))
         .filter(item -> item.equals(findMe)); (2)
        return p;
    }
}
1 Declare a local variable that loads the value of the instance field.
2 By referring to the local variable findMe we avoid capturing this and the job runs fine.

Another common pitfall is capturing an instance of DateTimeFormatter or a similar non-serializable class:

DateTimeFormatter formatter = DateTimeFormatter
        .ofPattern("HH:mm:ss.SSS")
        .withZone(ZoneId.systemDefault());
Pipeline p = Pipeline.create();
BatchStage<Long> src = p.drawFrom(Sources.list("input"));
src.map((Long tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp))); (1)
1 Captures the non-serializable formatter, so this fails.

Sometimes we can get away by using one of the preconfigured formatters available in the JDK:

src.map((Long tstamp) -> DateTimeFormatter.ISO_LOCAL_TIME (1)
        .format(Instant.ofEpochMilli(tstamp).atZone(ZoneId.systemDefault())));
1 Accesses the static final field ISO_LOCAL_TIME. Static fields are not subject to lambda capture, they are dereferenced when the code runs on the target machine.

This refers to a static final field in the JDK, so the instance is available on any JVM. If this is not available, you may create a static final field in your own class, but you can also use mapUsingContext(). In this case you provide a serializable factory that Jet will ask to create an object on the target member. The object it returns doesn’t have to be serializable. Here’s an example of that:

Pipeline p = Pipeline.create();
BatchStage<Long> src = p.drawFrom(Sources.list("input"));
ContextFactory<DateTimeFormatter> contextFactory = ContextFactory.withCreateFn( (1)
        x -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
                              .withZone(ZoneId.systemDefault()));
src.mapUsingContext(contextFactory, (2)
        (formatter, tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp))); (3)
1 Create a ContextFactory.
2 Supply it to mapUsingContext().
3 Your mapping function now gets the object the factory created.

3.4. Submit a Jet Job and Manage its Lifecycle

This is how you submit a Jet pipeline for execution:

Pipeline pipeline = buildPipeline();
jet.newJob(pipeline).join();

If you want to submit a Core API DAG, the syntax is identical:

DAG dag = buildDag();
jet.newJob(dag).join();

Job submission is a fire-and-forget action: once a client submits it, the job has a life of its own independent of the submitter. It can disconnect and any other client or Jet member can obtain its own Job instance that controls the same job.

You can use the same API to submit a job from either a client machine or directly on an instance of a Jet cluster member. The same Pipeline or DAG instance can be submitted for execution many times.

3.4.1. JobConfig

To gain more control over how Jet will run your job, you can pass in a JobConfig instance. For example, you can give your job a human-readable name:

JobConfig cfg = new JobConfig();
cfg.setName("my-job");
jet.newJob(pipeline, cfg);

Please note that you can have a single active job, i.e., running / suspended / waiting to be run, in the cluster with the same name. You get JobAlreadyExistsException if you make another submission. You can reuse the job name after the active job is completed.

If you have a microservice deployment where each package contains a jet member and makes the same job submission, you can ensure that the job runs only once by using the newJobIfAbsent() API. If a named job is submitted with this API, Jet does not create a second job if there is already an active job with the same name. Instead, it returns you a reference for the active job, as shown in the following snippet.

Pipeline pipeline = buildPipeline();
JobConfig cfg = new JobConfig();
cfg.setName("my-job");
Job job1 = jet.newJobIfAbsent(pipeline, cfg);
Job job2 = jet.newJobIfAbsent(pipeline, cfg);

assert job1.getId() == job2.getId();

3.4.2. Remember that a Jet Job is Distributed

The API to submit a job to Jet is in a way deceptively simple: "just call a method". As long as you’re toying around with Jet instances started locally in a single JVM, everything will indeed work. However, as soon as you try to deploy to an actual cluster, you’ll face the consequences of the fact that your job definition must travel over the wire to reach remote members which don’t have your code on their classpath.

Your custom code must be packaged with the Jet job. For simple examples you can have everything in a single class and use code like this:

class JetExample {
    static Job createJob(JetInstance jet) {
        JobConfig jobConfig = new JobConfig();
        jobConfig.addClass(JetExample.class);
        return jet.newJob(buildPipeline(), jobConfig);
    }

    static Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        // ...
        return p;
    }
}

If you forget to do this, or don’t add all the classes involved, you may get a quite confusing exception:

java.lang.ClassCastException:
cannot assign instance of java.lang.invoke.SerializedLambda
to field com.hazelcast.jet.core.ProcessorMetaSupplier$1.val$addressToSupplier
of type com.hazelcast.jet.function.FunctionEx
in instance of com.hazelcast.jet.core.ProcessorMetaSupplier$1

SerializedLambda actually declares readResolve(), which would normally transform it into an instance of the correct functional interface type. If this method throws an exception, Java doesn’t report it but keeps the SerializedLambda instance and continues the deserialization. Later in the process it will try to assign it to a field whose type is the target type of the lambda (FunctionEx in the example above) and at that point it will fail with the ClassCastException. If you see this kind of error, double-check the list of classes you have added to the Jet job.

For more complex jobs it will become more practical to first package the job in a JAR and then use a command-line utility to submit it, as explained next.

3.4.3. Submit a Job from the Command Line

Jet comes with the jet.sh script, which allows you to submit a Jet job packaged in a JAR file. You can find it in the Jet distribution zip file, in the bin directory. On Windows use jet.bat. To use it, follow these steps:

  • Write your main() method and your Jet code the usual way, but call JetBootstrap.getInstance() instead of Jet.newJetClient() to acquire a Jet client instance.

  • Create a runnable JAR which declares its Main-Class in MANIFEST.MF.

  • Run your JAR, but instead of java -jar jetjob.jar use jet.sh submit jetjob.jar.

  • The script will create a Jet client and configure it from hazelcast-client.xml located in the config directory of Jet’s distribution. Adjust that file to suit your needs.

For example, write a class like this:

class CustomJetJob {
    public static void main(String[] args) {
        JetInstance jet = JetBootstrap.getInstance();
        jet.newJob(buildPipeline()).join();
    }

    static Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        // ...
        return p;
    }
}

After building the JAR, submit the job:

$ jet.sh submit jetjob.jar

3.4.4. Manage a Submitted Job

jet.newJob() and jet.getJob(jobId) return a Job object, which you can use to monitor the job and change its status:

  • Job.suspend: the job will stop running, but its metadata will be kept and it can be resumed later. Use this for example if you want to restart the members one by one and you don’t want the job to restart multiple times in the meantime

  • Job.resume: resumes a previously suspended job

  • Job.restart: stops and restarts the job to make use of new members added to the cluster (if automatic scaling is disabled)

  • Job.cancel: the job will stop running and will be marked as completed. It cannot be restarted later

You can also get the job’s name, configuration, and submission time via job.getName(), job.getConfig(), and job.getSubmissionTime(). job.getStatus() will give you the current status of the job (running, failed, completed etc.). You can also call job.getFuture() to block until the job completes and then get its final outcome (either success or failure).

Jet does not support canceling the job with future.cancel(), instead you must call job.cancel(). This is due to the mismatch in the semantics between future.cancel() on one side and job.cancel() plus job.getStatus() on the other: the future immediately transitions to “completed by cancellation”, but it will take some time until the actual job in the cluster changes to that state. Not to confuse the users with these differences we decided to make future.cancel() fail with an exception.

3.4.5. Job Upgrade

Note: Job Upgrade is only available in Jet Enterprise

Jet allows you to upgrade a job while preserving its state. This is useful for bug fixing or to improve the application.

The new pipeline has to be compatible with the old one, for more details about what you can change in the pipeline, see Update a DAG Without Losing the State.

Job Upgrades use the snapshots. The Job state is exported to the snapshot and the new job version starts from from the snapshot.

If you want to preserve the state of the cancelled job, use cancelAndExportSnapshot:

job.cancelAndExportSnapshot("foo-snapshot");

You can also export the state and keep the job running. Both old and new version can then run in parallel. This is useful e.g. for an A/B testing.

job.exportSnapshot("foo-snapshot");

Then use the exported snapshot to submit a new job. The Job will use the state in the snapshot as its initial state:

Pipeline updatedPipeline = Pipeline.create();
// create the pipeline...

JobConfig jobConfig = new JobConfig();
jobConfig.setInitialSnapshotName("foo-snapshot");
instance.newJob(updatedPipeline, jobConfig);

The exported snapshots used for Job Upgrades differ from the snapshots used for the Fault Tolerance. Exported snapshots aren’t deleted automatically by Jet. Therefore you can also use the exported snapshot as a point of recovery. When you no longer need the exported snapshot, delete it.

3.4.6. Get a List of All Submitted Jobs

Jet keeps an inventory of all the jobs submitted to it, including those that have already completed. Access the full list with jet.getJobs(). You can use any Job instance from that list to monitor and manage a job, whether it was you or some other client that submitted it.

This example tells you what Jet has been up to in the last five minutes:

int total = 0;
int completed = 0;
int failed = 0;
int inProgress = 0;
long fiveMinutesAgo = System.currentTimeMillis() - MINUTES.toMillis(5);
for (Job job : jet.getJobs()) {
    if (job.getSubmissionTime() < fiveMinutesAgo) {
        continue;
    }
    total++;
    switch (job.getStatus()) {
        case COMPLETED:
            completed++;
            break;
        case FAILED:
            failed++;
            break;
        default:
            inProgress++;
    }
}
System.out.format(
    "In the last five minutes %d jobs were submitted to Jet, of "
    + "which %d already completed, %d jobs failed, and %d jobs "
    + "are still running.%n",
    total, completed, failed, inProgress);

To only return all jobs submitted with a particular name, you can call jet.getJobs(name), or jet.getJob(name) to get just the latest one.

Here’s how you can check the statistics on a job named my-job:

List<Job> myJobs = jet.getJobs("my-job");
long failedCount = myJobs.stream().filter(j -> j.getStatus() == FAILED).count();
System.out.format("Jet ran 'my-job' %d times and it failed %d times.%n",
        myJobs.size(), failedCount);

Note: data about completed jobs are evicted after 7 days.

3.5. Configure Fault Tolerance

Jet has features to allow an unbounded stream processing job to proceed correctly in the face of Jet members failing and leaving the cluster.

Jet takes snapshots of the entire state of the computation at regular intervals. It coordinates the snapshot across the cluster and synchronizes it with a checkpoint on the data source. The source must ensure that, in the case of a restart, it will be able to replay all the data it emitted after the last checkpoint. Each of the other components in the job will restore its processing state to exactly what it was at the time of the last snapshot. If a cluster member goes away, Jet will restart the job on the remaining members, rewind the sources to the last checkpoint, restore the state of processing from the last snapshot, and then seamlessly continue from that point.

3.5.1. Exactly-Once

"Exactly-once processing" means the output is consistent with processing each stream item exactly once. This is the ultimate guarantee you can ask for.

As of version 0.6, Hazelcast Jet supports exactly-once processing with the source being either a Hazelcast IMap journal or a Kafka topic, and the sink being a Hazelcast IMap.

If you configure Jet for exactly-once but use Kafka as the sink, after a job restart you may get duplicates in the output. As opposed to doubly processing an input item, this is more benign because it just means getting the exact same result twice.

3.5.2. At-Least-Once

"At-least-once processing" means the output is consistent with processing each stream item at least once. Some items may get processed again after a restart, as if they represented new events. This is a lesser guarantee that Jet can deliver at a higher throughput and lower latency. In some cases it is good enough.

In some other cases, however, duplicate processing of data items can have quite surprising consequences. There is more information about this in our Jet Concepts chapter.

3.5.3. Enable Fault Tolerance

Fault tolerance is off by default. To activate it for a job, create a JobConfig object and set the processing guarantee. You can also configure snapshot interval.

JobConfig jobConfig = new JobConfig();
jobConfig.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
jobConfig.setSnapshotIntervalMillis(SECONDS.toMillis(10));

Using less frequent snapshots, more data will have to be replayed and the temporary spike in the latency of the output will be greater. More frequent snapshots will reduce the throughput and introduce more latency variation during regular processing.

3.5.4. Automatic Elasticity

You can configure the behavior of what will happen when members are added or removed from the cluster.

  • If auto-scaling is enabled and a member is added or removed, the job will automatically restart. In case of member addition, it will restart after a delay.

  • If auto-scaling is disabled and a member is added, Jet takes no action and the job will not use the added member; you have to manually restart it. If a member is removed (after a shutdown or a failure), Jet suspends the job. You have to manually resume it.

By default, auto-scaling is enabled.

3.5.5. Level of Safety

Jet doesn’t delegate its fault tolerance to an outside system, it backs up the state to its own IMap objects. IMap is a replicated in-memory data structure, storing each key-value pair on a configurable number of cluster members. By default it will make a single backup copy, resulting in a system that tolerates the failure of a single member at a time. You can tweak this setting when starting Jet, for example increase the backup count to two:

JetConfig config = new JetConfig();
config.getInstanceConfig().setBackupCount(2);
JetInstance instance = Jet.newJetInstance(config);

Note: if multiple members are lost simultaneously, some data from the backing IMaps can be lost. This is not currently checked and the job will restart with some state data from the snapshot missing, or it might fail if classpath resources were added and are missing. We plan to address this in future releases.

3.5.6. Split-Brain Protection

A specific kind of failure is a so-called "split brain". It happens on network failure when a member or members think the other members left the cluster, but in fact they still run, but don’t see each other over the network. Now we have two or more fully-functioning Jet clusters where there was supposed to be one. Each one will recover and restart the same Jet job, making it to run multiple times.

Hazelcast Jet offers a mechanism to reduce this hazard: split-brain protection. It works by ensuring that a job can be restarted only in a cluster whose size is more than half of what it was before the job was suspended. Enable split-brain protection like this:

jobConfig.setSplitBrainProtection(true);

If there’s an even number of members in your cluster, this may mean the job will not be able to restart at all if the cluster splits into two equally-sized parts. We recommend having an odd number of members.

Note also that you should ensure there is no split-brain condition at the moment you are introducing new members to the cluster. If that happens, both sub-clusters may grow to more than half of the previous size. This will defuse the split-brain protection mechanism.

3.6. Configure Lossless Cluster Restart (Enterprise Only)

The fault tolerance feature we described in the previous section is entirely RAM-based, the snapshotted state isn’t persisted. Given the redundancy present in the cluster, this is sufficient to maintain a running cluster across single-node failures (or multiple-node, depending on the backup count), but it doesn’t cover the case when the entire cluster must shut down.

Hazelcast IMDG Enterprise HD has the Hot Restart Persistence feature which Jet uses to allow you to gracefully shut down the cluster at any time and have the snapshot data of all the jobs preserved. After you restart the cluster, Jet automatically restores the data and resumes the jobs.

Hot Restart Persistence works by persisting the RAM-based snapshot data. Even with the fastest solid-state storage, this dramatically reduces the maximum snapshotting throughput available to Jet: while DRAM can take up to 50 GB/s, a high-end storage device caps at 2 GB/s. Keep in mind that this is throughput per member, so even on a minimal cluster of 3 machines, this is actually 6 GB/s available to Jet. Also, this throughput limit does not apply to the data going through the Jet pipeline, but only to the periodic saving of the state present in the pipeline. Typically the state scales with the number of distinct keys seen within the time window.

Tu use Lossless Cluster Restart, enable it in hazelcast-jet.xml:

<hazelcast-jet xmlns="http://www.hazelcast.com/schema/jet-config">
    <instance>
        <lossless-restart-enabled>false</lossless-restart-enabled>
    </instance>
</hazelcast-jet>

To set the base directory where the data will be stored, configure Hot Restart in the IMDG configuration:

<hazelcast xmlns="http://www.hazelcast.com/schema/config">
    <hot-restart-persistence enabled="true">
        <base-dir>/mnt/hot-restart</base-dir>
        <parallelism>2</parallelism>
    </hot-restart-persistence>
</hazelcast>

Quick programmatic example:

JetConfig cfg = new JetConfig();
cfg.getInstanceConfig().setLosslessRestartEnabled(true);
cfg.getHazelcastConfig().getHotRestartPersistenceConfig()
        .setEnabled(true)
        .setBaseDir(new File("/mnt/hot-restart"))
        .setParallelism(2);

To have the cluster shut down gracefully, as required for this feature to work, do not kill the Jet instances one by one. As you are killing them, the cluster starts working hard to rebalance the data on the remaining members. This usually leads to out-of-memory failures and the loss of all data.

The entire cluster can be shut down gracefully in several ways:

  • on the command line:

$ cluster.sh -o shutdown
  • using the Jet Management Center

  • programmatically:

jet.getCluster().shutdown();

The first two ways make use of Jet’s REST endpoint, which isn’t enabled by default. You can enable it by setting the property

hazelcast.rest.enabled=true

This can be either a system property (-Dhazelcast.rest.enabled=true) or a property in the Hazelcast Jet configuration:

<hazelcast-jet xmlns="http://www.hazelcast.com/schema/jet-config">
    <properties>
       <property name="hazelcast.rest.enabled">true</property>
    </properties>
</hazelcast-jet>

or

JetConfig config = new JetConfig();
config.getProperties().setProperty("hazelcast.rest.enabled", "true");

Since the Hot Restart data is saved locally on each member, all the members must be present after the restart for Jet to be able to reload the data. Beyond that, there’s no special action to take: as soon as the cluster re-forms, it will automatically reload the persisted snapshots and resume the jobs.

3.7. Performance Considerations

3.7.1. Standard Java Serialization is Slow

When it comes to serializing the description of a Jet job, performance is not critical. However, for the data passing through the pipeline, the cost of the serialize-deserialize cycle can easily dwarf the cost of actual data transfer, especially on high-end LANs typical for data centers. In this context the performance of Java serialization is so poor that it regularly becomes the bottleneck. This is due to its heavy usage of reflection, overheads in the serialized form, etc.

Since Hazelcast IMDG faced the same problem a long time ago, we have mature support for optimized custom serialization and in Jet you can use it for stream data. In essence, you must implement a StreamSerializer for the objects you emit from your processors and register it in Jet configuration:

SerializerConfig serializerConfig = new SerializerConfig()
        .setImplementation(new MyItemSerializer())
        .setTypeClass(MyItem.class);
JetConfig config = new JetConfig();
config.getHazelcastConfig().getSerializationConfig()
      .addSerializerConfig(serializerConfig);
JetInstance jet = Jet.newJetInstance(config);

Consult the chapter on custom serialization in Hazelcast IMDG’s reference manual for more details.

Note the limitation implied here: the serializers must be registered with Jet on startup because this is how it is supported in Hazelcast IMDG. There is a plan to improve this and allow serializers to be registered on individual Jet jobs.

3.7.2. Sharing Data Between Pipeline Stages

Jet only serializes an item when it actually sends it to another member. If the target processor is on the same member, it will receive the exact same instance. To catch (de)serialization issues early on, we recommend using a 2-member local Jet cluster for development and testing.

Sharing the same objects between stages is good for performance, but it places a higher degree of responsibility on the developer. You can follow these rules of thumb to avoid concurrency bugs:

  • don’t use an item after emitting it

  • don’t mutate an item you received

Working with immutable objects has its price: you incur allocation and GC pressure by creating the copies and you waste additional CPU time copying the data. This is why we also present fine-grained rules you must follow to stay safe while sharing the data between pipeline stages.

1. Never mutate an item you emitted.

It is never safe to mutate an item you emitted. The downstream stages will see your changes and lose the original data you emitted, at best. Since the changes are concurrent, they may also see completely broken data, if the object is not thread-safe.

One use case where it’s especially tempting to mutate the item after emitting is rolling aggregation. Consider this example where we use mapUsingContext to implement a custom rolling aggregation:

Pipeline p = Pipeline.create();
ContextFactory<List<String>> contextFactory =
        ContextFactory.withCreateFn(procCtx -> new ArrayList<>());
p.drawFrom(source)
 .mapUsingContext(contextFactory, (list, item) -> {
     // Don't do this!
     list.add(item);
     return list; (1)
 });
1 You keep emitting the same list, changing it in-place

The downstream stage receives the same object each time, but it’s supposed to see it in the state as it was sent. In reality, the aggregating stage keeps changing the object concurrently while the downstream stage tries to observe it. In the best case it will crash with a ConcurrentModificationException, but it may also observe null or partially initialized objects in the list and a number of other bugs.

Jet’s AggregateOperation has a primitive specifically devoted to avoiding this problem: the exportFn. It transforms the accumulator to a new object so the accumulator can be safely updated afterwards. By contrast, finishFn is allowed to return the accumulator itself and Jet calls it only when it knows the whole operation is done and the accumulator won’t be modified again.

There’s another case where a data race can sneak up on you: a many-to-many hash-join, where you enrich an item with several items matching the same join key. Refer to the hash-join section for more details and an example.

2. If you keep using the item you emitted (for reading), don’t mutate it downstream.

There’s little reason to keep reading an item after emitting it, but if you ever have that situation, make sure you don’t mutate it in any downstream stage.

3. If you create a fork in the pipeline (send the output of one stage to several others), no stage after the fork may mutate the data.

Here’s an example with a stream of Person items. We attach two mapping stages to it: one reads the name and the other modifies it. The resulting Jet pipeline will have a data race and behave erratically, sometimes seeing the original name and sometimes the modified one:

class Person {
    private String name;

    String name() {
        return name;
    }

    Person addToName(String suffix) {
        name += suffix;
        return this;
    }
}

Pipeline p = Pipeline.create();
BatchStage<Person> sourceStage = p.drawFrom(personSource);
BatchStage<String> names = sourceStage
        .map(person -> person.name()); (1)
// don't do this!
BatchStage<Person> juniors = sourceStage
        .map(person -> person.addToName(" Jr.")); (2)
1 Get the person’s name
2 Modify the person’s name

3.7.3. Capacity of the Concurrent Queues

By default, Jet runs each internal DAG vertex, roughly equivalent to each step of the computation (such as map or aggregate), at maximum parallelism (equal to the number of CPU cores). This means that even a single Jet job uses quite a lot of parallel tasks. Since Jet’s cooperative tasklets are very cheap to switch between, there’s almost no overhead from this. However, every pair of tasklets that communicate uses a dedicated 1-to-1 concurrent queue so the number of queues scales with the square of the number of CPU cores. The default queue capacity is 1024, which translates to 4-8 kilobytes RAM overhead per tasklet pair and potentially a lot of data items in flight before the queues fill up.

If you experience RAM shortage on the Jet cluster, consider lowering the queue size. This is how you set the default queue size for the whole Jet cluster:

JetConfig cfg = new JetConfig();
cfg.getDefaultEdgeConfig().setQueueSize(128);
JetInstance jet = Jet.newJetInstance(cfg);

You can also set queue sizes individually on each Core API DAG edge. You must first convert your pipeline to the Core DAG, apply the configuration, and then submit the DAG for execution:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("a")).setName("source")
 .map(String::toLowerCase)
 .drainTo(Sinks.list("b"));

DAG dag = p.toDag();
dag.getOutboundEdges("source").get(0)
   .setConfig(new EdgeConfig().setQueueSize(128));

jet.newJob(dag);

3.8. Monitor Execution and Diagnose Problems

3.8.1. Configure Logging

Jet, like Hazelcast IMDG, does not depend on a specific logging framework and has built-in adapters for a variety of logging frameworks. You can also write a new adapter to integrate with loggers Jet doesn’t natively support. To use one of the built-in adapters, set the hazelcast.logging.type property to one of the following:

  • jdk: java.util.logging (default)

  • log4j: Apache Log4j

  • log4j2: Apache Log4j 2

  • slf4j: SLF4J

  • none: Turn off logging

For example, to configure Jet to use Log4j, you can do one of the following:

System.setProperty("hazelcast.logging.type", "log4j");

or

JetConfig config = new JetConfig();
config.getHazelcastConfig()
      .setProperty("hazelcast.logging.type", "log4j");

For more detailed information about how to configure logging, please refer to the IMDG reference manual.

3.8.2. Inspect Output of Individual Stages

While debugging your pipeline you’ll want to see the output of an individual stage. You can achieve it by using the peek() stage. For example:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("inputList"))
 .flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
 .filter(word -> !word.isEmpty())
 .peek() (1)
 .groupingKey(wholeItem())
 .aggregate(counting())
 .drainTo(Sinks.map("counts"));
1 Logs all the word tokens emitted by the filtering stage

If you run it like this:

JetInstance jet = Jet.newJetInstance();
try {
    jet.getList("inputList")
       .addAll(asList("The quick brown fox", "jumped over the lazy dog"));
    jet.newJob(p).join();
} finally {
    Jet.shutdownAll();
}

this is how your output may look:

Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#1
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: quick
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#2
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: brown
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#0
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: the
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#4
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: dog
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#3
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: lazy
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#0
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: jumped
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#2
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: the
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#1
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: over
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#3
INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: fox

The logger name of com.hazelcast.jet.impl.processor.PeekWrappedP.filter#1 consists of the following parts:

  • com.hazelcast.jet.impl.processor.PeekWrappedP: the type of the processor writing the log message

  • filter: the name of the vertex the processor belongs to

  • #0: index of the processor within the vertex. The index is unique cluster-wide.

For more information about logging when using the Core API, see the Best Practices section.

3.8.3. Monitor Metrics

Jet exposes various metrics to facilitate monitoring of the cluster state and of running jobs.

Metrics have associated tags which describe which object the metric applies to. The tags for job metrics typically indicate the specific vertex and processor the metric belongs to.

Each metric instance provided belongs to a particular Jet cluster member, so different cluster members can have their own versions of the same metric with different values.

The metric collection runs in regular intervals on each member, but note that the metric collection on different cluster members happens at different moments in time. So if you try to correlate metrics from different members, they can be from different moments of time.

There are two broad categories of Jet metrics. For clarity we will group them based on significant tags which define their granularity.

Last but not least let’s not forget about the fact that each Jet member is also an IMDG member, so Jet also exposes all the metrics available in IMDG.

Let’s look at these 3 broad categories of metrics in detail.

IMDG Metrics

There is a wide range of metrics and statistics provided by IMDG:

  • statistics of distributed data structures (see IMDG Reference Manual)

  • executor statistics (see IMDG Reference Manual)

  • partition related statistics (state, migration, replication)

  • garbage collection statistics (see IMDG API Docs)

  • memory statistics for the JVM which current IMDG member belongs to (total physical/free OS memory, max/committed/used/free heap memory and max/committed/used/free native memory)

  • network traffic related statistics (traffic and queue sizes)

  • class loading related statistics

  • thread count information (current, peak and deamon thread counts)

Jet Cluster metrics
Names Main tags

blockingWorkerCount: The number of non-cooperative workers employed.

none
Each Jet cluster member will have one instance of this metric.

iterationCount: The total number of iterations the driver of tasklets in cooperative thread N made. It should increase by at least 250 iterations/s. Lower value means some of the cooperative processors blocks for too long. Somewhat lower value is normal if there are many tasklets assigned to the processor. Lower value affects the latency.

taskletCount: The number of assigned tasklets to cooperative thread N.

cooperativeWorker=<N>
N is the number of the cooperative thread.

Jet Job metrics

All job specific metrics have their job=<jobId>, exec=<executionId> and vertex=<vertexName> tags set. This means that all these metrics will have at least one instance for each vertex of each current job execution.

Additionally, if the vertex sourcing them is a data source or data sink, then the source or sink tags will also be set to true.

Names Main tags

distributedBytesIn: Total number of bytes received from remote members.
distributedBytesOut: Total number of bytes sent to remote members.
distributedItemsIn: Total number of items received from remote members.
distributedItemsOut: Total number of items sent to remote members.

These values are only present for distributed edges, they only account for data actually transmitted over the network between members. This numbers include watermarks, snapshot barriers etc.

ordinal=<N>
Each Jet member will have an instance of these metrics for each ordinal of each vertex of each job execution.

topObservedWm: This value is equal to the highest coalescedWm on any input edge of this processor.
coalescedWm: The highest watermark received from all inputs that was sent to the processor to handle.
lastForwardedWm: Last watermark emitted by the processor to output.
lastForwardedWmLatency: The difference between lastForwardedWn and the system time at the moment when metrics were collected.

queuesCapacity: The total capacity of input queues.
queuesSize: The total number of items waiting in input queues.

All input queues for all edges to the processor are summed in the above two metrics. If size is close to capacity, backpressure is applied and this processor is a bottleneck. Only input edges with equal priority are summed. If the processor has input edges with different priority, only edges with the highest priority will be reflected, after those are exhausted edges with the next lower priority will be reflected and so on.

proc=<N>, ordinal=<not specified>
Each Jet member will have one instances of these metrics for each processor instance N, the N denotes the global processor index. Processor is the parallel worker doing the work of the vertex.

topObservedWm: The highest received watermark from any input on edge N.
coalescedWm: The highest watermark received from all upstream processors on edge N.

emittedCount: The number of emitted items. This number includes watermarks, snapshot barriers etc. Unlike distributedItemsOut, it includes items emitted items to local processors.
receivedCount: The number of received items. This number does not include watermarks, snapshot barriers etc. It’s the number of items the Processor.process method will receive.
receivedBatches: The number of received batches. Processor.process receives a batch of items at a time, this is the number of such batches. By dividing receivedCount by receivedBatches, you get the average batch size. It will be 1 under low load.

proc=<N>, ordinal=<M>
Each Jet member will have one instance of these metrics for each edge M (input or output) of each processor N. N is the global processor index and M is either the ordinal of the edge or has the value snapshot for output items written to state snapshot.

numInFlightOps: The number of pending (in flight) operations when using asynchronous flat-mapping processors.
See Processors.flatMapUsingContextAsyncP.

totalKeys : The number of active keys being tracked by a session window processor.
totalWindows : The number of active windows being tracked by a session window processor.
See Processors.aggregateToSessionWindowP.

totalFrames : The number of active frames being tracked by a sliding window processor.
totalKeysInFrames : The number of grouping keys associated with the current active frames of a sliding window processor.
See Processors.aggregateToSlidingWindowP.

lateEventsDropped : The number of late events dropped by various processor, due to the watermark already having passed their windows.

proc=<N>, procType=<set>
Processor specific metrics, only certain types of processors have them. The procType tag can be used to indentify the exact type of processor sourcing them.
Like all processor metrics, each Jet member will have one instances of these metrics for each processor instance N, the N denotes the global processor index.

Exposing metrics

The main method Jet has for exposing the metrics to the outside world is the JVM’s standard JMX interface. Since Jet 3.2 there is also an alternative to JMX for monitoring metrics, via the Job API, albeit only the job-specific ones.

Over JMX

Jet exposes all of its metrics using the JVM’s standard JMX interface. You can use tools such as Java Mission Control or JConsole to display them. All Jet-related beans are stored under com.hazelcast.jet/Metrics/<instanceName>/ node and the various tags they have form further sub-nodes in the resulting tree structure.

IMDG metrics are stored under the com.hazelcast/Metrics/<instanceName>/ node.

Via Job API

The Job class has a getMetrics() method which returns a JobMetrics instance. It contains the latest known metric values for the job.

This functionality has been developed primarily for giving access to metrics of finished jobs, but can in fact be used for jobs in any state.

While the job is running, the metric values are updated periodically (according to a configured collection interval), assuming that both generic metrics functionality and job metrics are enabled. Otherwise empty metrics will be returned.

When a job is restarted (or resumed after being previously suspended), the metrics are reset; their values will reflect only updates from the latest execution of the job.

Once a job stops executing (successfully, after a failure, cancellation, or temporarily while suspended) the metrics will have values taken at the moment just before the job completed), assuming that metrics storing was enabled. Otherwise, empty metrics will be returned.

For details on how to use and filter the metric values consult the JobMetrics API docs. A simple example for computing the number of data items emitted by a certain vertex (let’s call it vertexA), excluding items emitted to the snapshot, would look like this:

Predicate<Measurement> vertexOfInterest =
        MeasurementPredicates.tagValueEquals(MetricTags.VERTEX, "vertexA");
Predicate<Measurement> notSnapshotEdge =
        MeasurementPredicates.tagValueEquals(MetricTags.ORDINAL, "snapshot").negate();

Collection<Measurement> measurements = jobMetrics
        .filter(vertexOfInterest.and(notSnapshotEdge))
        .get(MetricNames.EMITTED_COUNT);

long totalCount = measurements.stream().mapToLong(Measurement::getValue).sum();
Configuration

The metrics collection is enabled by default. You can configure it using the hazelcast-jet.xml file:

<metrics enabled="true" jmxEnabled="true">
    <!-- The number of seconds the metrics will be retained on
         the instance -->
    <retention-seconds>5</retention-seconds>

    <!-- The metrics collection interval in seconds -->
    <collection-interval-seconds>5</collection-interval-seconds>

    <!-- whether metrics should be collected for data structures.
         Metrics collection can have some overhead if there is a
         large number of data structures -->
    <metrics-for-data-structures>false</metrics-for-data-structures>
</metrics>

or using JetConfig object:

JetConfig jetConfig = new JetConfig();
jetConfig.getMetricsConfig()
         .setEnabled(true)
         .setJmxEnabled(true)
        .setRetentionSeconds(5)
        .setCollectionIntervalSeconds(5)
        .setMetricsForDataStructuresEnabled(false);

See MetricsConfig API docs for available methods.

3.9. Jet Command Line Tool

Jet comes with a command line tool, jet.sh which you can use to submit jobs and do basic management of their lifecycle. It supports these commands:

Command Description

cluster

Shows current cluster state and information about members

list-jobs

Lists running jobs on the cluster

submit

Submits a job to the cluster

cancel

Cancels a running job

suspend

Suspends a running job

resume

Resumes a suspended job

restart

Restarts a running job

list-snapshots

Lists saved snapshots on the cluster

save-snapshot

Saves a named snapshot from a job (Jet Enterprise only)

delete-snapshot

Deletes a named snapshot

The command line uses the Jet client under the hood and thus needs to be configured with connection information to the cluster. By default, the command line tools uses the hazelcast-client.xml found in the config path of the distribution. It’s also possible to specify connection parameters explicitly. For example, to list all the jobs running on the cluster you can use the following command:

bin/jet.sh -a 192.168.0.1:5701,192.168.0.1:5702 -g my-group list-jobs

This command would try to connect to either 192.168.0.1:5701 or 192.168.0.2:5702 using group name my-group and will list all the running jobs on the cluster. If you want to use the tool with a different client.xml you can do it as follows:

bin/jet.sh -f client.xml list-jobs

For more details on how to use the command line tool please refer to its own documentation which can be retrieved by the command jet.sh --help

3.10. Management Center

Hazelcast Jet Management Center is a management and monitoring suite providing a live overview of the Hazelcast Jet cluster. Management Center includes a tool for diagnosing data flow within the running Hazelcast Jet job. It provides a visualization of the computational stages and allows you to peek into the stats across the data flow graph enabling you to diagnose bottlenecks.

Please refer to the Hazelcast Jet Management Center Reference Manual for installation and usage instructions.

Job Detail

4. The Pipeline API

4.1. The Shape of a Pipeline

The general shape of any data processing pipeline is drawFromSource → transform → drainToSink and the natural way to build it is from source to sink. The Pipeline API follows this pattern. For example,

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("input"))
 .map(String::toUpperCase)
 .drainTo(Sinks.list("result"));

In each step, such as drawFrom or drainTo, you create a pipeline stage. The stage resulting from a drainTo operation is called a sink stage and you can’t attach more stages to it. All others are called compute stages and expect you to attach stages to them.

The API differentiates between batch (bounded) and stream (unbounded) sources and this is reflected in the naming: there is a BatchStage and a StreamStage, each offering the operations appropriate to its kind. In this section we’ll mostly use batch stages, for simplicity, but the API of operations common to both kinds is identical. We’ll explain later on how to apply windowing, which is necessary to aggregate over unbounded streams.

Your pipeline can consist of multiple sources, each starting its own pipeline branch, and you are allowed to mix both kinds of stages in the same pipeline. You can merge the branches with joining transforms. For example, the hash-join transform can join a stream stage with batch stages:

Pipeline p = Pipeline.create();
StreamStage<Trade> trades = p.drawFrom(tradeStream())
                             .withoutTimestamps();
BatchStage<Entry<Integer, Product>> products =
        p.drawFrom(Sources.map("products"));
StreamStage<Tuple2<Trade, Product>> joined = trades.hashJoin(
        products,
        joinMapEntries(Trade::productId),
        Tuple2::tuple2
);

Symmetrically, you can fork the output of a stage and send it to more than one destination:

Pipeline p = Pipeline.create();
BatchStage<String> src = p.drawFrom(Sources.list("src"));
src.map(String::toUpperCase)
   .drainTo(Sinks.list("uppercase"));
src.map(String::toLowerCase)
   .drainTo(Sinks.list("lowercase"));

4.2. Choose Your Data Sources and Sinks

Hazelcast Jet has support for these data sources and sinks:

  • Hazelcast IMap and ICache, both as a batch source of just their contents and their event journal as an infinite source

  • Hazelcast IList (batch)

  • Hadoop Distributed File System (HDFS) (batch)

  • Java Database Connectivity (JDBC) (batch)

  • Java Messaging Services (JMS) queue and topic (infinite stream)

  • Kafka topic (infinite stream)

  • TCP socket (infinite stream)

  • a directory on the filesystem, both as a batch source of the current file contents and an infinite source of append events to the files

  • Apache Avro files (batch)

  • Amazon AWS S3 (batch)

  • any source/sink you create on your own by using the SourceBuilder and the SinkBuilder.

You can access most of these via the Sources and Sinks utility classes. Kafka, HDFS, S3 and Avro connectors are in their separate modules. The source and sink builder factories are in their respective classes.

There’s a dedicated chapter that discusses the topic of data sources and sinks in more detail.

4.3. Clean Up and Normalize Input Using Stateless Transforms

The simplest kind of transformation is one that can be done on each item individually and independent of other items. These are map, filter and flatMap. We already saw them in use in the previous examples. map transforms each item to another item; filter discards items that don’t match its predicate; and flatMap transforms each item into zero or more output items. These transforms are most often used to perform simple cleanup of the input stream by discarding unneeded data, adapting it to your data model, flattening nested lists into the top-level stream, etc.

We won’t spend much time on these transforms here; you can refer to their Javadoc for finer detail.

4.4. Detect Patterns Using Stateful Transforms

When you start with basic mapping and add to it an arbitrary object available over the entire lifetime of the processing job, you get a quite general and powerful tool: stateful mapping.

The major use case of stateful mapping is recognizing a pattern in the event stream, such as matching start-transaction with end-transaction events based on an event correlation ID. More generally, you can implement any kind of state machine and detect patterns in the input of any complexity.

Instead of keeping just one state object, you can define a grouping key and keep a separate object per distinct key. This is what you need for the above example of matching on correlation ID.

When you define a grouping key, you should also consider key expiration. Usually a given key will occur in the input stream only for a while and then become stale. Jet offers you a way to specify the time-to-live (TTL) for keys. If more than TTL milliseconds (in terms of event time) pass without observing the same key again, Jet evicts the state object from memory. You can also provide a function Jet will call when discarding the object. On the example of transaction tracking, you can detect stuck transactions here and react to them.

Here’s the code sample that implements pattern matching on a stream of transaction-start and transaction-end events:

StreamStage<Entry<String, Long>> transactionOutcomes = eventStream() (1)
    .groupingKey(TransactionEvent::transactionId)
    .mapStateful(
        SECONDS.toMillis(2),
        () -> new TransactionEvent[2], (2)
        (startEnd, transactionId, transactionEvent) -> { (3)
            switch (transactionEvent.type()) {
                case TRANSACTION_START:
                    startEnd[0] = transactionEvent;
                    break;
                case TRANSACTION_END:
                    startEnd[1] = transactionEvent;
                    break;
                default:
            }
            return (startEnd[0] != null && startEnd[1] != null)
                    ? entry(transactionId, startEnd[1].timestamp() - startEnd[0].timestamp())
                    : null;
        },
        (startEnd, transactionId, wm) -> (4)
            (startEnd[0] == null || startEnd[1] == null)
                ? entry(transactionId, TIMED_OUT)
                : null
    );
1 transactionOutcomes emits pairs (transactionId, transactionDurationMillis). Transaction duration can also be the special value TIMED_OUT.
2 This function creates the state object: an array of TransactionEvent s. The first slot is for the start event, the second for the end event.
3 The mapping function takes the state, the grouping key, and the input item. It places the event into the appropriate array slot and emits the output if both slots are now occupied.
4 This function reacts to state eviction. It takes the state object being evicted, the key associated with it, and the current watermark (i.e., current event time). If it detects that a slot in the array was left unoccupied, it outputs "transaction timed out".

Another thing you can do with stateful mapping is rolling aggregation, like accumulating the sum of a quantity across events, or its extreme values. You can also achieve these with the dedicated rolling aggregation transform to reuse Jet’s ready-made aggregate operations. However, if you can’t find an existing aggregate operation to reuse, it’s usually simpler to implement it with stateful mapping.

There is one major caveat when considering to solve your problem with stateful mapping: it’s sensitive to the order of the events in the stream. If the sequence of events in the stream can be different from the order of their occurrence in the real world, your state-updating logic will have to account for that, possibly leading to an explosion in complexity.

4.5. Suppress Duplicates

The distinct operation suppresses the duplicates from a stream. If you perform it after adding a grouping key, it emits a single item for every distinct key. The operation works on both batch and stream stages. In the latter case it emits distinct items within a window. Two different windows are processed independently.

In this example we have a batch of Person objects and we choose an arbitrary one from each 5-year age bracket:

Pipeline p = Pipeline.create();
BatchSource<Person> personSource = Sources.list("people");
p.drawFrom(personSource)
 .groupingKey(person -> person.getAge() / 5)
 .distinct()
 .drainTo(Sinks.list("sampleByAgeBracket"));

4.6. Merge Streams

The merge operation combines two pipeline stages in the simplest manner: it just emits all the items from both stages. In this example we merge the trading events from the New York and Tokyo stock exchanges:

Pipeline p = Pipeline.create();
IMap<Long, Trade> tradesNewYorkMap = instance.getMap("trades-newyork");
IMap<Long, Trade> tradesTokyoMap = instance.getMap("trades-tokyo");
StreamStage<Trade> tradesNewYork = p.drawFrom(
        Sources.mapJournal(tradesNewYorkMap, mapPutEvents(),
                mapEventNewValue(), START_FROM_CURRENT))
        .withNativeTimestamps(5_000);
StreamStage<Trade> tradesTokyo = p.drawFrom(
        Sources.mapJournal(tradesTokyoMap, mapPutEvents(),
                mapEventNewValue(), START_FROM_CURRENT))
        .withNativeTimestamps(5_000);
StreamStage<Trade> merged = tradesNewYork.merge(tradesTokyo);

4.7. Enrich Your Stream

As the data comes in, before you perform any reasoning on it, you must look up and attach to each item all the knowledge you have about it. If the data item represents a trade event, you want the data on the valuable being traded, the buyer, seller, etc. We call this step data enrichment.

Jet offers two basic techniques to enrich your data stream:

  1. Hash-join the stream with one or more datasets that you ingest as bounded streams. If your enriching dataset doesn’t change for the duration of the Jet job, this is your best tool.

  2. Directly look up the enriching data for each item by contacting the system that stores it. If you’re enriching an infinite stream and want to observe updates to the enriching dataset, you should use this approach.

4.8. Hash-Join

Hash-join is a kind of join operation optimized for the use case of data enrichment. It is like a many-to-one SQL JOIN that matches a foreign key in the stream-to-be-enriched with the primary key in the enriching dataset. You can perform several such joins in one operation, enriching your stream from arbitrarily many sources. You can also apply a many-to-many join on a non-unique key (see below).

The stream-to-be-enriched (we’ll call it the primary stream for short) can be an unbounded data stream. The enriching streams must be bounded and Jet will consume them in full before starting to enrich the primary stream. It will store their contents in hash tables for fast lookup, which is why we call this the "hash-join".

For each enriching stream you specify a pair of key-extracting functions: one for the enriching item and one for the primary item. This means that you can define a different join key for each of the enriching streams. The following example shows a three-way hash-join between the primary stream of stock trade events and two enriching streams: products and brokers:

Pipeline p = Pipeline.create();

// The primary stream (stream to be enriched): trades
IMap<Long, Trade> tradesMap = instance.getMap("trades");
StreamStage<Trade> trades = p.drawFrom(
        Sources.mapJournal(tradesMap, mapPutEvents(),
                mapEventNewValue(), START_FROM_CURRENT))
        .withoutTimestamps();

// The enriching streams: products and brokers
BatchStage<Entry<Integer, Product>> prodEntries =
        p.drawFrom(Sources.map("products"));
BatchStage<Entry<Integer, Broker>> brokEntries =
        p.drawFrom(Sources.map("brokers"));

// Join the trade stream with the product and broker streams
StreamStage<Tuple3<Trade, Product, Broker>> joined = trades.hashJoin2(
        prodEntries, joinMapEntries(Trade::productId),
        brokEntries, joinMapEntries(Trade::brokerId),
        Tuple3::tuple3
);

Products are joined on Trade.productId and brokers on Trade.brokerId. joinMapEntries() returns a JoinClause, which is a holder of the three functions that specify how to perform a join:

  1. the key extractor for the primary stream’s item

  2. the key extractor for the enriching stream’s item

  3. the projection function that transforms the enriching stream’s item into the item that will be used for enrichment.

Typically the enriching streams will be Map.Entry s coming from a key-value store, but you want just the entry value to appear as the enriching item. In that case you’ll specify Map.Entry::getValue as the projection function. This is what joinMapEntries() does for you. It takes just one function, primary stream’s key extractor, and fills in Entry::getKey and Entry::getValue for the enriching stream key extractor and the projection function, respectively.

You can also achieve a many-to-many join because the key in the enriching stream doesn’t have to be unique. If a given item’s foreign key matches several enriching items, Jet will produce another item in the output for each of them. If you’re joining with two enriching streams, and in both of them you have several matches, Jet will produce another output item for each combination of the enriching items (M x N items if there are M enriching items from one stream and N from the other).

If you’re doing many-to-many enrichment, never mutate the enriched item, otherwise you would break the rule of not mutating an item you emitted (see Sharing Data Between Pipeline Stages). For example:

StreamStage<Trade> joined = trades.hashJoin(
        prodEntries,
        joinMapEntries(Trade::productId),
        Trade::setProduct (1)
);
1 Sets the product on the trade object, returns this. Broken when there are several products for one trade!

With code as this and a many-to-many relationship between trades and products, Jet will re-emit the same Trade object several times, each time setting a different product on it. This will create a data race with the downstream stage, which may observe the same product several times while missing the other ones.

The safe way to perform a many-to-many hash join is to either package the individual objects into a tuple (as in the earlier examples) or to create a new Trade object each time.

In the interest of performance Jet pulls the entire enriching dataset into each cluster member. That’s why this operation is also known as a replicated join. This is something to keep in mind when estimating the RAM requirements for a hash-join operation.

4.8.1. Hash-Join With Four or More Streams Using the Builder

You can hash-join a stream with up to two enriching streams using the API we demonstrated above. If you have more than two enriching streams, you’ll use the hash-join builder. For example, you may want to enrich a trade with its associated product, broker, and market:

Pipeline p = Pipeline.create();

// The stream to be enriched: trades
IMap<Long, Trade> tradesMap = instance.getMap("trades");
StreamStage<Trade> trades = p.drawFrom(
        Sources.mapJournal(tradesMap, mapPutEvents(),
                mapEventNewValue(), START_FROM_CURRENT))
        .withoutTimestamps();

// The enriching streams: products, brokers and markets
BatchStage<Entry<Integer, Product>> prodEntries =
        p.drawFrom(Sources.map("products"));
BatchStage<Entry<Integer, Broker>> brokEntries =
        p.drawFrom(Sources.map("brokers"));
BatchStage<Entry<Integer, Market>> marketEntries =
        p.drawFrom(Sources.map("markets"));

// Obtain a hash-join builder object from the stream to be enriched
StreamHashJoinBuilder<Trade> builder = trades.hashJoinBuilder();

// Add enriching streams to the builder
Tag<Product> productTag = builder.add(prodEntries,
        joinMapEntries(Trade::productId));
Tag<Broker> brokerTag = builder.add(brokEntries,
        joinMapEntries(Trade::brokerId));
Tag<Market> marketTag = builder.add(marketEntries,
        joinMapEntries(Trade::marketId));

// Build the hash join pipeline
StreamStage<Tuple2<Trade, ItemsByTag>> joined = builder.build(Tuple2::tuple2);

The data type on the hash-joined stage is Tuple2<Trade, ItemsByTag>. The next snippet shows how to use it to access the primary and enriching items:

StreamStage<String> mapped = joined.map(
        (Tuple2<Trade, ItemsByTag> tuple) -> {
            Trade trade = tuple.f0();
            ItemsByTag ibt = tuple.f1();
            Product product = ibt.get(productTag);
            Broker broker = ibt.get(brokerTag);
            Market market = ibt.get(marketTag);
            return trade + ": " + product + ", " + broker + ", " + market;
        });

4.9. Enrich Using Direct Lookup

If you’re enriching an infinite stream, you most likely need to observe the changes that happen to the enriching dataset over the long timespan of the Jet job. In this case you can’t use the hash-join, which basically takes a snapshot of the enriching dataset at the beginning of the job. You may also encounter RAM shortage if your enriching dataset is very large.

The xUsingY transforms (such as filterUsingContext or mapUsingIMap) can enrich a stream by looking up from the original data source each time. There’s direct support for Hazelcast maps and Jet exposes the underlying machinery as well so you can write your own code to join with an arbitrary external dataset.

4.9.1. Look Up from Hazelcast Map

Hazelcast Jet allows you to enrich your stream directly from a Hazelcast IMap or ReplicatedMap. Since it must look up the data again for each item, performance is lower than with a hash-join, but the data is kept fresh this way. This matters especially for unbounded streaming jobs, where a hash-join would use data frozen in time at the beginning of the job.

If you enrich from a Hazelcast map (IMap or ReplicatedMap) that is stored inside the Jet cluster, you can achieve data locality. For ReplicatedMap that’s trivial because its entire contents are present on every cluster member. IMap, on the other hand, is partitioned so a given member holds only a part of the data. You must give Jet the key-extracting function so it can do the following for each item in your stream:

  1. extract the lookup key

  2. find its partition ID

  3. send the item to the member holding the IMap data with that partition ID

  4. on the target member, use the lookup key again to fetch the enriching data

We didn’t completely avoid the network here, but we replaced the request-response cycle with just a one-way send. We eliminated the cost of the request-response latency and pay just the overhead of network transfer.

In this example we enrich a stream of trades with detailed stock info. The stock info (the enriching dataset) is stored in a Hazelcast IMap so we use groupingKey() to let Jet partition our stream and use local IMap lookups:

IMap<String, StockInfo> stockMap = jet.getMap("stock-info"); (1)
StreamSource<Trade> tradesSource = Sources.mapJournal("trades",
        mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT);

Pipeline p = Pipeline.create();
p.drawFrom(tradesSource)
 .withoutTimestamps()
 .groupingKey(Trade::ticker) (2)
 .mapUsingIMap(stockMap, Trade::setStockInfo) (3)
 .drainTo(Sinks.list("result"));
1 Obtain the IMap from a Hazelcast Jet instance
2 Set the lookup key, this enables data locality
3 Enrich the stream by setting the looked-up StockInfo on Trade items. This syntax works if the setter has fluent API and returns this.

In the example above the syntax looks very simple, but it hides a layer of complexity: you can’t just fetch a Hazelcast map instance and add it to the pipeline. It’s a proxy object that’s valid only in the JVM process where you obtained it. mapUsingIMap actually remembers just the name of the map and will obtain a map with the same name from the Jet cluster on which the job runs.

4.9.2. Look Up From an External System

Hazelcast Jet exposes the facility to look up from an external system. In this case you must define a factory object that creates a client instance and through which you will fetch the data from the remote system. Jet will use the factory to create a context object for each Processor in the cluster that executes your transforming step.

The feature is available in two variants:

  • asynchronous: the functions return a CompletableFuture. This version provides higher throughput because Jet can issue many concurrent requests without blocking a thread

  • synchronous: the functions return the result directly. Use it if your external service doesn’t provide an async API. It’s also useful if the context is, for example, a pre-loaded machine learning model which is only CPU-bound and doesn’t do any IO.

You can also specify a function that extracts a key from your items using groupingKey(). Even though Jet won’t do any lookup or grouping by that key, it will set up the job so that all the items with the same key get paired with the same context object.

For example, you may be fetching data from a remote Hazelcast cluster. To optimize performance, you can enable the near-cache on the client and you can save a lot of memory by specifying the key function: since Jet executes your enriching code in parallel, each worker has its own Hazelcast client instance, with its own near-cache. With the key function, each near-cache will hold a non-overlapping subset of the keys:

ContextFactory<IMap<String, StockInfo>> ctxFac = ContextFactory
        .withCreateFn(x -> {
            ClientConfig cc = new ClientConfig();
            cc.getNearCacheConfigMap().put("stock-info",
                    new NearCacheConfig());
            HazelcastInstance client = newHazelcastClient(cc);
            IMap<String, StockInfo> map = client.getMap("stock-info");
            return map;
        })
        .withLocalSharing();
StreamSource<Trade> tradesSource = Sources.mapJournal("trades",
        mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT);

Pipeline p = Pipeline.create();
p.drawFrom(tradesSource)
 .withoutTimestamps()
 .groupingKey(Trade::ticker)
 .mapUsingContextAsync(ctxFac,
         (map, key, trade) -> toCompletableFuture(map.getAsync(key))
                 .thenApply(trade::setStockInfo))
 .drainTo(Sinks.list("result"));

In a similar fashion you can integrate other external systems with a Jet pipeline.

4.9.3. Weak Consistency of Direct Lookup

When you use the xUsingY transform to enrich an infinite stream, your output will reflect the fact that the enriching dataset is changing. Two items with the same key may be enriched with different data. This is expected. However, the output may also behave in a surprising way. Say you update the enriching data for some key from A to B. You expect to see a sequence …​A-A-B-B…​ in the output, but when you sort it by timestamp, you can observe any order, such as A-B-A-B or B-B-A-A. This is because Jet doesn’t enforce the processing order to strictly follow the timestamp order of events.

In effect, your lookups will be eventually consistent, but won’t have the monotonic read consistency. Currently there is no feature in Jet that would achieve monotonic reads while enriching an event stream from a changing dataset.

4.10. Group and Aggregate

Data aggregation is the cornerstone of distributed stream processing. It computes an aggregate function (simple examples: sum or average) over the data items. Typically you don’t want to aggregate all the items together, but classify them by some key and then aggregate over each group separately. This is the group-and-aggregate transformation. You can join several streams on a key and simultaneously group-aggregate them on the same key. This is the cogroup-and-aggregate transformation. If you’re processing an unbounded event stream, you must define a bounded window over the stream within which Jet will aggregate the data.

This is how a very simple batch aggregation without grouping may look (the list named text contains lines of text):

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("text"))
 .aggregate(counting())
 .drainTo(Sinks.list("result"));

We count all the lines and push the count to a list named “result”. Add this code to try it out:

JetInstance jet = Jet.newJetInstance();
jet.getList("text").addAll(asList("foo foo bar", "foo bar foo"));
jet.newJob(p).join();
jet.getList("result").forEach(System.out::println);
Jet.shutdownAll();

The program will print 2 because we added two items to the text list.

To get cleaner output, without Jet’s logging, add System.setProperty("hazelcast.logging.type", "none"); to the top.

Let’s count all the words instead:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("text"))
 .flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
 .aggregate(counting())
 .drainTo(Sinks.list("result"));

We split up the lines into words using a regular expression. The program prints 6 now because there are six words in the input.

Now let’s turn this into something more insightful: a word frequency histogram, giving us for each distinct word the number of its occurrences:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("text"))
 .flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
 .groupingKey(wholeItem())
 .aggregate(counting())
 .drainTo(Sinks.list("result"));

We added a grouping key, in this case it is the word itself (Functions.wholeItem() returns the identity function). Now the program will perform the counting aggregation on each group of equal words. It will print

bar=2
foo=4

What we’ve just built up is the classical Word Count task.

The definition of the aggregate operation hides behind the counting() method call. This is a static method in our AggregateOperations utility class, which provides you with some predefined aggregate operations. You can also implement your own aggregate operations; refer to the section dedicated to this.

4.11. Correlate and Join Streams

In Jet you can join any number of streams on a common key, which can be anything you can calculate from the data item. This works great for correlating the data from two or more streams. In the same step you apply an aggregate function to all the grouped items, separated by the originating stream. The aggregate function can produce a summary value like an average or linear trend, but it can also keep a list of all the items, effectively avoiding actual aggregation. You can achieve any kind of join: inner/outer (full, left and right). The constraint, as already noted, is that the join condition cannot be fully general, but must amount to matching a computed key.

We have a full code sample at our code samples repository. Here we’ll show a simple example where we calculate the ratio of page visits to add-to-cart actions for each user. We accumulate the event counts and then do the maths on them in the end:

(1)
JetInstance jet = Jet.newJetInstance();
IList<PageVisit> pageVisits = jet.getList("pageVisit");
IList<AddToCart> addToCarts = jet.getList("addToCart");
IList<Entry<Integer, Double>> results = jet.getList("result");

(2)
pageVisits.add(new PageVisit(1));
pageVisits.add(new PageVisit(1));
addToCarts.add(new AddToCart(1));

pageVisits.add(new PageVisit(2));
addToCarts.add(new AddToCart(2));

(3)
Pipeline p = Pipeline.create();
BatchStageWithKey<PageVisit, Integer> pageVisit =
        p.drawFrom(Sources.list(pageVisits))
         .groupingKey(pv -> pv.userId());
BatchStageWithKey<AddToCart, Integer> addToCart =
        p.drawFrom(Sources.list(addToCarts))
         .groupingKey(atc -> atc.userId());

BatchStage<Entry<Integer, Tuple2<Long, Long>>> coAggregated = pageVisit
        .aggregate2(counting(),         (4)
                addToCart, counting()); (5)
coAggregated
        .map(e -> {                     (6)
            long visitCount = e.getValue().f0();
            long addToCartCount = e.getValue().f1();
            return entry(e.getKey(), (double) (addToCartCount / visitCount));
        })
        .drainTo(Sinks.list(results));
(7)
jet.newJob(p).join();
results.forEach(System.out::println);
Jet.shutdownAll();
1 Start Jet, acquire source and sink lists from it
2 Fill the source lists (the numbers 1 and 2 are user IDs)
3 Construct the pipeline
4 Supply the aggregate operation for pageVisits
5 Supply the aggregate operation for addToCarts
6 Compute the ratio of page visits to add-to-cart actions
7 Run the job, print the results

The way we prepared the data, user 1 made two visits and added one item to the shopping cart; user 2 made one visit and added one item. Given that, we should expect to see the following output:

1=0.5
2=1.0

Note how we supplied a separate aggregate operation for each input stage. We calculate the overall result based on the results obtained for each input stage in isolation.

You also have the option of constructing a two-input aggregate operation that has immediate access to all the items. Refer to the section on AggregateOperation.

4.11.1. Join Without Aggregating

Let’s assume you have these input stages:

BatchStageWithKey<PageVisit, Integer> pageVisit =
        p.drawFrom(Sources.<PageVisit>list("pageVisit"))
         .groupingKey(PageVisit::userId);
BatchStageWithKey<AddToCart, Integer> addToCart =
        p.drawFrom(Sources.<AddToCart>list("addToCart"))
         .groupingKey(AddToCart::userId);

You may have expected to find a joining transform in the Pipeline API that outputs a stream of all matching pairs of items:

BatchStage<Tuple2<PageVisit, AddToCart>> joined = pageVisits.join(addToCarts);

This would more closely match the semantics of an SQL JOIN, but in the context of a Java API it doesn’t work well. For M page visits joined with N add-to-carts, Jet would have to materialize M * N tuples and feed them into the next stage. It would also be forced to buffer all the data from one stream before receiving the other.

To allow you to get the best performance, Jet strongly couples joining with aggregation and encourages you to frame your solution in these terms.

This doesn’t stop you from getting the "exploded" output with all the joined data, it just makes it a less straightforward option. If your use case can’t be solved any other way than by keeping all individual items, you can specify toList() as the aggregate operation and get all the items in lists:

BatchStage<Tuple2<List<PageVisit>, List<AddToCart>>> joinedLists =
    pageVisit.aggregate2(toList(), addToCart, toList())
             .map(Entry::getValue);

If you need something else than the full join, you can filter out some pairs of lists. In this example we create a LEFT OUTER JOIN by removing the pairs with empty left-hand list:

BatchStage<Tuple2<List<PageVisit>, List<AddToCart>>> leftOuterJoined =
        joinedLists.filter(pair -> !pair.f0().isEmpty());

If you specifically need to get a stream of all the combinations of matched items, you can add a flatmapping stage:

BatchStage<Tuple2<PageVisit, AddToCart>> fullJoined = joinedLists
    .flatMap(pair -> traverseStream(
            nonEmptyStream(pair.f0())
                .flatMap(pVisit -> nonEmptyStream(pair.f1())
                        .map(addCart -> tuple2(pVisit, addCart)))));

We used this helper method:

static <T> Stream<T> nonEmptyStream(List<T> input) {
    return input.isEmpty() ? Stream.of((T) null) : input.stream();
}

4.11.2. Join Four or More Streams Using the Aggregate Builder

If you need to join more than three streams, you’ll have to use the builder object. For example, your goal may be correlating events coming from different systems, where all the systems serve the same user base. In an online store you may have separate event streams for product page visits, adding-to-cart events, payments, and deliveries. You want to correlate all the events associated with the same user. The example below calculates statistics per category for each user:

Pipeline p = Pipeline.create();

(1)
BatchStageWithKey<PageVisit, Integer> pageVisits =
        p.drawFrom(Sources.<PageVisit>list("pageVisit"))
         .groupingKey(PageVisit::userId);
BatchStageWithKey<AddToCart, Integer> addToCarts =
        p.drawFrom(Sources.<AddToCart>list("addToCart"))
         .groupingKey(AddToCart::userId);
BatchStageWithKey<Payment, Integer> payments =
        p.drawFrom(Sources.<Payment>list("payment"))
         .groupingKey(Payment::userId);
BatchStageWithKey<Delivery, Integer> deliveries =
        p.drawFrom(Sources.<Delivery>list("delivery"))
         .groupingKey(Delivery::userId);

(2)
GroupAggregateBuilder<Integer, List<PageVisit>> builder =
        pageVisits.aggregateBuilder(toList());

(3)
Tag<List<PageVisit>> visitTag = builder.tag0();
Tag<List<AddToCart>> cartTag = builder.add(addToCarts, toList());
Tag<List<Payment>> payTag = builder.add(payments, toList());
Tag<List<Delivery>> deliveryTag = builder.add(deliveries, toList());

(4)
BatchStage<String> coGrouped = builder
        .build()
        .map(e -> {
            ItemsByTag ibt = e.getValue();
            return String.format("User ID %d: %d visits, %d add-to-carts," +
                            " %d payments, %d deliveries",
                    e.getKey(), ibt.get(visitTag).size(), ibt.get(cartTag).size(),
                    ibt.get(payTag).size(), ibt.get(deliveryTag).size());
        });
1 Create four source streams
2 Obtain a builder object for the co-group transform, specify the aggregate operation for PageVisits
3 Add the co-grouped streams to the builder, specifying the aggregate operation to perform on each
4 Build the co-group transform, retrieve the individual aggregation results using the tags you got in step 3 (ibt is an ItemsByTag)

4.12. Aggregate an Unbounded Stream over a Window

The process of data aggregation takes a finite batch of data and produces a result. We can make it work with an infinite stream if we break up the stream into finite chunks. This is called windowing and it’s almost always defined in terms of a range of event timestamps (a time window). For this reason Jet requires you to set up the policy of determining an event’s timestamp before you can apply windowing to your stream.

4.12.1. Add Timestamps to the Stream

The Pipeline API guides you to set up the timestamp policy right after you obtain a source stage. pipeline.drawFrom(someStreamSource) returns a SourceStreamStage which offers just these methods:

  • withNativeTimestamps() declares that the stream will use source’s native timestamps. This typically refers to the timestamps that the external source system sets on each event.

  • withTimestamps(timestampFn) provides a function to the source that determines the timestamp of each event.

  • withoutTimestamps() declares that the source stage has no timestamps. Use this if you don’t need them (i.e., your pipeline won’t perform windowed aggregation).

Exceptionally, you may need to call withoutTimestamps() on the source stage, then perform some transformations that determine the event timestamps, and then call addTimestamps(timestampFn) to instruct Jet where to find them in the events. Some examples include an enrichment stage that retrieves the timestamps from a side input or flat-mapping the stream to unpack a series of events from each original item. If you do this, however, it will no longer be the source that determines the watermark.

Prefer Assigning Timestamps at the Source

In some source implementations, especially partitioned ones like Kafka, there is a risk of high event time skew occurring across partitions as the Jet processor pulls the data from them in batches, in a round-robin fashion. This problem is especially pronounced when Jet recovers from a failure and restarts your job. In this case Jet must catch up with all the events that arrived since taking the last snapshot. It will receive these events at the maximum system throughput and thus each partition will have a lot of data each time Jet polls it. This means that the interleaving of data from different partitions will become much more coarse-grained and there will be sudden jumps in event time at the points of transition from one partition to the next. The jumps can easily exceed the configured allowedLateness and cause Jet to drop whole swaths of events as late.

In order to mitigate this issue, Jet has special logic in its partitioned source implementations that keeps separate track of the timestamps within each partition and knows how to reconcile the temporary differences that occur between them.

This feature works only if you set the timestamping policy in the source using withTimestamps() or withNativeTimestamps().

4.12.2. Specify the Window

In order to explain windowing, let’s start with a simple batch job, the Word Count. If you have a batch of tweets you want to analyze for word frequencies, this is how the pipeline can look:

BatchStage<Tweet> tweets = p.drawFrom(Sources.list("tweets"));

tweets.flatMap(tweet -> traverseArray(tweet.text().toLowerCase().split("\\W+")))
      .filter(word -> !word.isEmpty())
      .groupingKey(wholeItem())
      .aggregate(counting())
      .drainTo(Sinks.map("counts"));

To make the same kind of computation work on an infinite stream of tweets, we must add the event timestamps and the specification of the window:

StreamStage<Tweet> tweets = p.drawFrom(twitterStream())
                             .withNativeTimestamps(0); (1)

tweets.flatMap(tweet -> traverseArray(tweet.text().toLowerCase().split("\\W+")))
      .filter(word -> !word.isEmpty())
      .window(sliding(MINUTES.toMillis(1), SECONDS.toMillis(1))) (2)
      .groupingKey(wholeItem())
      .aggregate(counting())
      .drainTo(Sinks.list("result"));
1 use the source system’s native timestamps, don’t tolerate event time disorder (allowedLateness parameter set to 0)
2 specify the window (one minute long, slides by one second)

Now we’ve got a Jet job that does live tracking of words currently trending in tweets. The sliding window definition tells Jet to aggregate the events that occurred within the last minute and update this result every second.

Using native timestamps often works well, but it’s not the most precise way of determining the event time. It records the time when the event was submitted to the system that Jet uses as the source and that may happen quite a bit after the event originally occurred. The best approach is to provide Jet with the function that extracts the timestamp directly from the event object. In this case you must also deal with the fact that Jet can receive events in an order different from their timestamp order. We discuss these concerns in the Jet Concepts chapter.

To extract the event timestamp, create the source stage like this:

StreamStage<Tweet> tweets = p.drawFrom(twitterStream())
                 .withTimestamps(Tweet::timestamp, SECONDS.toMillis(5));

We specified two things: how to extract the timestamp and how much event "lateness" we want to tolerate. We said that any event we receive can be at most five seconds behind the highest timestamp we already received. If it’s older than that, we’ll have to ignore it. On the flip side, this means that we’ll have to wait for an event that occurred five seconds after the given window’s end before we can assume we have all the data to emit that result.

4.12.3. Get Early Results Before the Window is Complete

If you had to allow a lot of event lateness, or if you just use large time windows, you may want to track the progress of a window while it is still accumulating events. You can order Jet to give you, at regular intervals, the current status on all the windows it has some data for, but aren’t yet complete. On our running example of trending words, it may look like this:

StreamStage<KeyedWindowResult<String, Long>> result =
    p.drawFrom(twitterStream())
     .withTimestamps(Tweet::timestamp, SECONDS.toMillis(15))
     .flatMap(tweet -> traverseArray(tweet.text().toLowerCase().split("\\W+")))
     .filter(word -> !word.isEmpty())
     .window(sliding(MINUTES.toMillis(1), SECONDS.toMillis(1))
             .setEarlyResultsPeriod(SECONDS.toMillis(1)))
     .groupingKey(wholeItem())
     .aggregate(counting());

In the above snippet we set up an aggregation stage whose window size is one minute and there’s an additional 15-second wait time for the late-coming events. This amounts to waiting up to 75 seconds from receiving a given event to getting the result it contributed to. Therefore we ask Jet to give us updates on the current progress every second.

The output of the windowing stage is in the form of KeyedWindowResult<String, Long>, where String is the word and Long is the frequency of the word in the given window. KeyedWindowResult also has an isEarly property that says whether the result is early or final.

Generally, Jet doesn’t guarantee that a stage will receive the items in the same order its upstream stage emitted them. For example, it executes a map transform with many parallel tasks. One task may get the early result and another one the final result. They may emit the transformed result to the sink in any order.

This can lead to a situation where your sink receives an early result after it has already received the final result.

4.13. Kinds of Windows

Jet supports these kinds of windows:

  • tumbling window: a window of constant size that "tumbles" along the time axis: the consecutive positions of the window don’t overlap. If you use a window size of 1 second, Jet will group together all events that occur within the same second and you’ll get window results for intervals [0-1) seconds, then [1-2) seconds, and so on.

  • sliding window: a window of constant size that slides along the time axis. It slides in discrete steps that are a fraction of the window’s length. A typical setting is to slide by 1% of the window size. Jet outputs the aggregation result each time the window moves on. If you use a window of size 1 second sliding by 10 milliseconds, Jet will output window results for intervals [0.00-1.00) seconds, then [0.01-1.01) seconds, and so on.

  • session window: it captures a burst of events separated by periods of quiescence. You define the "session timeout", i.e., the length of the quiet period that causes the window to close. If you define a grouping key, there is a separate, independent session window for each key.

You can find a more in-depth explanation of Jet’s windows in the Jet Concepts chapter.

4.14. Rolling Aggregation

Jet supports a way to aggregate an unbounded stream without windowing: for each input item you get the current aggregation value as output, as if this item was the last one to be aggregated. You use the same AggregateOperation implementations that work with Jet’s aggregate API. Note that Jet doesn’t enforce processing in the order of event time; what you get accounts for the items that Jet happens to have processed so far.

This kind of aggregation is useful in jobs that monitor a stream for extremes or totals. For example, if you have a stream of web server monitoring metrics, you can keep track of the worst latency ever experienced, the highest throughput seen, total number of transactions processed, and so on.

In the following example we process a stream of trading events and get the "largest" trade seen so far (with the highest worth in dollars). Note that the rolling aggregation outputs an item every time, not just after a new record-breaking trade.

Pipeline p = Pipeline.create();
StreamSource<Trade> tradesSource = Sources.mapJournal("trades",
        mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT);
StreamStage<Trade> currLargestTrade =
        p.drawFrom(tradesSource)
         .withoutTimestamps()
         .rollingAggregate(maxBy(
                 ComparatorEx.comparing(Trade::worth)));

Rolling aggregation is just a special case of stateful mapping, equivalent to this:

stage.mapStateful(aggrOp.createFn(), (acc, item) -> {
    aggrOp.accumulateFn().accept(acc, item);
    return aggrOp.exportFn().apply(acc);
});

The main advantage of rolling aggregation over stateful mapping is that it’s the simplest way to reuse an existing AggregateOperation. If you’re starting from scratch with the rolling aggregation logic, it will probably be simpler to implement it as stateful mapping.

4.15. Apply a User-Defined Transformation

Once you move past the basics, you’ll notice it isn’t always practical to write out the full pipeline definition in a single long chained expression. You’ll want to apply the usual principles of modularization and code reuse.

One way to do it without any API support is writing a method that takes pipeline stage, applies some transforms, and returns the resulting stage, like this:

static BatchStage<String> cleanUp(BatchStage<String> input) {
    return input.map(String::toLowerCase)
                .filter(s -> s.startsWith("success"));
}

You could use it like this:

BatchStage<String> stage = p.drawFrom(source);
BatchStage<String> cleanedUp = PipelineTransforms.cleanUp(stage);
BatchStage<Long> counted = cleanedUp.aggregate(counting());

This has the unfortunate consequence of breaking up the method chain and forcing you to introduce more local variables. To allow you to seamlessy integrate such method calls with the pipeline expression, Jet defines the apply transform:

BatchStage<Long> counted = p.drawFrom(source)
               .apply(PipelineTransforms::cleanUp)
               .aggregate(counting());

4.16. Developing and testing your pipeline

When developing your pipeline, it can be useful to work with some mock data sources to avoid integrating with a real data source. Similarly with sinks, for testing you may want to assert the output of a pipeline in a convenient way. Jet’s Pipeline API offers some tools to help testing a pipeline without having to integrate with a real data source or sink.

These APIs are currently marked as @Beta and are not covered by the backward-compatibility guarantees.

4.16.1. Test Sources

Jet provides some test and development sources which can be used both for batch and streaming jobs.

Batch

Jet provides test batch sources which emit the supplied items and then complete:

pipeline.drawFrom(TestSources.items(1, 2, 3, 4));

It’s also possible to use this source with a collection:

List<Integer> list = IntStream.range(0, 1000)
      .boxed()
      .collect(Collectors.toList());
pipeline.drawFrom(TestSources.items(list))
        .drainTo(Sinks.logger());

These sources are non-distributed and do not participate in fault tolerance protocol.

Streaming

For streaming sources a mock source which generates infinite data is available. The source is by default throttled to emit at the specified rate of events per second. It also generates a sequence number and a timestamp with each event which can be used to create mock data:

pipeline.drawFrom(TestSources.itemStream(10,
    (timestamp, sequence) -> new Trade(sequence, timestamp)))
        .withNativeTimestamps(0)
        .window(WindowDefinition.tumbling(1000))
        .aggregate(AggregateOperations.counting())
        .drainTo(Sinks.logger());

Similar to batch sources, these sources are non-distributed and do not support fault tolerance: after a job restart the source sequence will be reset to 0. The timestamp is current system time.

4.16.2. Assertions

Jet also provides assertions which can be used for asserting the intermediate or final output of a pipeline. Assertions are implemented as sinks which collect incoming items and then run the desired assertion on the collected items.

Batch

Assertions in batch jobs are typically run only after all the items have been received.

An example is given below:

pipeline.drawFrom(TestSources.items(1, 2, 3, 4))
        .apply(Assertions.assertCollected(list ->
            assertEquals("4 items must be received", list.size(), 4))
        )
        .drainTo(Sinks.logger());

In the given example, the assertion will be run after the source has finished emitting all the items. If the assertion fails, the job will also fail with an AssertionError. It’s possible to have the assertion inline inside the pipeline and do further computations. This creates a fork in the pipeline where the assertion acts as an intermediary sink.

Streaming

For streaming assertions, we can’t rely that the source will at some point finish emitting items. Instead, we can rely on a timeout that a given assertion will eventually be reached.

Example:

int itemsPerSecond = 10;
pipeline.drawFrom(TestSources.itemStream(itemsPerSecond))
        .withoutTimestamps()
        .apply(Assertions.assertCollectedEventually(10, list ->
            assertTrue("At least 20 items must be received", list.size() > 20))
        )
        .drainTo(Sinks.logger());

Contrary to the batch example above, this assertion will be run periodically until it passes. Once the assertion is proved to be correct, the job will be cancelled immediately with an AssertionCompletedException. If the assertion is still failing after the given timeout (10 seconds in the example above) the job will fail with an AssertionError instead. Due to this if eventual assertion is used in the job, no other assertions should be used in it or the job may be cancelled before other assertions pass.

You can see a sample of how a pipeline containing this assertion can be submitted and asserted below:

try {
    jet.newJob(pipeline).join();
    fail("Job should have completed with an AssertionCompletedException," +
        " but instead completed normally"
    );
} catch (CompletionException e) {
    assertContains(e.toString(), AssertionCompletedException.class.getName());
}

4.17. Pipeline API Cheatsheet

Transform

Sample

Map

Apply a mapping function to each input item independently. You can also do filtering by mapping to null.

This example converts the input lines of text to lowercase:

BatchStage<String> lines = p.drawFrom(list("lines"));
BatchStage<String> lowercased = lines.map(line -> line.toLowerCase());

Filter

Apply a filtering function to each input item to decide whether to pass it to the output.

This example removes all empty strings from the stream:

BatchStage<String> lines = p.drawFrom(list("lines"));
BatchStage<String> nonEmpty = lines.filter(line -> !line.isEmpty());

Flatmap

Map each item to arbitrarily many items using a function that returns a Traverser over the result items.

This example splits the lines of text into individual words:

BatchStage<String> lines = p.drawFrom(list("lines"));
BatchStage<String> words = lines.flatMap(
        line -> traverseArray(line.split("\\W+")));

Stateful map

Transform input into output with access to persistent, mutable state. Useful for pattern matching and custom rolling aggregation. To reuse an existing AggregateOperation, use rollingAggregate.

This example processes a stream of reports on requests completed by a cluster of servers and emits output whenever it detects a request with record-breaking latency. If a server is inactive for 2 minutes, it emits the maximum latency for the server’s period of activity that has just ended and then resets the score for that server:

StreamStage<Entry<String, Long>> topLatencies = latencies
    .groupingKey(Entry::getKey)
    .mapStateful(
        MINUTES.toMillis(2),
        LongAccumulator::new,
        (topLatencyState, key, e) -> {
            long currLatency = e.getValue();
            long topLatency = topLatencyState.get();
            topLatencyState.set(Math.max(currLatency, topLatency));
            return currLatency > topLatency
                ? entry(String.format("%s:newRecord", key), e.getValue())
                : null;
        },
        (topLatencyState, key, time) -> entry(String.format(
            "%s:maxForSession:%d", key, time), topLatencyState.get())
    );

Distinct

Suppress duplicate items from a stream. If you apply a grouping key, two items mapping to the same key will be duplicates. This operation applies primarily to batch streams, but also works on a windowed unbounded stream.

This example takes a batch stage with strings and creates a stage with distinct strings and another where the four-character prefix of the strings is unique:

BatchStage<String> strings = someStrings();
BatchStage<String> distinctStrings = strings.distinct();
BatchStage<String> distinctByPrefix =
        strings.groupingKey(s -> s.substring(0, 4)).distinct();

Merge

Merge the contents of two streams into one. The item type in the right-hand stage must be the same or a subtype of the one in the left-hand stage.

This example merges the streams from New York and Tokyo stock exchanges:

StreamStage<Trade> tradesNewYork = tradeStream("new-york");
StreamStage<Trade> tradesTokyo = tradeStream("tokyo");
StreamStage<Trade> tradesNyAndTokyo =
        tradesNewYork.merge(tradesTokyo);

Enrich by Many-to-One Join

Perform a many-to-one join with arbitrarily many enriching streams. The stream on which you invoke hashJoin holds foreign keys for the items in the enriching streams.

This example enriches a stream of stock trades with detailed info on the stock involved:

BatchStage<Trade> trades = p.drawFrom(list("trades"));
BatchStage<Entry<String, StockInfo>> stockInfo =
        p.drawFrom(list("stockInfo"));
BatchStage<Trade> joined = trades.hashJoin(stockInfo,
        joinMapEntries(Trade::ticker), Trade::setStockInfo);

Enrich by Map Lookup

For each stream item, look up a value from a Hazelcast map and transform (enrich) the item using it. Similar to a hash-join with the map’s entry set, but values don’t get stale (at the expense of throughput).

This example enriches a stream of stock trades with detailed info on the stock involved:

IMap<String, StockInfo> stockMap = jet.getMap("stock-info");
StreamSource<Trade> tradesSource = tradesSource();

p.drawFrom(tradesSource)
 .withoutTimestamps()
 .groupingKey(Trade::ticker)
 .mapUsingIMap(stockMap, Trade::setStockInfo)
 .drainTo(Sinks.list("result"));

Aggregate

Aggregates all the stream items with the AggregateOperation you supply.

This example counts the stream items:

BatchStage<String> lines = p.drawFrom(list("lines"));
BatchStage<Long> count = lines.aggregate(counting());

Group and Aggregate

Group the items by key and perform an AggregateOperation on each group.

This example calculates the number of occurrences of each word in the stream:

BatchStage<String> words = p.drawFrom(list("words"));
BatchStage<Entry<String, Long>> wordsAndCounts =
    words.groupingKey(word -> word)
         .aggregate(counting());

Windowed Group and Aggregate

Perform grouping and aggregation on an unbounded stream by splitting it into bounded windows.

This example calculates the number of occurrences of each word in a stream of tweets within the last second:

StreamStage<Tweet> tweets = tweetStream();

tweets
    .flatMap(tweet -> Traversers.traverseArray(tweet.hashtags()))
    .groupingKey(hashtag -> hashtag)
    .window(sliding(1000, 10))
    .aggregate(counting());

Join on Common Key

Perform a many-to-many join of several streams on a common key. Apply an AggregateOperation on each group. As a special case, you can specify an aggregate operation that gives you all the joined items without transformation.

This example joins a "page visits" stream with a "payments" stream in a Web Shop application. For each user it gives you all the recorded page views and payments:

BatchStage<PageVisit> pageVisits = p.drawFrom(Sources.list("pageVisit"));
BatchStage<Payment> payments = p.drawFrom(Sources.list("payment"));

BatchStageWithKey<PageVisit, Integer> pageVisitsByUserId =
        pageVisits.groupingKey(pageVisit -> pageVisit.userId());

BatchStageWithKey<Payment, Integer> paymentsByUserId =
        payments.groupingKey(payment -> payment.userId());

pageVisitsByUserId.aggregate2(toList(), paymentsByUserId, toList());
// the output will two lists: one containing
// the payments and the other the page visits, both for the
// same user

Streaming Join on a Common Key

Like the above, but also apply a window to the unbounded stream. It joins all the items belonging to the same window.

This example joins two unbounded streams, "page visits" and "payments". For each user it gives you all the page views and payments that they performed within the last minute and updates the result every second:

StreamStage<PageVisit> pageVisits = pageVisitsStream();
StreamStage<Payment> payments = paymentsStream();

StreamStageWithKey<Payment, Integer> paymentsByUserId =
    payments.groupingKey(payment -> payment.userId());

pageVisits.groupingKey(pageVisit -> pageVisit.userId())
          .window(sliding(60_000, 1000))
          .aggregate2(toList(), paymentsByUserId, toList());

// the output will be a window with two lists: one containing
// the payments and the other the page visits, both for the
// same user

Rolling Aggregation

Keep performing the same aggregate operation forever, getting the current result after each item.

This example tracks the largest trade observed in a stream:

StreamSource<Trade> tradesSource = tradesSource();
StreamStage<Trade> currLargestTrade =
    p.drawFrom(tradesSource)
     .withoutTimestamps()
     .rollingAggregate(maxBy(comparing(Trade::worth)));

Apply a User-Defined Transform

Modularize and reuse your pipeline code by extracting transforms to methods.

For example, in this pipeline:

p.drawFrom(source)
 .map(String::toLowerCase)
 .filter(s -> s.startsWith("success"))
 .aggregate(counting())

you can extract the map and flatMap stages to a method:

static BatchStage<String> cleanUp(BatchStage<String> input) {
    return input.map(String::toLowerCase)
                .filter(s -> s.startsWith("success"));
}

and then use it like this:

p.drawFrom(source)
 .apply(PipelineTransforms::cleanUp)
 .aggregate(counting())

Transform with Custom Processor

Add a stage with a custom Core API processor to the pipeline.

Suppose you have this identity-mapping processor:

public static class IdentityMapP extends AbstractProcessor {
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        return tryEmit(item);
    }
}

Then you can add it to the pipeline this way (suppose you added the method to the Util class:

p.drawFrom(source)
 .customTransform("name", IdentityMapP::new)

Note that the Core API is for advanced use cases and you’ll rarely need it.

4.18. Implement Your Aggregate Operation

The single most important kind of processing Jet does is aggregation. In general it is a transformation of a set of input values into a single output value. The function that does this transformation is called the “aggregate function”. A basic example is sum applied to a set of integer numbers, but the result can also be a complex value, for example a list of all the input items.

Jet’s library contains a range of predefined aggregate functions, but it also exposes an abstraction, called AggregateOperation, that allows you to plug in your own. Since Jet does the aggregation in a parallelized and distributed way, you can’t simply supply a piece of Java code that implements the aggregate function; we need you to break it down into several smaller pieces that fit into Jet’s processing engine.

The ability to compute the aggregate function in parallel comes at a cost: Jet must be able to give a slice of the total data set to each processing unit and then combine the partial results from all the units. The combining step is crucial: it will only make sense if we’re combining the partial results of a commutative associative function (CA for short). On the example of sum this is trivial: we know from elementary school that + is a CA operation. If you have a stream of numbers: {17, 37, 5, 11, 42}, you can sum up {17, 5} separately from {42, 11, 37} and then combine the partial sums (also note the reordering of the elements).

If you need something more complex, like average, it doesn’t by itself have this property; however if you add one more ingredient, the finish function, you can express it easily. Jet allows you to first compute some CA function, whose partial results can be combined, and then at the very end apply the finish function on the fully combined result. To compute the average, your CA function will output the pair (sum, count). Two such pairs are trivial to combine by summing each component. The finish function will be sum / count.

In addition to the mathematical side, there is also the practical one: you have to provide Jet with a specific mutable object, called the accumulator, which will keep the “running score” of the operation in progress. For the average example, it would be something like

public class AvgAccumulator {
    private long sum;
    private long count;

    public void accumulate(long value) {
        sum += value;
        count++;
    }

    public void combine(AvgAccumulator that) {
        this.sum += that.sum;
        this.count += that.sum;
    }

    public double finish() {
        return (double) sum / count;
    }
}

This object will also have to be serializable, and preferably with Hazelcast’s serialization instead of Java’s because in a group-and-aggregate operation there’s one accumulator per each key and all of them have to be sent across the network to be combined and finished.

Instead of requiring you to write a complete class from scratch, Jet separates the concern of holding the accumulated state from that of the computation performed on it. This means that you just need one accumulator class for each kind of structure that holds the accumulated data, as opposed to one for each aggregate operation. Jet’s library offers in the com.hazelcast.jet.accumulator package several such classes, one of them being LongLongAccumulator, which is a match for our average function. You’ll just have to supply the logic on top of it.

Specifically, you have to provide a set of six functions (we call them “primitives”):

  • create a new accumulator object.

  • accumulate the data of an item by mutating the accumulator’s state.

  • combine the contents of the right-hand accumulator into the left-hand one.

  • deduct the contents of the right-hand accumulator from the left-hand one (undo the effects of combine).

  • finish accumulation by transforming the accumulator object into the final result.

  • export the result of aggregation in a way that’s not destructive for the accumulator (used in rolling aggregations).

We already mentioned most of these above. The deduct primitive is optional and Jet can manage without it, but if you are computing a sliding window over an infinite stream, this primitive can give a significant performance boost because it allows Jet to reuse the results of the previous calculations.

In a similar fashion Jet discerns between the export and finish primitives for optimization purposes. Every function that works as the export primitive will also work as finish, but you can specify a different finish that reuses the state already allocated in the accumulator. Jet applies finish only if it will never again use that accumulator.

If you happen to have a deeper familiarity with JDK’s java.util.stream API, you’ll find AggregateOperation quite similar to Collector, which is also a holder of several functional primitives. Jet’s definitions are slightly different, though, and there are the additional optimizing primitives we just mentioned.

Let’s see how this works with our average function. Using LongLongAccumulator we can express our accumulate primitive as

(acc, n) -> {
    acc.set1(acc.get1() + n);
    acc.set2(acc.get2() + 1);
}

The export/finish primitive will be

acc -> (double) acc.get1() / acc.get2()

Now we have to define the other three primitives to match our main logic. For create we just refer to the constructor: LongLongAccumulator::new. The combine primitive expects you to update the left-hand accumulator with the contents of the right-hand one, so:

(left, right) -> {
    left.set1(left.get1() + right.get1());
    left.set2(left.get2() + right.get2());
}

Deducting must undo the effect of a previous combine:

(left, right) -> {
    left.set1(left.get1() - right.get1());
    left.set2(left.get2() - right.get2());
}

All put together, we can define our averaging operation as follows:

AggregateOperation1<Long, LongLongAccumulator, Double> aggrOp = AggregateOperation
        .withCreate(LongLongAccumulator::new)
        .<Long>andAccumulate((acc, n) -> {
            acc.set1(acc.get1() + n);
            acc.set2(acc.get2() + 1);
        })
        .andCombine((left, right) -> {
            left.set1(left.get1() + right.get1());
            left.set2(left.get2() + right.get2());
        })
        .andDeduct((left, right) -> {
            left.set1(left.get1() - right.get1());
            left.set2(left.get2() - right.get2());
        })
        .andExportFinish(acc -> (double) acc.get1() / acc.get2());

Let’s stop for a second to look at the type we got: AggregateOperation1<Long, LongLongAccumulator, Double>. Its type parameters are:

  1. Long: the type of the input item

  2. LongLongAccumulator: the type of the accumulator

  3. Double: the type of the result

Specifically note the 1 at the end of the type’s name: it signifies that it’s the specialization of the general AggregateOperation to exactly one input stream. In Hazelcast Jet you can also perform a co-aggregating operation, aggregating several input streams together. Since the number of input types is variable, the general AggregateOperation type cannot statically capture them and we need separate subtypes. We decided to statically support up to three input types; if you need more, you’ll have to resort to the less type-safe, general AggregateOperation.

4.18.1. Aggregating over multiple inputs

Hazelcast Jet can join several streams and simultaneously perform aggregation on all of them. You specify a separate aggregate operation for each input stream and have the opportunity to combine their results when done. You can use aggregate operations provided in the library (see the section on co-aggregation for an example).

If you cannot express your aggregation logic using this approach, you can also specify a custom multi-input aggregate operation that can combine the items into the accumulator immediately as it receives them.

We’ll present a simple example on how to build a custom multi-input aggregate operation. Note that the same logic can also be expressed using separate single-input operations; the point of the example is introducing the API.

Say we are interested in the behavior of users in an online shop application and want to gather the following statistics for each user:

  1. total load time of the visited product pages

  2. quantity of items added to the shopping cart

  3. amount paid for bought items

This data is dispersed among separate datasets: PageVisit, AddToCart and Payment. Note that in each case we’re dealing with a simple sum applied to a field in the input item. We can perform a cogroup-and-aggregate transform with the following aggregate operation:

Pipeline p = Pipeline.create();
BatchStage<PageVisit> pageVisit = p.drawFrom(Sources.list("pageVisit"));
BatchStage<AddToCart> addToCart = p.drawFrom(Sources.list("addToCart"));
BatchStage<Payment> payment = p.drawFrom(Sources.list("payment"));

AggregateOperation3<PageVisit, AddToCart, Payment, LongAccumulator[], long[]>
    aggrOp = AggregateOperation
        .withCreate(() -> new LongAccumulator[] {
                new LongAccumulator(),
                new LongAccumulator(),
                new LongAccumulator()
        })
        .<PageVisit>andAccumulate0((accs, pv) -> accs[0].add(pv.loadTime()))
        .<AddToCart>andAccumulate1((accs, atc) -> accs[1].add(atc.quantity()))
        .<Payment>andAccumulate2((accs, pm) -> accs[2].add(pm.amount()))
        .andCombine((accs1, accs2) -> {
            accs1[0].add(accs2[0]);
            accs1[1].add(accs2[1]);
            accs1[2].add(accs2[2]);
        })
        .andExportFinish(accs -> new long[] {
                accs[0].get(),
                accs[1].get(),
                accs[2].get()
        });

BatchStage<Entry<Integer, long[]>> coGrouped =
        pageVisit.groupingKey(PageVisit::userId)
                 .aggregate3(
                         addToCart.groupingKey(AddToCart::userId),
                         payment.groupingKey(Payment::userId),
                         aggrOp);

Note how we got an AggregateOperation3 and how it captured each input type. When we use it as an argument to a cogroup-and-aggregate transform, the compiler will ensure that the ComputeStage s we attach it to have the correct type and are in the correct order.

On the other hand, if you use the co-aggregation builder object, you’ll construct the aggregate operation by calling andAccumulate(tag, accFn) with all the tags you got from the co-aggregation builder, and the static type will be just AggregateOperation. The compiler won’t be able to match up the inputs to their treatment in the aggregate operation.

5. Source and Sink Connectors

Jet accesses data sources and sinks via its connectors. They are a computation job’s point of contact with the outside world.

5.1. Concerns

Although the connectors do their best to unify the various kinds of resources under the same “data stream” paradigm, there are still many concerns that need your attention.

5.1.1. Is it Unbounded?

The first decision when building a Jet computation job is whether it will deal with bounded or unbounded data. A typical example of a bounded resource is a persistent storage system, whereas an unbounded one is usually like a FIFO queue, discarding old data. This is true both for sources and sinks.

Bounded data is handled in batch jobs and there are less concerns to deal with. Examples of finite resources are the Hazelcast IMap/ICache and the Hadoop Distributed File System (HDFS). In the unbounded category the most popular choice is Kafka, but a Hazelcast IMap/ICache can also be used as an infinite source of update events (via the Event Journal feature). You can also set up an IMap/ICache as a sink for an infinite amount of data, either by ensuring that the size of the keyset will be finite or by allowing the eviction of old entries.

5.1.2. Is it Replayable?

Most finite data sources are replayable because they come from persistent storage. You can easily replay the whole dataset. However, an infinite data source may be of such nature that it can be consumed only once. An example is the TCP socket connector. Such sources are bad at fault tolerance: if anything goes wrong during the computation, it cannot be retried.

Does it Support Checkpointing?

You cannot retry to process an infinite data stream from the very beginning. You must save the complete state at regular intervals, then replay the input stream from the last saved position (checkpoint). Jet can create snapshots of its internal processing state, but for this to be useful the data source must have the ability to replay its data from the chosen point, discarding everything before it. Both Kafka and the Hazelcast Event Journal support this.

5.1.3. Is it Distributed?

A distributed computation engine prefers to work with distributed data resources. If the resource is not distributed, all Jet members will have to contend for access to a single endpoint. Kafka, HDFS, IMap and ICache are all distributed. On the other hand, an IList is not: it resides on a single member. When used as a source, only one Jet member pulls its data. When used as a sink, all Jet members send their data to the one that holds it.

A file source/sink operating in local mode is a sort of a "manually distributed" resource, each member accessing its own local filesystem. You have to manually arrange the files so that on each member there is a subset of the full dataset. When used as a sink, you have to manually gather all the pieces that Jet created.

The file source/sink can also operate in shared mode, accessing a shared filesystem mounted as a local directory.

5.1.4. What about Data Locality?

If you’re looking to achieve record-breaking throughput for your application, you’ll have to think carefully how close you can deliver your data to the location where Jet will consume and process it. For example, if your source is HDFS, you should align the topologies of the Hadoop and Jet clusters so that each machine that hosts an HDFS member also hosts a Jet member. Jet will automatically figure this out and arrange for each member to consume only the slice of data stored locally.

If you’re using IMap/ICache as data sources, you have two basic choices: have Jet connect to a Hazelcast IMDG cluster, or use Jet itself to host the data (since a Jet cluster is at the same time a Hazelcast IMDG cluster). In the second case Jet will automatically ensure a data-local access pattern, but there’s a caveat: if the Jet job causes an error of unrestricted scope, such as OutOfMemoryError or StackOverflowError, it will have unpredictable consequences for the state of the whole Jet member, jeopardizing the integrity of the data stored on it.

5.2. Overview of Sources and Sinks

The table below gives you a high-level overview of the source and sink connectors we deliver with Jet. There are links to Javadoc and code samples. The sections following this one present each connector in more detail.

The Jet Connector Hub contains the complete connector list including the connectors that aren’t packaged with Jet.

Table 1. Sources and Sinks
Resource Javadoc Sample Unbounded? Replayable? Checkpointing? Distributed? Data Locality

IMap

Source

Sink

Sample

X

X

X

X

Src X

Sink X

ICache

Source

Sink

Sample

X

X

X

X

Src X

Sink X

IMap in another cluster

Source

Sink

Sample

X

X

X

X

X

ICache in another cluster

Source

Sink

X

X

X

X

X

IMap’s Event Journal

Source

Sample

X

X

X

X

X

ICache’s Event Journal

Source

X

X

X

X

X

Event Journal of IMap in another cluster

Source

Sample

X

X

X

X

X

Event Journal of ICache in another cluster

Source

X

X

X

X

X

IList

Source

Sink

Sample

X

X

X

X

X

IList in another cluster

Source

Sink

Sample

X

X

X

X

X

HDFS

Source

Sink

Sample

X

X

X

X

X

Kafka

Source

Sink

Source

X

X

X

X

X

Files

Source

Sink

Sample

X

X

X

X

Local FS X

Shared FS X

File Watcher

Source

Sample

X

X

X

X

Local FS X

Shared FS X

Avro

Source

Sink

Source Sample

Sink Sample

X

X

X

X

Local FS X

Shared FS X

TCP Socket

Source

Sink

Source

Sink

X

X

X

X

X

JMS

Queue Source Topic Source

Queue Sink Topic Sink

Queue Sample

Topic Sample

X

X

X

Queue Source X

Queue Sink X

Topic Source X

Topic Sink X

X

JDBC

Source

Sink

Source Sample

Sink Sample

X

X

X

X

X

Amazon AWS S3

Source

Sink

Source Sample

Sink Sample

X

X

X

X

X

Application Log

Sink

Sink

N/A

N/A

X

X

X

5.3. Hazelcast IMap and ICache

Hazelcast IMDG’s IMap and ICache are very similar in the way Jet uses them and largely interchangeable. IMap has a bit more features. The simplest way to use them is as finite sources of their contents, but if you enable the Event Journal on a map/cache, you’ll be able to use it as a source of an infinite stream of update events (see below).

The most basic usage is very simple, here are snippets to use IMap and ICache as a source and a sink:

Pipeline p = Pipeline.create();
BatchStage<Entry<String, Long>> stage = p.drawFrom(Sources.map("myMap"));
stage.drainTo(Sinks.map("myMap"));
Pipeline p = Pipeline.create();
BatchStage<Entry<String, Long>> stage = p.drawFrom(Sources.cache("inCache"));
stage.drainTo(Sinks.cache("outCache"));

In these snippets we draw from and drain to the same kind of structure, but you can use any combination.

5.3.1. Access an External Cluster

To access a Hazelcast IMDG cluster separate from the Jet cluster, you have to provide Hazelcast client configuration for the connection. In this simple example we use programmatic configuration to draw from and drain to remote IMap and ICache. Just for variety, we funnel the data from IMap to ICache and vice versa:

ClientConfig cfg = new ClientConfig();
cfg.getGroupConfig().setName("myGroup");
cfg.getNetworkConfig().addAddress("node1.mydomain.com", "node2.mydomain.com");

Pipeline p = Pipeline.create();
BatchStage<Entry<String, Long>> fromMap =
        p.drawFrom(Sources.remoteMap("inputMap", cfg));
BatchStage<Entry<String, Long>> fromCache =
        p.drawFrom(Sources.remoteCache("inputCache", cfg));
fromMap.drainTo(Sinks.remoteCache("outputCache", cfg));
fromCache.drainTo(Sinks.remoteMap("outputMap", cfg));

For a full discussion on how to configure your client connection, refer to the Hazelcast IMDG documentation on this topic.

5.3.2. Optimize Data Traffic at the Source

If your use case calls for some filtering and/or transformation of the data you retrieve, you can optimize the traffic volume by providing a filtering predicate and an arbitrary transformation function to the source connector itself and they’ll get applied on the remote side, before sending:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer, String, Person>remoteMap(
        "inputMap", clientConfig,
        e -> e.getValue().getAge() > 21,
        e -> e.getValue().getAge()));

The same optimization works on a local IMap, too, but has less impact. However, Hazelcast IMDG goes a step further in optimizing your filtering and mapping to a degree that matters even locally. If you don’t need fully general functions, but can express your predicate via Predicates or PredicateBuilder, they will create a specialized predicate instance that can test the object without deserializing it. Similarly, if the mapping you need is of a constrained kind where you just extract one or more object fields (attributes), you can specify a projection instead of a general mapping lambda: Projections.singleAttribute() or Projections.multiAttribute(). These will extract the listed attributes without deserializing the whole object. For these optimizations to work, however, your objects must employ Hazelcast’s portable serialization. They are especially relevant if the volume of data you need in the Jet job is significantly less than the volume of the stored data.

Note that the above feature is not available on ICache. It is, however, available on `ICache’s event journal, which we introduce next.

5.3.3. Receive an Infinite Stream of Update Events

You can use IMap/ICache as sources of infinite event streams. For this to work you have to enable the Event Journal on your data structure. This is a feature you set in the Jet/IMDG instance configuration, which means you cannot change it while the cluster is running.

This is how you enable the Event Journal on an IMap:

JetConfig cfg = new JetConfig();
cfg.getHazelcastConfig()
   .getMapEventJournalConfig("inputMap")
   .setEnabled(true)
   .setCapacity(1000)         // how many events to keep before evicting
   .setTimeToLiveSeconds(10); // evict events older than this
JetInstance jet = Jet.newJetInstance(cfg);

The default journal capacity is 10,000 and the default time-to-live is 0 (which means “unlimited”). Since the entire event journal is kept in RAM, you should take care to adjust these values to match your use case.

The configuration API for ICache is identical:

cfg.getHazelcastConfig()
   .getCacheEventJournalConfig("inputCache")
   .setEnabled(true)
   .setCapacity(1000)
   .setTimeToLiveSeconds(10);

Once properly configured, you use Event Journal sources like this:

Pipeline p = Pipeline.create();
StreamSourceStage<Entry<String, Long>> fromMap = p.drawFrom(
        Sources.mapJournal("inputMap", START_FROM_CURRENT));
StreamSourceStage<Entry<String, Long>> fromCache = p.drawFrom(
        Sources.cacheJournal("inputCache", START_FROM_CURRENT));

IMap and ICache are on an equal footing here. The second argument, START_FROM_CURRENT here, means "start receiving from events that occur after the processing starts". If you specify START_FROM_OLDEST, you’ll get all the events still on record.

This version of methods will only emit ADDED and UPDATED event types. Also, it will map the event object to simple Map.Entry with the key and new value. To make a different choice, use the overloads that allow you to specify your own projection and filtering. For example, you can request all event types and full event objects:

Pipeline p = Pipeline.create();
StreamSourceStage<EventJournalMapEvent<String, Long>> allFromMap = p.drawFrom(
    Sources.mapJournal("inputMap",
            alwaysTrue(), identity(), START_FROM_CURRENT));
StreamSourceStage<EventJournalCacheEvent<String, Long>> allFromCache = p.drawFrom(
    Sources.cacheJournal("inputMap",
            alwaysTrue(), identity(), START_FROM_CURRENT));

Note the type of the stream element: EventJournalMapEvent and EventJournalCacheEvent. These are almost the same and have these methods:

  • getKey()

  • getOldValue()

  • getNewValue()

  • getType()

The only difference is the return type of getType() which is specific to each kind of structure and gives detailed insight into what kind of event it reports. Add, remove and update are the basic ones, but there are also evict, clear, expire and some others.

Finally, you can get all of the above from a map/cache in another cluster, you just have to prepend remote to the source names and add a ClientConfig, for example:

Pipeline p = Pipeline.create();
StreamSourceStage<Entry<String, Long>> fromRemoteMap = p.drawFrom(
    Sources.remoteMapJournal("inputMap",
            someClientConfig, START_FROM_CURRENT));
StreamSourceStage<Entry<String, Long>> fromRemoteCache = p.drawFrom(
    Sources.remoteCacheJournal("inputCache",
            someClientConfig, START_FROM_CURRENT));

5.3.4. Update Entries in IMap

When you use an IMap as a sink, instead of just pushing the data into it you may have to merge the new with the existing data or delete the existing data. Hazelcast Jet supports this with map-updating sinks which rely on Hazelcast IMDG’s Entry Processor feature. An entry processor allows you to atomically execute a piece of code against a map entry, in a data-local manner.

The updating sinks come in three variants:

  1. mapWithMerging, where you provide a function that computes the map value from the stream item and a merging function that gets called if a value already exists in the map. Here’s an example that concatenates string values:

    Pipeline pipeline = Pipeline.create();
    pipeline.drawFrom(Sources.<String, Long>map("inMap"))
            .drainTo(Sinks.mapWithMerging("outMap",
                    Entry::getKey,
                    Entry::getValue,
                    (oldValue, newValue) -> oldValue + newValue)
            );
    This operation is NOT lock-aware, it will process the entries regardless whether they are locked or not. We significantly boost performance by applying the update function in batches, but this operation doesn’t respect the locks. If you use this method on locked entries, it will break the mutual exclusion contract. Use mapWithEntryProcessor if you need the proper locking behavior.
  2. mapWithUpdating, where you provide a single updating function that combines the roles of the two functions in mapWithMerging. It will be called on the stream item and the existing value, if any. Here’s an example that concatenates string values:

    Pipeline pipeline = Pipeline.create();
    pipeline.drawFrom(Sources.<String, Long>map("inMap"))
        .drainTo(Sinks.<Entry<String, Long>, String, Long>mapWithUpdating(
            "outMap", Entry::getKey,
            (oldV, item) -> (oldV != null ? oldV : 0L) + item.getValue())
        );
    This operation is NOT lock-aware, it will process the entries regardless whether they are locked or not. We significantly boost performance by applying the update function in batches, but this operation doesn’t respect the locks. If you use this method on locked entries, it will break the mutual exclusion contract. Use mapWithEntryProcessor if you need the proper locking behavior.
  3. mapWithEntryProcessor, where you provide a function that returns a full-blown EntryProcessor instance that will be submitted to the map. This is the most general variant, but can’t use batching that the other variants do and thus has a higher cost per item. You should use it only if you need a specialized entry processor that can’t be expressed in terms of the other variants. This example takes the values of the map and submits an entry processor that increments the values by 5:

    Pipeline pipeline = Pipeline.create();
    pipeline.drawFrom(Sources.<String, Integer>map("mymap"))
            .drainTo(Sinks.mapWithEntryProcessor("mymap",
                    Entry::getKey,
                    entry -> new IncrementEntryProcessor(5)
            ));
    
    class IncrementEntryProcessor implements EntryProcessor<String, Integer> {
    
        private int incrementBy;
    
        public IncrementEntryProcessor(int incrementBy) {
            this.incrementBy = incrementBy;
        }
    
        @Override
        public Object process(Entry<String, Integer> entry) {
            return entry.setValue(entry.getValue() + incrementBy);
        }
    
        @Override
        public EntryBackupProcessor<String, Integer> getBackupProcessor() {
            return null;
        }
    }

5.4. Hazelcast IList

Whereas IMap and ICache are the recommended choice of data sources and sinks in Jet jobs, Jet supports IList purely for convenience during prototyping, unit testing and similar non-production situations. It is not a partitioned data structure and only one cluster member has all the contents. In a distributed Jet job all the members will compete for access to the single member holding it.

With that said, IList is very simple to use. Here’s an example how to fill it with test data, consume it in a Jet job, dump its results into another list, and fetch the results (we assume you already have a Jet instance in the variable jet):

IList<Integer> inputList = jet.getList("inputList");
for (int i = 0; i < 10; i++) {
    inputList.add(i);
}

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer>list("inputList"))
 .map(i -> "item" + i)
 .drainTo(Sinks.list("resultList"));

jet.newJob(p).join();

IList<String> resultList = jet.getList("resultList");
System.out.println("Results: " + new ArrayList<>(resultList));

You can access a list in an external cluster as well, by providing a ClientConfig object:

ClientConfig clientConfig = new ClientConfig();
clientConfig.getGroupConfig()
            .setName("myGroup");
clientConfig.getNetworkConfig()
            .addAddress("node1.mydomain.com", "node2.mydomain.com");

Pipeline p = Pipeline.create();
p.drawFrom(Sources.remoteList("inputlist", clientConfig))
 .drainTo(Sinks.remoteList("outputList", clientConfig));

5.5. Kafka

Apache Kafka is a production-worthy choice of both source and sink for infinite stream processing jobs. It supports fault tolerance and snapshotting. The basic paradigm is that of a distributed publish/subscribe message queue. Jet’s Kafka Source subscribes to a Kafka topic and the sink publishes events to a Kafka topic.

The following code will consume from topics t1 and t2 and then write to t3:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.serializer", StringSerializer.class.getCanonicalName());
props.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
props.setProperty("value.serializer", IntegerSerializer.class.getCanonicalName());
props.setProperty("value.deserializer", IntegerDeserializer.class.getCanonicalName());
props.setProperty("auto.offset.reset", "earliest");

Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(props, "t1", "t2"))
 .withoutTimestamps()
 .drainTo(KafkaSinks.kafka(props, "t3"));

5.5.1. Using Kafka as a Source

The Kafka source emits entries of type Map.Entry<Key,Value> which can be transformed using an optional mapping function. It never completes. The job will end only if explicitly cancelled or aborted due to an error.

Internally Jet creates one KafkaConsumer per Processor instance using the supplied properties. Jet uses manual partition assignment to arrange the available Kafka partitions among the available processors and will ignore the group.id property.

Currently there is a requirement that the global parallelism of the Kafka source be at most the number of partitions you are subscribing to. The local parallelism of the Kafka source is 2 and if your Jet cluster has 4 members, this means that a minimum of 8 Kafka partitions must be available.

If any new partitions are added while the job is running, Jet will automatically assign them to the existing processors and consume them from the beginning.

5.5.2. Processing Guarantees

The Kafka source supports snapshots. Upon each snapshot it saves the current offset for each partition. When the job is restarted from a snapshot, the source can continue reading from the saved offset.

If snapshots are disabled, the source will commit the offset of the last record it read to the Kafka cluster. Since the fact that the source read an item doesn’t mean that the whole Jet pipeline processed it, this doesn’t guarantee against data loss.

5.5.3. Using Kafka as a Sink

The Kafka sink creates one KafkaProducer per cluster member and shares it among all the sink processors on that member. You can provide a mapping function that transforms the items the sink receives into `ProducerRecord`s.

5.5.4. Limitations

Apache Kafka introduced client backward compatibility with version 1.0.0. The compatibility is two way, new brokers support older clients and new clients support older broker.

The Kafka sink and source are based on version 2.2.0, this means Kafka connector will work with any client and broker having version equal to or greater than 1.0.0.

5.6. HDFS

The Hadoop Distributed File System is a production-worthy choice for both a data source and a data sink in a batch computation job. It is a distributed, replicated storage system that handles these concerns automatically, exposing a simple unified view to the client.

The HDFS source and sink require a configuration object of type JobConf which supplies the input and output paths and formats. They don’t actually create a MapReduce job, this config is simply used to describe the required inputs and outputs. You can share the same JobConf instance between several source/sink instances.

Here’s a configuration sample:

JobConf jobConfig = new JobConf();
jobConfig.setInputFormat(TextInputFormat.class);
jobConfig.setOutputFormat(TextOutputFormat.class);
TextInputFormat.addInputPath(jobConfig, new Path("input-path"));
TextOutputFormat.setOutputPath(jobConfig, new Path("output-path"));

The word count pipeline can then be expressed using HDFS as follows

Pipeline p = Pipeline.create();
p.drawFrom(HdfsSources.hdfs(jobConfig, (k, v) -> v.toString()))
 .flatMap(line -> traverseArray(line.toLowerCase().split("\\W+"))
                       .filter(w -> !w.isEmpty()))
 .groupingKey(wholeItem())
 .aggregate(counting())
 .drainTo(HdfsSinks.hdfs(jobConfig));

5.6.1. Data Locality When Reading

Jet will split the input data across the cluster, with each processor instance reading a part of the input. If the Jet nodes are running along the HDFS datanodes, then Jet can make use of data locality by reading the blocks locally where possible. This can bring a significant increase in read speed.

5.6.2. Output

Each processor will write to a different file in the output folder identified by the unique processor id. The files will be in a temporary state until the job is completed and will be committed when the job is complete. For streaming jobs, they will be committed when the job is cancelled. We have plans to introduce a rolling sink for HDFS in the future to have better streaming support.

5.6.3. Dealing with Writables

Hadoop types implement their own serialization mechanism through the use of Writable. Jet provides an adapter to register a Writable for Hazelcast serialization without having to write additional serialization code. To use this adapter, you can register your own Writable types by extending WritableSerializerHook and registering the hook.

5.6.4. Hadoop JARs and Classpath

When submitting JARs along with a Job, sending Hadoop JARs should be avoided and instead Hadoop JARs should be present on the classpath of the running members. Hadoop JARs contain some JVM hooks and can keep lingering references inside the JVM long after the job has ended, causing memory leaks.

5.7. Files

Hazelcast Jet provides file and TCP/IP socket connectors that have limited production use, but are simple and can be very useful in an early rapid prototyping phase. They assume the data is in the form of plain text and emit/receive data items which represent individual lines of text.

These connectors are not fault-tolerant. On job restart they behave the as if you started a new job. The sources don’t do snapshotting. The sinks don’t suppress duplicate data.

5.7.1. File Sources

The file sources are designed to work with the local and shared file systems. For local file system, the sources expect to see on each member just the files that member should read. You can achieve the effect of a distributed source if you manually prepare a different set of files on each member. For shared file system, the sources split the work so that each member will read a part of the files.

There are two flavors of the file source: bounded and unbounded.

Here’s an example with the bounded source:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.files("/home/jet/input"))
 .drainTo(Sinks.logger());

This will log on each Jet member the contents of all the files in the specified directory. When the source reads all the files, the job completes. If the files change while the job is running, the behavior is undefined.

Here’s an example with the unbounded source:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.fileWatcher("/home/jet/input"))
 .withoutTimestamps()
 .drainTo(Sinks.logger());

It will watch the directory for changes. It will emit only new contents added after startup: both new files and new content appended to existing ones. Files must be updated in an append-only fashion; if the existing content changes, the behavior is undefined.

If you delete the watched directory, the job will complete.

5.7.2. File Sink

The file sink can work with either a local or a shared network file system. Each member will write to different file names. You can achieve the effect of a distributed sink if you manually collect all the output files on all members and combine their contents.

Here’s a small example of usage:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("inputList"))
 .drainTo(Sinks.files("/home/jet/output"));

5.7.3. Avro

Hazelcast Jet provides Apache Avro file source and sink for batch processing jobs. This is a specialized version of file connector. Avro connector assumes the data is in the Avro Object Container File format.

Avro File Source

Avro file source reads the files in the specified directory using the given datum reader supplier and emits the records to downstream.

Here’s an example with a reflect datum reader:

Pipeline p = Pipeline.create();
p.drawFrom(AvroSources.files("/home/jet/input", Person.class))
 .drainTo(Sinks.logger());

This will log on each Jet member the records of all the files in the specified directory. When the source reads all the files, the job completes. If the files change while the job is running, the behavior is undefined.

Avro File Sink

Avro file sink writes the records to the files using the given datum writer and schema suppliers. The sink always overwrites the existing files, does not append.

Here is an example with a generic datum writer:

String schemaString = "{\"type\":\"record\",\"name\":\"Person\"," +
        "\"namespace\":\"datamodel\",\"fields\":[{" +
        "\"name\":\"id\",\"type\":\"int\"}]}";
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<GenericRecord>list("inputList"))
 .drainTo(AvroSinks.files("/home/jet/output",
         () -> new Schema.Parser().parse(schemaString)));

5.8. TCP/IP Socket

5.8.1. Socket Source

The socket source opens a blocking client TCP/IP socket and receives data over it. The data must be lines of plain text.

Each underlying worker of the Socket Source connector opens its own client socket and asks for data from it. The user supplies the host:port connection details. The server side should ensure a meaningful dispersion of data among all the connected clients, but how it does it is outside of Jet’s control.

Here’s a simple example:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.socket("localhost", 8080, StandardCharsets.UTF_8))
 .withoutTimestamps()
 .drainTo(Sinks.logger());

You can study a comprehensive code sample including a sample socket server using Netty.

Sink

The socket sink opens a blocking client TCP/IP socket and sends data over it. The data must be in the form of lines of plain text. To get meaningful behavior, the server side must collect and combine them from all the concurrently connected clients.

Here’s a simple example:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("inputList"))
 .drainTo(Sinks.socket("localhost", 8080));

5.9. JDBC

Jet contains both JDBC source and a sink. They can be used to read or write from/to relational databases or another source that supports the standard JDBC API.

The source is only a batch-style source: it reads records that are produced by one SQL query and once all records are emitted, the source completes. The sink can work in both batch and streaming jobs.

The connector is not fault-tolerant. On job restart it behaves as if you have started a new job. The source does not do snapshotting, the sink does not suppress duplicate data.

In order to use JDBC connector, user should include a JDBC Driver in classpath.

5.9.1. Using the JDBC Source

The JDBC source comes in two versions:

  1. Parallel: You will need to provide a ToResultSetFunction that will create one query for each parallel worker to query part of the data.

  2. Non-parallel: There will be only one parallel worker that will read all the records.

Example of parallel source

Here we use modulo operator to select a part of the keys, but any query returning non-overlapping subsets is possible. Note that this way might not actually perform better that the non-parallel version unless the underlying table is actually physically partitioned by this key.

Pipeline p = Pipeline.create();
p.drawFrom(Sources.jdbc(
    () -> DriverManager.getConnection(DB_CONNECTION_URL),
    (con, parallelism, index) -> {
        PreparedStatement stmt = con.prepareStatement("SELECT * FROM PERSON WHERE MOD(id, ?) = ?)");
        stmt.setInt(1, parallelism);
        stmt.setInt(2, index);
        return stmt.executeQuery();
    },
    resultSet ->
        new Person(resultSet.getInt(1), resultSet.getString(2))
)).drainTo(Sinks.logger());
Example of non-parallel source

A single worker of the source will fetch the whole result set with a single query. You need to provide just one SQL query.

Pipeline p = Pipeline.create();
p.drawFrom(Sources.jdbc(
    DB_CONNECTION_URL,
    "select * from PERSON",
    resultSet ->
            new Person(resultSet.getInt(1), resultSet.getString(2))
)).drainTo(Sinks.logger());

Output function gets the ResultSet and creates desired output object. The function is called for each row of the result set, user should not call ResultSet#next() or any other cursor-navigating functions.

The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.

Any SQLException will cause the job to fail. The default local parallelism for this processor is 1.

5.9.2. Using the JDBC Sink

The JDBC sink connects to the specified database using the given connection supplier, prepares a statement using the given update query and inserts/updates the items.

The update query should be a parametrized query. The bind function will receive a PreparedStatement created for this query and should bind parameters to it. It should not execute the query, call commit or any other method.

The records will be committed after each batch of records and a batch mode will be used (if the driver supports it). Auto-commit will be disabled on the connection.

In case of an SQLException the processor will automatically try to reconnect and the job won’t fail, except for the SQLNonTransientException subclass. The default local parallelism for this sink is 1.

No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee. For this reason you should not use INSERT statement which can fail on duplicate primary key. Rather use an insert-or-update statement that can tolerate duplicate writes.

The following code snippet shows writing the Person objects to a database table:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Person>list("inputList"))
 .drainTo(Sinks.jdbc(
         "REPLACE INTO PERSON (id, name) values(?, ?)",
         DB_CONNECTION_URL,
         (stmt, item) -> {
             stmt.setInt(1, item.id);
             stmt.setString(2, item.name);
         }));

5.10. JMS

JMS (Java Message Service) connector can be used both as a source and a sink for infinite stream processing. The connector is not fault-tolerant. On job restart it behaves as if you have started a new job. The source does not do snapshotting, the sink does not suppress duplicate data.

In order to use JMS connector, user should include a JMS Client in classpath. IO failures are generally handled by JMS Client and do not cause the connector to fail. Most of the clients offer a configuration parameter to enable auto-reconnection, refer to client documentation for details.

5.10.1. Using JMS as a Source

The JMS source opens a connection to the JMS server for each member. Then each underlying worker of the source creates a session and a message consumer using that connection. The user supplies necessary functions to create the connection, session and message consumer.

The JMS source uses non-blocking API to receive the messages and transforms each message to a desired output object using supplied projection function. Source flushes the session after receiving each message using the supplied flush function.

The following code snippets show streaming messages from a JMS queue and a JMS topic using ActiveMQ JMS Client.

Pipeline p = Pipeline.create();
p.drawFrom(Sources.jmsQueue(() -> new ActiveMQConnectionFactory(
        "tcp://localhost:61616"), "queue"))
 .withoutTimestamps()
 .drainTo(Sinks.logger());
Pipeline p = Pipeline.create();
p.drawFrom(Sources.jmsTopic(() -> new ActiveMQConnectionFactory(
        "tcp://localhost:61616"), "topic"))
 .withoutTimestamps()
 .drainTo(Sinks.logger());

The JMS topic is a non-distributed source, if messages are consumed by multiple consumer, all of them will get the same messages. Therefore the source operates on a single member with local parallelism of 1. Setting local parallelism to a value other than 1 causes IllegalArgumentException.

5.10.2. Using JMS as a Sink

The JMS sink opens a connection to the JMS server for each member. Then each underlying worker of the sink creates a session and a message producer using that connection. The user supplies necessary functions and parameters to create the connection, session and message producer.

The JMS sink uses the supplied function to create a Message object for each input item and sends this message using the supplied send function. After a batch of messages is sent, sink flushes the session using the supplied flush function.

The following code snippets show writing to a JMS queue and a JMS topic using ActiveMQ JMS Client.

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("inputList"))
 .drainTo(Sinks.jmsQueue(() -> new ActiveMQConnectionFactory(
         "tcp://localhost:61616"), "queue"));
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("inputList"))
 .drainTo(Sinks.jmsTopic(() -> new ActiveMQConnectionFactory(
         "tcp://localhost:61616"), "topic"));

5.11. Amazon AWS S3

Amazon AWS S3 Object Storage is a production-worthy choice for both a data source and a data sink in a batch computation job.

The S3 Source and Sink uses Amazon S3 SDK to connect to the storage. The connectors expect the user to provide either an S3Client client instance or credentials (Access key ID, Secret Access Key) to create the client. The source and sink assume the data is in the form of plain text and emit/receive data items which represent individual lines of text.

The connectors are not fault-tolerant. On job restart they behave as if you started a new job. The source does not do snapshotting. The sink always overwrites previously written objects.

5.11.1. S3 Source

S3 source distributes the workload by splitting the objects to each processor instance. The source reads each object line by line and emits desired output to downstream using provided transform function.

Below example will log on each Jet member the contents of all the objects in the specified bucket. When the source reads all the files, the job completes. If the objects change while the job is running, the behavior is undefined.

String accessKeyId = "";
String accessKeySecret = "";
String prefix = "";

AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, accessKeySecret);
S3Client s3 = S3Client
        .builder()
        .credentialsProvider(StaticCredentialsProvider.create(credentials))
        .build();

Pipeline p = Pipeline.create();
p.drawFrom(S3Sources.s3(singletonList("input-bucket"), prefix, () -> s3))
 .drainTo(Sinks.logger());

5.11.2. S3 Sink

S3 sink writes items to the specified bucket by transforming each item to a line using given transform function. The sink creates an object in the bucket for each processor instance. Name of the file will include an user provided prefix (if defined) and followed by the processor’s global index, for example the processor having the index 2 with prefix my-object- will create the object my-object-2.

Here’s a small example of usage:

String accessKeyId = "";
String accessKeySecret = "";

AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, accessKeySecret);
S3Client s3 = S3Client
        .builder()
        .credentialsProvider(StaticCredentialsProvider.create(credentials))
        .build();

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("input-list"))
 .drainTo(S3Sinks.s3("output-bucket", () -> s3));

5.11.3. Using HDFS as S3 connector

AWS S3 can be registered as a filesystem to HDFS which then can be used as a source or sink via our hadoop module. See Hadoop-AWS module for more information on the dependencies and configuration.

Below example will read from a bucket, transform each line to uppercase and write to another bucket using the hadoop module.

JobConf jobConfig = new JobConf();
jobConfig.setInputFormat(TextInputFormat.class);
jobConfig.setOutputFormat(TextOutputFormat.class);
TextInputFormat.addInputPath(jobConfig, new Path("s3a://input-bucket"));
TextOutputFormat.setOutputPath(jobConfig, new Path("s3a://output-bucket"));


Pipeline p = Pipeline.create();
p.drawFrom(HdfsSources.<String, String>hdfs(jobConfig))
 .map(e -> Util.entry(e.getKey(), e.getValue().toUpperCase()))
 .drainTo(HdfsSinks.hdfs(jobConfig));

5.12. Source and Sink Builders

If Jet doesn’t natively support the data source/sink you need, you can build a connector for it yourself by using the SourceBuilder and SinkBuilder.

5.12.1. Source Builder

To make your custom source connector you need two basic ingredients:

  1. a context object that holds all the resources and state you need to keep track of

  2. a stateless function, fillBufferFn, taking two parameters: the state object and a buffer object provided by Jet

Jet repeatedly calls fillBufferFn whenever it needs more data items. Optimally, the function will fill the buffer with the items it can acquire without blocking. A hundred items at a time is enough to eliminate any per-call overheads within Jet. The function may block as well (it’s running inside a non-cooperative processor), but taking longer than a second to complete can have negative effects on the overall performance of the processing pipeline.

Build a Bounded (Batch) Source

In this example we build a source that emits the lines of a file:

BatchSource<String> fileSource = SourceBuilder
    .batch("file-source", x ->                               (1)
        new BufferedReader(new FileReader("input.txt")))
    .<String>fillBufferFn((in, buf) -> {                     (2)
        String line = in.readLine();
        if (line != null) {
            buf.add(line);
        } else {
            buf.close();                                     (3)
        }
    })
    .destroyFn(BufferedReader::close)
    .build();
Pipeline p = Pipeline.create();
BatchStage<String> srcStage = p.drawFrom(fileSource);
1 here we pass createFn, the factory of state objects
2 the main logic goes to fillBufferFn
3 call buf.close() to mark the end of the bounded data stream

The file must be available on all the members of the Jet cluster for this to work. Only one member will actually read it, but you can’t choose or predict which one.

The code above emits a single item per fillBufferFn call. To ensure we get the best performance, we can improve it to emit the data in chunks:

BatchSource<String> fileSource = SourceBuilder
    .batch("file-source", x ->
        new BufferedReader(new FileReader("input.txt")))
    .<String>fillBufferFn((in, buf) -> {
        for (int i = 0; i < 128; i++) {
            String line = in.readLine();
            if (line == null) {
                buf.close();
                return;
            }
            buf.add(line);
        }
    })
    .destroyFn(BufferedReader::close)
    .build();
Build an Unbounded (Stream) Source

Here’s how you can build a simple source that keeps polling a URL, emitting all the lines it gets in the response:

StreamSource<String> httpSource = SourceBuilder
    .stream("http-source", ctx -> HttpClients.createDefault())
    .<String>fillBufferFn((httpc, buf) -> {
        HttpGet request = new HttpGet("localhost:8008");
        InputStream content = httpc.execute(request)
                                   .getEntity()
                                   .getContent();
        BufferedReader reader = new BufferedReader(
            new InputStreamReader(content)
        );
        reader.lines().forEach(buf::add);
    })
    .destroyFn(CloseableHttpClient::close)
    .build();
Pipeline p = Pipeline.create();
StreamSourceStage<String> srcStage = p.drawFrom(httpSource);

Our state object is an instance of the Apache HTTP Client. It maintains a connection pool so it will reuse the same connection for all our requests. Note that we’re making a blocking call here, but it’s expected to respond quickly. In a more serious production setting we could configure timeouts on the HTTP client to limit the blocking time. We’d also have to add error handling so we just retry on failure instead of causing the whole Jet job to fail.

Build a Stream Source with Event Timestamps

To make the data usable for windowed aggregation, each event item must be timestamped. If the source doesn’t emit timestamped events, you’ll have to add them while building the processing pipeline. It’s more convenient and efficient to add the timestamps right at the source. Jet offers you an API variant specifically tailored to this need. Let’s say that in the above example the first 9 characters denote the event’s timestamp. We can extract them as follows:

StreamSource<String> httpSource = SourceBuilder
    .timestampedStream("http-source", ctx -> HttpClients.createDefault())
    .<String>fillBufferFn((httpc, buf) -> {
        HttpGet request = new HttpGet("localhost:8008");
        InputStream content = httpc.execute(request)
                                   .getEntity()
                                   .getContent();
        BufferedReader reader = new BufferedReader(
            new InputStreamReader(content)
        );
        reader.lines().forEach(item -> {
            long timestamp = Long.valueOf(item.substring(0, 9));
            buf.add(item.substring(9), timestamp);
        });
    })
    .destroyFn(CloseableHttpClient::close)
    .build();
Distributed Stream Source

In the examples we showed so far the source was non-distributed: Jet will create just a single processor in the whole cluster to serve all the data. This is an easy and obvious way to create a source connector.

If you want to create a distributed source, the challenge is coordinating all the parallel instances to appear as a single, unified source. Each instance must emit its unique slice of the whole data stream. Jet passes the Processor.Context to your createFn and you can use it to identify each state object you create. Consult the properties totalParallelism and globalProcessorIndex: Jet will call createFn exactly once with each globalProcessorIndex from 0 to totalParallelism - 1 and you can use this to slice up the data.

Here’s a rudimentary example that shows how such a scheme could work:

class SourceContext {
    private final int limit;
    private final int step;
    private int currentValue;

    SourceContext(Processor.Context ctx, int limit) {
        this.limit = limit;
        this.step = ctx.totalParallelism();
        this.currentValue = ctx.globalProcessorIndex();
    }

    void addToBuffer(SourceBuffer<Integer> buffer) {
        if (currentValue < limit) {
            buffer.add(currentValue);
            currentValue += step;
        } else {
            buffer.close();
        }
    }
}

BatchSource<Integer> sequenceSource = SourceBuilder
    .batch("seq-source", procCtx -> new SourceContext(procCtx, 1_000))
    .fillBufferFn(SourceContext::addToBuffer)
    .distributed(2)  (1)
    .build();
1 we request two parallel processors on each Jet member

This is a distributed source that generates a bounded sequence of integers. If the Jet cluster has two members, the source will have four parallel generators. The first one will generate numbers 0, 4, 8, …​, the second one 1, 5, 9, …​, etc.

In the real world there are some fundamental differences. The data source is partitioned in advance and you cannot expect the number of partitions to perfectly match your topology. Instead you must write some logic that distributes, more or less evenly, M partitions over N processors.

There’s a specifically nasty problem hiding in this scheme: when we assign several partitions to a single processor and consume the data from each partition in chunks, the difference in the timestamp of the last event we fetch from partition i and the first item from partition j can be very large, much larger than the maximum lateness we configured. By the time it receives these events, Jet has already completed the processing of their window and moved on. The items must be dropped as "too late to process".

Jet has a mechanism that mitigates this issue by individually tracking the highest timestamp from each partition and emitting the appropriate watermark items that account for the lagging partitions. It is available if you create a custom source processor using the Core API, but it’s not currently exposed through the source builder. Refer to the section on custom source vertices and to the Javadoc of EventTimeMapper.

Add Fault Tolerance to your Source

If you want your source to behave correctly within a streaming Jet job that has a processing guarantee configured (at-least-once or exactly-once), you must help Jet with saving the operational state of your context object to the snapshot storage. There are two functions you must supply:

  1. createSnapshotFn returns a serializable object that has all the data you’ll need to restore the operational state

  2. restoreSnapshotFn applies the previously saved snapshot to the current context object

While a job is running, Jet calls createSnapshotFn at regular intervals to save the current state. When Jet resumes a job, it will:

  1. create your context object the usual way, by calling createFn

  2. retrieve the latest snapshot object from its storage

  3. pass the context and snapshot objects to restoreSnapshotFn

  4. start calling fillBufferFn, which must start by emitting the same item it was about to emit when createSnapshotFn was called.

You’ll find that restoreSnapshotFn, somewhat unexpectedly, accepts not one but a list of snapshot objects. If you’re building a simple, non-distributed source, this list will have just one member. However, the same logic must work for distributed sources as well, and a distributed source runs on many parallel processors at the same time. Each of them will produce its own snapshot object. After a restart the number of parallel processors may be different than before (because you added a Jet cluster member, for example), so there’s no one-to-one mapping between the processors before and after the restart. This is why Jet passes all the snapshot objects to all the processors, and your logic must work out which part of their data to use. See Distributed Stream Source chapter for more information.

Here’s a brief example with a fault-tolerant streaming source that generates a sequence of integers:

StreamSource<Integer> faultTolerantSource = SourceBuilder
    .stream("fault-tolerant-source", processorContext -> new int[1])
    .<Integer>fillBufferFn((numToEmit, buffer) ->
        buffer.add(numToEmit[0]++))
    .createSnapshotFn(numToEmit -> numToEmit[0])                (1)
    .restoreSnapshotFn(
        (numToEmit, saved) -> numToEmit[0] = saved.get(0))  (2)
    .build();
1 the snapshotting function returns the current number to emit
2 the restoring function sets the number from the snapshot to the current state

5.12.2. Sink Builder

To make your custom sink connector you need two basic ingredients:

  1. an object that will hold all the state you need to keep track of

  2. a stateless function, receiveFn, taking two parameters: the state object and a data item sent to the sink

This is a simple example with a file sink:

Sink<Object> sink = sinkBuilder(
    "file-sink", x -> new PrintWriter(new FileWriter("output.txt")))
    .receiveFn((out, item) -> out.println(item.toString()))
    .destroyFn(PrintWriter::close)
    .build();
Pipeline p = Pipeline.create();
p.drawFrom(list("input"))
 .drainTo(sink);

This will create the file output.txt on each member, so the overall job output consists of the contents of all these files put together.

Note that we’re using blocking IO here. Jet will use non-cooperative processors for the sink, so we’re allowed to do that. For good overall job performance we should still ensure that the call receiveFn doesn’t take more than a second to complete.

In the above example our state object is a PrintWriter, which has internal buffering. Jet allows us to make buffering a first-class concern and deal with it explicitly by taking an optional flushFn which it will call at regular intervals. Here’s our example modified to do explicit buffering and flushing:

Sink<Object> sink = sinkBuilder("file-sink", x -> new StringBuilder())
    .receiveFn((buf, item) -> buf.append(item).append('\n'))
    .flushFn(buf -> {
        try (Writer out = new FileWriter("output.txt", true)) {
            out.write(buf.toString());
            buf.setLength(0);
        }
    })
    .build();

In this case we don’t need the destroyFn because we keep the file closed while not appending to it. Through the use of buffering we drown out the overhead of opening and closing the file each time and we get an overall more robust solution.

Sink Parallelism

Jet builds a sink that is distributed: each member of the Jet cluster has a processor running it. You can configure how many parallel processors there are on each member (the local parallelism) by calling SinkBuilder.preferredLocalParallelism(). By default there will be one processor per member.

Fault Tolerance

The sink you get from the sink builder doesn’t participate in the fault tolerance protocol. You can’t preserve any internal state if a job fails and gets restarted. In a job with snapshotting enabled your sink will still receive every item at least once. If the system you’re storing the data into is idempotent, ignoring duplicate items, this will have the effect of the exactly-once guarantee.

6. Configuration

This chapter starts with a reference table describing all the options to configure Hazelcast Jet. Then it provides the ways of configuration, i.e., programmatic, declarative and within Spring context.

6.1. Configuration Options

The following tables list and describe all the methods and/or properties used to configure Hazelcast Jet, grouped by the features they are used for. Some of the listed options below can also be configured declaratively. See the Declarative Configuration section for an example.

6.1.1. Jet Instance

Configuration API Method Description

setCooperativeThreadCount

The number of threads Jet creates in its cooperative multithreading pool. Its default value is Runtime.getRuntime().availableProcessors().

setFlowControlPeriodMs

Jet uses a flow control mechanism between cluster members to prevent a slower vertex from getting overflowed with data from a faster upstream vertex. Each receiver regularly reports to each sender how much more data it may send over a given DAG edge. This method sets the duration (in milliseconds) of the interval between flow-control packets. Its default value is 100 milliseconds.

setBackupCount

The number of synchronous backups to configure on the IMap that Jet needs internally to store job metadata and snapshots. The maximum allowed value is 6. Its default value is 1.

setScaleUpDelayMillis

The delay after which the auto-scaled jobs restart if a new member is added to the cluster. It has no effect on jobs with auto scaling disabled. Its default value is 10000 milliseconds.

setLosslessRestartEnabled

Specifies whether the lossless job restart feature is enabled. With this feature, you can restart the whole cluster without losing the jobs and their state. It is implemented on top of Hazelcast IMDG’s Hot Restart Store feature, which persists the data to disk. You need to have the Hazelcast Jet Enterprise edition and configure Hazelcast IMDG’s Hot Restart Store to use this feature. Its default value is false, i.e., disabled.

6.1.2. Fault Tolerance

Configuration API Method Description

setProcessingGuarantee

Processing guarantee for failures. If a cluster member leaves the cluster during a job execution and the job is restarted automatically, it defines the semantics of at which point in the stream the job is resumed from. Available values are EXACTLY_ONCE, AT_LEAST_ONCE and NONE. If you choose one of the first two values, distributed snapshotting is for the job. See the Javadoc for a detailed explanation. Its default value is NONE.

setSnapshotIntervalMillis

The interval in milliseconds between the completion of the previous snapshot and the start of a new one. It must be a positive value and it is only relevant when the processing guarantee is set as either EXACTLY_ONCE or AT_LEAST_ONCE. Its default value is set to 10000 milliseconds.

setAutoScaling

Specifies whether Jet scales the job up or down when a member is added or removed from the cluster. Its default value is true, i.e., enabled.

setSplitBrainProtection

Specifies whether the split-brain protection feature is enabled. When enabled, Jet restarts the job after a topology change, only if the cluster quorum is satisfied. See the Split-Brain Protection section.

6.1.3. Performance

Configuration API Method Description

setPacketSizeLimit

For a distributed edge, data is sent to a remote member via Hazelcast network packets. Each packet is dedicated to the data of a single edge, but may contain any number of data items. This setting limits the size of the packet in bytes. Packets should be large enough to drown out any fixed overheads, but small enough to allow good interleaving with other packets. Its default value is 16384.

setQueueSize

The capacity of processor-to-processor concurrent queues. The value is rounded upwards to the next power of 2. Its default value is 1024.

setReceiveWindowMultiplier

The scaling factor used by the adaptive receive window sizing function. Its default value is 3.

6.1.4. Security

Configuration Element Description

ssl

Allows you to configure the following properties to configure TLS for all member to member and member to client communications:

  • keyStore: Path of your keystore file.

  • keyStorePassword: Password to access the key from your keystore file.

  • keyManagerAlgorithm: Name of the algorithm based on which the authentication keys are provided.

  • keyStoreType: Type of the keystore. Its default value is JKS. Another commonly used type is the PKCS12. Available keystore/truststore types depend on your Operating system and the Java runtime.

  • trustStore: Path of your truststore file. The file truststore is a keystore file that contains a collection of certificates trusted by your application.

  • trustStorePassword: Password to unlock the truststore file.

  • trustManagerAlgorithm: Name of the algorithm based on which the trust managers are provided.

  • trustStoreType: Type of the truststore. Its default value is JKS. Another commonly used type is the PKCS12. Available keystore/truststore types depend on your Operating system and the Java runtime.

  • mutualAuthentication: Mutual authentication configuration. It’s empty by default which means the client side of connection is not authenticated. See the Mutual Authentication section in the Hazelcast IMDG Reference Manual for further details.

  • ciphersuites: Comma-separated list of cipher suite names allowed to be used. Its default value contains all the supported suites in your Java runtime.

  • protocol: Name of the algorithm which is used in your TLS/SSL. Its default value is TLS. When you use the default value, your application chooses the TLS version according to your Java version. So, we recommend you to provide TLS with its version information, e.g., TLSv1.3.

You need to have the Hazelcast Jet Enterprise edition to use this feature. See the Security chapter for more information.

6.1.5. Monitoring

Configuration Method/Property Description

hazelcast.logging.type

The predefined types of built-in logging adapters. Available values are jdk, log4j, log4j2, slf4j and none. See the Configure Logging section.

MetricsConfig

Allows you to provide the following configuration options for metrics collection:

  • setJmxEnabled: Specifies whether the metrics are exposed through JMX. Its default value is true, i.e., enabled.

  • setRetentionSeconds: Duration in seconds for which the metrics are retained on the Jet instance. Its default value is 5 seconds.

  • setCollectionIntervalSeconds: Metrics collections interval in seconds. Its default value is 5 seconds.

  • setMetricsForDataStructuresEnabled: Specifies whether the data structure statistics are included in the metrics. Its default value is false, i.e., disabled.

See Javadoc for more information.

You can also use Hazelcast Jet Management Center to monitor your Jet clusters. See the Hazelcast Jet Management Center Reference Manual for more information.

6.2. Ways of Configuration

You can configure Hazelcast Jet either programmatically or declaratively (XML or YAML).

6.2.1. Programmatic Configuration

Programmatic configuration is the simplest way to configure Jet. You instantiate a JetConfig and set the desired properties. For example, the following will configure Jet to use only two threads in its cooperative multithreading pool:

JetConfig config = new JetConfig();
config.getInstanceConfig().setCooperativeThreadCount(2);
JetInstance jet = Jet.newJetInstance(config);

If you want to specify your own configuration file to create JetConfig, Hazelcast Jet supports several ways including filesystem, classpath, InputStream and in-memory string.

Building JetConfig from the XML declarative configuration:

  • JetConfig cfg = JetConfig.loadXmlFromStream(inputStream);

  • JetConfig cfg = JetConfig.loadFromClasspath(classloader, xmlFileName);

  • JetConfig cfg = JetConfig.loadFromFile(configFile);

  • JetConfig cfg = JetConfig.loadXmlFromString(xml);

Building JetConfig from the YAML declarative JetConfiguration:

  • JetConfig cfg = JetConfig.loadYamlFromStream(inputStream);

  • JetConfig cfg = JetConfig.loadFromClasspath(classloader, yamlFileName);

  • JetConfig cfg = JetConfig.loadFromFile(configFile);

  • JetConfig cfg = JetConfig.loadYamlFromString(yaml);

6.2.2. Declarative Configuration

If you don’t pass an explicit JetConfig object when constructing a Jet instance, it will perform the following lookup procedure:

  1. Read the system property hazelcast.jet.config. If it starts with classpath:, treat it as a classpath resource, otherwise it’s a file pathname. If it’s defined but Jet can’t find the file it specifies, startup fails.

  2. Look for hazelcast-jet.xml in the working directory.

  3. Look for hazelcast-jet.xml in the classpath.

  4. Look for hazelcast-jet.yaml in the working directory.

  5. Look for hazelcast-jet.yaml in the classpath.

  6. Load the default XML configuration packaged in Jet’s JAR.

The YAML files need to have a .yaml or .yml extension if they are passed in via hazelcast.jet.config property.

An example hazelcast-jet.xml is shown below :

<hazelcast-jet>
    <instance>
       <cooperative-thread-count>8</cooperative-thread-count>
       <flow-control-period>100</flow-control-period>
       <temp-dir>/var/tmp/jet</temp-dir>
       <backup-count>1</backup-count>
       <scale-up-delay-millis>10000</scale-up-delay-millis>
    </instance>
    <properties>
       <property name="custom.property">custom property</property>
    </properties>
    <edge-defaults>
       <queue-size>1024</queue-size>
       <packet-size-limit>16384</packet-size-limit>
       <receive-window-multiplier>3</receive-window-multiplier>
    </edge-defaults>
    <metrics>
       <retention-seconds>5</retention-seconds>
       <collection-interval-seconds>5</collection-interval-seconds>
       <metrics-for-data-structures>false</metrics-for-data-structures>
    </metrics>
</hazelcast-jet>

The identical part of the configuration extracted from hazelcast-jet.yaml is shown as below:

hazelcast-jet:
  instance:
    cooperative-thread-count: 8
    flow-control-period: 100
    backup-count: 1
    scale-up-delay-millis: 10000
    lossless-restart-enabled: false
  edge-defaults:
    queue-size: 1024
    packet-size-limit: 16384
    receive-window-multiplier: 3
  metrics:
    enabled: true
    jmx-enabled: true
    collection-interval-seconds: 5
    retention-seconds: 5
    metrics-for-data-structures: false
Composing Declarative Configuration

Declarative configurations (either XML or YAML) have the support for composing multiple declarative configuration snippets into a single one.

In order to compose a declarative configuration, you can import different declarative configuration files.

Let’s say you want to compose the declarative configuration for Hazelcast Jet out of two XML configurations: development-metrics-config.xml and development-instance-config.xml. These two configurations are shown below.

development-metrics-config.xml:

<hazelcast-jet>
    <metrics enabled="true" jmxEnabled="true">
        <retention-seconds>5</retention-seconds>
        <collection-interval-seconds>5</collection-interval-seconds>
        <metrics-for-data-structures>true</metrics-for-data-structures>
    </metrics>
</hazelcast-jet>

development-instance-config.xml:

<hazelcast-jet>
    <instance>
        <flow-control-period>100</flow-control-period>
        <backup-count>1</backup-count>
        <scale-up-delay-millis>10000</scale-up-delay-millis>
        <lossless-restart-enabled>false</lossless-restart-enabled>
    </instance>
</hazelcast-jet>

To get your example Hazelcast Jet declarative configuration out of the above two, use the <import/> element as shown below.

<hazelcast-jet>
    <import resource="development-metrics-config.xml"/>
    <import resource="development-instance-config.xml"/>
</hazelcast-jet>

The above example using the YAML configuration files looks like the following:

development-metrics-config.yaml:

hazelcast-jet:
   metrics:
      enabled: true
      jmx-enabled: true
      collection-interval-seconds: 5
      retention-seconds: 5
      metrics-for-data-structures: true

development-instance-config.yaml:

hazelcast-jet:
  instance:
    flow-control-period: 100
    backup-count: 1
    scale-up-delay-millis: 10000
    lossless-restart-enabled: false

Composing the above two YAML configuration files needs them to be imported as shown below.

hazelcast-jet:
  import:
    - development-metrics-config.yaml
    - development-instance-config.yaml
Use the <import/> element and import mapping on top level of the XML and YAML hierarchies.

Hazelcast Jet and Hazelcast IMDG share the same syntax for declarative configuration composition. For more examples on importing the resources from the classpath and file system, or resources with variables in their names, refer to the relevant section in the Hazelcast IMDG Reference Manual.

6.2.3. Using Variables

In your Hazelcast Jet declarative configuration, you can use variables to set the values of the elements. This is valid when you set a system property programmatically or use the command line interface. You can use a variable in the declarative configuration to access the values of the system properties you set.

For example, see the following command that sets two system properties.

-Dcooperative.thread.count=16

Let’s get the values of these system properties in the declarative configuration of Hazelcast Jet, as shown below.

In the XML configuration:

<hazelcast-jet>
    <instance>
       <cooperative-thread-count>${cooperative.thread.count}</cooperative-thread-count>
    </instance>
</hazelcast-jet>

In the YAML configuration:

hazelcast-jet:
  instance:
    cooperative-thread-count: ${cooperative.thread.count}

If you do not want to rely on the system properties, you can use the XmlJetConfigBuilder or YamlJetConfigBuilder and explicitly set a Properties instance, as shown below.

Properties properties = new Properties();

// fill the properties, e.g., from database/LDAP, etc.

XmlJetConfigBuilder builder = new XmlJetConfigBuilder();
builder.setProperties(properties);
JetConfig config = builder.build();
JetInstance hz = Jet.newJetInstance(config);

6.2.4. Variable Replacers

Hazelcast Jet supports variable replacers which are used to replace custom strings during loading the configuration.

Hazelcast Jet and Hazelcast IMDG share the same syntax for variable replacers. You can configure your variable replacers in the Hazelcast Jet configuration via XML or YAML. Please see the Variable Replacers section in the Hazelcast IMDG Reference Manual for more information and examples on the variable replacers.

6.3. Configure the Underlying Hazelcast Instance

Each Jet member or client has its underlying Hazelcast member or client. Please refer to the Hazelcast IMDG Reference Manual for specific configuration options for Hazelcast IMDG.

6.3.1. Programmatic

You can configure the underlying Hazelcast IMDG member as follows:

JetConfig jetConfig = new JetConfig();
jetConfig.getHazelcastConfig().getGroupConfig().setName("test");
JetInstance jet = Jet.newJetInstance(jetConfig);

Similarly, you can also configure the underlying Hazelcast client:

ClientConfig clientConfig = new ClientConfig();
clientConfig.getGroupConfig().setName("test");
JetInstance jet = Jet.newJetClient(clientConfig);

6.3.2. Declarative

Hazelcast IMDG can also be configured declaratively. Please refer to the Hazelcast IMDG Reference Manual for information on how to do this.

6.4. Spring Integration

You can configure and start a Hazelcast Jet instance as a component in the Spring Application Context. You can use the plain bean element and define individual properties on a JetConfig instance, but Jet provides its own schema-based configuration which will make this much less verbose.

6.4.1. Approach 1: Use the Plain bean Element

You can declare Hazelcast Jet objects using the default Spring beans namespace. Here is an example for a Hazelcast Jet Instance declaration:

<bean id="instance" class="com.hazelcast.jet.Jet" factory-method="newJetInstance">
    <constructor-arg>
        <bean class="com.hazelcast.jet.config.JetConfig">
            <property name="hazelcastConfig">
                <bean class="com.hazelcast.config.Config">
                    <!-- ... -->
                </bean>
            </property>
            <property name="instanceConfig">
                <bean class="com.hazelcast.jet.config.InstanceConfig">
                    <property name="cooperativeThreadCount" value="2"/>
                </bean>
            </property>
            <property name="defaultEdgeConfig">
                <bean class="com.hazelcast.jet.config.EdgeConfig">
                    <property name="queueSize" value="2048"/>
                </bean>
            </property>
            <property name="properties">
                <props>
                    <prop key="foo">bar</prop>
                </props>
            </property>
        </bean>
    </constructor-arg>
</bean>

<bean id="map" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="my-map"/>
</bean>

6.4.2. Approach 2: Use jet:instance

Hazelcast Jet provides its own Spring config schema. Add the namespace declaration xmlns:jet=“http://www.hazelcast.com/schema/jet-spring” to the beans element and then use the jet namespace prefix. Make sure you added hazelcast-jet-spring.jar to the classpath.

Here’s how your namespace and schema instance declarations may look:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jet="http://www.hazelcast.com/schema/jet-spring"
       xmlns:hz="http://www.hazelcast.com/schema/spring"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
        http://www.hazelcast.com/schema/spring
        http://www.hazelcast.com/schema/spring/hazelcast-spring-3.12.xsd
        http://www.hazelcast.com/schema/jet-spring
        http://www.hazelcast.com/schema/jet-spring/hazelcast-jet-spring-3.2.xsd">
        <!-- ... -->
 </beans>
Configuring the Hazelcast Jet Instance
<jet:instance id="instance">
    <hz:config>
        <hz:spring-aware/>
        <hz:group name="jet"/>
        <hz:network port="5701" port-auto-increment="false">
            <hz:join>
                <hz:multicast enabled="false"/>
                <hz:tcp-ip enabled="true">
                    <hz:member>127.0.0.1:5701</hz:member>
                </hz:tcp-ip>
            </hz:join>
        </hz:network>
        <hz:map name="map" backup-count="3">
        </hz:map>
    </hz:config>
    <jet:instance-config cooperative-thread-Count="2"/>
    <jet:default-edge-config queue-size="2048"/>
    <jet:properties>
        <hz:property name="foo">bar</hz:property>
    </jet:properties>
</jet:instance>
Configure a Jet Client
<jet:client id="jet-client">
    <jet:group name="jet"/>
    <jet:network>
        <hz:member>127.0.0.1:5701</hz:member>
    </jet:network>
    <jet:spring-aware/>
</jet:client>
Additional Bean Types Supported by the Jet Namespace

You can obtain the underlying HazelcastInstance from the Jet instance as a bean and use it to obtain these Hazelcast IMDG beans:

  • map

  • list

  • multiMap

  • replicatedMap

  • queue

  • topic

  • set

  • executorService

  • idGenerator

  • atomicLong

  • atomicReference

  • semaphore

  • countDownLatch

  • lock

Here are short examples for each of them:

<jet:hazelcast jet-instance-ref="jet-instance" id="hazelcast-instance"/>

<jet:map instance-ref="jet-instance" name="my-map" id="my-map-bean"/>

<jet:list instance-ref="jet-client" name="my-list" id="my-list-bean"/>

<hz:multiMap id="multiMap" instance-ref="hazelcast-instance" name="my-multiMap"/>

<hz:replicatedMap id="replicatedMap" instance-ref="hazelcast-instance" name="my-replicatedMap"/>

<hz:queue id="queue" instance-ref="hazelcast-instance" name="my-queue"/>

<hz:topic id="topic" instance-ref="hazelcast-instance" name="my-topic"/>

<hz:set id="set" instance-ref="hazelcast-instance" name="my-set"/>

<hz:executorService id="executorService" instance-ref="hazelcast-instance" name="my-executorService"/>

<hz:idGenerator id="idGenerator" instance-ref="hazelcast-instance" name="my-idGenerator"/>

<hz:atomicLong id="atomicLong" instance-ref="hazelcast-instance" name="my-atomicLong"/>

<hz:atomicReference id="atomicReference" instance-ref="hazelcast-instance" name="my-atomicReference"/>

<hz:semaphore id="semaphore" instance-ref="hazelcast-instance" name="my-semaphore"/>

<hz:countDownLatch id="countDownLatch" instance-ref="hazelcast-instance" name="my-countDownLatch"/>

<hz:lock id="lock" instance-ref="hazelcast-instance" name="my-lock"/>

Hazelcast Jet also supports lazy-init, scope and depends-on bean attributes.

<jet:instance id="instance" lazy-init="true" scope="singleton">
<!-- ... -->
</jet:instance>
<jet:client id="client" scope="prototype" depends-on="instance">
<!-- ... -->
</jet:client>

6.4.3. Annotation-Based Configuration

Annotation-Based Configuration does not require any XML definition. Simply create a configuration class annotated with @Configuration and provide a JetInstance as a bean by annotating the method with @Bean.

@Configuration
public class AppConfig {

    @Bean
    public JetInstance instance() {
        return Jet.newJetInstance();
    }
}

6.4.4. Enabling SpringAware Objects

Hazelcast IMDG has a special annotation, @SpringAware, which enables you to initialize the object with spring context.

When a job is submitted to the cluster, processors are created by Hazelcast Jet on each member. By marking your processor with @SpringAware, you make spring context accessible to your processor which gives you the ability:

  • to apply bean properties

  • to apply factory callbacks such as ApplicationContextAware, BeanNameAware

  • to apply bean post-processing annotations such as InitializingBean, @PostConstruct

You need to configure Hazelcast Jet with <hz:spring-aware/> tag or set SpringManagedContext programmatically to enable spring-aware objects. Code samples for declarative and annotation-based configurations are available at our Code Samples repo.

7. Security

It is possible to configure Jet to use TLS for all member to member and member to client communications. TLS requires the use of Jet Enterprise version.

Once TLS is enabled on a member, all other members in the same cluster and all clients connecting to the cluster must also have TLS enabled.

The way TLS is configured is same as how it’s done in Hazelcast IMDG Enterprise. To configure TLS, you need a server certificate and a matching private key and do the necessary configuration on the member side.

The following is an example configuration snippet for hazelcast.xml. The configuration must be present on all the members.

<network>
    <ssl enabled="true">
        <factory-class-name>com.hazelcast.nio.ssl.BasicSSLContextFactory</factory-class-name>
        <properties>
            <property name="protocol">TLS</property>
            <property name="keyStore">/opt/hazelcast.keystore</property>
            <property name="keyStorePassword">password12345</property>
            <property name="trustStore">/opt/hazelcast.truststore</property>
            <property name="trustStorePassword">password12345</property>
            <property name="keyManagerAlgorithm">SunX509</property>
            <property name="trustManagerAlgorithm">SunX509</property>
            <property name="mutualAuthentication">REQUIRED</property>
        </properties>
    </ssl>
</network>

And similarly, a matching snippet must be present on the clients inside the hazelcast-client.xml file:

<network>
    <ssl enabled="true">
        <factory-class-name>com.hazelcast.nio.ssl.BasicSSLContextFactory</factory-class-name>
        <properties>
            <property name="protocol">TLS</property>

            <property name="trustStore">/opt/hazelcast-client.truststore</property>
            <property name="trustStorePassword">secret.123456</property>
            <property name="trustStoreType">JKS</property>

            <!-- Following properties are only needed when the mutual authentication is used. -->
            <property name="keyStore">/opt/hazelcast-client.keystore</property>
            <property name="keyStorePassword">keystorePassword123</property>
            <property name="keyStoreType">JKS</property>
            <property name="mutualAuthentication">REQUIRED</property>
        </properties>
    </ssl>
</network>

A detailed list of TLS configuration options can be found in the Hazelcast IMDG Reference Manual.

7.1. Mutual Authentication

When a client connects to a node, the node can authenticate the client by using mutual authentication. In this mode, the client also has to present a certificate to the member upon connection. This requires some additional configuration. See the Mutual Authentication section on the IMDG Reference Manual for further details.

7.2. OpenSSL Configuration

By default Jet uses the Java implementation of SSL. For better performance, it is also possible to use Jet with OpenSSL instead. For configuring OpenSSL please refer to the Hazelcast IMDG Reference Manual.

7.3. TLS Impact on Performance

TLS can have some effect on performance when running jobs, as all node to node communications need to be encrypted. During an aggregation or another partitioned operation in a job, large amounts of data might be transferred from one node to another.

Jet makes use of several techniques that reduce the need for data to be transferred across the nodes, however this is sometimes unavoidable. When using Java SSL, the performance impact can be between 5-30% depending on how much network is utilized. When using OpenSSL, we measured the impact to be minimal regardless of the amount of network traffic. Detailed benchmarks are available upon request.

7.4. Remote Hazelcast Sources and Sinks

Currently it’s not possible to connect to remote Hazelcast sources and sinks using TLS. This issue will be resolved in a further patch release.

8. Jet Concepts

In this chapter we will take a deep dive into the fundamentals of distributed computing and Jet’s specific take on it. You’ll find that having some intuition and insight into how distributed computing actually works in Jet makes a big difference when diagnosing your pipeline and improving its performance.

Jet performs high performance in-memory data processing by modeling the computation as a Directed Acyclic Graph (DAG), where vertices represent computation and edges represent data flows. A vertex receives data from its inbound edges, performs a step in the computation, and emits data to its outbound edges. Both the edge and the vertex are distributed entities: there are many parallel instances of the Processor type that perform a single vertex’s computation work on each cluster member. An edge between two vertices is implemented with many data connections, both within a member (concurrent SPSC queues) and between members (Hazelcast network connections).

One of the major reasons to divide the full computation task into several vertices is data partitioning: the ability to split the data stream into slices which can be processed in parallel, independently of each other. This is how Jet can parallelize and distribute the group-and-aggregate stream transformation, the major workhorse in distributed computing. To make this work, there must be a function which computes the partitioning key for each item and makes all related items map to the same key. Jet can then route all such items to the same processor instance, but has the freedom to route items with different keys to different processors.

Typically your computation job consists of a mapping vertex, where you pre-process the input data into a form that’s ready to be partitioned, followed by the grouping-and-aggregating vertex. The edge between them contains the partitioning logic.

8.1. Modeling the Computation as a DAG

We’ll take one specific problem, the Word Count, dissect it and explain how it gets computed in a Jet cluster. Let us first see the definition of the computation in the Pipeline API:

Pattern delimiter = Pattern.compile("\\W+");
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Long, String>map("book-lines"))
 .flatMap(e -> traverseArray(delimiter.split(e.getValue().toLowerCase())))
 .filter(word -> !word.isEmpty())
 .groupingKey(wholeItem())
 .aggregate(AggregateOperations.counting())
 .drainTo(Sinks.map("counts"));

Now let’s step back from this and start from the single-threaded Java code that solves the problem for a basic data structure such as an ArrayList. If you have some familiarity with the java.util.stream API, this is how you’d express it:

Map<String, Long> counts =
        lines.stream()
             .flatMap(line -> Arrays.stream(line.toLowerCase().split("\\W+")))
             .filter(word -> !word.isEmpty())
             .collect(Collectors.groupingBy(word -> word, Collectors.counting()));

You can notice a strong similarity with the Pipeline API formulation, but the way it’s executed is radically different. Java will compute it in a single thread, basically running this code:

List<String> lines = someExistingList();
Map<String, Long> counts = new HashMap<>();
for (String line : lines) {
    for (String word : line.toLowerCase().split("\\W+")) {
        if (!word.isEmpty()) {
            counts.merge(word, 1L, (count, one) -> count + one);
        }
    }
}

The j.u.s. formulation helps us see the steps taken to process each data item:

  1. lines.stream(): read the items (lines of text) from the data source (we’ll call this the “source” step).

  2. flatMap()+filter(): split each line into lowercase words, avoiding empty strings (the tokenizing step).

  3. collect(): group equal words together and count them (the accumulating step).

Our next move is to express these steps as a DAG. We’ll start with a single-threaded model and then make several transformations to reach a parallelized, distributed one, discussing at each step the concerns that arise and how to meet them.

We can represent the steps outlined above in a directed acyclic graph (DAG):

Word-counting DAG

The simplest, single-threaded code (shown above) deals with each item as it is produced: the outer loop reads the lines, the inner loop that runs for each line deals with the words on that line, and inside the inner loop we populate the result map with running counts.

However, just by modeling the computation as a DAG, we’ve split the work into isolated steps with clear data interfaces between them. We can perform the same computation by running a separate thread for each step. Roughly speaking, these are the snippets the threads would be executing:

// Source thread
for (String line : readLines()) {
    emit(line);
}
// Tokenizer thread
for (String line : receive()) {
    for (String word : line.toLowerCase().split("\\W+")) {
        if (!word.isEmpty()) {
            emit(word);
        }
    }
}
// Accumulator thread
Map<String, Long> counts = new HashMap<>();
for (String word : receive()) {
    counts.merge(word, 1L, (count, one) -> count + one);
}
// finally, when done receiving:
for (Entry<String, Long> wordAndCount : counts.entrySet()) {
    emit(wordAndCount);
}

The source loop feeds the tokenizer loop over a concurrent queue, the tokenizer feeds the accumulator loop, and after the accumulator is done receiving, it emits its results to the sink. Diagrammatically it looks like this:

Word-counting DAG with concurrent queues shown

This transformation brought us a pipelined architecture: while the tokenizer is busy with the regex work, the accumulator is updating the map using the data the tokenizer is done with; and the source and sink stages are pumping the data from/to the environment. Our design is now able to engage more than one CPU core and will complete that much sooner; however, we’re still limited by the number of vertices. We’ll be able utilize two or three cores regardless of how many are available. To move forward we must try to parallelize the work of each individual vertex.

Given that our input is an in-memory list of lines, the bottleneck occurs in the processing stages (tokenizing and accumulating). Let’s first attack the tokenizing stage: it is a so-called "embarrassingly parallelizable" task because the processing of each line is completely self-contained. At this point we have to make a clear distinction between the notions of vertex and processor: there can be several processors doing the work of a single vertex. Let’s add another tokenizing processor:

Word-counting DAG with tokenizer vertex parallelized

The input processor can now use all the available tokenizers as a pool and submit to any one whose queue has some room.

The next step is parallelizing the accumulator vertex, but this is trickier: accumulators count word occurrences so using them as a pool will result in each processor observing almost all distinct words (entries taking space in its hashtable), but the counts will be partial and will need combining. The common strategy to reduce memory usage is to ensure that all occurrences of the same word go to the same processor. This is called “data partitioning” and in Jet we’ll use a partitioned edge between the tokenizer and the accumulator:

Word-counting DAG with tokenizer and accumulator parallelized

As a word is emitted from the tokenizer, it goes through a “switchboard” stage where it’s routed to the correct downstream processor. To determine where a word should be routed, we can calculate its hashcode and use the lowest bit to address either accumulator 0 or accumulator 1.

At this point we have a blueprint for a fully functional parallelized computation job which can max out all the CPU cores given enough instances of tokenizing and accumulating processors. The next challenge is making this work across machines.

For starters, our input can no longer be a simple in-memory list because that would mean each machine processes the same data. To exploit the cluster as a unified computation device, each cluster member must observe only a slice of the dataset. Given that a Jet instance is also a fully functional Hazelcast IMDG instance and a Jet cluster is also a Hazelcast IMDG cluster, the natural choice is to pre-load our data into an IMap, which will be automatically partitioned and distributed across the members. Now each Jet member can just read the slice of data that was stored locally on it.

When run in a cluster, Jet will instantiate a replica of the whole DAG on each member. On a two-member cluster there will be two source processors, four tokenizers, and so on. The trickiest part is the partitioned edge between tokenizer and accumulator: each accumulator is supposed to receive its own subset of words. That means that, for example, a word emitted from tokenizer 0 will have to travel across the network to reach accumulator 3, if that’s the one that happens to own it. On average we can expect every other word to need network transport, causing both serious network traffic and serialization/deserialization CPU load.

There is a simple trick we can employ to avoid most of this traffic, closely related to what we pointed above as a source of problems when parallelizing locally: members of the cluster can be used as a pool, each doing its own partial word counts, and then send their results to a combining vertex. Note that this means sending only one item per distinct word. Here’s the rough equivalent of the code the combining vertex executes:

// Combining vertex
Map<String, Long> combined = new HashMap<>();
for (Entry<String, Long> wordAndCount : receive()) {
    combined.merge(wordAndCount.getKey(), wordAndCount.getValue(),
            (accCount, newCount) -> accCount + newCount);
}
// finally, when done receiving:
for (Entry<String, Long> wordAndCount : combined.entrySet()) {
    emit(wordAndCount);
}

As noted above, such a scheme takes more memory due to more hashtable entries on each member, but it saves network traffic (an issue we didn’t have within a member). Given that memory costs scale with the number of distinct keys (english words in our case), the memory cost is more-or-less constant regardless of how much book material we process. On the other hand, network traffic scales with the total data size so the more material we process, the more we save on network traffic.

Jet distinguishes between local and distributed edges, so we’ll use a local partitioned edge for tokenize→`accumulate` and a distributed partitioned edge for accumulate→`combine`. With this move we’ve finalized our DAG design, which can be illustrated by the following diagram:

Word-counting DAG parallelized and distributed

8.2. Unbounded Stream Processing

So far we’ve worked with a bounded (finite) stream processing task. In general, you provide Jet with one or more pre-existing datasets and order it to mine them for interesting information. The most important workhorse in this area is the "join, group and aggregate" operation: you define a classifying function that computes a grouping key for each of the datasets and an aggregate operation that will be performed on all the items in each group, yielding one result item per distinct key. Jet can apply the same operation on unbounded data streams as well.

8.2.1. The Importance of “Right Now”

In batch jobs the data we process represents a point-in-time snapshot of our state of knowledge (for example, warehouse inventory where individual data items represent items on stock). We can recapitulate each business day by setting up regular snapshots and batch jobs. However, there is more value hiding in the freshest data — our business can win by reacting to minute-old or even second-old updates. To get there we must make a shift from the finite to the infinite: from the snapshot to a continuous influx of events that update our state of knowledge. For example, an event could pop up in our stream every time an item is checked in or out of the warehouse.

A single word that captures the above story is latency: we want our system to minimize the latency from observing an event to acting upon it.

8.2.2. Windowing

In an unbounded stream, the dimension of time is always there. Consider a batch job: it may process a dataset labeled “Wednesday”, but the computation itself doesn’t have to know this. Its results will be understood from the outside to be “about Wednesday”. An endless stream, on the other hand, delivers information about the reality as it is unfolding, in near-real time, and the computation itself must deal with time explicitly.

Another point: in a batch it is obvious when to stop aggregating and emit the results: when we have exhausted the whole dataset. However, with unbounded streams we need a policy on how to select bounded chunks whose aggregate results we are interested in. This is called windowing. We imagine the window as a time interval laid over the time axis. A given window contains only the events that belong to that interval.

A very basic type of window is the tumbling window, which can be imagined to advance by tumbling over each time. There is no overlap between the successive positions of the window. In other words, it splits the time-series data into batches delimited by points on the time axis. The result of this is very similar to running a sequence of batch jobs, one per time interval.

A more useful and powerful policy is the sliding window: instead of splitting the data at fixed boundaries, it lets it roll in incrementally, new data gradually displacing the old. The window (pseudo)continuously slides along the time axis.

Another popular policy is called the session window and it’s used to detect bursts of activity by correlating events bunched together on the time axis. In an analogy to a user’s session with a web application, the session window “closes” when the specified session timeout elapses with no further events.

8.2.3. Time Ordering and the Watermark

Usually the time of observing an event is explicitly written in a field of the stream item. There is no guarantee that items will occur in the stream ordered by the value of that field; in fact in many cases it is certain that they won’t. Consider events gathered from users of a mobile app: for all kinds of reasons the items will arrive to our datacenter out of order, even with significant delays due to connectivity issues.

This disorder in the event stream makes it more difficult to formally specify a rule that tells us at which point all the data for a given window has been gathered, allowing us to emit the aggregated result.

To approach these challenges we use the concept of the watermark. It is a timestamped item Jet inserts into the stream that says "from this point on there will be no more items with timestamp less than this". Unfortunately, we almost never know for sure when such a statement becomes true and there is always a chance some events will arrive even later. If we do observe such an offending item, we must categorize it as “too late” and just filter it out.

Note the tension in defining the “perfect” watermark for a given use case: it is bad both the more we wait and the less we wait to emit a given watermark. The more we wait, the higher the latency of getting the results of the computation; the less we wait, the worse their accuracy due to missed events.

For these reasons Jet cannot determine the watermark on its own, you must decide how much disorder to accept (and expect).

8.3. Sliding and Tumbling Window

Many quantities, like “the current rate of change of a price” require you to aggregate your data over some time period. This is what makes the sliding window so important: it tracks the value of such a quantity in real time.

Calculating a single sliding window result can be quite computationally intensive, but we also expect it to slide smoothly and give a new result often, even many times per second. This is why we gave special attention to optimizing this computation.

We optimize especially heavily for those aggregate operations that have a cheap way of combining partial results and even more so for those which can cheaply undo the combining. For cheap combining you have to express your operation in terms of a commutative and associative (CA for short) function; to undo a combine you need the notion of “negating” an argument to the function. A great many operations can be expressed through CA functions: average, variance, standard deviation and linear regression are some examples. All of these also support the undoing (which we call deduct). The computation of extreme values (min/max) is an example that has CA, but no good notion of negation and thus doesn’t support deducting.

This is the way we leverage the above properties: our sliding window actually “hops” in fixed-size steps. The length of the window is an integer multiple of the step size. Under such a definition, the tumbling window becomes just a special case with one step per window.

This allows us to divide the timestamp axis into frames of equal length and assign each event to its frame. Instead of keeping the event object, we immediately pass it to the aggregate operation’s accumulate primitive. To compute a sliding window, we take all the frames covered by it and combine them. Finally, to compute the next window, we just deduct the trailing frame and combine the leading frame into the existing result.

Even without deduct the above process is much cheaper than the most naïve approach where you’d keep all data and recompute everything from scratch each time. After accumulating an item just once, the rest of the process has fixed cost regardless of input size. With deduct, the fixed cost approaches zero.

8.3.1. Example: 30-second Window Sliding by 10 Seconds

We’ll now illustrate the above story with a specific example: we’ll construct a 30-second window which slides by 10 seconds (i.e., three steps per window). The aggregate operation is to simply count the number of events. In the diagrams we label the events as minutes:seconds. This is the outline of the process:

  1. Throw each event into its “bucket” (the frame whose time interval it belongs to).

  2. Instead of keeping the items in the frame, just keep the item count.

  3. Combine the frames into three different positions of the sliding window, yielding the final result: the number of events that occurred within the window’s timespan.

Grouping disordered events by frame and then to sliding window

This would be a useful interpretation of the results: "At the time 1:30, the 30-second running average was 8/30 = 0.27 events per second. Over the next 20 seconds it increased to 10/30 = 0.33 events per second."

Keep in mind that the whole diagram represents what happens on just one cluster member and for just one grouping key. The same process is going on simultaneously for all the keys on all the members.

8.3.2. Two-stage aggregation

The concept of frame combining helps us implement two-stage aggregation as well. In the first stage the individual members come up with their partial results by frame and send them over a distributed edge to the second stage, which combines the frames with the same timestamp. After having combined all the partial frames from members, it combines the results along the event time axis into the sliding window.

Combining partial frames in two-stage aggregation

8.4. Session Window

In the abstract sense, the session window is a quite intuitive concept: it simply captures a burst of events. If no new events occur within the configured session timeout, the window closes. However, because the Jet processor encounters events out of their original order, this kind of window becomes quite tricky to compute.

The way Jet computes the session windows is easiest to explain in terms of the event interval: the range [eventTimestamp, eventTimestamp + sessionTimeout]. Initially an event causes a new session window to be created, covering exactly the event interval.

Session window: single event

A following event under the same key belongs to this window iff its interval overlaps it. The window is extended to cover the entire interval of the new event.

Session window: extend with another event

If the event intervals don’t overlap, Jet creates new session window for the new event.

Session window: create a new window after session timeout

An event may happen to belong to two existing windows if its interval bridges the gap between them; in that case they are combined into one.

Session window: an event may merge two existing windows

Once the watermark has passed the closing time of a session window, Jet can close it and emit the result of its aggregation.

8.5. Fault Tolerance and Processing Guarantees

One less-than-obvious consequence of stepping up from finite to infinite streams is the difficulty of forever maintaining the continuity of the output, even in the face of changing cluster topology. A Jet node may leave the cluster due to an internal error, loss of networking, or deliberate shutdown for maintenance. This will cause the computation job to be suspended. Except for the obvious problem of new data pouring in while we’re down, we have a much more fiddly issue of restarting the computation in a differently laid-out cluster exactly where it left off and neither miss anything nor process it twice. The technical term for this is the "exactly-once processing guarantee".

Jet achieves fault tolerance in streaming jobs by making a snapshot of the internal processing state at regular intervals. If a member of the cluster fails while a job is running, Jet will detect this and restart the job on the new cluster topology. It will restore its internal state from the snapshot and tell the source to start sending data from the last “committed” position (where the snapshot was taken). The data source must have built-in support to replay the data from the given checkpoint. The sink must either support transactions or be idempotent, tolerating duplicate submission of data.

In a Jet cluster, one member is the coordinator. It tells other members what to do and they report to it any status changes. The coordinator may fail and the cluster will automatically re-elect another one. If any other member fails, the coordinator restarts the job on the remaining members.

8.6. Update a DAG Without Losing the State

If you need to do a change in the pipeline or DAG, you need to submit a new job. The old job has to be cancelled and a new one with modified pipeline has to be submitted. To preserve the state, you have to export it before you cancel the old job and start the new job with the exported state.

There are 2 ways of exporting the state:

JetInstance client = Jet.newJetClient();
Job job = client.getJob("foo-job");
// This call will block until the snapshot is written
job.cancelAndExportSnapshot("foo-state-name");

// Code for the new pipeline
Pipeline p = Pipeline.create();
JobConfig config = new JobConfig()
        .setInitialSnapshotName("foo-state-name");
client.newJob(p, config);

// Now make sure the new job is running and restored from the
// state correctly. When you no longer need the snapshot, delete
// it. Jet never deletes exported snapshots automatically.
client.getJobStateSnapshot("foo-state-name").destroy();

8.6.1. State Compatibility

The state has to be compatible with the updated pipeline. The snapshot contains separate data for each vertex, identified by the vertex name. If a snapshot doesn’t contain data for a vertex, it will be restored with an empty state. If a snapshot contains data for a vertex that does not exist, that data will be ignored. If the vertex name matches, then it depends on the specific processor type what type of change is allowed.

From this follows:

  • you can add and new stateful stages and remove existing ones without breaking compatibility. This includes adding/removing a source or sink or adding a new aggregation path from existing sources (see also Update a Job with Auto-Assigned Stage Names).

  • you can freely add, remove or change stateless stages, such as filter/map/flatMap stages, sinks and others

Recombining the Frames of Sliding Windows

Jet supports changing of sliding window size or sliding step. In some situations, however, you can have incorrect data on the output.

Sliding windows in Jet accumulate input items into frames (see Sliding and Tumbling Window). The frame size is equal to the slide step of the window. The processor saves the aggregation’s accumulator to the snapshot, one for each key and frame.

If the slide step changes, the frames in the snapshot won’t match those needed for the new slide step. You can do this change, but the frames will be recombined into new frames based on their end timestamps.

Recombining restored frames
Figure 1. Example recombining 10-second frames into 15-second frames

As you see on the figure, frame 1:10 will be put into frame 1:15. Frames 1:20 and 1:30 will be combined into frame 1:30. This means that events with timestamp between 1:10 to 1:15 will be incorrectly accumulated into frame 1:15-1:30.

You also can extend the window size, but the first emitted windows after restoring will miss older events. The reason for this is that the frames were already purged, see the image:

Example extending window size
Figure 2. Example extending window size

In the example above, the 20-second window was extended to a 30-second one, sliding step (i.e. frame size) was unchanged. As you see, the frames after restore match the frames when the snapshot was saved, so no recombining will occur. However, the next window to emit at 1:30 will miss events between 0:50-1:00, because the frame containing them was already purged because it wasn’t needed for the original 20-second window. The window at 1:40 will be correct. Reducing the window size has no such issue.

State Compatibility of Other Processors

Here are examples of other supported changes to the parameters of pipeline stages. For details, consult the javadoc of each stage.

  • change session window timeout

  • change connection parameters of sources/sinks

  • change parameters of aggregate operation: for example, change the comparator of AggregateOperation.minBy()

  • replace the aggregate operation, if the accumulator type is the same: for example, change counting() to summingLong(), if that makes sense

  • any change to stateless stages

The following changes are not supported:

  • change a sliding window to a session window

  • replace aggregation operation for one with incompatible accumulator type

  • assign a different name to a stage

Update a Job with Auto-Assigned Stage Names

The Pipeline API allows you to set the name of a stage using Stage.setName, but if you don’t set it, Jet generates the name based on the stage type. If this would result in the same name for multiple stages, Jet appends -N to the name, where the N is a sequence number. For example, if you have 3 sliding window aggregations in the job, their names will be sliding-window, sliding-window-2 and sliding-window-3. The number sequence follows the order in which you added the stages to the Pipeline.

When Jet translates the pipeline into the Core API DAG, it uses the name of the stage to name the vertex that implements it. Since the vertex name identifies the data element in the state snapshot, it’s very important to keep it equal when you change the pipeline to a new version. For example, you can’t reorder the aggregations or remove the first one because that would change the names and incorrect state will be restored to the vertex. Therefore we recommend that you explicitly assign a unique name to every stage in the pipeline you intend to use and maintain over a long term.

Caveats When Changing the Grouping Key

You can change the key-extracting function for a `groupingKey’s, but keep in mind that the snapshotted state has keys extracted with the old key extractor. If the new keys aren’t equal, they will create new groups.

You can also change the key type. In this case, however, the job may fail with a ClassCastException because the downstream might expect a different type than it will actually receive. For example, if you change the key type from Integer to Long, the downstream stage of a window aggregation will expect KeyedWindowResult<Long, V>, but can actually get KeyedWindowResult<Integer, V>.

State compatibility with Further Versions

Jet does not guarantee snapshot compatibility between major releases. Snapshot created in older version might not work in newer major version. Bugfix releases are compatible.

8.7. Jet’s Execution Model

At the heart of Jet is the TaskletExecutionService. It manages the threads that perform all the computation in a Jet job. Although this class is not formally a part of Jet’s public API, understanding how it schedules code for execution is essential if you want to implement a cooperative processor.

8.7.1. Cooperative Multithreading

Cooperative multithreading is one of the core features of Jet and can be roughly compared to green threads. It is purely a library-level feature and does not involve any low-level system or JVM tricks; the Processor API is simply designed in such a way that the processor can do a small amount of work each time it is invoked, then yield back to the Jet engine. The engine manages a thread pool of fixed size and on each thread, the processors take their turn in a round-robin fashion.

The point of cooperative multithreading is better performance. Several factors contribute to this:

  • The overhead of context switching between processors is much lower since the operating system’s thread scheduler is not involved.

  • The worker thread driving the processors stays on the same core for longer periods, preserving the CPU cache lines.

  • The worker thread has direct knowledge of the ability of a processor to make progress (by inspecting its input/output buffers).

8.7.2. Tasklet

The execution service doesn’t deal with processors directly; instead it deals with tasklets. Tasklet is a very simple functional interface derived from the standard Java Callable<ProgressState>. The execution service manages a pool of worker threads, each being responsible for a list of tasklets. The worker thread simply invokes the call() methods on its tasklets in a round-robin fashion. The method’s return value tells whether the tasklet made progress and whether it is now done.

The most important tasklet is the one driving a processor (ProcessorTasklet); there are a few others that deal with network sending/receiving and taking snapshots.

8.7.3. Exponential Backoff

If none of the worker’s tasklets report having made progress, the worker will go to a short sleep. If this happens again after it wakes up, it will sleep for twice as long. Once it reaches 1 ms sleep time, it will continue retrying once per millisecond to see if any tasklets can make progress.

8.7.4. ProcessorTasklet

ProcessorTasklet is the one that drives a processor. It manages its inbox, outbox, inbound/outbound concurrent queues, and tracks the current processor state so it knows which of its callback methods to call.

During each tasklet.call(), ProcessorTasklet makes one call into one of its processor’s callbacks. It determines the processor’s progress status and reports it to the execution service.

8.7.5. Non-Cooperative Processor

If a processor declares itself as non-cooperative, the execution service will start a dedicated Java thread for its tasklet to run on.

Even if it’s non-cooperative, the processor’s callback methods must still make sure they don’t run for longer than a second or so at a time. Jet can’t start a snapshot until all the processors have yielded control back to it.

8.8. What Happens When you Submit a Job

When you submit a Job to it, Jet replicates the DAG to the whole Jet cluster and executes a copy of it on each member.

DAG Distribution

Jet executes the job on a user-configurable number of threads which use work stealing to balance the amount of work being done on each thread. Each worker thread has a list of tasklets it is in charge of and as tasklets complete at different rates, the remaining ones are moved between workers to keep the load balanced.

Each instance of a Processor is wrapped in one tasklet which the execution service repeatedly executes until it is done. A vertex with a parallelism of 8 running on 4 members would have a total of 32 tasklets running at the same time. Each member has the same number of tasklets running.

Tasklet execution model

When you make a request to execute a Job, the corresponding DAG and additional resources are deployed to the Jet cluster. Jet builds an execution plan for the DAG on each member, which creates the associated tasklets for each Vertex and connects them to their inputs and outputs.

Jet uses Single Producer/Single Consumer ringbuffers to transfer the data between processors on the same member. They are data-type agnostic, so any data type can be used to transfer the data between vertices.

Ringbuffers, being bounded queues, introduce natural backpressure into the system; if a consumer’s ringbuffer is full, the producer will have to back off until it can enqueue the next item. When data is sent to another member over the network, there is no natural backpressure, so Jet uses explicit signaling in the form of adaptive receive windows.

8.9. Distributed Snapshot

The technique Jet uses to achieve fault tolerance is called a “distributed snapshot”, described in a paper by Chandy and Lamport. At regular intervals, Jet raises a global flag that says "it’s time for another snapshot". All processors belonging to source vertices observe the flag, create a checkpoint on their source, emit a barrier item to the downstream processors and resume processing.

As the barrier item reaches a processor, it stops what it’s doing and emits its state to the snapshot storage. Once complete, it forwards the barrier item to its downstream processors.

Due to parallelism, in most cases a processor receives data from more than one upstream processor. It will receive the barrier item from each of them at separate times, but it must start taking a snapshot at a single point in time. There are two approaches it can take, as explained below.

8.9.1. Exactly-Once Snapshotting

With exactly-once configured, as soon as the processor gets a barrier item in any input stream (from any upstream processor), it must stop consuming it until it gets the same barrier item in all the streams:

Exactly-once processing: received one barrier
  1. At the barrier in stream X, but not Y. Must not accept any more X items.

    Exactly-once processing: received both barriers
  2. At the barrier in both streams, taking a snapshot.

    Exactly-once processing: forward the barrier
  3. Snapshot done, barrier forwarded. Can resume consuming all streams.

8.9.2. At-Least-Once Snapshotting

With at-least-once configured, the processor can keep consuming all the streams until it gets all the barriers, at which point it will stop to take the snapshot:

At-Least-once processing: received one barrier
  1. At the barrier in stream X, but not Y. Carry on consuming all streams.

    At-Least-once processing: received both barriers
  2. At the barrier in both streams, already consumed x1 and x2. Taking a snapshot.

    At-Least-once processing: forward the barrier
  3. Snapshot done, barrier forwarded.

Even though x1 and x2 occur after the barrier, the processor consumed and processed them, updating its state accordingly. If the computation job stops and restarts, this state will be restored from the snapshot and then the source will replay x1 and x2. The processor will think it got two new items.

8.10. Dropped Late Events

At times, you might encounter a line like the following in your logs:

Event dropped, late by 123ms. currentWatermark=14:23:10.256, eventTime=14:23:10.123, event=FooEvent

The most obvious cause to check is to make sure that there are no two events E1 and E2 in any source partition where E2 is positioned after E1 in the stream and E1.timestamp - allowedLag > E2.timestamp.

Besides this obvious reason, there are few more possible causes:

  • If timestamps are not added in source, source streams might be split and merged in an unpredictable way. See Prefer Assigning Timestamps at the Source. Also Map Journal and Cache Journal sources currently don’t coalesce watermarks in source, they fall into this category too.

  • Source partitions can be marked as idle. The idle timeout is hardcoded for pipeline api to DEFAULT_IDLE_TIMEOUT. If the partition isn’t really idle, but rather is stalled, after it resumes, events from it can be late because it was excluded from coalescing.

Note: items are not dropped unless there is need to. For example, map operation never drops items, even if they come late. Currently only windowing aggregations drop events, because they purge data from memory as the event time passes.

8.11. Stream Skew

We explained how we use the concept of watermark to impose order onto a disordered data stream. However, items arriving out of order aren’t our only challenge; modern stream sources like Kafka are partitioned and distributed so “the stream” is actually a set of independent substreams, moving on in parallel. Substantial time difference may arise between events being processed on each one, but our system must produce coherent output as if there was only one stream. We meet this challenge by coalescing watermarks: as the data travels over a partitioned/distributed edge, we make sure the downstream processor observes the correct watermark value, which is the least of watermarks received from the contributing substreams.

8.11.1. Rules of Watermark Propagation

Watermark objects are sent interleaved with other stream items, but are handled specially:

  • The value of the watermark a processor emits must be strictly increasing. Jet will throw an exception if it detects a non-increasing watermark.

  • When a processor receives a watermark, it should add it to the outbox. The processor is allowed to delay the watermark: for example, if the processor does async mapping, it should emit the watermark after it received responses for all items that came before it. Sink processors don’t have to emit watermarks.

  • The watermark item is always broadcast, regardless of the edge type. This means that all N upstream processors send their watermark to all M downstream processors.

  • The processor will observe only a watermark that was received from all upstream processors and from all upstream edges. This is called watermark coalescing.

Jet’s internal class WatermarkCoalescer manages watermarks received from multiple inputs. As it receives watermark items from them, its duty is to decide when to forward the watermark downstream. This happens at two levels:

  • between multiple queues backing single edge

  • between multiple input edges to single processor

8.11.2. Idle inputs

A special object called idle message can be emitted from source processor when the processor sees no events for configured idle timeout. This can happen in real life when some external partitions have no events while others do.

When an idle message is received from an input, that input will be excluded from watermark coalescing. This means that we will not wait to receive watermark from the idle input. It will allow the other active inputs to be processed without further delay. When the idle timeout is disabled and some processor doesn’t emit any watermarks (because it sees no events), the processing will stall indefinitely.

8.12. The Pitfalls of At-Least-Once Processing

In some cases at-least-once semantics can have consequences of quite an unexpected magnitude, as we discuss next.

8.12.1. Apparent Data Loss

Imagine a very simple kind of processor: it matches up the items that belong to a pair based on some rule. If it receives item A first, it remembers it. Later on, when it receives item B, it emits that fact to its outbound edge and forgets about the two items. It may also first receive B and wait for A.

Now imagine this sequence: A → BARRIER → B. In at-least-once the processor may observe both A and B, emit its output, and forget about them, all before taking the snapshot. After the restart, item B will be replayed because it occurred after the last barrier, but item A won’t. Now the processor is stuck forever in a state where it’s expecting A and has no idea it already got it and emitted that fact.

Problems similar to this may happen with any state the processor keeps until it has got enough information to emit the results and then forgets it. By the time it takes a snapshot, the post-barrier items will have caused it to forget facts about some pre-barrier items. After a restart it will behave as though it has never observed those pre-barrier items, resulting in behavior equivalent to data loss.

8.12.2. Non-Monotonic Watermark

One special case of the above story concerns watermark items. Thanks to watermark coalescing, processors are typically implemented against the invariant that the watermark value always increases. However, in at-least-once the post-barrier watermark items will advance the processor’s watermark value. After the job restarts and the state gets restored to the snapshotted point, the watermark will appear to have gone back, breaking the invariant. This can again lead to apparent data loss.

9. Expert Zone — The Core API

This section covers the Core API, Jet’s low-level API that directly exposes the computation engine’s raw features. If you are looking for the API to build your computation pipeline, please refer to the Pipeline API section.

Creating a Core API DAG requires expert-level familiarity with concepts like partitioning schemes, vertex parallelism, distributed vs. local edges, etc. Furthermore, this API offers no static type safety and it is very easy to create a DAG that fails with a ClassCastException when executed. Even though it is possible, this API is not intended to create DAGs by hand; it offers the infrastructure on top of which to build high-level DSLs and APIs that describe computation jobs.

Implementing a Core API Processor requires even greater expertise than building a DAG. Among other things, you have to be acquainted in detail with Jet’s concept of cooperative multithreading. While we provide as much convenience as we can for extending Jet with your custom processors, we cannot remove the dangers of using these facilities improperly.

9.1. DAG

The DAG-building API is centered around the DAG class. This is a pure data class and can be instantiated on its own, without a Jet instance. This makes it simple to separate the job-describing code from the code that manages the lifecycle of Jet instances.

You can "compile" a Jet pipeline into a DAG:

DAG dag = pipeline.toDag();

Studying these DAGs may be a useful aid while learning the Core API.

To start building a DAG from scratch, write

DAG dag = new DAG();

A good practice is to structure the DAG-building code into the following sections:

  1. Create all the vertices.

  2. Configure the local parallelism of vertices.

  3. Create the edges.

Example:

(1)
Vertex source = dag.newVertex("source",
        SourceProcessors.readFilesP(".", UTF_8, "*", false, (file, line) -> line)
);
Vertex transform = dag.newVertex("transform", mapP(
        (String line) -> entry(line, line.length())
));
Vertex sink = dag.newVertex("sink", SinkProcessors.writeMapP("sinkMap"));

(2)
source.localParallelism(1);

(3)
dag.edge(between(source, transform));
dag.edge(between(transform, sink));
1 Create the vertices
2 Configure local parallelism
3 Create the edges

9.1.1. Creating a Vertex

The two mandatory elements of creating a vertex are its string identifier and the supplier of processors. The latter can be provided in three variants, differing in the degree of explicit control over the lifecycle management of the processors. From simple to complex they are:

  1. SupplierEx<Processor> directly returns processor instances from its get() method. It is expected to be stateless and return equivalent instances on each call. It doesn’t provide any initialization or cleanup code.

  2. ProcessorSupplier returns in a single call all the processors that will run on a single cluster member. It may specialize each instance, for example to achieve local data partitioning. It is also in charge of the member-local lifecycle (initialization and destruction).

  3. ProcessorMetaSupplier returns in a single call an object that will be in charge of creating all the processors for a vertex. Given a list of member addresses, the object it returns is a Function<Address, ProcessorSupplier> which will then be called with each of the addresses from the list to retrieve the ProcessorSupplier specialized for the given member.

ProcessorMetaSupplier is the most fundamental facility. It is a factory of ProcessorSupplier s. SupplierEx<Processor> exists purely as convenience over ProcessorSupplier for the simplest kinds of vertices.

You make the choice which of the three to use for a particular vertex when you implement it. When you build a DAG from already implemented vertices, you don’t have to care, or even know, which one it’s using. You’ll call a factory method that returns one or the other and they will integrate the same way into your newVertex() calls.

9.1.2. Local and Global Parallelism of Vertex

The vertex is implemented at runtime by one or more instances of Processor on each member. Each vertex can specify how many of its processors will run per cluster member using the localParallelism property; every member will have the same number of processors. A new Vertex instance has this property set to -1, which requests to use the default value equal to the configured size of the cooperative thread pool. The latter defaults to Runtime.availableProcessors() and is configurable via InstanceConfig.setCooperativeThreadCount().

In most cases the only level of local parallelism that you’ll want to explicitly configure is 1 for the cases where no parallelism is desirable (e.g. on a source processor reading from a file).

The total parallelism of a vertex is the total number of its processors running in the whole cluster. In Jet this is always equal to local parallelism times cluster size because it allocates the same number of processors on each member. This number can be critical to the proper functioning of a vertex that represents a distributed, partitioned data source. If you let the total parallelism of such a vertex exceed the number of partitions in the data source, some processors will end up with no partitions assigned to them. This has serious consequences in an event time-based streaming job: Jet must produce a unique value of the watermark that doesn’t overtake the timestamps coming out of any processors. A processor with no source partitions will emit no data and its local watermark will permanently stay at the initial value of Long.MIN_VALUE. This will keep the global watermark from advancing until the idle partition detection mechanism kicks in. Jet will produce no output until this happens and the default "partition idle" timeout is 60 seconds.

9.1.3. Edge Ordinal

An edge is connected to a vertex at a given ordinal, which identifies it to the vertex and its processors. When a processor receives an item, it knows which ordinal it came from. Things are similar on the outbound side: the processor emits an item to a given ordinal, but also has the option to emit the same item to all ordinals. This is the most typical case and allows easy replication of a data stream across several edges.

When you use the between(a, b) edge factory, the edge will be connected at ordinal 0 at both ends. When you need a different ordinal, use the from(a, ord1).to(b, ord2) form. There must be no gaps in ordinal assignment, which means a vertex will have inbound edges with ordinals 0..N and outbound edges with ordinals 0..M.

This example shows the usage of between() and from().to() forms to build a DAG with one source feeding two computational vertices:

Vertex source = dag.newVertex("source",
        SourceProcessors.readFilesP(".", UTF_8, "*", false, (file, line) -> line)
);
Vertex toUpper = dag.newVertex("toUpper", mapP((String in) -> in.toUpperCase()));
Vertex toLower = dag.newVertex("toLower", mapP((String in) -> in.toLowerCase()));

dag.edge(between(source, toUpper));
dag.edge(from(source, 1).to(toLower));

9.1.4. Local and Distributed Edge

A major choice to make in terms of routing the data coming out of a processor is whether the candidate set of the target vertex’s processors is unconstrained, encompassing all its processors across the cluster, or constrained to just those running on the same cluster member. You control this with the distributed property of the edge. By default the edge is local and calling the distributed() method removes this restriction.

You can minimize network traffic by employing local edges. They are implemented with the most efficient kind of concurrent queue: single-producer, single-consumer array-backed queue. It employs wait-free algorithms on both sides and avoids even the latency of volatile writes by using lazySet.

A good example of employing a local-distributed edge combo is two-stage aggregation. Here’s how it looks on our Word Count example:

dag.edge(between(tokenize, accumulate)
           .partitioned(wholeItem(), HASH_CODE))
   .edge(between(accumulate, combine)
           .distributed()
           .partitioned(entryKey()));

Note that only the edge from accumulate to combine is distributed.

9.1.5. Routing Policies

The routing policy decides which of the processors in the candidate set to route each particular item to.

Unicast

This is the default routing policy, the one you get when you write

dag.edge(between(input, output));

For each item it chooses a single destination processor with no further restrictions on the choice. The only guarantee given by this policy is that exactly one processor will receive the item, but Jet also takes care to “spray” the items equally over all the reception candidates.

This choice makes sense when the data doesn’t have to be partitioned, usually implying a downstream vertex which can compute the result based on each item in isolation.

Isolated

This is a more restricted kind of unicast policy: any given downstream processor receives data from exactly one upstream processor. In some DAG setups you may need to apply selective backpressure to individual upstream processors, with this policy you can achieve it. If the connected vertices have equal parallelism, an isolated edge creates a one-to-one mapping between their processors, preserving the encounter order and partitioning.

Activate this policy by calling isolated() on the edge:

dag.edge(between(input, output).isolated());
Broadcast

A broadcasting edge sends each item to all candidate receivers. Due to the redundancy it creates, this kind of edge isn’t appropriate for the main data stream. Its purpose is dispatching a small amount of data to all the processors of a vertex, typically as a part of setting up before processing the data stream. For this reason a broadcast edge is often high-priority as well. For example, in a hash-join one vertex creates the lookup hashtable and sends the same instance to all the processors of the next vertex, the one that processes the main data stream.

Activate this policy by calling broadcast() on the edge:

dag.edge(between(input, output).broadcast());
Partitioned

A partitioned edge sends each item to the one processor responsible for the item’s partition ID. On a distributed edge, this processor will be unique across the whole cluster. On a local edge, each member will have its own processor for each partition ID.

Jet automatically assigns partitions to processors during job initialization. The number of partitions is fixed for the lifetime of a cluster and you configure it on the IMDG level with the system property hazelcast.partition.count. The number of partitions must not be less than the highest global parallelism of any vertex, otherwise some processors will get no partitions.

You can also refer to the Hazelcast Reference Manual for more details on partitioning in Hazelcast IMDG.

This is the default algorithm to determine the partition ID of an item:

  1. Apply the key extractor function defined on the edge to retrieve the partitioning key.

  2. Serialize the partitioning key to a byte array using Hazelcast serialization.

  3. Apply Hazelcast’s standard MurmurHash3-based algorithm to get the key’s hash value.

  4. Partition ID is the hash value modulo the number of partitions.

The above procedure is quite CPU-intensive, but has the crucial property of giving repeatable results across all cluster members, which may be running on disparate JVM implementations.

Another common choice is to use Java’s standard Object.hashCode(). It is often significantly faster and it’s safe to use on a local edge. On a distributed edge it is not a safe strategy in general because hashCode() 's contract does not require repeatable results across JVMs or even different instances of the same JVM version. If a given class’s Javadoc explicitly specifies the hashing function it uses, then its instances are safe to partition with hashCode().

You can provide your own implementation of Partitioner to gain full control over the partitioning strategy.

We use both partitioning strategies in the Word Count example:

dag.edge(between(tokenize, accumulate)
        .partitioned(wholeItem(), Partitioner.HASH_CODE))
   .edge(between(accumulate, combine)
           .distributed()
           .partitioned(entryKey()));

Note that the local-partitioned edge uses partitioning by hash code and the distributed edge uses the default Hazelcast partitioning. It turns out that in this particular example we could safely use the hash code both times, but we leave it like this for didactic purposes. Since much less data travels towards the combiner than towards the accumulator, the performance of the whole job is hardly affected by this choice.

All-To-One

The all-to-one routing policy is a special case of the partitioned policy which assigns the same partition ID to all items. allToOne(key) is semantically equivalent to partitioned(t → key), the former is slightly faster because Jet avoids recalculation of the partition ID for each stream item.

Two edges with equal key will go to the same partition and will be processed by the same member. If you have multiple all-to-one edges in the job or in multiple jobs, you should assign different key to each of them so that they all don’t run on the same member. On the other hand, if you have two all-to-one edges going to the same vertex, they should have the same key so that all the items are really processed by the same member.

You should always set the local parallelism of the target vertex to 1, otherwise there will be idle processors that never get any items.

On a local edge this policy doesn’t make sense since simply setting the local parallelism of the target vertex to 1 constrains the local choice to just one processor instance.

One case where the all-to-one routing is appropriate is global aggregation (without grouping). A single processor must see all the data. Here’s how we would set it between the first-stage and the second-stage aggregating vertex:

dag.edge(between(stage1, stage2).distributed().allToOne(123));

9.1.6. Priority

By default the processor receives items from all inbound edges as they arrive. However, there are important cases where an edge must be consumed in full to make the processor ready to accept data from other edges. A major example is a “hash join” which enriches the data stream with data from a lookup table. This can be modeled as a join of two data streams where the enriching stream contains the data for the lookup table and must be consumed in full before consuming the stream to be enriched.

The priority property controls the order of consuming the edges. Edges are sorted by their priority number (ascending) and consumed in that order. Edges with the same priority are consumed without particular ordering (as the data arrives).

We can see a prioritized edge in action in the TF-IDF example:

dag.edge(between(stopwordSource, tokenize).broadcast().priority(-1));

The tokenize vertex receives a set of stopwords and filters out their occurrences from the input text. It must receive the entire set before beginning to process the text.

A Fault Tolerance Caveat

As explained in the section on the Processor API, Jet takes regular snapshots of processor state when fault tolerance is enabled. A processor will get a special item in its input stream, called a barrier. When working in the exactly once mode, as soon as it receives it, it must stop pulling the data from that stream and continue pulling from all other streams until it receives the same barrier in all of them, and then emit its state to the snapshot storage. This is in direct contradiction with the contract of edge prioritization: the processor is not allowed to consume any other streams before having fully exhausted the prioritized ones.

For this reason Jet does not initiate a snapshot until all the higher-priority edges have been fully consumed.

Although strictly speaking this only applies to the exactly once mode, Jet postpones taking the snapshot in at least once mode as well. Even though the snapshot could begin early, it would still not be able to complete until the processor has consumed all the prioritized edges, started consuming non-prioritized ones, and received the barrier in all of them. The result would be many more items processed twice after the restart.

9.1.7. Fine-Tuning Edges

Edges can be configured with an EdgeConfig instance, which specifies additional fine-tuning parameters. For example,

dag.edge(between(tickerSource, generateTrades)
        .setConfig(new EdgeConfig().setQueueSize(512)));

Please refer to the Javadoc of EdgeConfig for details.

9.2. How to Build a DAG

9.2.1. Bounded Stream (Batch) DAG

Let’s use the Core API to build the Word Count DAG, already described in an earlier section:

Word Count DAG

We start by instantiating the DAG class and adding the source vertex:

DAG dag = new DAG();
Vertex source = dag.newVertex("source", SourceProcessors.readMapP("lines"));

Note how we can build the DAG outside the context of any running Jet instances: it is a pure POJO.

The source vertex will read the lines from the IMap and emit items of type Map.Entry<Integer, String> to the next vertex. The key of the entry is the line number, and the value is the line itself. The built-in map-reading processor will do just what we want: on each member it will read only the data local to that member.

The next vertex is the tokenizer, which does a simple "flat-mapping" operation (transforms one input item into zero or more output items). The low-level support for such a processor is a part of Jet’s library, we just need to provide the mapping function:

// (lineNum, line) -> words
Pattern delimiter = Pattern.compile("\\W+");
Vertex tokenize = dag.newVertex("tokenize",
        Processors.flatMapP((Entry<Integer, String> e) ->
                traverseArray(delimiter.split(e.getValue().toLowerCase()))
                        .filter(word -> !word.isEmpty()))
);

This creates a processor that applies the given function to each incoming item, obtaining zero or more output items, and emits them. Specifically, our processor accepts items of type Entry<Integer, String>, splits the entry value into lowercase words, and emits all non-empty words. The function must return a Traverser, which is a functional interface used to traverse a sequence of non-null items. Its purpose is equivalent to the standard Java Iterator, but avoids the cumbersome two-method API. Since a lot of support for cooperative multithreading in Hazelcast Jet deals with sequence traversal, this abstraction simplifies many of its aspects.

The next vertex will do the actual word count. We can use the built-in accumulateByKey processor for this:

// word -> (word, count)
Vertex accumulate = dag.newVertex("accumulate",
        Processors.accumulateByKeyP(singletonList(wholeItem()), counting())
);

This processor maintains a hashtable that maps each distinct key to its accumulated value. We specify wholeItem() as the key extractor function: our input item is just the word, which is also the grouping key. The second argument is the kind of aggregate operation we want to perform: counting. We are relying on Jet’s out-of-the-box definitions here, but it is easy to define your own aggregate operations and key extractors. The processor emits nothing until it has received all the input, and at that point it emits the hashtable as a stream of Entry<String, Long>.

Next is the combining step which computes the grand totals from individual members' contributions. This is the code:

// (word, count) -> (word, count)
Vertex combine = dag.newVertex("combine",
        Processors.combineByKeyP(counting(), Util::entry)
);

combineByKey is designed to be used downstream of accumulateByKey, which is why it doesn’t need an explicit key extractor. The aggregate operation must be the same as on accumulateByKey.

The final vertex is the sink; we want to store the output in another IMap:

Vertex sink = dag.newVertex("sink", SinkProcessors.writeMapP("counts"));

Now that we have all the vertices, we must connect them into a graph and specify the edge type as discussed in the previous section. Here’s the code:

dag.edge(between(source, tokenize))
   .edge(between(tokenize, accumulate)                            (1)
           .partitioned(wholeItem(), Partitioner.HASH_CODE))
   .edge(between(accumulate, combine)                             (2)
           .distributed()
           .partitioned(entryKey()))
   .edge(between(combine, sink));
1 Here we chose a local partitioned edge. For each word, there will be a processor responsible for it on each member so that no items must travel across the network. In the partitioned() call we specify two things: the function that extracts the partitioning key (wholeItem() — same as the grouping key extractor), and the policy object that decides how to compute the partition ID from the key. Here we use the built-in HASH_CODE, which will derive the ID from Object.hashCode(). As long as the definitions of equals()/hashCode() on the key object match our expected notion of key equality, this policy is always safe to use on a local edge.
2 is a distributed partitioned edge: for each word there is a single combiner processor in the whole cluster responsible for it and items will be sent over the network if needed. The partitioning key is again the word, but here it is the key part of the Map.Entry<String, Long>. We are using the default partitioning policy here (Hazelcast’s own partitioning scheme). It is the slower-but-safe choice on a distributed edge. Detailed inspection shows that hashcode-based partitioning would be safe as well because all of String, Long, and Map.Entry have the hash function specified in their Javadoc.

You can access a full, self-contained Java program with the above DAG code at the Hazelcast Jet Code Samples repository.

9.2.2. Unbounded Stream DAG

For this example we’ll build a simple Jet job that monitors trading events on a stock market, categorizes the events by stock ticker, and reports the number of trades per time unit (the time window). In terms of DAG design, not much changes going from batch to streaming. This is how it looks:

Trade monitoring DAG

We have the same cascade of source, two-stage aggregation, and sink. The source is the event journal of a Hazelcast IMap (we assume some other process continuously updates this map with trade events). On the sink side there’s another mapping vertex, format-output, that transforms the window result items into lines of text. The sink vertex writes these lines to a file.

Here’s the DAG-building code in full:

ToLongFunctionEx<? super Trade> timestampFn = Trade::timestamp;
FunctionEx<? super Trade, ?> keyFn = Trade::productId;
SlidingWindowPolicy winPolicy = slidingWinPolicy(
        SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS);

DAG dag = new DAG();
Vertex tradeSource = dag.newVertex("trade-source",
        SourceProcessors.<Trade, Long, Trade>streamMapP(
                TRADES_MAP_NAME,
                alwaysTrue(),                              (1)
                EventJournalMapEvent::getNewValue,         (1)
                JournalInitialPosition.START_FROM_OLDEST,  (2)
                eventTimePolicy(
                        timestampFn,                       (3)
                        limitingLag(SECONDS.toMillis(3)),  (4)
                        winPolicy.frameSize(),             (5)
                        winPolicy.frameOffset(),
                        SECONDS.toMillis(3)                (6)
                )));
Vertex slidingStage1 = dag.newVertex("sliding-stage-1",
        Processors.accumulateByFrameP(
                singletonList(keyFn),
                singletonList(timestampFn),
                TimestampKind.EVENT,
                winPolicy, counting()
        ));
Vertex slidingStage2 = dag.newVertex("sliding-stage-2",
    Processors.combineToSlidingWindowP(winPolicy, counting(),
            KeyedWindowResult::new));
Vertex formatOutput = dag.newVertex("format-output", mapUsingContextP(    (7)
    ContextFactory.withCreateFn(x -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS")),
    (DateTimeFormatter timeFormat, KeyedWindowResult<String, Long> kwr) ->
        String.format("%s %5s %4d",
            timeFormat.format(Instant.ofEpochMilli(kwr.end())
                                     .atZone(ZoneId.systemDefault())),
                kwr.getKey(), kwr.getValue())
));
Vertex sink = dag.newVertex("sink", SinkProcessors.writeFileP(
        OUTPUT_DIR_NAME, Object::toString, UTF_8, false));

tradeSource.localParallelism(1);

return dag
        .edge(between(tradeSource, slidingStage1)
                .partitioned(keyFn, HASH_CODE))
        .edge(between(slidingStage1, slidingStage2)
                .partitioned(entryKey(), HASH_CODE)
                .distributed())
        .edge(between(slidingStage2, formatOutput)
                .isolated())
        .edge(between(formatOutput, sink));

You can see quite a lot of code going into the setup of the streaming source. Let’s zoom in on it:

1 filtering and mapping functions we supply directly to the source. Hazelcast IMDG will apply them before serializing and sending to Jet so this saves network traffic and CPU.
2 where to start from in map’s event journal: the oldest entry still available.
3 function to apply to the event object to get its timestamp.
4 watermark policy. Here we use the simplest kind, limitingLag, which will make the watermark lag behind the top observed event timestamp by the fixed amount we specified (3 seconds).
5 watermark emission policy that tells Jet when to actually send a watermark event. Since the sliding window processor ignores all watermark events that belong to the same frame, we configure a matching policy that emits only one watermark per frame.
6 partition idle timeout. This is a countermeasure to stalling problems that occur when some of the IMap partitions don’t receive any updates. Due to watermark coalescing this could stall the entire stream, but with this setting a partition will be marked as idle after 3 seconds of inactivity and then the rest of the system behaves as if it didn’t exist.
7 Here we use mapUsingContextP which allows us to create an object available to the processor at a late point, after all the job serialization-deserialization is done. In this case we need it because the Java 8 DateTimeFormatter isn’t serializable.

The full code of this sample is in StockExchangeCoreApi.java and running it you’ll get an endless stream of data accumulating on the disk. To spare your filesystem we’ve limited the execution time to 10 seconds.

9.2.3. Advanced Batch DAG — Inverted TF-IDF Index

In this tutorial we’ll explore what the Core API DAG model offers beyond the capabilities of the Pipeline API. Our DAG will feature splits, joins, broadcast, and prioritized edges. We’ll access data from the file system and show a simple technique to distribute file reading across Jet members. Several vertices we use can’t be implemented in terms of out-of-the-box processors, so we’ll also show you how to implement your own with minimum boilerplate.

The full code is available at the hazelcast-jet-code-samples repository:

Let us first introduce the problem. The inverted index is a basic data structure in the domain of full-text search. First used in the 1950s, it is still at the core of modern information retrieval systems such as Lucene. The goal is to be able to quickly find the documents that contain a given set of search terms, and to sort them by relevance. To understand it we’ll need to throw in some terminology.

  • A document is treated as a list of words that has a unique ID. It is useful to define the notion of a document index which maps each document ID to the list of words it contains. We won’t build this index; it’s just for the sake of explanation.

  • The inverted index is the inverse of the document index: it maps each word to the list of documents that contain it. This is the fundamental building block in our search algorithm: it will allow us to find in O(1) time all documents relevant to a search term.

  • In the inverted index, each entry in the list is assigned a TF-IDF score which quantifies how relevant the document is to the search request.

  • Let DF (document frequency) be the length of the list: the number of documents that contain the word.

  • Let D be the total number of documents that were indexed.

  • IDF (inverse document frequency) is equal to log(D/DF).

  • TF (term frequency) is the number of occurrences of the word in the document.

  • TF-IDF score is simply the product of TF * IDF.

Note that IDF is a property of the word itself: it quantifies the relevance of each entered word to the search request as a whole. The list of entered words can be perceived as a list of filtering functions that we apply to the full set of documents. A more relevant word will apply a stronger filter. Specifically, common words like “the”, “it”, “on” act as pure "pass-through" filters and consequently have an IDF of zero, making them completely irrelevant to the search.

TF, on the other hand, is the property of the combination of word and document, and tells us how relevant the document is to the word, regardless of the relevance of the word itself.

When the user enters a search phrase:

  1. each individual term from the phrase is looked up in the inverted index;

  2. an intersection is found of all the lists, resulting in the list of documents that contain all the words;

  3. each document is scored by summing the TF-IDF contributions of each word;

  4. the result list is sorted by score (descending) and presented to the user.

Let’s have a look at a specific search phrase:

the man in the black suit murdered the king

The list of documents that contain all the above words is quite long…​ how do we decide which are the most relevant? The TF-IDF logic will make those stand out that have an above-average occurrence of words that are generally rare across all documents. For example, “murdered” occurs in far fewer documents than “black”…​ so given two documents where one has the same number of “murdered” as the other one has of “black”, the one with “murdered” wins because its word is more salient in general. On the other hand, “suit” and “king” might have a similar IDF, so the document that simply contains more of both wins.

Also note the limitation of this technique: a phrase is treated as just the sum of its parts; a document may contain the exact phrase and this will not affect its score.

Building the Inverted Index with Java Streams

To warm us up, let’s see what it takes to build the inverted index with just thread parallelism and without the ability to scale out across many machines. It is expressible in Java Streams API without too much work. The full code is here.

We’ll start by preparing a Stream<Entry<Long, String>> docWords: a stream of all the words found in all the documents. We use Map.Entry as a holder of a pair of values (a 2-tuple) and here we have a pair of Long docId and String word:

Stream<Entry<Long, String>> docWords = docId2Name
        .entrySet()
        .parallelStream()
        .flatMap(TfIdfJdkStreams::docLines)
        .flatMap(this::tokenize);

We know the number of all documents so we can compute double logDocCount, the logarithm of the document count:

final double logDocCount = Math.log(docId2Name.size());

Calculating TF is very easy, just count the number of occurrences of each distinct pair and save the result in a Map<Entry<Long, String>, Long>:

Map<Entry<Long, String>, Long> tfMap = docWords
        .parallel()
        .collect(groupingBy(identity(), counting()));

And now we build the inverted index. We start from tfMap, group by word, and the list under each word already matches our final product: the list of all the documents containing the word. We finish off by applying a transformation to the list: currently it’s just the raw entries from the tf map, but we need pairs (docId, tfIDfScore).

invertedIndex = tfMap
    .entrySet()
    .parallelStream()
    .collect(groupingBy(
        e -> e.getKey().getValue(),
        collectingAndThen(
            toList(),
            entries -> {
                double idf = logDocCount - Math.log(entries.size());
                return entries.stream().map(e ->
                        tfidfEntry(e, idf)).collect(toList());
            }
        )
    ));

The search function can be implemented with another Streams expression, which you can review in the SearchGui class. You can also run the TfIdfJdkStreams class and take the inverted index for a spin, making actual searches.

There is one last concept in this model that we haven’t mentioned yet: the stopword set. It contains those words that are known in advance to be common enough to occur in every document. Without treatment, these words are the worst case for the inverted index: the document list under each such word is the longest possible, and the score of all documents is zero due to zero IDF. They raise the index’s memory footprint without providing any value. The cure is to prepare a file, stopwords.txt, which is read in advance into a Set<String> and used to filter out the words in the tokenization phase. The same set is used to cross out words from the user’s search phrase, as if they weren’t entered. We’ll add this feature to our DAG based model in the following section.

Translating to Jet DAG

Our DAG as a whole will look relatively complex, but it can be understood as a “backbone” (cascade of vertices) starting from a source and ending in a sink with several more vertices attached on the side. This is just the backbone:

Backbone of the TF-IDF DAG
  1. The data source is a Hazelcast IMap which holds a mapping from document ID to its filename. The source vertex will emit all the map’s entries, but only a subset on each cluster member.

  2. doc-lines opens each file named by the map entry and emits all its lines in the (docId, line) format.

  3. tokenize transforms each line into a sequence of its words, again paired with the document ID, so it emits (docId, word).

  4. tf builds a set of all distinct pairs emitted from tokenize and maintains the count of each pair’s occurrences (its TF score).

  5. tf-idf takes that set, groups the pairs by word, and calculates the TF-IDF scores. It emits the results to the sink, which saves them to a distributed IMap.

Edge types follow the same pattern as in the word-counting job: after flatmapping there is first a local, then a distributed partitioned edge. The logic behind it is not the same, though: TF can actually compute the final TF scores by observing just the local data. This is because it treats each document separately (document ID is a part of the grouping key) and the source data is already partitioned by document ID. The TF-IDF vertex does something similar to word count’s combining, but there’s again a twist: it will group the TF entries by word, but instead of just merging them into a single result per word, it will keep them all in lists.

To this cascade we add a stopword-source which reads the stopwords file, parses it into a HashSet, and sends the whole set as a single item to the tokenize vertex. We also add a vertex that takes the data from doc-source and simply counts its items; this is the total document count used in the TF-IDF formula. We end up with this DAG:

The TF-IDF DAG

The choice of edge types into and out of doc-count may look surprising, so let’s examine it. We start with the doc-source vertex, which emits one item per document, but its output is distributed across the cluster. To get the full document count on each member, each doc-count processor must get all the items, and that’s just what the distributed broadcast edge will achieve. We’ll configure doc-count with local parallelism of 1, so there will be one processor on every member, each observing all the doc-source items. The output of doc-count must reach all tf-idf processors on the same member, so we use the local broadcast edge.

Another thing to note are the two flat-mapping vertices: doc-lines and tokenize. From a purely semantic standpoint, composing flatmap with flatmap yields just another flatmap. As we’ll see below, we’re using custom code for these two processors…​ so why did we choose to separate the logic this way? There are actually two good reasons. The first one has to do with Jet’s cooperative multithreading model: doc-lines makes blocking file IO calls, so it must be declared non-cooperative; tokenization is pure computation so it can be in a cooperative processor. The second one is more general: the workload of doc-lines is very uneven. It consists of waiting, then suddenly coming up with a whole block of data. If we left tokenization there, performance would suffer because first the CPU would be forced to sit idle, then we’d be late in making the next IO call while tokenizing the input. The separate vertex can proceed at full speed all the time.

Implementation Code

As we announced, some of the processors in our DAG will need custom implementation code. Let’s start from the source vertex. It is easy, just the standard IMap reader:

dag.newVertex("doc-source", readMapP(DOCID_NAME));

The stopwords-producing processor has custom code, but it’s quite simple:

dag.newVertex("stopword-source", StopwordsP::new);
private static class StopwordsP extends AbstractProcessor {
    @Override
    public boolean complete() {
        return tryEmit(docLines("stopwords.txt").collect(toSet()));
    }
}

Since this is a source processor, all its action happens in complete(). It emits a single item: the HashSet built directly from the text file’s lines.

The doc-count processor can be built from the primitives provided in Jet’s library:

dag.newVertex("doc-count", Processors.aggregateP(counting()));

The doc-lines processor is more of a mouthful, but still built from existing primitives:

Vertex docLines = dag.newVertex("doc-lines", flatMapUsingContextP(
        ContextFactory.withCreateFn(jet -> null).toNonCooperative(),
        (Object ctx, Entry<Long, String> e) ->
        traverseStream(docLines("books/" + e.getValue())
            .map(line -> entry(e.getKey(), line)))));

Let’s break down this expression…​ Processors.flatMap returns a standard processor that emits an arbitrary number of items for each received item. We already saw one in the introductory Word Count example. There we created a traverser from an array, here we create it from a Java stream. We additionally apply the nonCooperative() wrapper which will declare all the created processors non-cooperative. We already explained why we do this: this processor will make blocking I/O calls.

tokenize is another custom vertex:

dag.newVertex("tokenize", TokenizeP::new);
private static class TokenizeP extends AbstractProcessor {
    private Set<String> stopwords;
    private final FlatMapper<Entry<Long, String>, Entry<Long, String>>
            flatMapper = flatMapper(e -> traverseStream(
            Arrays.stream(DELIMITER.split(e.getValue().toLowerCase()))
                  .filter(word -> !stopwords.contains(word))
                  .map(word -> entry(e.getKey(), word))));

    @Override
    @SuppressWarnings("unchecked")
    protected boolean tryProcess0(@Nonnull Object item) {
        stopwords = (Set<String>) item;
        return true;
    }

    @Override
    @SuppressWarnings("unchecked")
    protected boolean tryProcess1(@Nonnull Object item) {
        return flatMapper.tryProcess((Entry<Long, String>) item);
    }
}

This is a processor that must deal with two different inbound edges. It receives the stopword set over edge 0 and then it does a flatmapping operation on edge 1. The logic presented here uses the same approach as the implementation of the provided Processors.flatMap() processor: there is a single instance of FlatMapper that holds the business logic of the transformation, and tryProcess1() directly delegates into it. If FlatMapper is done emitting the previous items, it will accept the new item, apply the user-provided transformation, and start emitting the output items. If the outbox refuses a pending item, it will return false, which will make the framework call the same tryProcess1() method later, with the same input item.

Let’s show the code that creates the tokenize 's two inbound edges:

dag.edge(between(stopwordSource, tokenize).broadcast().priority(-1))
   .edge(from(docLines).to(tokenize, 1));

Especially note the .priority(-1) part: this ensures that there will be no attempt to deliver any data coming from docLines before all the data from stopwordSource is already delivered. The processor would fail if it had to tokenize a line before it has its stopword set in place.

tf is another simple vertex, built purely from the provided primitives:

dag.newVertex("tf", aggregateByKeyP(
        singletonList(wholeItem()), counting(), Util::entry));

tf-idf is the most complex processor:

dag.newVertex("tf-idf", TfIdfP::new);
private static class TfIdfP extends AbstractProcessor {
    private double logDocCount;

    private final Map<String, List<Entry<Long, Double>>> wordDocTf =
            new HashMap<>();
    private final Traverser<Entry<String, List<Entry<Long, Double>>>>
            invertedIndexTraverser = lazy(() ->
            traverseIterable(wordDocTf.entrySet())
                    .map(this::toInvertedIndexEntry));

    @Override
    protected boolean tryProcess0(@Nonnull Object item) {
        logDocCount = Math.log((Long) item);
        return true;
    }

    @Override
    @SuppressWarnings("unchecked")
    protected boolean tryProcess1(@Nonnull Object item) {
        Entry<Entry<Long, String>, Long> e =
                (Entry<Entry<Long, String>, Long>) item;
        long docId = e.getKey().getKey();
        String word = e.getKey().getValue();
        long tf = e.getValue();
        wordDocTf.computeIfAbsent(word, w -> new ArrayList<>())
                 .add(entry(docId, (double) tf));
        return true;
    }

    @Override
    public boolean complete() {
        return emitFromTraverser(invertedIndexTraverser);
    }

    private Entry<String, List<Entry<Long, Double>>> toInvertedIndexEntry(
            Entry<String, List<Entry<Long, Double>>> wordDocTf
    ) {
        String word = wordDocTf.getKey();
        List<Entry<Long, Double>> docidTfs = wordDocTf.getValue();
        return entry(word, docScores(docidTfs));
    }

    private List<Entry<Long, Double>> docScores(
            List<Entry<Long, Double>> docidTfs
    ) {
        double logDf = Math.log(docidTfs.size());
        return docidTfs.stream()
                       .map(tfe -> tfidfEntry(logDf, tfe))
                       .collect(toList());
    }

    private Entry<Long, Double> tfidfEntry(
            double logDf, Entry<Long, Double> docidTf
    ) {
        Long docId = docidTf.getKey();
        double tf = docidTf.getValue();
        double idf = logDocCount - logDf;
        return entry(docId, tf * idf);
    }
}

This is quite a lot of code, but each of the three pieces is not too difficult to follow:

  1. tryProcess0() accepts a single item, the total document count.

  2. tryProcess1() performs a boilerplate group-and-aggregate operation, collecting a list of items under each key.

  3. complete() outputs the accumulated results, also applying the final transformation on each one: replacing the TF score with the final TF-IDF score. It relies on a lazy traverser, which holds a Supplier<Traverser> and will obtain the inner traverser from it the first time next() is called. This makes it very simple to write code that obtains a traverser from a map after it has been populated.

Finally, our DAG is terminated by a sink vertex:

dag.newVertex("sink", SinkProcessors.writeMapP(INVERTED_INDEX));

9.3. Processor

Processor is the main type whose implementation is up to the user of the Core API: it contains the code of the computation to be performed by a vertex. There are a number of Processor building blocks in the Core API which allow you to just specify the computation logic, while the provided code handles the processor’s cooperative behavior. Please refer to the AbstractProcessor section.

A processor’s work can be conceptually described as follows: "receive data from zero or more input streams and emit data into zero or more output streams." Each stream maps to a single DAG edge (either inbound or outbound). There is no requirement on the correspondence between input and output items; a processor can emit any data it sees fit, including none at all. The same Processor abstraction is used for all kinds of vertices, including sources and sinks.

The implementation of a processor can be stateful and does not need to be thread-safe because Jet guarantees to use the processor instances from one thread at a time, although not necessarily always the same thread.

9.3.1. Cooperativeness

Processor instances are cooperative by default. The processor can opt out of cooperative multithreading by overriding isCooperative() to return false. Jet will then start a dedicated thread for it.

To maintain an overall good throughput, a cooperative processor must take care not to hog the thread for too long (a rule of thumb is up to a millisecond at a time). Jet’s design strongly favors cooperative processors and most processors can and should be implemented to fit these requirements. The major exception are sources and sinks because they often have no choice but calling into blocking I/O APIs.

9.3.2. The Outbox

The processor sends its output items to its Outbox which has a separate bucket for each outbound edge. The buckets have limited capacity and will refuse an item when full. A cooperative processor should be implemented such that when the outbox refuses its item, it saves its processing state and returns from the processing method. The execution engine will then drain the outbox buckets.

9.3.3. Data Processing Callbacks

process(ordinal, inbox)

Jet passes the items received over a given edge to the processor by calling process(ordinal, inbox). All items received since the last process() call are in the inbox, but also all the items the processor hasn’t removed in a previous process() call. There is a separate instance of Inbox for each inbound edge, so any given process() call involves items from only one edge.

The processor must not remove an item from the inbox until it has fully processed it. This is important with respect to the cooperative behavior: the processor may not be allowed to emit all items corresponding to a given input item and may need to return from the process() call early, saving its state. In such a case the item should stay in the inbox so Jet knows the processor has more work to do even if no new items are received.

tryProcessWatermark(watermark)

When new highest watermark is received from all input edges and all input processor instances, the tryProcessWatermark(watermark) method is called. The watermark value is always greater than in the previous call.

The implementation may choose to process only partially and return false, in which case it will be called again later with the same timestamp before any other processing method is called. When the method returns true, the watermark is forwarded to the downstream processors.

tryProcess()

If a processor’s inbox is empty, Jet will call its tryProcess() method instead. This allows the processor to perform work that is not input data-driven. The method has a boolean return value and if it returns false, it will be called again before any other methods are called. This way it can retry emitting its output until the outbox accepts it.

An important use case for this method is the emission of watermark items. A job that processes an infinite data stream may experience occasional lulls - periods with no items arriving. On the other hand, a windowing processor is not allowed to act upon each item immediately due to event skew; it must wait for a watermark item to arrive. During a stream lull this becomes problematic because the watermark itself is primarily data-driven and advances in response to the observation of event timestamps. The watermark-inserting processor must be able to advance the watermark even during a stream lull, based on the passage of wall-clock time, and it can do it inside the tryProcess() method.

complete()

Jet calls complete() when all the input edges are exhausted. It is the last method to be invoked on the processor before disposing of it. Typically this is where a batch processor emits the results of an aggregating operation. If it can’t emit everything in a given call, it should return false and will be called again later.

9.3.4. Snapshotting Callbacks

Hazelcast Jet supports fault-tolerant processing jobs by taking distributed snapshots. In regular time intervals each of the source vertices will perform a snapshot of its own state and then emit a special item to its output stream: a barrier. The downstream vertex that receives the barrier item makes its own snapshot and then forwards the barrier to its outbound edges, and so on towards the sinks.

At the level of the Processor API the barrier items are not visible; ProcessorTasklet handles them internally and invokes the snapshotting callback methods described below.

saveToSnapshot()

Jet will call saveToSnapshot() when it determines it’s time for the processor to save its state to the current snapshot. Except for source vertices, this happens when the processor has received the barrier item from all its inbound streams and processed all the data items preceding it. The method must emit all its state to the special snapshotting bucket in the Outbox, by calling outbox.offerToSnapshot(). If the outbox doesn’t accept all the data, it must return false to be called again later, after the outbox has been flushed.

When this method returns true, ProcessorTasklet will forward the barrier item to all the outbound edges.

restoreFromSnapshot()

When a Jet job is restarting after having been suspended, it will first reload all the state from the last successful snapshot. Each processor will get its data through the invocations of restoreFromSnapshot(). Its parameter is the Inbox filled with a batch of snapshot data. The method will be called repeatedly until it consumes all the snapshot data.

finishSnapshotRestore()

After it has delivered all the snapshot data to restoreFromSnapshot(), Jet will call finishSnapshotRestore(). The processor may use it to initialize some transient state from the restored state.

9.3.5. Best Practice: Document At-Least-Once Behavior

As we discuss in the Jet Concepts chapter, the behavior of a processor under at-least-once semantics can deviate from correctness in extremely non-trivial and unexpected ways. Therefore the processor should always document its possible behaviors for that case.

9.4. AbstractProcessor

AbstractProcessor is a convenience class designed to deal with most of the boilerplate in implementing the full Processor API.

9.4.1. Receiving items

On the reception side the first line of convenience are the tryProcessN() methods. While in the inbox the watermark and data items are interleaved, these methods take care of the boilerplate needed to filter out the watermarks. Additionally, they get one item at a time, eliminating the need to write a suspendable loop over the input items.

There is a separate method specialized for each edge from 0 to 4 (tryProcess0..tryProcess4) and a catch-all method tryProcess(ordinal, item). If the processor doesn’t need to distinguish between the inbound edges, the latter method is a good match; otherwise, it is simpler to implement one or more of the ordinal-specific methods. The catch-all method is also the only way to access inbound edges beyond ordinal 4, but such cases are very rare in practice.

Paralleling the above there are tryProcessWm(ordinal, wm) and tryProcessWmN(wm) methods that get just the watermark items.

9.4.2. Emitting items

AbstractProcessor has a private reference to its outbox and lets you access all its functionality indirectly. The tryEmit() methods offer your items to the outbox. If you get a false return value, you must stop emitting items and return from the current callback method of the processor. For example, if you called tryEmit() from tryProcess0(), you should return false so Jet will call tryProcess0() again later, when there’s more room in the outbox. Similar to these methods there are tryEmitToSnapshot() and emitFromTraverserToSnapshot(), to be used from the saveToSnapshot() callback.

Implementing a processor to respect the above rule is quite tricky and error-prone. Things get especially tricky when there are several items to emit, such as:

  • when a single input item maps to many output items

  • when the processor performs a group-by-key operation and emits each group as a separate item

You can avoid most of the complexity if you wrap all your output in a Traverser. Then you can simply say return emitFromTraverser(myTraverser). It will:

  1. try to emit as many items as possible

  2. return false if the outbox refuses an item

  3. hold on to the refused item and continue from it when it’s called again with the same traverser.

There is one more layer of convenience relying on emitFromTraverser(): the nested class FlatMapper, which makes it easy to implement a flatmapping kind of transformation. It automatically handles the concern of creating a new traverser when the previous one is exhausted and reusing the previous one until exhausted.

9.4.3. Traverser

Traverser is a very simple functional interface whose shape matches that of a Supplier, but with a contract specialized for the traversal over a sequence of non-null items: each call to its next() method returns another item of the sequence until exhausted, then keeps returning null. A traverser may also represent an infinite, non-blocking stream of items: it may return null when no item is currently available, then later return more items.

The point of this type is the ability to implement traversal over any kind of dataset or lazy sequence with minimum hassle: often just by providing a one-liner lambda expression. This makes it very easy to integrate with Jet’s convenience APIs for cooperative processors.

Traverser also supports some default methods that facilitate building a simple transformation layer over the underlying sequence: map, filter, flatMap, etc.

The following example shows how you can implement a simple flatmapping processor:

class ItemAndSuccessorP extends AbstractProcessor {
    private final FlatMapper<Integer, Integer> flatMapper =
            flatMapper(i -> traverseIterable(asList(i, i + 1)));

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        return flatMapper.tryProcess((int) item);
    }
}

For each received Integer this processor emits the number and its successor. If the outbox refuses an item, flatMapper.tryProcess() returns false and stays ready to resume the next time it is invoked. The fact that it returned false signals Jet to invoke ItemAndSuccessorP.tryProcess() again with the same arguments.

9.5. Watermark Policy

As mentioned in the Time Ordering and the Watermark chapter, determining the watermark is somewhat of a black art; it’s about superimposing order over a disordered stream of events. We must decide at which point it stops making sense to wait even longer for data about past events to arrive. There’s a tension between two opposing forces here:

  • wait as long as possible to account for all the data;

  • get results as soon as possible.

While there are ways to (kind of) achieve both, there’s a significant associated cost in terms of complexity and overall performance. Hazelcast Jet takes a simple approach and strictly triages stream items into “still on time” and “late”, discarding the latter.

WatermarkPolicy is the abstraction that computes the value of the watermark for a (sub)stream of disordered data items. It takes as input the timestamp of each observed item and outputs the current watermark value.

9.5.1. Predefined Watermark Policy

We provide a simple, data-agnostic watermark policy, the limitingLag() policy. This policy will maintain a watermark that lags behind the highest observed event timestamp by a configured amount. In other words, each time an event with the highest timestamp so far is encountered, this policy advances the watermark to eventTimestamp - lag. This puts a limit on the spread between timestamps in the stream: all events whose timestamp is more than the configured lag behind the highest timestamp are considered late.

9.5.2. Watermark Throttling

The policy objects presented above will return the “ideal” watermark value according to their logic; however it would be too much overhead to insert a watermark item each time the ideal watermark advances (typically every millisecond). The EventTimePolicy.watermarkThrottlingFrameSize() and EventTimePolicy.watermarkThrottlingFrameOffset() are the parameters that control the watermark interval.

Since the watermark is only needed for windowing aggregation, we can emit only those watermarks, that will cause a new window to be emitted. In case of sliding windows, the throttling frame size and offset should correspond to the sliding step and frame offset of the sliding window. Session windows can emit windows at any moment, using high throttling frame size will increase latency as the event time only advances with the arrival of a newer watermark. For session window in the Pipeline API, we use 100ms as the throttling frame size or less, if the session timeout is short.

9.6. Vertices in the Library

While formally there’s only one kind of vertex in Jet, in practice there is an important distinction between the following:

  • A source is a vertex with no inbound edges. It injects data from the environment into the Jet job.

  • A sink is a vertex with no outbound edges. It drains the output of the Jet job into the environment.

  • A computational vertex has both kinds of edges. It accepts some data from upstream vertices, performs some computation, and emits the results to downstream vertices. Typically it doesn’t interact with the environment.

The com.hazelcast.jet.processor package contains static utility classes with factory methods that return suppliers of processors, as required by the dag.newVertex(name, procSupplier) calls. There is a convention in Jet that every module containing vertex implementations contributes a utility class to the same package. Inspecting the contents of this package in your IDE should allow you to discover all vertex implementations available on the project’s classpath. For example, there are modules that connect to 3rd party resources like Kafka and Hadoop Distributed File System (HDFS). Each such module declares a class in the same package, com.hazelcast.jet.processor, exposing the module’s source and sink definitions.

The main factory class for the source vertices provided by the Jet core module is SourceProcessors. It contains sources that ingest data from Hazelcast IMDG structures like IMap, ICache, IList, etc., as well as some simple sources that get data from files and TCP sockets (readFiles, streamSocket and some more).

Paralleling the sources there’s SinkProcessors for the sink vertices, supporting the same range of resources (IMDG, files, sockets). There’s also a general writeBuffered method that takes some boilerplate out of writing custom sinks. The user must implement a few primitives: create a new buffer, add an item to it, flush the buffer. The provided code takes care of integrating these primitives into the Processor API (draining the inbox into the buffer and handling the general lifecycle).

Finally, the computational vertices are where the main action takes place. The main class with factories for built-in computational vertices is Processors.

9.7. Implement a Custom Source or Sink Vertex

The Hazelcast Jet distribution contains vertices for the sources and sinks exposed through the Pipeline API. You can extend Jet’s support for sources and sinks by writing your own vertex implementations.

One of the main concerns when implementing a source vertex is that the data source is typically distributed across multiple machines and partitions, and the work needs to be distributed across multiple members and processors.

9.7.1. How Jet Creates and Initializes a Job

These are the steps taken to create and initialize a Jet job:

  1. The user builds the DAG and submits it to the local Jet client instance.

  2. The client instance serializes the DAG and sends it to a member of the Jet cluster. This member becomes the coordinator for this Jet job.

  3. The coordinator deserializes the DAG and builds an execution plan for each member.

  4. The coordinator serializes the execution plans and distributes each to its target member.

  5. Each member acts upon its execution plan by creating all the needed tasklets, concurrent queues, network senders/receivers, etc.

  6. The coordinator sends the signal to all members to start job execution.

The most visible consequence of the above process is the ProcessorMetaSupplier type: one must be provided for each Vertex. In Step 3, the coordinator deserializes the meta-supplier as a constituent of the DAG and asks it to create ProcessorSupplier instances which go into the execution plans. A separate instance of ProcessorSupplier is created specifically for each member’s plan. In Step 4, the coordinator serializes these and sends each to its member. In Step 5 each member deserializes its ProcessorSupplier and asks it to create as many Processor instances as configured by the vertex’s localParallelism property.

This process is so involved because each Processor instance may need to be differently configured. This is especially relevant for processors driving a source vertex: typically each one will emit only a slice of the total data stream, as appropriate to the partitions it is in charge of.

ProcessorMetaSupplier

The client serializes an instance of ProcessorMetaSupplier as part of each Vertex in the DAG. The coordinator member deserializes the instance and uses it to create ProcessorSupplier s by calling the ProcessorMetaSupplier.get() method. Before that, coordinator calls the init() method with a context object that you can use to get useful information. The get() method takes List<Address> as a parameter, which you should use to determine cluster members that will run the job, if needed.

ProcessorSupplier

Usually this type will be custom-implemented in the same cases where the meta-supplier is custom-implemented and will complete the logic of a distributed data source’s partition assignment. It creates instances of Processor ready to start executing the vertex’s logic.

Another use is to open and close external resources, such as files or connections. We provide CloseableProcessorSupplier for this.

9.7.2. Example - Distributed Integer Generator

Let’s say we want to write a simple source that will generate numbers from 0 to 1,000,000 (exclusive). It is trivial to write a single Processor which can do this using java.util.stream and Traverser

class GenerateNumbersP extends AbstractProcessor {

    private final Traverser<Integer> traverser;

    GenerateNumbersP(int upperBound) {
        traverser = Traversers.traverseStream(range(0, upperBound).boxed());
    }

    @Override
    public boolean complete() {
        return emitFromTraverser(traverser);
    }
}

Now we can build our DAG and execute it:

JetInstance jet = Jet.newJetInstance();

int upperBound = 10;
DAG dag = new DAG();
Vertex generateNumbers = dag.newVertex("generate-numbers",
        () -> new GenerateNumbersP(upperBound));
Vertex logInput = dag.newVertex("log-input",
        DiagnosticProcessors.writeLoggerP(i -> "Received number: " + i));
dag.edge(Edge.between(generateNumbers, logInput));

try {
    jet.newJob(dag).join();
} finally {
    Jet.shutdownAll();
}

When you run this code, you will see the output as below:

Received number: 4
Received number: 0
Received number: 3
Received number: 2
Received number: 2
Received number: 2

Since we are using the default parallelism setting on this vertex, several instances of the source processor were created, all of which generated the same sequence of values. Generally we want the ability to parallelize the source vertex, so we have to make each processor emit only a slice of the total data set.

So far we’ve used the simplest approach to creating processors: a DistributeSupplier<Processor> function that keeps returning equal instances of processors. Now we’ll step up to Jet’s custom interface that gives us the ability to provide a list of separately configured processors: ProcessorSupplier and its method get(int processorCount).

First we must decide on a partitioning policy: what subset will each processor emit. In our simple example we can use a simple policy: we’ll label each processor with its index in the list and have it emit only those numbers n that satisfy n % processorCount processorIndex. Let’s write a new constructor for our processor which implements this partitioning logic:

GenerateNumbersP(int upperBound, int processorCount, int processorIndex) {
    traverser = Traversers.traverseStream(
            range(0, upperBound)
                     .filter(n -> n % processorCount == processorIndex)
                     .boxed());

}

Given this preparation, implementing ProcessorSupplier is easy:

class GenerateNumbersPSupplier implements ProcessorSupplier {

    private final int upperBound;

    GenerateNumbersPSupplier(int upperBound) {
        this.upperBound = upperBound;
    }

    @Override @Nonnull
    public List<? extends Processor> get(int processorCount) {
        return
                range(0, processorCount)
                .mapToObj(index -> new GenerateNumbersP(
                        upperBound, processorCount, index))
                .collect(toList());
    }
}

Let’s use the custom processor supplier in our DAG-building code:

DAG dag = new DAG();
Vertex generateNumbers = dag.newVertex("generate-numbers",
        new GenerateNumbersPSupplier(10));
Vertex logInput = dag.newVertex("log-input",
        DiagnosticProcessors.writeLoggerP(i -> "Received number: " + i));
dag.edge(Edge.between(generateNumbers, logInput));

Now we can re-run our example and see that each number indeed occurs only once. However, note that we are still working with a single-member Jet cluster; let’s see what happens when we add another member:

JetInstance jet = Jet.newJetInstance();
Jet.newJetInstance();

int upperBound = 10;
DAG dag = new DAG();
// rest of the code same as above

Running after this change we’ll see that both members are generating the same set of numbers. This is because ProcessorSupplier is instantiated independently for each member and asked for the same number of processors, resulting in identical processors on all members. We have to solve the same problem as we just did, but at the higher level of cluster-wide parallelism. For that we’ll need the ProcessorMetaSupplier: an interface which acts as a factory of ProcessorSupplier s, one for each cluster member. Under the hood it is actually always the meta-supplier that’s created by the DAG-building code; the above examples are just implicit about it for the sake of convenience. They result in a simple meta-supplier that reuses the provided suppliers everywhere.

The meta-supplier is a bit trickier to implement because its method takes a list of Jet member addresses instead of a simple count, and the return value is a function from address to ProcessorSupplier. In our case we’ll treat the address as just an opaque ID and we’ll build a map from address to a properly configured ProcessorSupplier. Then we can simply return map::get as our function.

class GenerateNumbersPMetaSupplier implements ProcessorMetaSupplier {

    private final int upperBound;

    private transient int totalParallelism;
    private transient int localParallelism;

    GenerateNumbersPMetaSupplier(int upperBound) {
        this.upperBound = upperBound;
    }

    @Override
    public void init(@Nonnull Context context) {
        totalParallelism = context.totalParallelism();
        localParallelism = context.localParallelism();
    }

    @Override @Nonnull
    public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses) {
        Map<Address, ProcessorSupplier> map = new HashMap<>();
        for (int i = 0; i < addresses.size(); i++) {
            // We'll calculate the global index of each processor in the cluster:
            int globalIndexBase = localParallelism * i;
            // Capture the value of the transient field for the lambdas below:
            int divisor = totalParallelism;
            // processorCount will be equal to localParallelism:
            ProcessorSupplier supplier = processorCount ->
                    range(globalIndexBase, globalIndexBase + processorCount)
                            .mapToObj(globalIndex ->
                                    new GenerateNumbersP(upperBound, divisor, globalIndex)
                            ).collect(toList());
            map.put(addresses.get(i), supplier);
        }
        return map::get;
    }
}

We change our DAG-building code to use the meta-supplier:

DAG dag = new DAG();
Vertex generateNumbers = dag.newVertex("generate-numbers",
        new GenerateNumbersPMetaSupplier(upperBound));
Vertex logInput = dag.newVertex("log-input",
        DiagnosticProcessors.writeLoggerP(i -> "Received number: " + i));
dag.edge(Edge.between(generateNumbers, logInput));

After re-running with two Jet members, we should once again see each number generated just once.

9.7.3. Sinks

Like a source, a sink is just another kind of processor. It accepts items from the inbox and pushes them into some system external to the Jet job (Hazelcast IMap, files, databases, distributed queues, etc.). A simple way to implement it is to extend AbstractProcessor and override tryProcess, which deals with items one at a time. However, sink processors must often explicitly deal with batching. In this case directly implementing Processor is better because its process() method gets the entire Inbox which can be drained to a buffer and flushed out.

9.7.4. Example: File Writer

In this example we’ll implement a vertex that writes the received items to files. To avoid contention and conflicts, each processor must write to its own file. Since we’ll be using a BufferedWriter which takes care of the buffering/batching concern, we can use the simpler approach of extending AbstractProcessor:

class WriteFileP extends AbstractProcessor implements Closeable {

    private final String path;

    private transient BufferedWriter writer;

    WriteFileP(String path) {
        this.path = path;
    }

    @Override
    public boolean isCooperative() {
        return false;
    }

    @Override
    protected void init(@Nonnull Context context) throws Exception {
        Path path = Paths.get(this.path, context.jetInstance().getName()
                + '-' + context.globalProcessorIndex());
        writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8);
    }

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) throws Exception {
        writer.append(item.toString());
        writer.newLine();
        return true;
    }

    @Override
    public void close() throws IOException {
        if (writer != null) {
            writer.close();
        }
    }
}

Some comments:

  • The constructor declares the processor non-cooperative because it will perform blocking IO operations.

  • init() method finds a unique filename for each processor by relying on the information reachable from the Context object.

  • Note the careful implementation of close(): it first checks if writer is null, which can happen if newBufferedWriter() fails in init(). This would make init() fail as well, which would make the whole job fail and then our ProcessorSupplier would call close() to clean up.

Cleaning up on completion/failure is actually the only concern that we need ProcessorSupplier for: the other typical concern, specializing processors to achieve data partitioning, was achieved directly from the processor’s code. This is the supplier’s code:

class WriteFilePSupplier implements ProcessorSupplier {

    private final String path;

    private transient List<WriteFileP> processors;

    WriteFilePSupplier(String path) {
        this.path = path;
    }

    @Override
    public void init(@Nonnull Context context) {
        File homeDir = new File(path);
        boolean success = homeDir.isDirectory() || homeDir.mkdirs();
        if (!success) {
            throw new JetException("Failed to create " + homeDir);
        }
    }

    @Override @Nonnull
    public List<WriteFileP> get(int count) {
        processors = Stream.generate(() -> new WriteFileP(path))
                           .limit(count)
                           .collect(Collectors.toList());
        return processors;
    }

    @Override
    public void close(Throwable error) {
        try {
            for (WriteFileP p : processors) {
                p.close();
            }
        } catch (IOException e) {
            throw new JetException(e);
        }
    }
}

9.8. Best Practices

9.8.1. Inspecting Processor Input and Output

The structure of the DAG model is a very poor match for Java’s type system, which results in the lack of compile-time type safety between connected vertices. Developing a type-correct DAG therefore usually requires some trial and error. To facilitate this process, but also to allow many more kinds of diagnostics and debugging, Jet’s library offers ways to capture the input/output of a vertex and inspect it.

Peeking with Processor Wrappers

The first approach is to decorate a vertex declaration with a layer that will log all the data traffic going through it. This support is present in the DiagnosticProcessors factory class, which contains the following methods:

  • peekInput(): logs items received at any edge ordinal.

  • peekOutput(): logs items emitted to any ordinal. An item emitted to several ordinals is logged just once.

These methods take two optional parameters:

  • toStringF returns the string representation of an item. The default is to use Object.toString().

  • shouldLogF is a filtering function so you can focus your log output only on some specific items. The default is to log all items.

Example Usage

Suppose we have declared the second-stage vertex in a two-stage aggregation setup:

DAG dag = new DAG();
    Vertex combine = dag.newVertex("combine",
            combineByKeyP(counting(), Util::entry)
    );

We’d like to see what exactly we’re getting from the first stage, so we’ll wrap the processor supplier with peekInput():

Vertex combine = dag.newVertex("combine",
        peekInputP(combineByKeyP(counting(), Util::entry))
);

Keep in mind that logging happens on the machine running hosting the processor, so this technique is primarily targeted to Jet jobs the developer runs locally in his development environment.

Attaching a Sink Vertex

Since most vertices are implemented to emit the same data stream to all attached edges, it is usually possible to attach a diagnostic sink to any vertex. For example, Jet’s standard writeFileP() sink vertex can be very useful here.

Example Usage

In the example from the Word Count tutorial we can add the following declarations:

Vertex diagnose = dag
        .newVertex("diagnose", writeFileP(
                "tokenize-output", Object::toString, UTF_8, false))
        .localParallelism(1);
dag.edge(from(tokenize, 1).to(diagnose));

This will create the directory tokenize-output which will contain one file per processor instance running on the machine. When running in a cluster, you can inspect on each member the input seen on that member. By specifying the allToOne() routing policy you can also have the output of all the processors on all the members saved on a single member (although the choice of exactly which member will be arbitrary).

9.8.2. How to Unit-Test a Processor

We provide some utility classes to simplify writing unit tests for your custom processors. You can find them in the com.hazelcast.jet.core.test package. Using these utility classes you can unit test your processor by passing it some input items and asserting the expected output.

Start by calling TestSupport.verifyProcessor() by passing it a processor supplier or a processor instance.

The test process does the following:

  • initialize the processor by calling Processor.init()

  • do a snapshot+restore (optional, see below)

  • call Processor.process(0, inbox). The inbox always contains one item from the input parameter

  • every time the inbox gets empty, do a snapshot+restore

  • call Processor.complete() until it returns true (optional)

  • do a final snapshot+restore after complete() is done

The optional snapshot+restore test procedure:

  • call saveToSnapshot()

  • create a new processor instance and use it instead of the existing one

  • restore the snapshot using restoreFromSnapshot()

  • call finishSnapshotRestore()

The test optionally asserts that the processor made progress on each call to any processing method. To be judged as having made progress, the callback method must do at least one of these:

  • take something from the inbox

  • put something to the outbox

  • return true (applies only to boolean-returning methods)

Cooperative Processors

The test will provide a 1-capacity outbox to cooperative processors. The outbox will already be full on every other call to process(). This tests the edge case: process() may be called even when the outbox is full, giving the processor a chance to process the inbox without emitting anything.

The test will also assert that the processor doesn’t spend more time in any callback than the limit specified in cooperativeTimeout(long).

Cases Not Covered

This class does not cover these cases:

  • testing of processors which distinguish input or output edges by ordinal

  • checking that the state of a stateful processor is empty at the end (you can do that yourself afterwards with the last instance returned from your supplier)

  • it never calls Processor.tryProcess()

Example Usage

This will test one of the jet-provided processors:

TestSupport.verifyProcessor(mapP((String s) -> s.toUpperCase()))
           .disableCompleteCall()       (1)
           .disableLogging()            (1)
           .disableProgressAssertion()  (1)
           .disableSnapshots()          (1)
           .cooperativeTimeout(2000)                         (2)
           .outputChecker(TestSupport.SAME_ITEMS_ANY_ORDER)  (3)
           .input(asList("foo", "bar"))                      (4)
           .expectOutput(asList("FOO", "BAR"));
1 Enabled by default
2 The default is 1000ms
3 The default is Objects::equal
4 The default is emptyList()
Other Utility Classes

com.hazelcast.jet.test contains these classes that you can use as implementations of Jet interfaces in tests:

  • TestInbox

  • TestOutbox

  • TestProcessorContext

  • TestProcessorSupplierContext

  • TestProcessorMetaSupplierContext

The class JetAssert contains a few of the assertX() methods normally found in JUnit’s Assert class. We had to reimplement them to avoid a dependency on JUnit from our production code.

9.9. Management Center Integration

Jet exposes metrics using the JVM’s standard JMX interface. See the Monitor Metrics section.

These metrics are also exposed in the Jet Management Center. Please refer to the Hazelcast Jet Management Center Reference Manual for more information.

Appendix A: Jet Version Compatibility

The following rules currently apply for Jet for compile time and runtime compatibility.

A.1. Semantic Versioning

Hazelcast Jet uses Semantic Versioning which can be summarized as follows:

  • MAJOR version when you make incompatible API changes,

  • MINOR version when you add functionality in a backwards-compatible manner, and

  • PATCH version when you make backwards-compatible bug fixes.

This means that a Jet job written using Pipeline API in a previous minor version should compile in later minor versions.

However some exceptions apply:

  • Classes in com.hazelcast.jet.core package which form the Core API of Jet only provide PATCH level compatibility guarantee.

  • Classes in impl packages do not provide any compatibility guarantees between versions.

A.2. Summary Table

The compatibility guarantees can be summarized as follows:

Type Component Guarantee

Compile Time

Job API

MINOR

Compile Time

Pipeline API

MINOR

Compile Time

Core API

PATCH

Runtime

Member to Member

NONE

Runtime

Management Center to Member

NONE

Runtime

Client to Member

PATCH

Runtime

Job State

PATCH

Runtime

Command Line Tools

MINOR

Runtime

Configuration XML files

PATCH

Runtime

Metrics (JMX)

PATCH

A.3. Runtime Compatibility

A.3.1. Members

Jet requires that all members in a cluster use the same PATCH version. When updating Jet to a newer PATCH version, the whole cluster must be shutdown and restarted with the newer version at once.

A.3.2. Management Center

Management Center, like members, is only compatible with the same PATCH version. This means that Management Center and the cluster must have the exact same PATCH version to be compatible.

A.3.3. Clients

Jet clients are compatible with the members running on the same MINOR version. This means that a client using an older or newer PATCH version should be able to connect and work with a cluster that’s running a different PATCH version.

A.4. Job State Compatibility

Job state is only compatible across the same MINOR version and only backwards-compatible i.e. a newer PATCH version is be able to understand the job state from a previous PATCH version.

This means that if you have a running job, using the job upgrades and lossless recovery features you are able to upgrade the cluster to a newer PATCH version without losing the state of a running job.

A.5. Command Line Tools and Configuration Files

The command line tools provided such as jet.sh and the configuration XML files are backwards-compatible between MINOR versions. This means that when upgrading a cluster to a new minor version, the XML configuration for the previous version can be used without any modification.

A.6. Metrics

Jet currently provides metrics to Management Center and also through other means such as JMX. The metrics names may change between MINOR versions but not between PATCH versions.

Appendix B: System Properties

The table below lists the system properties supported by Jet. Jet also supports the system properties defined by Hazelcast IMDG, please see the System properties chapter in Hazelcast IMDG Reference Manual.

When you want to reconfigure a system property, you need to restart the members for which the property is modified.
Table 2. System Properties

Property Name

Default Value

Type

Description

jet.job.scan.period

5000

long [milliseconds]

Jet will periodically check for new jobs to start and perform cleanup of unused resources. This property configures how often this check and cleanup will be done. Value is in milliseconds.

jet.shutdownhook.enabled

true

boolean

Whether a JVM shutdown hook is registered to shutdown the node gracefully when the process is terminated. The shutdown hook will terminate all running jobs and then gracefully terminate the note, in a way that is equivalent to calling JetInstance.shutdown().

jet.job.results.ttl.seconds

604800 (7 days)

long [seconds]

Maximum number of time in seconds the job results will be kept in the cluster. They will be automatically deleted after this period is reached.

jet.job.results.max.size

1000

integer

Maximum number of job results to keep in the cluster, the oldest results will be automatically deleted after this size is reached.

jet.home

Jet installation path

string

Root of Jet installation. Used as default location for the lossless recovery store. By default it will be automatically set to the start of the Jet installation path.

jet.idle.cooperative.min.microseconds

25

long [microseconds]

The minimum time in microseconds the cooperative worker threads will sleep if none of the tasklets made any progress. Lower values increase idle CPU usage but may result in decreased latency. Higher values will increase latency and very high values (>10000µs) will also limit throughput.

Note: the underlying LockSupport.parkNanos(long) call may actually sleep longer depending on the operating system (up to 15000µs on Windows). See the Hazelcast blog post about this subject for more details.

jet.idle.cooperative.max.microseconds

500

long [microseconds]

The maximum time in microseconds the cooperative worker threads will sleep if none of the tasklets made any progress. Lower values increase idle CPU usage but may result in decreased latency. Higher values will increase latency and very high values (>10000µs) will also limit throughput.

Note: the underlying LockSupport.parkNanos(long) call may actually sleep longer depending on the operating system (up to 15000µs on Windows). See the Hazelcast blog post about this subject for more details.

jet.idle.noncooperative.min.microseconds

25

long [microseconds]

The minimum time in microseconds the non-cooperative worker threads will sleep if none of the tasklets made any progress. Lower values increase idle CPU usage but may result in decreased latency. Higher values will increase latency and very high values (>10000µs) will also limit throughput.

Note: the underlying LockSupport.parkNanos(long) call may actually sleep longer depending on the operating system (up to 15000µs on Windows). See the Hazelcast blog post about this subject for more details.

jet.idle.noncooperative.max.microseconds

5000

long [microseconds]

The maximum time in microseconds the non-cooperative worker threads will sleep if none of the tasklets made any progress. Lower values increase idle CPU usage but may result in decreased latency. Higher values will increase latency and very high values (>10000µs) will also limit throughput.

Note: the underlying LockSupport.parkNanos(long) call may actually sleep longer depending on the operating system (up to 15000µs on Windows). See the Hazelcast blog post about this subject for more details.

Appendix C: Common Exceptions

You may see the following exceptions thrown when working with Jet:

  • JetException: A general exception thrown if a job failure occurs. It has the original exception as its cause.

  • TopologyChangedException: Thrown when a member participating in a job leaves the cluster. If auto-restart is enabled, Jet will restart the job automatically, without throwing the exception to the user.

  • JobNotFoundException: Thrown when the coordinator node is not able to find the metadata for a given job.

There are also several Hazelcast exceptions that might be thrown when interacting with JetInstance. For a description of Hazelcast IMDG exceptions, please refer to the IMDG Reference manual.

Appendix D: Phone Homes

Hazelcast uses phone home data to learn about the usage of Hazelcast Jet.

Hazelcast Jet instances call our phone home server initially when they are started and then every 24 hours. This applies to all the instances joined to the cluster.

D.1. What is sent in?

The following information is sent in a phone home:

  • Hazelcast Jet version

  • Local Hazelcast Jet member UUID

  • Download ID

  • A hash value of the cluster ID

  • Cluster size bands for 5, 10, 20, 40, 60, 100, 150, 300, 600 and > 600

  • Number of connected clients bands of 5, 10, 20, 40, 60, 100, 150, 300, 600 and > 600

  • Cluster uptime

  • Member uptime

  • Environment Information:

    • Name of operating system

    • Kernel architecture (32-bit or 64-bit)

    • Version of operating system

    • Version of installed Java

    • Name of Java Virtual Machine

  • Hazelcast IMDG Enterprise specific:

    • Number of clients by language (Java, C++, C#)

    • Flag for Hazelcast Enterprise

    • Hash value of license key

    • Native memory usage

D.2. Phone Home Code

The phone home code itself is open source. Please see here.

D.3. Disabling Phone Homes

Set the hazelcast.phone.home.enabled system property to false either in the config or on the Java command line.

Starting with Hazelcast Jet 0.5, you can also disable the phone home using the environment variable HZ_PHONE_HOME_ENABLED. Simply add the following line to your .bash_profile:

export HZ_PHONE_HOME_ENABLED=false

D.4. Phone Home URL

The URL used for phone home requests is

http://phonehome.hazelcast.com/ping

Appendix E: FAQ

You can refer to the FAQ page to see the answers to frequently asked questions related to topics such as the relationship and differences between Hazelcast Jet and Hazelcast IMDG, Jet’s APIs and roadmap.

Appendix F: License Questions

Hazelcast Jet is distributed using the Apache License 2, therefore permissions are granted to use, reproduce and distribute it along with any kind of open source and closed source applications.

Depending on the used feature-set, Hazelcast Jet has certain runtime dependencies which might have different licenses. Following are dependencies and their respective licenses.

F.1. Embedded Dependencies

Embedded dependencies are merged (shaded) with the Hazelcast Jet codebase at compile-time. These dependencies become an integral part of the Hazelcast Jet distribution.

For license files of embedded dependencies, please see the license directory of the Hazelcast Jet distribution, available at our download page.

F.1.1. minimal-json

minimal-json is a JSON parsing and generation library which is a part of the Hazelcast Jet distribution. It is used for communication between the Hazelcast Jet cluster and the Management Center.

minimal-json is distributed under the MIT license and offers the same rights to add, use, modify, and distribute the source code as the Apache License 2.0 that Hazelcast uses. However, some other restrictions might apply.

F.1.2. picocli

picocli is a command line parser which is used for the implementation of jet.sh command line tool.

picocli is distributed under the terms of the Apache License 2.

F.1.3. Runtime Dependencies

Depending on the used features, additional dependencies might be added to the dependency set. Those runtime dependencies might have other licenses. See the following list of additional runtime dependencies.

F.1.4. Apache Hadoop

Hazelcast integrates with Apache Hadoop and can use it as a data sink or source. Jet has a dependency on the libraries required to read from and write to the Hadoop File System.

Apache Hadoop is distributed under the terms of the Apache License 2.

F.1.5. Apache Kafka

Hazelcast integrates with Apache Kafka and can make use of it as a data sink or source. Hazelcast has a dependency on Kafka client libraries.

Apache Kafka is distributed under the terms of the Apache License 2.

F.1.6. Spring

Hazelcast integrates with Spring and can be configured using Spring Context. Jet has a dependency on the libraries required to create a Spring context.

Spring is distributed under the terms of the Apache License 2.

Glossary

Accumulation

The act of building up an intermediate result inside a mutable object (called the accumulator) as a part of performing an aggregate operation. After all accumulation is done, a finishing function is applied to the object to produce the result of the operation.

Aggregate Operation

A set of functional primitives that instructs Jet how to calculate some aggregate function over one or more data sets. Used in the group-by, co-group and windowing transforms.

Aggregation

The act of applying an aggregate function to a stream of items. The result of the function can be simple, like a sum or average, or complex, like a collection of all aggregated items.

At-Least-Once Processing Guarantee

The system guarantees to process each item of the input stream(s), but doesn’t guarantee it will process it just once.

Batch Processing

The act of processing a finite dataset, such as one stored in Hazelcast IMDG or HDFS.

Client Server Topology

Hazelcast topology where members run outside the user application and are connected to clients using client libraries. The client library is installed in the user application.

Co-Grouping

An operation that is a mix of an SQL JOIN and GROUP BY with specific restrictions. Sometimes called a “stream join”. Each item of each stream that is being joined must be mapped to its grouping key. All items with the same grouping key (from all streams) are aggregated together into a single result. However, the result can be structured and preserve all input items separated by their stream of origin. In that form the operation effectively becomes a pure JOIN with no aggregation.

DAG

Directed Acyclic Graph which Hazelcast Jet uses to model the relationships between individual steps of the data processing.

Edge

A DAG element which holds the logic on how to route the data from one vertex’s processing units to the next one’s.

Embedded Topology

Hazelcast topology where the members are in-process with the user application and act as both client and server.

Event time

A data item in an infinite stream typically contains a timestamp data field. This is its event time. As the stream items go by, the event time passes as the items' timestamps increase. A typical distributed stream has a certain amount of event time disorder (items aren’t strictly ordered by their timestamp) so the “passage of event time” is a somewhat fuzzy concept. Jet uses the watermark to superimpose order over the disordered stream.

Exactly-Once Processing Guarantee

The system guarantees that it will process each item of the input stream(s) and will never process an item more than once.

Fault Tolerance

The property of a distributed computation system that gives it resilience to changes in the topology of the cluster running the computation. If a member leaves the cluster, the system adapts to the change and resumes the computation without loss.

Hash-Join

A special-purpose stream join optimized for the use case of data enrichment. Each item of the primary stream is joined with one item from each of the enriching streams. Items are matched by the join key. The name "hash-join" stems from the fact that the contents of the enriching streams are held in hashtables for fast lookup. Hashtables are replicated on each cluster member, which is why this operation is also known as a “replicated join”.

Hazelcast IMDG

An In-Memory Data grid (IMDG) is a data structure that resides entirely in memory, and is distributed among many machines in a single location (and possibly replicated across different locations). IMDGs can support millions of in-memory data updates per second, and they can be clustered and scaled in ways that support large quantities of data. Hazelcast IMDG is the in-memory data grid offered by Hazelcast.

HDFS

Hadoop Distributed File System. Hazelcast Jet can use it both as a data source and a sink.

Jet Job

A unit of distributed computation that Jet executes. One job has one DAG specifying what to do. A distributed array of Jet processors performs the computation.

Kafka

Apache Kafka is a product that offers a distributed publish-subscribe message queue with guarantees of delivery and message persistence. The most commonly used component over which heterogeneous distributed systems exchange data.

Latency

The time that passes from the occurrence of an event that triggers some response to the occurrence of the response. In the case of Hazelcast Jet’s stream processing, latency refers to the time that passes from the point in time the last item that belongs to a window enters the system to the point where the result for that window appears in the output.

Member

A Hazelcast Jet instance (node) that is a member of a cluster. A single JVM can host one or more Jet members, but in production there should be one member per physical machine.

Partition (Data)

To guarantee that all items with the same grouping key are processed by the same processor, Hazelcast Jet uses a total surjective function to map each data item to the ID of its partition and assigns to each processor its unique subset of all partition IDs. A partitioned edge then routes all items with the same partition ID to the same processor.

Partition (Network)

A malfunction in network connectivity that splits the cluster into two or more parts that are mutually unreachable, but the connections among nodes within each part remain intact. May cause each of the parts to behave as if it was “the” cluster that lost the other members. Also known as “split brain”.

Pipeline

Hazelcast Jet’s name for the high-level description of a computation job constructed using the Pipeline API. Topologically it is a DAG, but the vertices have different semantics than the Core API vertices and are called pipeline stages. Edges are implicit and not expressed in the API. Each stage (except for source/sink stages) has an associated transform that it performs on its input data.

Processor

The unit which contains the code of the computation to be performed by a vertex. Each vertex’s computation is implemented by a processor. On each Jet cluster member there are one or more instances of the processor running in parallel for a single vertex.

Session Window

A window that groups an infinite stream’s items by their timestamp. It groups together bursts of events closely spaced in time (by less than the configured session timeout).

Sliding Window

A window that groups an infinite stream’s items by their timestamp. It groups together events that belong to a segment of fixed size on the timeline. As the time passes, the segment slides along, always extending from the present into the recent past. In Jet, the window doesn’t literally slide, but hops in steps of user-defined size. (“Time” here refers to the stream’s own notion of time, i.e., event time.)

Source

A resource present in a Jet job’s environment that delivers a data stream to it. Hazelcast Jet uses a source connector to access the resource. Alternatively, source may refer to the DAG vertex that hosts the connector.

Sink

A resource present in a Jet job’s environment that accepts its output data. Hazelcast Jet uses a sink connector to access the resource. Alternatively, sink may refer to the vertex that hosts the connector.

Skew

A generalization of the term “clock skew” applied to distributed stream processing. In this context it refers to the deviation in event time as opposed to wall-clock time in the classical usage. Several substreams of a distributed stream may at the same time emit events with timestamps differing by some delta, due to various lags that accumulate in the delivery pipeline for each substream. This is called stream skew. Event skew refers to the disorder within a substream, where data items appear out of order with respect to their timestamps.

Split Brain

A popular name for a network partition, which see above.

Stream Processing

The act of processing an infinite stream of data, typically implying that the data is processed as soon as it appears. Such a processing job must explicitly deal with the notion of time in order to make sense of the data. It achieves this with the concept of windowing.

Throughput

A measure for the volume of data a system is capable of processing per unit of time. Typical ways to express it for Hazelcast Jet are in terms of events per second and megabytes per second.

Tumbling Window

A window that groups an infinite stream’s items by their timestamp. It groups together events that belong to a segment of fixed size on the timeline. As the time passes, the segment “tumbles” along, never covering the same point in time twice. This means that each event belongs to just one tumbling window position. (“Time” here refers to the stream’s own notion of time, i.e., event time.)

Vertex

The DAG element that performs a step in the overall computation. It receives data from its inbound edges and sends the results of its computation to its outbound edges. There are three kinds of vertices: source (has only outbound edges), sink (has only inbound edges) and computational (has both kinds of edges).

Watermark

A concept that superimposes order over a disordered underlying data stream. An infinite data stream’s items represent timestamped events, but they don’t occur in the stream ordered by the timestamp. The value of the watermark at a certain location in the processing pipeline denotes the lowest value of the timestamp that is expected to occur in the upcoming items. Items that don’t meet this criterion are discarded because they arrived too late to be processed.

Windowing

The act of splitting an infinite stream’s data into windows according to some rule, most typically one that involves the item’s timestamps. Each window becomes the target of an aggregate function, which outputs one data item per window (and per grouping key).