Hazelcast Jet Reference Manual
Version 3.2
Welcome to the Hazelcast Jet Reference Manual. This manual includes concepts, instructions, and samples to guide you on how to use Hazelcast Jet to build and execute computation jobs.
As the reader of this manual you should be familiar with the Java programming language.
Summary of Contents
-
Overview presents the basic concepts of Hazelcast Jet and provides high-level orientation.
-
Get Started leads you through the steps to get Hazelcast Jet up and running on your machine. You can copy-paste the provided code to execute your first Jet job.
-
Work With Jet show you how to configure Jet, start a Jet cluster, manage jobs, etc.
-
The Pipeline API introduces Jet’s Pipeline API. This is the API you use to build the computation you want Jet to execute.
-
Source and Sink Connectors presents the source and sink connectors that Jet provides.
-
Configuration explains how to configure Jet. You can tune its performance, level of safety in fault-tolerant jobs, configure the underlying IMDG cluster, etc.
-
Jet Concepts presents some fundamental concepts that make Jet work. Understanding them will help you get the most of Hazelcast Jet.
-
Expert Zone — The Core API presents the material you need to understand if you want to successfully use the Core API. You don’t need this knowledge to use Jet. It is relevant only if you plan to implement custom processors or build the DAG by directly using the Core API.
Preface
Naming
-
Hazelcast Jet or just Jet refers to the distributed data processing engine provided by Hazelcast, Inc.
-
Hazelcast IMDG or just Hazelcast refers to the Hazelcast in-memory data grid middleware. Hazelcast is also the name of the company (Hazelcast, Inc.) providing Hazelcast IMDG and Hazelcast Jet.
Licensing
Hazelcast Jet and Hazelcast Jet Reference Manual are free and provided under the Apache License, Version 2.0.
Trademarks
Hazelcast is a registered trademark of Hazelcast, Inc. All other trademarks in this manual are held by their respective owners.
Getting Help
The Jet team provides support to its community via these channels:
-
Stack Overflow (ask a question on how to use Jet properly and troubleshoot your setup)
-
Hazelcast Jet mailing list (propose features and discuss your ideas with the team)
-
GitHub’s issue tracking (report your confirmed issues)
For information on the commercial support for Hazelcast Jet, please see this page on hazelcast.com.
1. Overview of Jet
Jet is a distributed data processing engine that treats the data as a stream. It can process both static datasets (i.e., batch jobs) and live event streams. It can compute aggregate functions over infinite data streams by using the concept of windowing: dividing the stream into a sequence of subsets (windows) and applying the aggregate function over each window.
With unbounded event streams there arises the issue of failures in the cluster. Jet can tolerate failures of individual members without loss, offering the exactly-once processing guarantee.
You deploy Jet to a cluster of machines and then submit your processing jobs to it. Jet will automatically make all the cluster members participate in processing your data. It can connect to a distributed data source such as Kafka, Hadoop Distributed File System, or a Hazelcast IMDG cluster. Each member of the Jet cluster will consume a slice of the distributed data, which makes it easy to scale up to any level of throughput.
Jet is built on top of the Hazelcast IMDG technology and thanks to this the Jet cluster can also play the role of the data source and sink. If you use Jet this way, you can achieve perfect data locality and top-of-the-class throughputs.
The main programming paradigm you’ll use with Jet is that of a processing pipeline, which is a network of interconnected stages. The stages form a directed acyclic graph (a DAG) and the connections between them indicate the path along which the data flows. Data processing happens inside each stage. You describe the pipeline using the Pipeline API.
Internally Jet converts the pipeline into its native representation, the Core API DAG, which describes the layout of the processing units and the rules for routing the data between them. It is possible to use the Core API to directly create the native representation, but it’s much more complex and error-prone.
To run the computation Jet uses a cooperative multithreading execution engine, which means it can run
many Core API Processor
s on the same JVM thread. A processor
corresponds to a standalone single-threaded task that processes a stream
of data. The processors are implemented in such a way that they don’t
block the thread while waiting for more data, or while waiting for the
receiving processor to accept it. By not depending on OS-level thread
scheduling Jet can achieve greater throughput and better saturate the
CPU cores. It can also drive many more processors than the OS can
manage native threads.
2. Get Started
In this section we’ll get you started using Hazelcast Jet. We’ll show you how to set up a Java project with the proper dependencies and a quick Hello World example to verify your setup.
2.1. Requirements
In the good tradition of Hazelcast products, Jet is distributed as a JAR with no other dependencies. It requires JRE version 8 or higher to run.
2.2. Install Hazelcast Jet
The easiest way to start using Hazelcast Jet is to add it as a dependency to your project.
Hazelcast Jet is published on the Maven repositories. Add the following
lines to your pom.xml
:
<dependencies>
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet</artifactId>
<version>3.2</version>
</dependency>
</dependencies>
If you prefer to use Gradle, execute the following command:
compile 'com.hazelcast.jet:hazelcast-jet:3.2'
Alternatively you can download the latest
distribution package of Hazelcast Jet
and add the hazelcast-jet-3.2.jar
file to your classpath.
2.3. Install Hazelcast Jet Enterprise (Optional)
Hazelcast Jet Enterprise is a commercial edition of Hazelcast Jet. It’s built on top of Hazelcast Jet open source and extends it with the following features:
-
Lossless Restart (in Jet 3.0)
-
Job Upgrades (in Jet 3.0)
-
Enterprise PaaS Deployment Environment (Pivotal Cloud Foundry, Openshift Container Platform (Jet 3.0))
Hazelcast Jet Enterprise is available on a Hazelcast Maven repository.
Add the following lines to your pom.xml
:
<repository>
<id>Hazelcast Private Snapshot Repository</id>
<url>https://repository.hazelcast.com/snapshot/</url>
</repository>
<repository>
<id>Hazelcast Private Release Repository</id>
<url>https://repository.hazelcast.com/release/</url>
</repository>
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet-enterprise</artifactId>
<version>3.2</version>
</dependency>
You can download the Hazelcast Jet Enterprise package from hazelcast.com.
2.3.1. Set the License Key
To use Hazelcast Jet Enterprise, you must set the license key using one of the configuration methods shown below. You can request a trial license key at hazelcast.com.
Hazelcast Jet Enterprise license keys are required only to run the Jet cluster. A Jet client can access the Enterprise features without the license key.
The license key can be configured using one of the following methods:
Hazelcast Configuration File
Replace the value for the <license-key>
tag inside the hazelcast.xml
file in the config folder:
<hazelcast ..>
...
<license-key>ENTER LICENSE KEY HERE</license-key>
...
</hazelcast>
Programmatic Configuration
License key also can be set in the Jet config object as follows:
JetConfig config = new JetConfig();
config.getHazelcastConfig().setLicenseKey( "Your Enterprise License Key" );
System Property
Set the following system property:
-Dhazelcast.enterprise.license.key=Your Enterprise License Key
2.3.2. Hazelcast Jet Management Center
Hazelcast Jet Management Center is a management and monitoring suite providing a live overview of the Hazelcast Jet cluster. It’s a standalone tool with a web console.
Please see the Hazelcast Jet Management Center Reference Manual for the installation instructions.
2.4. Verify Your Setup With a Word Count Example
You can verify your setup by running this simple program. It processes
the contents of a Hazelcast IList
that contains lines of text, finds
the number of occurrences of each word in it, and stores its results
in a Hazelcast IMap
. In a distributed computation job the input and
output cannot be simple in-memory structures like a Java List
; they
must be accessible to any member of the computing cluster and must
persist after a job ends. This is why we use Hazelcast structures.
/*
* Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import java.util.List;
import java.util.Map;
import static com.hazelcast.jet.Traversers.traverseArray;
import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
import static com.hazelcast.jet.function.Functions.wholeItem;
public class HelloWorld {
public static void main(String[] args) {
// Create the specification of the computation pipeline. Note
// it's a pure POJO: no instance of Jet needed to create it.
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("text"))
.flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
.filter(word -> !word.isEmpty())
.groupingKey(wholeItem())
.aggregate(counting())
.drainTo(Sinks.map("counts"));
// Start Jet, populate the input list
JetInstance jet = Jet.newJetInstance();
try {
List<String> text = jet.getList("text");
text.add("hello world hello hello world");
text.add("world world hello world");
// Perform the computation
jet.newJob(p).join();
// Check the results
Map<String, Long> counts = jet.getMap("counts");
System.out.println("Count of hello: "
+ counts.get("hello"));
System.out.println("Count of world: "
+ counts.get("world"));
} finally {
Jet.shutdownAll();
}
}
}
You should expect to see a lot of logging output from Jet (sent to
stderr
) and two lines on stdout
:
Count of hello: 4 Count of world: 5
2.5. Reuse Your java.util.stream Knowledge
If you’ve already used Java’s Stream API, you’ll find many similarities in Jet’s Pipeline API. They both construct a processing pipeline by adding processing steps (stages). Both are FP-oriented APIs with lambdas playing a key role. Simple transformations like map/filter even look exactly the same. The main concern is knowing where the similarities end. Here are some typical gotchas if the Stream API has set some expectations for you:
-
All lambdas in Jet get serialized so they can be sent to remote cluster members. If your lambda captures a variable from the surrounding scope, that variable’s contents must be serialized as well. If you refer to an instance variable, the entire instance holding it must be serialized. It’s quite easy to accidentally capture and serialize the entire
this
object and everything it refers to. -
The pipeline you construct doesn’t execute itself, you must explicitly submit it to a Jet cluster.
-
Since you’re submitting the computation to an external system, you don’t get the result in the return value of a method call. The pipeline explicitly specifies where it will store the results (to a data sink).
-
Whereas in the Stream API the aggregation is the terminal step, the one that immediately makes your pipeline execute, in Jet it is just another transformation (an intermediate step).
Finally, you’ll notice that Jet’s Pipeline API is much more powerful than the Stream API. Here are a few highlights:
2.5.1. Example: List Transformation
Here’s a simple example of list transformation with the Stream API:
List<String> strings = Arrays.asList("a", "b");
List<String> uppercased = strings
.stream()
.map(String::toUpperCase)
.collect(toList());
uppercased.forEach(System.out::println);
Here’s the equivalent in Jet. Note that we’re transforming Hazelcast
IList
s:
JetInstance jet = newJetInstance();
IList<String> strings = jet.getList("strings");
strings.addAll(Arrays.asList("a", "b"));
IList<String> uppercased = jet.getList("uppercased");
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.list(strings))
.map(String::toUpperCase)
.drainTo(Sinks.list(uppercased));
jet.newJob(pipeline).join();
uppercased.forEach(System.out::println);
2.5.2. Example: Grouping and Aggregation
Here’s an example of grouping and aggregation with the Stream API. We compute a histogram of words by their length:
List<String> strings = Arrays.asList("a", "b", "aa", "bb");
Map<Integer, Long> histogram = strings
.stream()
.collect(groupingBy(String::length, Collectors.counting()));
histogram.forEach((length, count) -> System.out.format(
"%d chars: %d occurrences%n", length, count));
And here’s how to aggregate in Jet:
JetInstance jet = newJetInstance();
IList<String> strings = jet.getList("strings");
strings.addAll(Arrays.asList("a", "b", "aa", "bb"));
IMap<Integer, Long> histogram = jet.getMap("histogram");
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.list(strings))
.groupingKey(String::length)
.aggregate(AggregateOperations.counting())
.drainTo(Sinks.map(histogram));
jet.newJob(pipeline).join();
histogram.forEach((length, count) -> System.out.format(
"%d chars: %d occurrences%n", length, count));
Note that the result of aggregate
is just another pipeline stage, you
can apply more transforms to it before draining to the sink.
2.5.3. Example: Collector vs. AggregateOperation
If you have ever written your own Collector for the Stream API, you’ll find that Jet’s AggregateOperation is quite similar and you can transfer your skill to it.
Here’s a Stream API collector that computes the sum of input items:
Collector<Long, LongAccumulator, Long> summingLong = Collector.of(
LongAccumulator::new,
(LongAccumulator acc, Long t) -> acc.add(t),
(acc0, acc1) -> acc0.add(acc1),
LongAccumulator::get
);
And here’s Jet’s aggregate operation doing the same:
AggregateOperation1<Long, LongAccumulator, Long> summingLong = AggregateOperation
.withCreate(LongAccumulator::new)
.andAccumulate((LongAccumulator acc, Long t) -> acc.add(t))
.andCombine((acc0, acc1) -> acc0.add(acc1))
.andDeduct((acc0, acc1) -> acc0.subtract(acc1))
.andExportFinish(LongAccumulator::get);
Compared to Collector
, AggregateOperation
defines two more
primitives:
-
deduct
reverses a previouscombine
. It’s an optional primitive and serves to optimize sliding window aggregation. -
export
is similar tofinish
, the difference being thatexport
must preserve the accumulator’s state andfinish
doesn’t. Jet usesfinish
wherever applicable as it can be implemented more optimally. In this example we use the same lambda for both primitives.
2.6. Deploy Using Docker
You can deploy your Hazelcast Jet projects using the Docker containers. Hazelcast Jet has the following images on Docker:
-
Hazelcast Jet
-
Hazelcast Jet Enterprise
-
Hazelcast Jet Management Center
After you pull an image from the Docker registry, you can run your image to start the Jet Management Center or a Jet instance with its default configuration. All repositories provide the latest stable releases but you can pull a specific release, too. You can also specify environment variables when running the image.
If you want to start customized instances of Jet or Jet Management Center, you can extend the the above listed images by providing your own configuration file. This feature is provided as a Hazelcast Jet plugin. Please see their own GitHub repos at Hazelcast Jet Docker and Hazelcast Jet Management Center Docker for details on configurations and usages.
2.7. Deploy Using Kubernetes
Hazelcast Jet provides Kubernetes-ready Docker images which you can easily deploy into Kubernetes environments. These images use the Hazelcast Kubernetes plugin to discover other Hazelcast Jet members in the Kubernetes environment by interacting with the Kubernetes API.
Using Kubernetes API requires granting certain permissions. Therefore,
you need to create Role Based Access Control definition, (rbac.yaml
),
with the following content. You can check out the
Hazelcast Kubernetes
plugin for more details on this manner.
Here is the content of rbac.yaml
:
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: default-cluster
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: view
subjects:
- kind: ServiceAccount
name: default
namespace: default
You can grant the required permissions stated above using the following command:
kubectl apply -f rbac.yaml
Then, you need to configure Hazelcast Jet to use Kubernetes Discovery to
discover other members, to do so create a file named
hazelcast-jet-config.yaml
with the contents as shown below.
Here is the content of hazelcast-jet-config.yaml
:
apiVersion: v1
kind: ConfigMap
metadata:
name: hazelcast-jet-configuration
data:
hazelcast.yaml: |-
hazelcast:
network:
join:
multicast:
enabled: false
kubernetes:
enabled: true
namespace: default
service-name: hazelcast-jet-service
The following command will create a ConfigMap object in the Kubernetes Cluster.
kubectl apply -f hazelcast-jet-config.yaml
This object holds our Kubernetes Discovery enabled Hazelcast Jet configuration. You will use this configuration when you create pods as explained below.
You need to create a StatefulSet, which manages pods that are based on
an identical container spec, and a Service, which is an abstraction
defining a logical set of pods and a policy using which to access them.
These two should be defined in the (hazelcast-jet.yaml
) file, as shown
below.
Here is the content of hazelcast-jet.yaml
:
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: hazelcast-jet
labels:
app: hazelcast-jet
spec:
replicas: 2
serviceName: hazelcast-jet-service
selector:
matchLabels:
app: hazelcast-jet
template:
metadata:
labels:
app: hazelcast-jet
spec:
containers:
- name: hazelcast-jet
image: hazelcast/hazelcast-jet:latest
imagePullPolicy: IfNotPresent
ports:
- name: hazelcast-jet
containerPort: 5701
livenessProbe:
httpGet:
path: /hazelcast/health/node-state
port: 5701
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 3
readinessProbe:
httpGet:
path: /hazelcast/health/node-state
port: 5701
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 1
successThreshold: 1
failureThreshold: 1
volumeMounts:
- name: hazelcast-jet-storage
mountPath: /data/hazelcast-jet
env:
- name: JAVA_OPTS
value: "-Dhazelcast.rest.enabled=true -Dhazelcast.config=/data/hazelcast-jet/hazelcast.yaml"
volumes:
- name: hazelcast-jet-storage
configMap:
name: hazelcast-jet-configuration
---
apiVersion: v1
kind: Service
metadata:
name: hazelcast-jet-service
spec:
selector:
app: hazelcast-jet
ports:
- protocol: TCP
port: 5701
After creating the (hazelcast-jet.yaml
) file with the content above,
run the following command to create the Service and StatefulSet:
kubectl apply -f hazelcast-jet.yaml
After that, you can inspect the pod logs to see that they formed a cluster using the following command:
kubectl logs hazelcast-jet-0
It should output a log similar to the one below:
....
Nov 15, 2018 9:39:15 AM com.hazelcast.internal.cluster.ClusterService
INFO: [172.17.0.2]:5701 [dev] [0.7]
Members {size:2, ver:2} [
Member [172.17.0.2]:5701 - d935437e-143e-4e3f-81d6-f3ece16eb23e this
Member [172.17.0.6]:5701 - 88472d6c-1cae-4b0d-9681-f6da6199bc9c
]
....
For more details about deployment and information on how to submit the Hazelcast Jet jobs to your cluster, see the Kubernetes Integration code sample.
2.8. Deployment using Helm
You can deploy Hazelcast Jet using the Helm charts. These charts provide an easy way to package and deploy applications on Kubernetes.
You can install the chart using the following command:
helm install stable/hazelcast-jet
See Hazelcast Jet Helm Chart for more details on configuration and the Troubleshooting in Kubernetes Environments section if you encounter any issues.
Apart from the official Helm charts for Hazelcast Jet, we do also provide Helm charts for Hazelcast Jet with Hazelcast Jet Management Center and Hazelcast Jet Enterprise on the Hazelcast Enterprise Helm Chart Repository.
2.9. Scaling the Cluster in Kubernetes
Hazelcast Jet cluster is easily scalable within Kubernetes. You can use
the standard kubectl scale
command to change the cluster size. See
Scaling StatefulSets
for more information.
Note however that, by default, Hazelcast Jet is configured to TERMINATE
on receiving the SIGTERM
signal from Kubernetes, which means that a
container stops quickly, but the cluster’s data safety relies on the
backup stored by other Hazelcast Jet members. If you suddenly scale down
by more than your backup-count
property (1 by default), you may lose
the cluster data.
The other option is to use the GRACEFUL
shutdown, which triggers the
partition migration before shutting down the Hazelcast Jet member.
Note that it may take some time depending on your data size.
To use graceful shutdown approach, set the following properties:
- terminationGracePeriodSeconds
: in your StatefulSet (or Deployment)
configuration; the value defines how much time Kubernetes waits
before terminating the pod and it should be high enough to cover the
data migration process. Default value is 30 seconds
- -Dhazelcast.shutdownhook.policy=GRACEFUL
: in the JVM parameters
(JAVA_OPTS
environment variable)
- -Dhazelcast.graceful.shutdown.max.wait
: in the JVM parameters
(JAVA_OPTS
environment variable) ; the value should be high enough to
cover the data migration process. Default value is 600 seconds
The graceful shutdown configuration is already included in Hazelcast Jet Helm Chart.
With the Automatic Elasticity feature, your jobs will be automatically restarted from the latest known snapshot when you downscale your cluster. For more details about snapshotting and processing guarantees see Fault Tolerance section.
3. Work With Jet
3.1. Start Jet
To create a Jet cluster, we simply start some Jet instances. Normally these would be started on separate machines, but for simple practice we can use the same JVM for two instances. Even though they are in the same JVM, they’ll communicate over the network interface.
JetInstance jet = Jet.newJetInstance();
Jet.newJetInstance();
These two instances should automatically discover each other using IP multicast and form a cluster. You should see a log output similar to the following:
Members [2] { Member [10.0.1.3]:5701 - f1e30062-e87e-4e97-83bc-6b4756ef6ea3 Member [10.0.1.3]:5702 - d7b66a8c-5bc1-4476-a528-795a8a2d9d97 this }
This means the members successfully formed a cluster. Since the Jet
instances start their own threads, it is important to explicitly shut
them down at the end of your program; otherwise the Java process will
remain alive after the main()
method completes:
public static void main(String[] args) {
try {
JetInstance jet = Jet.newJetInstance();
Jet.newJetInstance();
// work with Jet
} finally {
Jet.shutdownAll();
}
}
It is important to use at least 2 members when developing because otherwise Jet doesn’t serialize the stream items and serialization errors will not be discovered.
3.2. Build the Computation Pipeline
The general shape of any data processing pipeline is drawFromSource →
transform → drainToSink
and the natural way to build it is from source
to sink. The Pipeline API follows this pattern. For
example,
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("input"))
.map(String::toUpperCase)
.drainTo(Sinks.list("result"));
Refer to the chapter on the Pipeline API for full details.
3.3. Watch out for Capturing Lambdas
A typical Jet pipeline involves lambda expressions. Since the whole
pipeline definition must be serialized to be sent to the cluster, the
lambda expressions must be serializable as well. The Java standard
provides an essential building block: if the static type of the lambda
is a subtype of Serializable
, you will automatically get a lambda
instance that can serialize itself.
None of the functional interfaces in the JDK extend Serializable
, so
we had to mirror the entire java.util.function
package in our own
com.hazelcast.jet.function
with all the interfaces subtyped and made
Serializable
. Each subtype has the name of the original with
Ex
appended. For example, a FunctionEx
is just
like Function
, but implements Serializable
. We use these types
everywhere in the Pipeline API.
As always with this kind of magic, auto-serializability of lambdas has its flipside: it is easy to overlook what’s going on.
If the lambda references a variable in the outer scope, the variable is
captured and must also be serializable. If it references an instance
variable of the enclosing class, it implicitly captures this
so the
entire class will be serialized. For example, this will fail because
JetJob1
doesn’t implement Serializable
:
class JetJob1 {
private String instanceVar;
Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("input"))
.filter(item -> item.equals(instanceVar)); (1)
return p;
}
}
1 | Refers to instanceVar , capturing this , but JetJob1 is not
Serializable so this call will fail. |
Just adding implements Serializable
to JetJob1
would be a viable
workaround here. However, consider something just a bit different:
class JetJob2 implements Serializable {
private String instanceVar;
private OutputStream fileOut; (1)
Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("input"))
.filter(item -> item.equals(instanceVar)); (2)
return p;
}
}
1 | A non-serializable field. |
2 | Refers to instanceVar , capturing this . JetJob2 is declared as
Serializable , but has a non-serializable field and this fails. |
Even though we never refer to fileOut
, we are still capturing the
entire JetJob2
instance. We might mark fileOut
as transient
, but
the sane approach is to avoid referring to instance variables of the
surrounding class. We can simply achieve this by assigning to a local
variable, then referring to that variable inside the lambda:
class JetJob3 {
private String instanceVar;
Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
String findMe = instanceVar; (1)
p.drawFrom(Sources.list("input"))
.filter(item -> item.equals(findMe)); (2)
return p;
}
}
1 | Declare a local variable that loads the value of the instance field. |
2 | By referring to the local variable findMe we avoid capturing
this and the job runs fine. |
Another common pitfall is capturing an instance of DateTimeFormatter
or a similar non-serializable class:
DateTimeFormatter formatter = DateTimeFormatter
.ofPattern("HH:mm:ss.SSS")
.withZone(ZoneId.systemDefault());
Pipeline p = Pipeline.create();
BatchStage<Long> src = p.drawFrom(Sources.list("input"));
src.map((Long tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp))); (1)
1 | Captures the non-serializable formatter , so this fails. |
Sometimes we can get away by using one of the preconfigured formatters available in the JDK:
src.map((Long tstamp) -> DateTimeFormatter.ISO_LOCAL_TIME (1)
.format(Instant.ofEpochMilli(tstamp).atZone(ZoneId.systemDefault())));
1 | Accesses the static final field ISO_LOCAL_TIME . Static fields are
not subject to lambda capture, they are dereferenced when the code runs
on the target machine. |
This refers to a static final
field in the JDK, so the instance is
available on any JVM. If this is not available, you may create a
static final field in your own class, but you can also use
mapUsingContext()
. In this case you provide a serializable factory
that Jet will ask to create an object on the target member. The object
it returns doesn’t have to be serializable. Here’s an example of that:
Pipeline p = Pipeline.create();
BatchStage<Long> src = p.drawFrom(Sources.list("input"));
ContextFactory<DateTimeFormatter> contextFactory = ContextFactory.withCreateFn( (1)
x -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
.withZone(ZoneId.systemDefault()));
src.mapUsingContext(contextFactory, (2)
(formatter, tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp))); (3)
1 | Create a ContextFactory . |
2 | Supply it to mapUsingContext() . |
3 | Your mapping function now gets the object the factory created. |
3.4. Submit a Jet Job and Manage its Lifecycle
This is how you submit a Jet pipeline for execution:
Pipeline pipeline = buildPipeline();
jet.newJob(pipeline).join();
If you want to submit a Core API DAG, the syntax is identical:
DAG dag = buildDag();
jet.newJob(dag).join();
Job submission is a fire-and-forget action: once a client submits it,
the job has a life of its own independent of the submitter. It can
disconnect and any other client or Jet member can obtain its own Job
instance that controls the same job.
You can use the same API to submit a job from either a client machine
or directly on an instance of a Jet cluster member. The same Pipeline
or DAG
instance can be submitted for execution many times.
3.4.1. JobConfig
To gain more control over how Jet will run your job, you can pass in a
JobConfig
instance. For example, you can give your job a
human-readable name:
JobConfig cfg = new JobConfig();
cfg.setName("my-job");
jet.newJob(pipeline, cfg);
Please note that you can have a single active job, i.e., running / suspended /
waiting to be run, in the cluster with the same name. You get
JobAlreadyExistsException
if you make another submission. You can reuse
the job name after the active job is completed.
If you have a microservice deployment where each package contains a jet member
and makes the same job submission, you can ensure that the job runs only once
by using the newJobIfAbsent()
API. If a named job is submitted with this API,
Jet does not create a second job if there is already an active job
with the same name. Instead, it returns you a reference for the active job,
as shown in the following snippet.
Pipeline pipeline = buildPipeline();
JobConfig cfg = new JobConfig();
cfg.setName("my-job");
Job job1 = jet.newJobIfAbsent(pipeline, cfg);
Job job2 = jet.newJobIfAbsent(pipeline, cfg);
assert job1.getId() == job2.getId();
3.4.2. Remember that a Jet Job is Distributed
The API to submit a job to Jet is in a way deceptively simple: "just call a method". As long as you’re toying around with Jet instances started locally in a single JVM, everything will indeed work. However, as soon as you try to deploy to an actual cluster, you’ll face the consequences of the fact that your job definition must travel over the wire to reach remote members which don’t have your code on their classpath.
Your custom code must be packaged with the Jet job. For simple examples you can have everything in a single class and use code like this:
class JetExample {
static Job createJob(JetInstance jet) {
JobConfig jobConfig = new JobConfig();
jobConfig.addClass(JetExample.class);
return jet.newJob(buildPipeline(), jobConfig);
}
static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
// ...
return p;
}
}
If you forget to do this, or don’t add all the classes involved, you may get a quite confusing exception:
java.lang.ClassCastException:
cannot assign instance of java.lang.invoke.SerializedLambda
to field com.hazelcast.jet.core.ProcessorMetaSupplier$1.val$addressToSupplier
of type com.hazelcast.jet.function.FunctionEx
in instance of com.hazelcast.jet.core.ProcessorMetaSupplier$1
SerializedLambda
actually declares readResolve()
, which would
normally transform it into an instance of the correct functional
interface type. If this method throws an exception, Java doesn’t report
it but keeps the SerializedLambda
instance and continues the
deserialization. Later in the process it will try to assign it to
a field whose type is the target type of the lambda
(FunctionEx
in the example above) and at that point it will
fail with the ClassCastException
. If you see this kind of error,
double-check the list of classes you have added to the Jet job.
For more complex jobs it will become more practical to first package the job in a JAR and then use a command-line utility to submit it, as explained next.
3.4.3. Submit a Job from the Command Line
Jet comes with the jet.sh
script, which allows you to submit a
Jet job packaged in a JAR file. You can find it in the Jet distribution
zip file, in the bin
directory. On Windows use jet.bat
. To use
it, follow these steps:
-
Write your
main()
method and your Jet code the usual way, but callJetBootstrap.getInstance()
instead ofJet.newJetClient()
to acquire a Jet client instance. -
Create a runnable JAR which declares its
Main-Class
inMANIFEST.MF
. -
Run your JAR, but instead of
java -jar jetjob.jar
usejet.sh submit jetjob.jar
. -
The script will create a Jet client and configure it from
hazelcast-client.xml
located in theconfig
directory of Jet’s distribution. Adjust that file to suit your needs.
For example, write a class like this:
class CustomJetJob {
public static void main(String[] args) {
JetInstance jet = JetBootstrap.getInstance();
jet.newJob(buildPipeline()).join();
}
static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
// ...
return p;
}
}
After building the JAR, submit the job:
$ jet.sh submit jetjob.jar
3.4.4. Manage a Submitted Job
jet.newJob()
and jet.getJob(jobId)
return a
Job object, which you can use to monitor the
job and change its status:
-
Job.suspend: the job will stop running, but its metadata will be kept and it can be resumed later. Use this for example if you want to restart the members one by one and you don’t want the job to restart multiple times in the meantime
-
Job.resume: resumes a previously suspended job
-
Job.restart: stops and restarts the job to make use of new members added to the cluster (if automatic scaling is disabled)
-
Job.cancel: the job will stop running and will be marked as completed. It cannot be restarted later
You can also get the job’s name, configuration, and submission time via
job.getName()
, job.getConfig()
, and job.getSubmissionTime()
.
job.getStatus()
will give you the current status of the job (running,
failed, completed etc.). You can also call job.getFuture()
to block
until the job completes and then get its final outcome (either success
or failure).
Jet does not support canceling the job with future.cancel()
, instead
you must call job.cancel()
. This is due to the mismatch in the
semantics between future.cancel()
on one side and job.cancel()
plus
job.getStatus()
on the other: the future immediately transitions to
“completed by cancellation”, but it will take some time until the
actual job in the cluster changes to that state. Not to confuse the
users with these differences we decided to make future.cancel()
fail
with an exception.
3.4.5. Job Upgrade
Note: Job Upgrade is only available in Jet Enterprise
Jet allows you to upgrade a job while preserving its state. This is useful for bug fixing or to improve the application.
The new pipeline has to be compatible with the old one, for more details about what you can change in the pipeline, see Update a DAG Without Losing the State.
Job Upgrades use the snapshots. The Job state is exported to the snapshot and the new job version starts from from the snapshot.
If you want to preserve the state of the cancelled job, use
cancelAndExportSnapshot
:
job.cancelAndExportSnapshot("foo-snapshot");
You can also export the state and keep the job running. Both old and new version can then run in parallel. This is useful e.g. for an A/B testing.
job.exportSnapshot("foo-snapshot");
Then use the exported snapshot to submit a new job. The Job will use the state in the snapshot as its initial state:
Pipeline updatedPipeline = Pipeline.create();
// create the pipeline...
JobConfig jobConfig = new JobConfig();
jobConfig.setInitialSnapshotName("foo-snapshot");
instance.newJob(updatedPipeline, jobConfig);
The exported snapshots used for Job Upgrades differ from the snapshots used for the Fault Tolerance. Exported snapshots aren’t deleted automatically by Jet. Therefore you can also use the exported snapshot as a point of recovery. When you no longer need the exported snapshot, delete it.
3.4.6. Get a List of All Submitted Jobs
Jet keeps an inventory of all the jobs submitted to it, including those
that have already completed. Access the full list with jet.getJobs()
.
You can use any Job
instance from that list to monitor and manage a
job, whether it was you or some other client that submitted it.
This example tells you what Jet has been up to in the last five minutes:
int total = 0;
int completed = 0;
int failed = 0;
int inProgress = 0;
long fiveMinutesAgo = System.currentTimeMillis() - MINUTES.toMillis(5);
for (Job job : jet.getJobs()) {
if (job.getSubmissionTime() < fiveMinutesAgo) {
continue;
}
total++;
switch (job.getStatus()) {
case COMPLETED:
completed++;
break;
case FAILED:
failed++;
break;
default:
inProgress++;
}
}
System.out.format(
"In the last five minutes %d jobs were submitted to Jet, of "
+ "which %d already completed, %d jobs failed, and %d jobs "
+ "are still running.%n",
total, completed, failed, inProgress);
To only return all jobs submitted with a particular name, you can call
jet.getJobs(name)
, or jet.getJob(name)
to get just the latest one.
Here’s how you can check the statistics on a job named my-job
:
List<Job> myJobs = jet.getJobs("my-job");
long failedCount = myJobs.stream().filter(j -> j.getStatus() == FAILED).count();
System.out.format("Jet ran 'my-job' %d times and it failed %d times.%n",
myJobs.size(), failedCount);
Note: data about completed jobs are evicted after 7 days.
3.5. Configure Fault Tolerance
Jet has features to allow an unbounded stream processing job to proceed correctly in the face of Jet members failing and leaving the cluster.
Jet takes snapshots of the entire state of the computation at regular intervals. It coordinates the snapshot across the cluster and synchronizes it with a checkpoint on the data source. The source must ensure that, in the case of a restart, it will be able to replay all the data it emitted after the last checkpoint. Each of the other components in the job will restore its processing state to exactly what it was at the time of the last snapshot. If a cluster member goes away, Jet will restart the job on the remaining members, rewind the sources to the last checkpoint, restore the state of processing from the last snapshot, and then seamlessly continue from that point.
3.5.1. Exactly-Once
"Exactly-once processing" means the output is consistent with processing each stream item exactly once. This is the ultimate guarantee you can ask for.
As of version 0.6, Hazelcast Jet supports exactly-once processing with
the source being either a Hazelcast IMap
journal or a Kafka topic,
and the sink being a Hazelcast IMap
.
If you configure Jet for exactly-once but use Kafka as the sink, after a job restart you may get duplicates in the output. As opposed to doubly processing an input item, this is more benign because it just means getting the exact same result twice.
3.5.2. At-Least-Once
"At-least-once processing" means the output is consistent with processing each stream item at least once. Some items may get processed again after a restart, as if they represented new events. This is a lesser guarantee that Jet can deliver at a higher throughput and lower latency. In some cases it is good enough.
In some other cases, however, duplicate processing of data items can have quite surprising consequences. There is more information about this in our Jet Concepts chapter.
3.5.3. Enable Fault Tolerance
Fault tolerance is off by default. To activate it for a job, create a
JobConfig
object and set the
processing guarantee.
You can also configure
snapshot interval.
JobConfig jobConfig = new JobConfig();
jobConfig.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
jobConfig.setSnapshotIntervalMillis(SECONDS.toMillis(10));
Using less frequent snapshots, more data will have to be replayed and the temporary spike in the latency of the output will be greater. More frequent snapshots will reduce the throughput and introduce more latency variation during regular processing.
3.5.4. Automatic Elasticity
You can configure the behavior of what will happen when members are added or removed from the cluster.
-
If auto-scaling is enabled and a member is added or removed, the job will automatically restart. In case of member addition, it will restart after a delay.
-
If auto-scaling is disabled and a member is added, Jet takes no action and the job will not use the added member; you have to manually restart it. If a member is removed (after a shutdown or a failure), Jet suspends the job. You have to manually resume it.
By default, auto-scaling is enabled.
3.5.5. Level of Safety
Jet doesn’t delegate its fault tolerance to an outside system, it backs
up the state to its own IMap
objects. IMap
is a replicated
in-memory data structure, storing each key-value pair on a configurable
number of cluster members. By default it will make a single backup
copy, resulting in a system that tolerates the failure of a single
member at a time. You can tweak this setting when starting Jet, for
example increase the backup count to two:
JetConfig config = new JetConfig();
config.getInstanceConfig().setBackupCount(2);
JetInstance instance = Jet.newJetInstance(config);
Note: if multiple members are lost simultaneously, some data from the backing IMaps can be lost. This is not currently checked and the job will restart with some state data from the snapshot missing, or it might fail if classpath resources were added and are missing. We plan to address this in future releases.
3.5.6. Split-Brain Protection
A specific kind of failure is a so-called "split brain". It happens on network failure when a member or members think the other members left the cluster, but in fact they still run, but don’t see each other over the network. Now we have two or more fully-functioning Jet clusters where there was supposed to be one. Each one will recover and restart the same Jet job, making it to run multiple times.
Hazelcast Jet offers a mechanism to reduce this hazard: split-brain protection. It works by ensuring that a job can be restarted only in a cluster whose size is more than half of what it was before the job was suspended. Enable split-brain protection like this:
jobConfig.setSplitBrainProtection(true);
If there’s an even number of members in your cluster, this may mean the job will not be able to restart at all if the cluster splits into two equally-sized parts. We recommend having an odd number of members.
Note also that you should ensure there is no split-brain condition at the moment you are introducing new members to the cluster. If that happens, both sub-clusters may grow to more than half of the previous size. This will defuse the split-brain protection mechanism.
3.6. Configure Lossless Cluster Restart (Enterprise Only)
The fault tolerance feature we described in the previous section is entirely RAM-based, the snapshotted state isn’t persisted. Given the redundancy present in the cluster, this is sufficient to maintain a running cluster across single-node failures (or multiple-node, depending on the backup count), but it doesn’t cover the case when the entire cluster must shut down.
Hazelcast IMDG Enterprise HD has the Hot Restart Persistence feature which Jet uses to allow you to gracefully shut down the cluster at any time and have the snapshot data of all the jobs preserved. After you restart the cluster, Jet automatically restores the data and resumes the jobs.
Hot Restart Persistence works by persisting the RAM-based snapshot data. Even with the fastest solid-state storage, this dramatically reduces the maximum snapshotting throughput available to Jet: while DRAM can take up to 50 GB/s, a high-end storage device caps at 2 GB/s. Keep in mind that this is throughput per member, so even on a minimal cluster of 3 machines, this is actually 6 GB/s available to Jet. Also, this throughput limit does not apply to the data going through the Jet pipeline, but only to the periodic saving of the state present in the pipeline. Typically the state scales with the number of distinct keys seen within the time window.
Tu use Lossless Cluster Restart, enable it in hazelcast-jet.xml
:
<hazelcast-jet xmlns="http://www.hazelcast.com/schema/jet-config">
<instance>
<lossless-restart-enabled>false</lossless-restart-enabled>
</instance>
</hazelcast-jet>
To set the base directory where the data will be stored, configure Hot Restart in the IMDG configuration:
<hazelcast xmlns="http://www.hazelcast.com/schema/config">
<hot-restart-persistence enabled="true">
<base-dir>/mnt/hot-restart</base-dir>
<parallelism>2</parallelism>
</hot-restart-persistence>
</hazelcast>
Quick programmatic example:
JetConfig cfg = new JetConfig();
cfg.getInstanceConfig().setLosslessRestartEnabled(true);
cfg.getHazelcastConfig().getHotRestartPersistenceConfig()
.setEnabled(true)
.setBaseDir(new File("/mnt/hot-restart"))
.setParallelism(2);
To have the cluster shut down gracefully, as required for this feature to work, do not kill the Jet instances one by one. As you are killing them, the cluster starts working hard to rebalance the data on the remaining members. This usually leads to out-of-memory failures and the loss of all data.
The entire cluster can be shut down gracefully in several ways:
-
on the command line:
$ cluster.sh -o shutdown
-
using the Jet Management Center
-
programmatically:
jet.getCluster().shutdown();
The first two ways make use of Jet’s REST endpoint, which isn’t enabled by default. You can enable it by setting the property
hazelcast.rest.enabled=true
This can be either a system property (-Dhazelcast.rest.enabled=true
)
or a property in the Hazelcast Jet configuration:
<hazelcast-jet xmlns="http://www.hazelcast.com/schema/jet-config"> <properties> <property name="hazelcast.rest.enabled">true</property> </properties> </hazelcast-jet>
or
JetConfig config = new JetConfig();
config.getProperties().setProperty("hazelcast.rest.enabled", "true");
Since the Hot Restart data is saved locally on each member, all the members must be present after the restart for Jet to be able to reload the data. Beyond that, there’s no special action to take: as soon as the cluster re-forms, it will automatically reload the persisted snapshots and resume the jobs.
3.7. Performance Considerations
3.7.1. Standard Java Serialization is Slow
When it comes to serializing the description of a Jet job, performance is not critical. However, for the data passing through the pipeline, the cost of the serialize-deserialize cycle can easily dwarf the cost of actual data transfer, especially on high-end LANs typical for data centers. In this context the performance of Java serialization is so poor that it regularly becomes the bottleneck. This is due to its heavy usage of reflection, overheads in the serialized form, etc.
Since Hazelcast IMDG faced the same problem a long time ago, we have
mature support for optimized custom serialization and in Jet you can
use it for stream data. In essence, you must implement a
StreamSerializer
for the objects you emit from your processors and
register it in Jet configuration:
SerializerConfig serializerConfig = new SerializerConfig()
.setImplementation(new MyItemSerializer())
.setTypeClass(MyItem.class);
JetConfig config = new JetConfig();
config.getHazelcastConfig().getSerializationConfig()
.addSerializerConfig(serializerConfig);
JetInstance jet = Jet.newJetInstance(config);
Consult the chapter on custom serialization in Hazelcast IMDG’s reference manual for more details.
Note the limitation implied here: the serializers must be registered with Jet on startup because this is how it is supported in Hazelcast IMDG. There is a plan to improve this and allow serializers to be registered on individual Jet jobs.
3.7.2. Sharing Data Between Pipeline Stages
Jet only serializes an item when it actually sends it to another member. If the target processor is on the same member, it will receive the exact same instance. To catch (de)serialization issues early on, we recommend using a 2-member local Jet cluster for development and testing.
Sharing the same objects between stages is good for performance, but it places a higher degree of responsibility on the developer. You can follow these rules of thumb to avoid concurrency bugs:
-
don’t use an item after emitting it
-
don’t mutate an item you received
Working with immutable objects has its price: you incur allocation and GC pressure by creating the copies and you waste additional CPU time copying the data. This is why we also present fine-grained rules you must follow to stay safe while sharing the data between pipeline stages.
1. Never mutate an item you emitted.
It is never safe to mutate an item you emitted. The downstream stages will see your changes and lose the original data you emitted, at best. Since the changes are concurrent, they may also see completely broken data, if the object is not thread-safe.
One use case where it’s especially tempting to mutate the item after
emitting is rolling aggregation. Consider this example where we use
mapUsingContext
to implement a custom rolling aggregation:
Pipeline p = Pipeline.create();
ContextFactory<List<String>> contextFactory =
ContextFactory.withCreateFn(procCtx -> new ArrayList<>());
p.drawFrom(source)
.mapUsingContext(contextFactory, (list, item) -> {
// Don't do this!
list.add(item);
return list; (1)
});
1 | You keep emitting the same list, changing it in-place |
The downstream stage receives the same object each time, but it’s
supposed to see it in the state as it was sent. In reality, the
aggregating stage keeps changing the object concurrently while the
downstream stage tries to observe it. In the best case it will crash
with a ConcurrentModificationException
, but it may also observe null
or partially initialized objects in the list and a number of other bugs.
Jet’s AggregateOperation
has a primitive specifically devoted to
avoiding this problem: the
exportFn
.
It transforms the accumulator to a new object so the accumulator can be
safely updated afterwards. By contrast,
finishFn
is allowed to return the accumulator itself and Jet calls it only when
it knows the whole operation is done and the accumulator won’t be
modified again.
There’s another case where a data race can sneak up on you: a many-to-many hash-join, where you enrich an item with several items matching the same join key. Refer to the hash-join section for more details and an example.
2. If you keep using the item you emitted (for reading), don’t mutate it downstream.
There’s little reason to keep reading an item after emitting it, but if you ever have that situation, make sure you don’t mutate it in any downstream stage.
3. If you create a fork in the pipeline (send the output of one stage to several others), no stage after the fork may mutate the data.
Here’s an example with a stream of Person
items. We attach two mapping
stages to it: one reads the name and the other modifies it. The
resulting Jet pipeline will have a data race and behave erratically,
sometimes seeing the original name and sometimes the modified one:
class Person {
private String name;
String name() {
return name;
}
Person addToName(String suffix) {
name += suffix;
return this;
}
}
Pipeline p = Pipeline.create();
BatchStage<Person> sourceStage = p.drawFrom(personSource);
BatchStage<String> names = sourceStage
.map(person -> person.name()); (1)
// don't do this!
BatchStage<Person> juniors = sourceStage
.map(person -> person.addToName(" Jr.")); (2)
1 | Get the person’s name |
2 | Modify the person’s name |
3.7.3. Capacity of the Concurrent Queues
By default, Jet runs each internal DAG vertex, roughly equivalent to
each step of the computation (such as map
or aggregate
), at maximum
parallelism (equal to the number of CPU cores). This means that even a
single Jet job uses quite a lot of parallel tasks. Since Jet’s
cooperative tasklets are very cheap to switch between, there’s almost
no overhead from this. However, every pair of tasklets that communicate
uses a dedicated 1-to-1 concurrent queue so the number of queues scales
with the square of the number of CPU cores. The default queue capacity
is 1024, which translates to 4-8 kilobytes RAM overhead per tasklet pair
and potentially a lot of data items in flight before the queues fill up.
If you experience RAM shortage on the Jet cluster, consider lowering the queue size. This is how you set the default queue size for the whole Jet cluster:
JetConfig cfg = new JetConfig();
cfg.getDefaultEdgeConfig().setQueueSize(128);
JetInstance jet = Jet.newJetInstance(cfg);
You can also set queue sizes individually on each Core API DAG edge. You must first convert your pipeline to the Core DAG, apply the configuration, and then submit the DAG for execution:
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("a")).setName("source")
.map(String::toLowerCase)
.drainTo(Sinks.list("b"));
DAG dag = p.toDag();
dag.getOutboundEdges("source").get(0)
.setConfig(new EdgeConfig().setQueueSize(128));
jet.newJob(dag);
3.8. Monitor Execution and Diagnose Problems
3.8.1. Configure Logging
Jet, like Hazelcast IMDG, does not depend on a specific logging
framework and has built-in adapters for a variety of logging frameworks.
You can also write a new adapter to integrate with loggers Jet doesn’t
natively support. To use one of the built-in adapters, set the
hazelcast.logging.type
property to one of the following:
-
jdk
: java.util.logging (default) -
log4j
: Apache Log4j -
log4j2
: Apache Log4j 2 -
slf4j
: SLF4J -
none
: Turn off logging
For example, to configure Jet to use Log4j, you can do one of the following:
System.setProperty("hazelcast.logging.type", "log4j");
or
JetConfig config = new JetConfig();
config.getHazelcastConfig()
.setProperty("hazelcast.logging.type", "log4j");
For more detailed information about how to configure logging, please refer to the IMDG reference manual.
3.8.2. Inspect Output of Individual Stages
While debugging your pipeline you’ll want to see the output of an
individual stage. You can achieve it by using the
peek()
stage. For example:
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("inputList"))
.flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
.filter(word -> !word.isEmpty())
.peek() (1)
.groupingKey(wholeItem())
.aggregate(counting())
.drainTo(Sinks.map("counts"));
1 | Logs all the word tokens emitted by the filtering stage |
If you run it like this:
JetInstance jet = Jet.newJetInstance();
try {
jet.getList("inputList")
.addAll(asList("The quick brown fox", "jumped over the lazy dog"));
jet.newJob(p).join();
} finally {
Jet.shutdownAll();
}
this is how your output may look:
Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#1 INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: quick Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#2 INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: brown Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#0 INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: the Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#4 INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: dog Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#3 INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: lazy Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#0 INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: jumped Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#2 INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: the Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#1 INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: over Mar 20, 2018 2:43:24 PM com.hazelcast.jet.impl.processor.PeekWrappedP.filter#3 INFO: [192.168.5.12]:5702 [jet] [0.7-SNAPSHOT] Output to ordinal 0: fox
The logger name of
com.hazelcast.jet.impl.processor.PeekWrappedP.filter#1
consists of the
following parts:
-
com.hazelcast.jet.impl.processor.PeekWrappedP
: the type of the processor writing the log message -
filter
: the name of the vertex the processor belongs to -
#0
: index of the processor within the vertex. The index is unique cluster-wide.
For more information about logging when using the Core API, see the Best Practices section.
3.8.3. Monitor Metrics
Jet exposes various metrics to facilitate monitoring of the cluster state and of running jobs.
Metrics have associated tags which describe which object the metric applies to. The tags for job metrics typically indicate the specific vertex and processor the metric belongs to.
Each metric instance provided belongs to a particular Jet cluster member, so different cluster members can have their own versions of the same metric with different values.
The metric collection runs in regular intervals on each member, but note that the metric collection on different cluster members happens at different moments in time. So if you try to correlate metrics from different members, they can be from different moments of time.
There are two broad categories of Jet metrics. For clarity we will group them based on significant tags which define their granularity.
Last but not least let’s not forget about the fact that each Jet member is also an IMDG member, so Jet also exposes all the metrics available in IMDG.
Let’s look at these 3 broad categories of metrics in detail.
IMDG Metrics
There is a wide range of metrics and statistics provided by IMDG:
-
statistics of distributed data structures (see IMDG Reference Manual)
-
executor statistics (see IMDG Reference Manual)
-
partition related statistics (state, migration, replication)
-
garbage collection statistics (see IMDG API Docs)
-
memory statistics for the JVM which current IMDG member belongs to (total physical/free OS memory, max/committed/used/free heap memory and max/committed/used/free native memory)
-
network traffic related statistics (traffic and queue sizes)
-
class loading related statistics
-
thread count information (current, peak and deamon thread counts)
Jet Cluster metrics
Names | Main tags |
---|---|
blockingWorkerCount: The number of non-cooperative workers employed. |
none |
iterationCount: The total number of iterations the driver of tasklets in cooperative thread N made. It should increase by at least 250 iterations/s. Lower value means some of the cooperative processors blocks for too long. Somewhat lower value is normal if there are many tasklets assigned to the processor. Lower value affects the latency. taskletCount: The number of assigned tasklets to cooperative thread N. |
cooperativeWorker=<N> |
Jet Job metrics
All job specific metrics have their job=<jobId>
,
exec=<executionId>
and
vertex=<vertexName>
tags set. This means
that all these metrics will have at least one instance for each vertex of each current job
execution.
Additionally, if the vertex sourcing them is a data source or data sink, then the
source
or
sink
tags will also be set to true
.
Names | Main tags |
---|---|
distributedBytesIn: Total number of bytes received from remote
members. These values are only present for distributed edges, they only account for data actually transmitted over the network between members. This numbers include watermarks, snapshot barriers etc. |
ordinal=<N> |
topObservedWm: This value is equal to the highest coalescedWm on
any input edge of this processor. queuesCapacity: The total capacity of input queues. All input queues for all edges to the processor are summed in the above two metrics. If size is close to capacity, backpressure is applied and this processor is a bottleneck. Only input edges with equal priority are summed. If the processor has input edges with different priority, only edges with the highest priority will be reflected, after those are exhausted edges with the next lower priority will be reflected and so on. |
proc=<N>, ordinal=<not specified> |
topObservedWm: The highest received watermark from any input on
edge N. emittedCount: The number of emitted items. This number includes
watermarks, snapshot barriers etc. Unlike distributedItemsOut, it
includes items emitted items to local processors. |
proc=<N>, ordinal=<M> |
numInFlightOps: The number of pending (in flight) operations when using
asynchronous flat-mapping processors. totalKeys : The number of active keys being tracked by a session window
processor. totalFrames : The number of active frames being tracked by a sliding window
processor. lateEventsDropped : The number of late events dropped by various
processor, due to the watermark already having passed their windows. |
proc=<N>, procType=<set> |
Exposing metrics
The main method Jet has for exposing the metrics to the outside world is the
JVM’s standard JMX interface. Since Jet 3.2 there is also an alternative to JMX
for monitoring metrics, via the Job API
, albeit only the job-specific ones.
Over JMX
Jet exposes all of its metrics using the JVM’s standard JMX interface. You can use
tools such as Java Mission Control or JConsole to display them. All
Jet-related beans are stored under
com.hazelcast.jet/Metrics/<instanceName>/
node and the various tags they
have form further sub-nodes in the resulting tree structure.
IMDG metrics are stored under the com.hazelcast/Metrics/<instanceName>/
node.
Via Job API
The Job
class has a
getMetrics()
method which returns
a JobMetrics
instance. It
contains the latest known metric values for the job.
This functionality has been developed primarily for giving access to metrics of finished jobs, but can in fact be used for jobs in any state.
While the job is running, the metric values are updated periodically (according to a configured collection interval), assuming that both generic metrics functionality and job metrics are enabled. Otherwise empty metrics will be returned.
When a job is restarted (or resumed after being previously suspended), the metrics are reset; their values will reflect only updates from the latest execution of the job.
Once a job stops executing (successfully, after a failure, cancellation, or temporarily while suspended) the metrics will have values taken at the moment just before the job completed), assuming that metrics storing was enabled. Otherwise, empty metrics will be returned.
For details on how to use and filter the metric values consult the
JobMetrics API docs. A
simple example for computing the number of data items emitted by a
certain vertex (let’s call it vertexA
), excluding items emitted to the
snapshot, would look like this:
Predicate<Measurement> vertexOfInterest =
MeasurementPredicates.tagValueEquals(MetricTags.VERTEX, "vertexA");
Predicate<Measurement> notSnapshotEdge =
MeasurementPredicates.tagValueEquals(MetricTags.ORDINAL, "snapshot").negate();
Collection<Measurement> measurements = jobMetrics
.filter(vertexOfInterest.and(notSnapshotEdge))
.get(MetricNames.EMITTED_COUNT);
long totalCount = measurements.stream().mapToLong(Measurement::getValue).sum();
Configuration
The metrics collection is enabled by default. You can configure it
using the hazelcast-jet.xml
file:
<metrics enabled="true" jmxEnabled="true">
<!-- The number of seconds the metrics will be retained on
the instance -->
<retention-seconds>5</retention-seconds>
<!-- The metrics collection interval in seconds -->
<collection-interval-seconds>5</collection-interval-seconds>
<!-- whether metrics should be collected for data structures.
Metrics collection can have some overhead if there is a
large number of data structures -->
<metrics-for-data-structures>false</metrics-for-data-structures>
</metrics>
or using JetConfig
object:
JetConfig jetConfig = new JetConfig();
jetConfig.getMetricsConfig()
.setEnabled(true)
.setJmxEnabled(true)
.setRetentionSeconds(5)
.setCollectionIntervalSeconds(5)
.setMetricsForDataStructuresEnabled(false);
See MetricsConfig API docs for available methods.
3.9. Jet Command Line Tool
Jet comes with a command line tool, jet.sh
which you can use to submit
jobs and do basic management of their lifecycle. It supports these
commands:
Command | Description |
---|---|
|
Shows current cluster state and information about members |
|
Lists running jobs on the cluster |
|
Submits a job to the cluster |
|
Cancels a running job |
|
Suspends a running job |
|
Resumes a suspended job |
|
Restarts a running job |
|
Lists saved snapshots on the cluster |
|
Saves a named snapshot from a job (Jet Enterprise only) |
|
Deletes a named snapshot |
The command line uses the Jet client under the hood and thus needs to be
configured with connection information to the cluster. By default,
the command line tools uses the hazelcast-client.xml
found in the
config
path of the distribution. It’s also possible to specify
connection parameters explicitly. For example, to list all the
jobs running on the cluster you can use the following command:
bin/jet.sh -a 192.168.0.1:5701,192.168.0.1:5702 -g my-group list-jobs
This command would try to connect to either 192.168.0.1:5701
or 192.168.0.2:5702
using group name my-group
and will list
all the running jobs on the cluster. If you want to use the tool
with a different client.xml
you can do it as follows:
bin/jet.sh -f client.xml list-jobs
For more details on how to use the command line tool please refer to its own
documentation which can be retrieved by the command jet.sh --help
3.10. Management Center
Hazelcast Jet Management Center is a management and monitoring suite providing a live overview of the Hazelcast Jet cluster. Management Center includes a tool for diagnosing data flow within the running Hazelcast Jet job. It provides a visualization of the computational stages and allows you to peek into the stats across the data flow graph enabling you to diagnose bottlenecks.
Please refer to the Hazelcast Jet Management Center Reference Manual for installation and usage instructions.
4. The Pipeline API
4.1. The Shape of a Pipeline
The general shape of any data processing pipeline is drawFromSource →
transform → drainToSink
and the natural way to build it is from source
to sink. The Pipeline API follows this
pattern. For example,
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("input"))
.map(String::toUpperCase)
.drainTo(Sinks.list("result"));
In each step, such as drawFrom
or drainTo
, you create a pipeline
stage. The stage resulting from a drainTo
operation is called a
sink stage and you can’t attach more stages to it. All others are
called compute stages and expect you to attach stages to them.
The API differentiates between batch (bounded) and stream (unbounded)
sources and this is reflected in the naming: there is a BatchStage
and a StreamStage
, each offering the operations appropriate to its
kind. In this section we’ll mostly use batch stages, for simplicity,
but the API of operations common to both kinds is identical. We’ll
explain later on how to apply windowing, which is necessary to aggregate
over unbounded streams.
Your pipeline can consist of multiple sources, each starting its own pipeline branch, and you are allowed to mix both kinds of stages in the same pipeline. You can merge the branches with joining transforms. For example, the hash-join transform can join a stream stage with batch stages:
Pipeline p = Pipeline.create();
StreamStage<Trade> trades = p.drawFrom(tradeStream())
.withoutTimestamps();
BatchStage<Entry<Integer, Product>> products =
p.drawFrom(Sources.map("products"));
StreamStage<Tuple2<Trade, Product>> joined = trades.hashJoin(
products,
joinMapEntries(Trade::productId),
Tuple2::tuple2
);
Symmetrically, you can fork the output of a stage and send it to more than one destination:
Pipeline p = Pipeline.create();
BatchStage<String> src = p.drawFrom(Sources.list("src"));
src.map(String::toUpperCase)
.drainTo(Sinks.list("uppercase"));
src.map(String::toLowerCase)
.drainTo(Sinks.list("lowercase"));
4.2. Choose Your Data Sources and Sinks
Hazelcast Jet has support for these data sources and sinks:
-
Hazelcast
IMap
andICache
, both as a batch source of just their contents and their event journal as an infinite source -
Hazelcast
IList
(batch) -
Hadoop Distributed File System (HDFS) (batch)
-
Java Database Connectivity (JDBC) (batch)
-
Java Messaging Services (JMS) queue and topic (infinite stream)
-
Kafka topic (infinite stream)
-
TCP socket (infinite stream)
-
a directory on the filesystem, both as a batch source of the current file contents and an infinite source of append events to the files
-
Apache Avro files (batch)
-
Amazon AWS S3 (batch)
-
any source/sink you create on your own by using the
SourceBuilder
and theSinkBuilder
.
You can access most of these via the
Sources
and
Sinks
utility classes.
Kafka,
HDFS,
S3 and
Avro connectors are in their
separate modules. The
source
and sink
builder factories are in their respective classes.
There’s a dedicated chapter that discusses the topic of data sources and sinks in more detail.
4.3. Clean Up and Normalize Input Using Stateless Transforms
The simplest kind of transformation is one that can be done on each item
individually and independent of other items. These are
map
,
filter
and
flatMap
.
We already saw them in use in the previous examples. map
transforms
each item to another item; filter
discards items that don’t match its
predicate; and flatMap
transforms each item into zero or more output
items. These transforms are most often used to perform simple cleanup of
the input stream by discarding unneeded data, adapting it to your data
model, flattening nested lists into the top-level stream, etc.
We won’t spend much time on these transforms here; you can refer to their Javadoc for finer detail.
4.4. Detect Patterns Using Stateful Transforms
When you start with basic mapping and add to it an arbitrary object available over the entire lifetime of the processing job, you get a quite general and powerful tool: stateful mapping.
The major use case of stateful mapping is recognizing a pattern in the event stream, such as matching start-transaction with end-transaction events based on an event correlation ID. More generally, you can implement any kind of state machine and detect patterns in the input of any complexity.
Instead of keeping just one state object, you can define a grouping key and keep a separate object per distinct key. This is what you need for the above example of matching on correlation ID.
When you define a grouping key, you should also consider key expiration. Usually a given key will occur in the input stream only for a while and then become stale. Jet offers you a way to specify the time-to-live (TTL) for keys. If more than TTL milliseconds (in terms of event time) pass without observing the same key again, Jet evicts the state object from memory. You can also provide a function Jet will call when discarding the object. On the example of transaction tracking, you can detect stuck transactions here and react to them.
Here’s the code sample that implements pattern matching on a stream of transaction-start and transaction-end events:
StreamStage<Entry<String, Long>> transactionOutcomes = eventStream() (1)
.groupingKey(TransactionEvent::transactionId)
.mapStateful(
SECONDS.toMillis(2),
() -> new TransactionEvent[2], (2)
(startEnd, transactionId, transactionEvent) -> { (3)
switch (transactionEvent.type()) {
case TRANSACTION_START:
startEnd[0] = transactionEvent;
break;
case TRANSACTION_END:
startEnd[1] = transactionEvent;
break;
default:
}
return (startEnd[0] != null && startEnd[1] != null)
? entry(transactionId, startEnd[1].timestamp() - startEnd[0].timestamp())
: null;
},
(startEnd, transactionId, wm) -> (4)
(startEnd[0] == null || startEnd[1] == null)
? entry(transactionId, TIMED_OUT)
: null
);
1 | transactionOutcomes emits pairs
(transactionId, transactionDurationMillis) . Transaction duration can
also be the special value TIMED_OUT . |
2 | This function creates the state object: an array of
TransactionEvent s. The first slot is for the start event, the second
for the end event. |
3 | The mapping function takes the state, the grouping key, and the input item. It places the event into the appropriate array slot and emits the output if both slots are now occupied. |
4 | This function reacts to state eviction. It takes the state object being evicted, the key associated with it, and the current watermark (i.e., current event time). If it detects that a slot in the array was left unoccupied, it outputs "transaction timed out". |
Another thing you can do with stateful mapping is rolling aggregation, like accumulating the sum of a quantity across events, or its extreme values. You can also achieve these with the dedicated rolling aggregation transform to reuse Jet’s ready-made aggregate operations. However, if you can’t find an existing aggregate operation to reuse, it’s usually simpler to implement it with stateful mapping.
There is one major caveat when considering to solve your problem with stateful mapping: it’s sensitive to the order of the events in the stream. If the sequence of events in the stream can be different from the order of their occurrence in the real world, your state-updating logic will have to account for that, possibly leading to an explosion in complexity.
4.5. Suppress Duplicates
The distinct
operation suppresses the duplicates from a stream. If you
perform it after adding a grouping key, it emits a single item for every
distinct key. The operation works on both batch and stream stages. In
the latter case it emits distinct items within a window. Two different
windows are processed independently.
In this example we have a batch of Person
objects and we choose an
arbitrary one from each 5-year age bracket:
Pipeline p = Pipeline.create();
BatchSource<Person> personSource = Sources.list("people");
p.drawFrom(personSource)
.groupingKey(person -> person.getAge() / 5)
.distinct()
.drainTo(Sinks.list("sampleByAgeBracket"));
4.6. Merge Streams
The merge
operation combines two pipeline stages in the simplest
manner: it just emits all the items from both stages. In this example
we merge the trading events from the New York and Tokyo stock exchanges:
Pipeline p = Pipeline.create();
IMap<Long, Trade> tradesNewYorkMap = instance.getMap("trades-newyork");
IMap<Long, Trade> tradesTokyoMap = instance.getMap("trades-tokyo");
StreamStage<Trade> tradesNewYork = p.drawFrom(
Sources.mapJournal(tradesNewYorkMap, mapPutEvents(),
mapEventNewValue(), START_FROM_CURRENT))
.withNativeTimestamps(5_000);
StreamStage<Trade> tradesTokyo = p.drawFrom(
Sources.mapJournal(tradesTokyoMap, mapPutEvents(),
mapEventNewValue(), START_FROM_CURRENT))
.withNativeTimestamps(5_000);
StreamStage<Trade> merged = tradesNewYork.merge(tradesTokyo);
4.7. Enrich Your Stream
As the data comes in, before you perform any reasoning on it, you must look up and attach to each item all the knowledge you have about it. If the data item represents a trade event, you want the data on the valuable being traded, the buyer, seller, etc. We call this step data enrichment.
Jet offers two basic techniques to enrich your data stream:
-
Hash-join the stream with one or more datasets that you ingest as bounded streams. If your enriching dataset doesn’t change for the duration of the Jet job, this is your best tool.
-
Directly look up the enriching data for each item by contacting the system that stores it. If you’re enriching an infinite stream and want to observe updates to the enriching dataset, you should use this approach.
4.8. Hash-Join
Hash-join is a kind of join operation optimized for the use case of data enrichment. It is like a many-to-one SQL JOIN that matches a foreign key in the stream-to-be-enriched with the primary key in the enriching dataset. You can perform several such joins in one operation, enriching your stream from arbitrarily many sources. You can also apply a many-to-many join on a non-unique key (see below).
The stream-to-be-enriched (we’ll call it the primary stream for short) can be an unbounded data stream. The enriching streams must be bounded and Jet will consume them in full before starting to enrich the primary stream. It will store their contents in hash tables for fast lookup, which is why we call this the "hash-join".
For each enriching stream you specify a pair of key-extracting functions: one for the enriching item and one for the primary item. This means that you can define a different join key for each of the enriching streams. The following example shows a three-way hash-join between the primary stream of stock trade events and two enriching streams: products and brokers:
Pipeline p = Pipeline.create();
// The primary stream (stream to be enriched): trades
IMap<Long, Trade> tradesMap = instance.getMap("trades");
StreamStage<Trade> trades = p.drawFrom(
Sources.mapJournal(tradesMap, mapPutEvents(),
mapEventNewValue(), START_FROM_CURRENT))
.withoutTimestamps();
// The enriching streams: products and brokers
BatchStage<Entry<Integer, Product>> prodEntries =
p.drawFrom(Sources.map("products"));
BatchStage<Entry<Integer, Broker>> brokEntries =
p.drawFrom(Sources.map("brokers"));
// Join the trade stream with the product and broker streams
StreamStage<Tuple3<Trade, Product, Broker>> joined = trades.hashJoin2(
prodEntries, joinMapEntries(Trade::productId),
brokEntries, joinMapEntries(Trade::brokerId),
Tuple3::tuple3
);
Products are joined on Trade.productId
and brokers on Trade.brokerId
.
joinMapEntries()
returns a JoinClause
, which is a holder of the
three functions that specify how to perform a join:
-
the key extractor for the primary stream’s item
-
the key extractor for the enriching stream’s item
-
the projection function that transforms the enriching stream’s item into the item that will be used for enrichment.
Typically the enriching streams will be Map.Entry
s coming from a
key-value store, but you want just the entry value to appear as the
enriching item. In that case you’ll specify Map.Entry::getValue
as the
projection function. This is what joinMapEntries()
does for you. It
takes just one function, primary stream’s key extractor, and fills in
Entry::getKey
and Entry::getValue
for the enriching stream key
extractor and the projection function, respectively.
You can also achieve a many-to-many join because the key in the enriching stream doesn’t have to be unique. If a given item’s foreign key matches several enriching items, Jet will produce another item in the output for each of them. If you’re joining with two enriching streams, and in both of them you have several matches, Jet will produce another output item for each combination of the enriching items (M x N items if there are M enriching items from one stream and N from the other).
If you’re doing many-to-many enrichment, never mutate the enriched item, otherwise you would break the rule of not mutating an item you emitted (see Sharing Data Between Pipeline Stages). For example:
StreamStage<Trade> joined = trades.hashJoin(
prodEntries,
joinMapEntries(Trade::productId),
Trade::setProduct (1)
);
1 | Sets the product on the trade object, returns this . Broken when
there are several products for one trade! |
With code as this and a many-to-many relationship between trades and
products, Jet will re-emit the same Trade
object several times, each
time setting a different product on it. This will create a data race
with the downstream stage, which may observe the same product several
times while missing the other ones.
The safe way to perform a many-to-many hash join is to either package
the individual objects into a tuple (as in the earlier examples) or to
create a new Trade
object each time.
In the interest of performance Jet pulls the entire enriching dataset into each cluster member. That’s why this operation is also known as a replicated join. This is something to keep in mind when estimating the RAM requirements for a hash-join operation.
4.8.1. Hash-Join With Four or More Streams Using the Builder
You can hash-join a stream with up to two enriching streams using the API we demonstrated above. If you have more than two enriching streams, you’ll use the hash-join builder. For example, you may want to enrich a trade with its associated product, broker, and market:
Pipeline p = Pipeline.create();
// The stream to be enriched: trades
IMap<Long, Trade> tradesMap = instance.getMap("trades");
StreamStage<Trade> trades = p.drawFrom(
Sources.mapJournal(tradesMap, mapPutEvents(),
mapEventNewValue(), START_FROM_CURRENT))
.withoutTimestamps();
// The enriching streams: products, brokers and markets
BatchStage<Entry<Integer, Product>> prodEntries =
p.drawFrom(Sources.map("products"));
BatchStage<Entry<Integer, Broker>> brokEntries =
p.drawFrom(Sources.map("brokers"));
BatchStage<Entry<Integer, Market>> marketEntries =
p.drawFrom(Sources.map("markets"));
// Obtain a hash-join builder object from the stream to be enriched
StreamHashJoinBuilder<Trade> builder = trades.hashJoinBuilder();
// Add enriching streams to the builder
Tag<Product> productTag = builder.add(prodEntries,
joinMapEntries(Trade::productId));
Tag<Broker> brokerTag = builder.add(brokEntries,
joinMapEntries(Trade::brokerId));
Tag<Market> marketTag = builder.add(marketEntries,
joinMapEntries(Trade::marketId));
// Build the hash join pipeline
StreamStage<Tuple2<Trade, ItemsByTag>> joined = builder.build(Tuple2::tuple2);
The data type on the hash-joined stage is Tuple2<Trade, ItemsByTag>
.
The next snippet shows how to use it to access the primary and enriching
items:
StreamStage<String> mapped = joined.map(
(Tuple2<Trade, ItemsByTag> tuple) -> {
Trade trade = tuple.f0();
ItemsByTag ibt = tuple.f1();
Product product = ibt.get(productTag);
Broker broker = ibt.get(brokerTag);
Market market = ibt.get(marketTag);
return trade + ": " + product + ", " + broker + ", " + market;
});
4.9. Enrich Using Direct Lookup
If you’re enriching an infinite stream, you most likely need to observe the changes that happen to the enriching dataset over the long timespan of the Jet job. In this case you can’t use the hash-join, which basically takes a snapshot of the enriching dataset at the beginning of the job. You may also encounter RAM shortage if your enriching dataset is very large.
The xUsingY
transforms (such as filterUsingContext
or mapUsingIMap
) can
enrich a stream by looking up from the original data source each time.
There’s direct support for Hazelcast maps and Jet
exposes the underlying machinery as well so you
can write your own code to join with an arbitrary external dataset.
4.9.1. Look Up from Hazelcast Map
Hazelcast Jet allows you to enrich your stream directly from a Hazelcast
IMap
or ReplicatedMap
. Since it must look up the data again for each
item, performance is lower than with a hash-join, but the
data is kept fresh this way. This matters especially for unbounded
streaming jobs, where a hash-join would use data frozen in time at the
beginning of the job.
If you enrich from a Hazelcast map (IMap
or ReplicatedMap
) that is
stored inside the Jet cluster, you can achieve data locality. For
ReplicatedMap
that’s trivial because its entire contents are present
on every cluster member. IMap
, on the other hand, is partitioned so a
given member holds only a part of the data. You must give Jet the
key-extracting function so it can do the following for each item in your
stream:
-
extract the lookup key
-
find its partition ID
-
send the item to the member holding the
IMap
data with that partition ID -
on the target member, use the lookup key again to fetch the enriching data
We didn’t completely avoid the network here, but we replaced the request-response cycle with just a one-way send. We eliminated the cost of the request-response latency and pay just the overhead of network transfer.
In this example we enrich a stream of trades with detailed stock info.
The stock info (the enriching dataset) is stored in a Hazelcast IMap
so we use groupingKey()
to let Jet partition our stream and use local
IMap
lookups:
IMap<String, StockInfo> stockMap = jet.getMap("stock-info"); (1)
StreamSource<Trade> tradesSource = Sources.mapJournal("trades",
mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT);
Pipeline p = Pipeline.create();
p.drawFrom(tradesSource)
.withoutTimestamps()
.groupingKey(Trade::ticker) (2)
.mapUsingIMap(stockMap, Trade::setStockInfo) (3)
.drainTo(Sinks.list("result"));
1 | Obtain the IMap from a Hazelcast Jet instance |
2 | Set the lookup key, this enables data locality |
3 | Enrich the stream by setting the looked-up StockInfo on Trade
items. This syntax works if the setter has fluent API and returns
this . |
In the example above the syntax looks very simple, but it hides a layer
of complexity: you can’t just fetch a Hazelcast map instance and add it
to the pipeline. It’s a proxy object that’s valid only in the JVM
process where you obtained it. mapUsingIMap
actually remembers
just the name of the map and will obtain a map with the same name from
the Jet cluster on which the job runs.
4.9.2. Look Up From an External System
Hazelcast Jet exposes the facility to look up from an external system.
In this case you must define a factory object that creates a client
instance and through which you will fetch the data from the remote
system. Jet will use the factory to create a context object for each
Processor
in the cluster that executes your transforming step.
The feature is available in two variants:
-
asynchronous: the functions return a
CompletableFuture
. This version provides higher throughput because Jet can issue many concurrent requests without blocking a thread -
synchronous: the functions return the result directly. Use it if your external service doesn’t provide an async API. It’s also useful if the context is, for example, a pre-loaded machine learning model which is only CPU-bound and doesn’t do any IO.
You can also specify a function that extracts a key from your items
using
groupingKey()
.
Even though Jet won’t do any lookup or grouping by that key, it will set
up the job so that all the items with the same key get paired with the
same context object.
For example, you may be fetching data from a remote Hazelcast cluster. To optimize performance, you can enable the near-cache on the client and you can save a lot of memory by specifying the key function: since Jet executes your enriching code in parallel, each worker has its own Hazelcast client instance, with its own near-cache. With the key function, each near-cache will hold a non-overlapping subset of the keys:
ContextFactory<IMap<String, StockInfo>> ctxFac = ContextFactory
.withCreateFn(x -> {
ClientConfig cc = new ClientConfig();
cc.getNearCacheConfigMap().put("stock-info",
new NearCacheConfig());
HazelcastInstance client = newHazelcastClient(cc);
IMap<String, StockInfo> map = client.getMap("stock-info");
return map;
})
.withLocalSharing();
StreamSource<Trade> tradesSource = Sources.mapJournal("trades",
mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT);
Pipeline p = Pipeline.create();
p.drawFrom(tradesSource)
.withoutTimestamps()
.groupingKey(Trade::ticker)
.mapUsingContextAsync(ctxFac,
(map, key, trade) -> toCompletableFuture(map.getAsync(key))
.thenApply(trade::setStockInfo))
.drainTo(Sinks.list("result"));
In a similar fashion you can integrate other external systems with a Jet pipeline.
4.9.3. Weak Consistency of Direct Lookup
When you use the xUsingY
transform to enrich an infinite stream, your
output will reflect the fact that the enriching dataset is changing. Two
items with the same key may be enriched with different data. This is
expected. However, the output may also behave in a surprising way. Say
you update the enriching data for some key from A to B. You expect to
see a sequence …A-A-B-B
… in the output, but when you sort it by
timestamp, you can observe any order, such as A-B-A-B
or B-B-A-A
.
This is because Jet doesn’t enforce the processing order to strictly
follow the timestamp order of events.
In effect, your lookups will be eventually consistent, but won’t have the monotonic read consistency. Currently there is no feature in Jet that would achieve monotonic reads while enriching an event stream from a changing dataset.
4.10. Group and Aggregate
Data aggregation is the cornerstone of distributed stream processing. It computes an aggregate function (simple examples: sum or average) over the data items. Typically you don’t want to aggregate all the items together, but classify them by some key and then aggregate over each group separately. This is the group-and-aggregate transformation. You can join several streams on a key and simultaneously group-aggregate them on the same key. This is the cogroup-and-aggregate transformation. If you’re processing an unbounded event stream, you must define a bounded window over the stream within which Jet will aggregate the data.
This is how a very simple batch aggregation without grouping may look
(the list named text
contains lines of text):
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("text"))
.aggregate(counting())
.drainTo(Sinks.list("result"));
We count all the lines and push the count to a list named “result”. Add this code to try it out:
JetInstance jet = Jet.newJetInstance();
jet.getList("text").addAll(asList("foo foo bar", "foo bar foo"));
jet.newJob(p).join();
jet.getList("result").forEach(System.out::println);
Jet.shutdownAll();
The program will print 2
because we added two items to the text
list.
To get cleaner output, without Jet’s logging, add
System.setProperty("hazelcast.logging.type", "none"); to the top.
|
Let’s count all the words instead:
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("text"))
.flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
.aggregate(counting())
.drainTo(Sinks.list("result"));
We split up the lines into words using a regular expression. The program
prints 6
now because there are six words in the input.
Now let’s turn this into something more insightful: a word frequency histogram, giving us for each distinct word the number of its occurrences:
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>list("text"))
.flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
.groupingKey(wholeItem())
.aggregate(counting())
.drainTo(Sinks.list("result"));
We added a grouping key, in this case it is the word itself
(Functions.wholeItem()
returns the identity function). Now
the program will perform the counting aggregation on each group of equal
words. It will print
bar=2 foo=4
What we’ve just built up is the classical Word Count task.
The definition of the aggregate operation hides behind the counting()
method call. This is a static method in our
AggregateOperations
utility class, which provides you with some predefined aggregate
operations. You can also implement your own aggregate operations; refer
to the section dedicated to this.
4.11. Correlate and Join Streams
In Jet you can join any number of streams on a common key, which can be anything you can calculate from the data item. This works great for correlating the data from two or more streams. In the same step you apply an aggregate function to all the grouped items, separated by the originating stream. The aggregate function can produce a summary value like an average or linear trend, but it can also keep a list of all the items, effectively avoiding actual aggregation. You can achieve any kind of join: inner/outer (full, left and right). The constraint, as already noted, is that the join condition cannot be fully general, but must amount to matching a computed key.
We have a full code sample at our code samples repository. Here we’ll show a simple example where we calculate the ratio of page visits to add-to-cart actions for each user. We accumulate the event counts and then do the maths on them in the end:
(1)
JetInstance jet = Jet.newJetInstance();
IList<PageVisit> pageVisits = jet.getList("pageVisit");
IList<AddToCart> addToCarts = jet.getList("addToCart");
IList<Entry<Integer, Double>> results = jet.getList("result");
(2)
pageVisits.add(new PageVisit(1));
pageVisits.add(new PageVisit(1));
addToCarts.add(new AddToCart(1));
pageVisits.add(new PageVisit(2));
addToCarts.add(new AddToCart(2));
(3)
Pipeline p = Pipeline.create();
BatchStageWithKey<PageVisit, Integer> pageVisit =
p.drawFrom(Sources.list(pageVisits))
.groupingKey(pv -> pv.userId());
BatchStageWithKey<AddToCart, Integer> addToCart =
p.drawFrom(Sources.list(addToCarts))
.groupingKey(atc -> atc.userId());
BatchStage<Entry<Integer, Tuple2<Long, Long>>> coAggregated = pageVisit
.aggregate2(counting(), (4)
addToCart, counting()); (5)
coAggregated
.map(e -> { (6)
long visitCount = e.getValue().f0();
long addToCartCount = e.getValue().f1();
return entry(e.getKey(), (double) (addToCartCount / visitCount));
})
.drainTo(Sinks.list(results));
(7)
jet.newJob(p).join();
results.forEach(System.out::println);
Jet.shutdownAll();
1 | Start Jet, acquire source and sink lists from it |
2 | Fill the source lists (the numbers 1 and 2 are user IDs) |
3 | Construct the pipeline |
4 | Supply the aggregate operation for pageVisits |
5 | Supply the aggregate operation for addToCarts |
6 | Compute the ratio of page visits to add-to-cart actions |
7 | Run the job, print the results |
The way we prepared the data, user 1
made two visits and added one
item to the shopping cart; user 2
made one visit and added one item.
Given that, we should expect to see the following output:
1=0.5 2=1.0
Note how we supplied a separate aggregate operation for each input stage. We calculate the overall result based on the results obtained for each input stage in isolation.
You also have the option of constructing a two-input aggregate operation
that has immediate access to all the items. Refer to the
section on AggregateOperation
.
4.11.1. Join Without Aggregating
Let’s assume you have these input stages:
BatchStageWithKey<PageVisit, Integer> pageVisit =
p.drawFrom(Sources.<PageVisit>list("pageVisit"))
.groupingKey(PageVisit::userId);
BatchStageWithKey<AddToCart, Integer> addToCart =
p.drawFrom(Sources.<AddToCart>list("addToCart"))
.groupingKey(AddToCart::userId);
You may have expected to find a joining transform in the Pipeline API that outputs a stream of all matching pairs of items:
BatchStage<Tuple2<PageVisit, AddToCart>> joined = pageVisits.join(addToCarts);
This would more closely match the semantics of an SQL JOIN, but in the context of a Java API it doesn’t work well. For M page visits joined with N add-to-carts, Jet would have to materialize M * N tuples and feed them into the next stage. It would also be forced to buffer all the data from one stream before receiving the other.
To allow you to get the best performance, Jet strongly couples joining with aggregation and encourages you to frame your solution in these terms.
This doesn’t stop you from getting the "exploded" output with all the
joined data, it just makes it a less straightforward option. If your use
case can’t be solved any other way than by keeping all individual items,
you can specify toList()
as the aggregate operation and get all the
items in lists:
BatchStage<Tuple2<List<PageVisit>, List<AddToCart>>> joinedLists =
pageVisit.aggregate2(toList(), addToCart, toList())
.map(Entry::getValue);
If you need something else than the full join, you can filter out some pairs of lists. In this example we create a LEFT OUTER JOIN by removing the pairs with empty left-hand list:
BatchStage<Tuple2<List<PageVisit>, List<AddToCart>>> leftOuterJoined =
joinedLists.filter(pair -> !pair.f0().isEmpty());
If you specifically need to get a stream of all the combinations of matched items, you can add a flatmapping stage:
BatchStage<Tuple2<PageVisit, AddToCart>> fullJoined = joinedLists
.flatMap(pair -> traverseStream(
nonEmptyStream(pair.f0())
.flatMap(pVisit -> nonEmptyStream(pair.f1())
.map(addCart -> tuple2(pVisit, addCart)))));
We used this helper method:
static <T> Stream<T> nonEmptyStream(List<T> input) {
return input.isEmpty() ? Stream.of((T) null) : input.stream();
}
4.11.2. Join Four or More Streams Using the Aggregate Builder
If you need to join more than three streams, you’ll have to use the builder object. For example, your goal may be correlating events coming from different systems, where all the systems serve the same user base. In an online store you may have separate event streams for product page visits, adding-to-cart events, payments, and deliveries. You want to correlate all the events associated with the same user. The example below calculates statistics per category for each user:
Pipeline p = Pipeline.create();
(1)
BatchStageWithKey<PageVisit, Integer> pageVisits =
p.drawFrom(Sources.<PageVisit>list("pageVisit"))
.groupingKey(PageVisit::userId);
BatchStageWithKey<AddToCart, Integer> addToCarts =
p.drawFrom(Sources.<AddToCart>list("addToCart"))
.groupingKey(AddToCart::userId);
BatchStageWithKey<Payment, Integer> payments =
p.drawFrom(Sources.<Payment>list("payment"))
.groupingKey(Payment::userId);
BatchStageWithKey<Delivery, Integer> deliveries =
p.drawFrom(Sources.<Delivery>list("delivery"))
.groupingKey(Delivery::userId);
(2)
GroupAggregateBuilder<Integer, List<PageVisit>> builder =
pageVisits.aggregateBuilder(toList());
(3)
Tag<List<PageVisit>> visitTag = builder.tag0();
Tag<List<AddToCart>> cartTag = builder.add(addToCarts, toList());
Tag<List<Payment>> payTag = builder.add(payments, toList());
Tag<List<Delivery>> deliveryTag = builder.add(deliveries, toList());
(4)
BatchStage<String> coGrouped = builder
.build()
.map(e -> {
ItemsByTag ibt = e.getValue();
return String.format("User ID %d: %d visits, %d add-to-carts," +
" %d payments, %d deliveries",
e.getKey(), ibt.get(visitTag).size(), ibt.get(cartTag).size(),
ibt.get(payTag).size(), ibt.get(deliveryTag).size());
});
1 | Create four source streams |
2 | Obtain a builder object for the co-group transform, specify the
aggregate operation for PageVisits |
3 | Add the co-grouped streams to the builder, specifying the aggregate operation to perform on each |
4 | Build the co-group transform, retrieve the individual aggregation
results using the tags you got in step 3 (ibt is an ItemsByTag ) |
4.12. Aggregate an Unbounded Stream over a Window
The process of data aggregation takes a finite batch of data and produces a result. We can make it work with an infinite stream if we break up the stream into finite chunks. This is called windowing and it’s almost always defined in terms of a range of event timestamps (a time window). For this reason Jet requires you to set up the policy of determining an event’s timestamp before you can apply windowing to your stream.
4.12.1. Add Timestamps to the Stream
The Pipeline API guides you to set up the timestamp policy right after
you obtain a source stage. pipeline.drawFrom(someStreamSource)
returns
a SourceStreamStage
which offers just these methods:
-
withNativeTimestamps()
declares that the stream will use source’s native timestamps. This typically refers to the timestamps that the external source system sets on each event. -
withTimestamps(timestampFn)
provides a function to the source that determines the timestamp of each event. -
withoutTimestamps()
declares that the source stage has no timestamps. Use this if you don’t need them (i.e., your pipeline won’t perform windowed aggregation).
Exceptionally, you may need to call withoutTimestamps()
on the source
stage, then perform some transformations that determine the event
timestamps, and then call addTimestamps(timestampFn)
to instruct Jet
where to find them in the events. Some examples include an enrichment
stage that retrieves the timestamps from a side input or flat-mapping
the stream to unpack a series of events from each original item. If you
do this, however, it will no longer be the source that determines the
watermark.
Prefer Assigning Timestamps at the Source
In some source implementations, especially partitioned ones like Kafka,
there is a risk of high event time skew occurring across partitions as
the Jet processor pulls the data from them in batches, in a round-robin
fashion. This problem is especially pronounced when Jet recovers from a
failure and restarts your job. In this case Jet must catch up with all
the events that arrived since taking the last snapshot. It will receive
these events at the maximum system throughput and thus each partition
will have a lot of data each time Jet polls it. This means that the
interleaving of data from different partitions will become much more
coarse-grained and there will be sudden jumps in event time at the
points of transition from one partition to the next. The jumps can
easily exceed the configured allowedLateness
and cause Jet to drop
whole swaths of events as late.
In order to mitigate this issue, Jet has special logic in its partitioned source implementations that keeps separate track of the timestamps within each partition and knows how to reconcile the temporary differences that occur between them.
This feature works only if you set the timestamping policy in the source
using withTimestamps()
or withNativeTimestamps()
.
4.12.2. Specify the Window
In order to explain windowing, let’s start with a simple batch job, the Word Count. If you have a batch of tweets you want to analyze for word frequencies, this is how the pipeline can look:
BatchStage<Tweet> tweets = p.drawFrom(Sources.list("tweets"));
tweets.flatMap(tweet -> traverseArray(tweet.text().toLowerCase().split("\\W+")))
.filter(word -> !word.isEmpty())
.groupingKey(wholeItem())
.aggregate(counting())
.drainTo(Sinks.map("counts"));
To make the same kind of computation work on an infinite stream of tweets, we must add the event timestamps and the specification of the window:
StreamStage<Tweet> tweets = p.drawFrom(twitterStream())
.withNativeTimestamps(0); (1)
tweets.flatMap(tweet -> traverseArray(tweet.text().toLowerCase().split("\\W+")))
.filter(word -> !word.isEmpty())
.window(sliding(MINUTES.toMillis(1), SECONDS.toMillis(1))) (2)
.groupingKey(wholeItem())
.aggregate(counting())
.drainTo(Sinks.list("result"));
1 | use the source system’s native timestamps, don’t tolerate event time
disorder (allowedLateness parameter set to 0 ) |
2 | specify the window (one minute long, slides by one second) |
Now we’ve got a Jet job that does live tracking of words currently trending in tweets. The sliding window definition tells Jet to aggregate the events that occurred within the last minute and update this result every second.
Using native timestamps often works well, but it’s not the most precise way of determining the event time. It records the time when the event was submitted to the system that Jet uses as the source and that may happen quite a bit after the event originally occurred. The best approach is to provide Jet with the function that extracts the timestamp directly from the event object. In this case you must also deal with the fact that Jet can receive events in an order different from their timestamp order. We discuss these concerns in the Jet Concepts chapter.
To extract the event timestamp, create the source stage like this:
StreamStage<Tweet> tweets = p.drawFrom(twitterStream())
.withTimestamps(Tweet::timestamp, SECONDS.toMillis(5));
We specified two things: how to extract the timestamp and how much event "lateness" we want to tolerate. We said that any event we receive can be at most five seconds behind the highest timestamp we already received. If it’s older than that, we’ll have to ignore it. On the flip side, this means that we’ll have to wait for an event that occurred five seconds after the given window’s end before we can assume we have all the data to emit that result.
4.12.3. Get Early Results Before the Window is Complete
If you had to allow a lot of event lateness, or if you just use large time windows, you may want to track the progress of a window while it is still accumulating events. You can order Jet to give you, at regular intervals, the current status on all the windows it has some data for, but aren’t yet complete. On our running example of trending words, it may look like this:
StreamStage<KeyedWindowResult<String, Long>> result =
p.drawFrom(twitterStream())
.withTimestamps(Tweet::timestamp, SECONDS.toMillis(15))
.flatMap(tweet -> traverseArray(tweet.text().toLowerCase().split("\\W+")))
.filter(word -> !word.isEmpty())
.window(sliding(MINUTES.toMillis(1), SECONDS.toMillis(1))
.setEarlyResultsPeriod(SECONDS.toMillis(1)))
.groupingKey(wholeItem())
.aggregate(counting());
In the above snippet we set up an aggregation stage whose window size is one minute and there’s an additional 15-second wait time for the late-coming events. This amounts to waiting up to 75 seconds from receiving a given event to getting the result it contributed to. Therefore we ask Jet to give us updates on the current progress every second.
The output of the windowing stage is in the form of KeyedWindowResult<String,
Long>
, where String
is the word and Long
is the frequency of the
word in the given window. KeyedWindowResult
also has an isEarly
property that says whether the result is early or final.
Generally, Jet doesn’t guarantee that a stage will receive the items in
the same order its upstream stage emitted them. For example, it executes
a This can lead to a situation where your sink receives an early result after it has already received the final result. |
4.13. Kinds of Windows
Jet supports these kinds of windows:
-
tumbling window: a window of constant size that "tumbles" along the time axis: the consecutive positions of the window don’t overlap. If you use a window size of 1 second, Jet will group together all events that occur within the same second and you’ll get window results for intervals [0-1) seconds, then [1-2) seconds, and so on.
-
sliding window: a window of constant size that slides along the time axis. It slides in discrete steps that are a fraction of the window’s length. A typical setting is to slide by 1% of the window size. Jet outputs the aggregation result each time the window moves on. If you use a window of size 1 second sliding by 10 milliseconds, Jet will output window results for intervals [0.00-1.00) seconds, then [0.01-1.01) seconds, and so on.
-
session window: it captures a burst of events separated by periods of quiescence. You define the "session timeout", i.e., the length of the quiet period that causes the window to close. If you define a grouping key, there is a separate, independent session window for each key.
You can find a more in-depth explanation of Jet’s windows in the Jet Concepts chapter.
4.14. Rolling Aggregation
Jet supports a way to aggregate an unbounded stream without windowing:
for each input item you get the current aggregation value as output, as
if this item was the last one to be aggregated. You use the same
AggregateOperation
implementations that work with Jet’s aggregate
API. Note that Jet doesn’t enforce processing in the order of event time;
what you get accounts for the items that Jet happens to have processed
so far.
This kind of aggregation is useful in jobs that monitor a stream for extremes or totals. For example, if you have a stream of web server monitoring metrics, you can keep track of the worst latency ever experienced, the highest throughput seen, total number of transactions processed, and so on.
In the following example we process a stream of trading events and get the "largest" trade seen so far (with the highest worth in dollars). Note that the rolling aggregation outputs an item every time, not just after a new record-breaking trade.
Pipeline p = Pipeline.create();
StreamSource<Trade> tradesSource = Sources.mapJournal("trades",
mapPutEvents(), mapEventNewValue(), START_FROM_CURRENT);
StreamStage<Trade> currLargestTrade =
p.drawFrom(tradesSource)
.withoutTimestamps()
.rollingAggregate(maxBy(
ComparatorEx.comparing(Trade::worth)));
Rolling aggregation is just a special case of stateful mapping, equivalent to this:
The main advantage of rolling aggregation over stateful mapping is that
it’s the simplest way to reuse an existing |
4.15. Apply a User-Defined Transformation
Once you move past the basics, you’ll notice it isn’t always practical to write out the full pipeline definition in a single long chained expression. You’ll want to apply the usual principles of modularization and code reuse.
One way to do it without any API support is writing a method that takes pipeline stage, applies some transforms, and returns the resulting stage, like this:
static BatchStage<String> cleanUp(BatchStage<String> input) {
return input.map(String::toLowerCase)
.filter(s -> s.startsWith("success"));
}
You could use it like this:
BatchStage<String> stage = p.drawFrom(source);
BatchStage<String> cleanedUp = PipelineTransforms.cleanUp(stage);
BatchStage<Long> counted = cleanedUp.aggregate(counting());
This has the unfortunate consequence of breaking up the method chain and
forcing you to introduce more local variables. To allow you to seamlessy
integrate such method calls with the pipeline expression, Jet defines
the apply
transform:
BatchStage<Long> counted = p.drawFrom(source)
.apply(PipelineTransforms::cleanUp)
.aggregate(counting());
4.16. Developing and testing your pipeline
When developing your pipeline, it can be useful to work with some mock data sources to avoid integrating with a real data source. Similarly with sinks, for testing you may want to assert the output of a pipeline in a convenient way. Jet’s Pipeline API offers some tools to help testing a pipeline without having to integrate with a real data source or sink.
These APIs are currently marked as @Beta and are not covered
by the backward-compatibility guarantees.
|
4.16.1. Test Sources
Jet provides some test and development sources which can be used both for batch and streaming jobs.
Batch
Jet provides test batch sources which emit the supplied items and then complete:
pipeline.drawFrom(TestSources.items(1, 2, 3, 4));
It’s also possible to use this source with a collection:
List<Integer> list = IntStream.range(0, 1000)
.boxed()
.collect(Collectors.toList());
pipeline.drawFrom(TestSources.items(list))
.drainTo(Sinks.logger());
These sources are non-distributed and do not participate in fault tolerance protocol.
Streaming
For streaming sources a mock source which generates infinite data is available. The source is by default throttled to emit at the specified rate of events per second. It also generates a sequence number and a timestamp with each event which can be used to create mock data:
pipeline.drawFrom(TestSources.itemStream(10,
(timestamp, sequence) -> new Trade(sequence, timestamp)))
.withNativeTimestamps(0)
.window(WindowDefinition.tumbling(1000))
.aggregate(AggregateOperations.counting())
.drainTo(Sinks.logger());
Similar to batch sources, these sources are non-distributed and do not support fault tolerance: after a job restart the source sequence will be reset to 0. The timestamp is current system time.
4.16.2. Assertions
Jet also provides assertions which can be used for asserting the intermediate or final output of a pipeline. Assertions are implemented as sinks which collect incoming items and then run the desired assertion on the collected items.
Batch
Assertions in batch jobs are typically run only after all the items have been received.
An example is given below:
pipeline.drawFrom(TestSources.items(1, 2, 3, 4))
.apply(Assertions.assertCollected(list ->
assertEquals("4 items must be received", list.size(), 4))
)
.drainTo(Sinks.logger());
In the given example, the assertion will be run after the source
has finished emitting all the items. If the assertion fails, the job
will also fail with an AssertionError
. It’s possible to have the
assertion inline inside the pipeline and do further computations. This
creates a fork in the pipeline where the assertion acts as an intermediary
sink.
Streaming
For streaming assertions, we can’t rely that the source will at some point finish emitting items. Instead, we can rely on a timeout that a given assertion will eventually be reached.
Example:
int itemsPerSecond = 10;
pipeline.drawFrom(TestSources.itemStream(itemsPerSecond))
.withoutTimestamps()
.apply(Assertions.assertCollectedEventually(10, list ->
assertTrue("At least 20 items must be received", list.size() > 20))
)
.drainTo(Sinks.logger());
Contrary to the batch example above, this assertion will be run
periodically until it passes. Once the assertion is proved to be
correct, the job will be cancelled immediately with an
AssertionCompletedException
. If the assertion is still failing
after the given timeout (10 seconds in the example above) the job
will fail with an AssertionError
instead. Due to this if eventual
assertion is used in the job, no other assertions should be used in it
or the job may be cancelled before other assertions pass.
You can see a sample of how a pipeline containing this assertion can be submitted and asserted below:
try {
jet.newJob(pipeline).join();
fail("Job should have completed with an AssertionCompletedException," +
" but instead completed normally"
);
} catch (CompletionException e) {
assertContains(e.toString(), AssertionCompletedException.class.getName());
}
4.17. Pipeline API Cheatsheet
Transform |
Sample |
Map |
Apply a mapping function to each input item independently. You can also
do filtering by mapping to This example converts the input lines of text to lowercase:
|
Filter |
Apply a filtering function to each input item to decide whether to pass it to the output. This example removes all empty strings from the stream:
|
Flatmap |
Map each item to arbitrarily many items using a function that returns a
This example splits the lines of text into individual words:
|
Stateful map |
Transform input into output with access to persistent, mutable state.
Useful for pattern matching and custom rolling aggregation. To reuse
an existing This example processes a stream of reports on requests completed by a cluster of servers and emits output whenever it detects a request with record-breaking latency. If a server is inactive for 2 minutes, it emits the maximum latency for the server’s period of activity that has just ended and then resets the score for that server:
|
Distinct |
Suppress duplicate items from a stream. If you apply a grouping key, two items mapping to the same key will be duplicates. This operation applies primarily to batch streams, but also works on a windowed unbounded stream. This example takes a batch stage with strings and creates a stage with distinct strings and another where the four-character prefix of the strings is unique:
|
Merge |
Merge the contents of two streams into one. The item type in the right-hand stage must be the same or a subtype of the one in the left-hand stage. This example merges the streams from New York and Tokyo stock exchanges:
|
Enrich by Many-to-One Join |
Perform a many-to-one join with arbitrarily many enriching streams.
The stream on which you invoke This example enriches a stream of stock trades with detailed info on the stock involved:
|
Enrich by Map Lookup |
For each stream item, look up a value from a Hazelcast map and transform (enrich) the item using it. Similar to a hash-join with the map’s entry set, but values don’t get stale (at the expense of throughput). This example enriches a stream of stock trades with detailed info on the stock involved:
|
Aggregate |
Aggregates all the stream items with the This example counts the stream items:
|
Group and Aggregate |
Group the items by key and perform an This example calculates the number of occurrences of each word in the stream:
|
Windowed Group and Aggregate |
Perform grouping and aggregation on an unbounded stream by splitting it into bounded windows. This example calculates the number of occurrences of each word in a stream of tweets within the last second:
|
Join on Common Key |
Perform a many-to-many join of several streams on a common key.
Apply an This example joins a "page visits" stream with a "payments" stream in a Web Shop application. For each user it gives you all the recorded page views and payments:
|
Streaming Join on a Common Key |
Like the above, but also apply a window to the unbounded stream. It joins all the items belonging to the same window. This example joins two unbounded streams, "page visits" and "payments". For each user it gives you all the page views and payments that they performed within the last minute and updates the result every second:
|
Rolling Aggregation |
Keep performing the same aggregate operation forever, getting the current result after each item. This example tracks the largest trade observed in a stream:
|
Apply a User-Defined Transform |
Modularize and reuse your pipeline code by extracting transforms to methods. For example, in this pipeline:
you can extract the
and then use it like this:
|
Transform with Custom Processor |
Add a stage with a custom Core API processor to the pipeline. Suppose you have this identity-mapping processor:
Then you can add it to the pipeline this way (suppose you added the
method to the
Note that the Core API is for advanced use cases and you’ll rarely need it. |
4.18. Implement Your Aggregate Operation
The single most important kind of processing Jet does is aggregation. In
general it is a transformation of a set of input values into a single
output value. The function that does this transformation is called the
“aggregate function”. A basic example is sum
applied to a set of
integer numbers, but the result can also be a complex value, for example
a list of all the input items.
Jet’s library contains a range of
predefined aggregate functions,
but it also exposes an abstraction, called
AggregateOperation
,
that allows you to plug in your own. Since Jet does the aggregation in a
parallelized and distributed way, you can’t simply supply a piece of
Java code that implements the aggregate function; we need you to break
it down into several smaller pieces that fit into Jet’s processing
engine.
The ability to compute the aggregate function in parallel comes at a
cost: Jet must be able to give a slice of the total data set to each
processing unit and then combine the partial results from all the units.
The combining step is crucial: it will only make sense if we’re
combining the partial results of a commutative associative function
(CA for short). On the example of sum
this is trivial: we know from
elementary school that +
is a CA operation. If you have a stream of
numbers: {17, 37, 5, 11, 42}
, you can sum up {17, 5}
separately from
{42, 11, 37}
and then combine the partial sums (also note the
reordering of the elements).
If you need something more complex, like average
, it doesn’t by itself
have this property; however if you add one more ingredient, the finish
function, you can express it easily. Jet allows you to first compute
some CA function, whose partial results can be combined, and then at the
very end apply the finish
function on the fully combined result. To
compute the average, your CA function will output the pair (sum,
count)
. Two such pairs are trivial to combine by summing each
component. The finish
function will be sum / count
.
In addition to the mathematical side, there is also the practical one:
you have to provide Jet with a specific mutable object, called the
accumulator
, which will keep the “running score” of the operation in
progress. For the average
example, it would be something like
public class AvgAccumulator {
private long sum;
private long count;
public void accumulate(long value) {
sum += value;
count++;
}
public void combine(AvgAccumulator that) {
this.sum += that.sum;
this.count += that.sum;
}
public double finish() {
return (double) sum / count;
}
}
This object will also have to be serializable, and preferably with Hazelcast’s serialization instead of Java’s because in a group-and-aggregate operation there’s one accumulator per each key and all of them have to be sent across the network to be combined and finished.
Instead of requiring you to write a complete class from scratch, Jet
separates the concern of holding the accumulated state from that of the
computation performed on it. This means that you just need one
accumulator class for each kind of structure that holds the accumulated
data, as opposed to one for each aggregate operation. Jet’s library
offers in the
com.hazelcast.jet.accumulator
package several such classes, one of them being
LongLongAccumulator
,
which is a match for our average
function. You’ll just have to supply
the logic on top of it.
Specifically, you have to provide a set of six functions (we call them “primitives”):
-
create
a new accumulator object. -
accumulate
the data of an item by mutating the accumulator’s state. -
combine
the contents of the right-hand accumulator into the left-hand one. -
deduct
the contents of the right-hand accumulator from the left-hand one (undo the effects ofcombine
). -
finish
accumulation by transforming the accumulator object into the final result. -
export
the result of aggregation in a way that’s not destructive for the accumulator (used in rolling aggregations).
We already mentioned most of these above. The deduct
primitive is
optional and Jet can manage without it, but if you are computing a
sliding window over an infinite stream, this primitive can give a
significant performance boost because it allows Jet to reuse the results
of the previous calculations.
In a similar fashion Jet discerns between the export
and finish
primitives for optimization purposes. Every function that works as the
export
primitive will also work as finish
, but you can specify a
different finish
that reuses the state already allocated in the
accumulator. Jet applies finish
only if it will never again use that
accumulator.
If you happen to have a deeper familiarity with JDK’s java.util.stream
API, you’ll find AggregateOperation
quite similar to
Collector
,
which is also a holder of several functional primitives. Jet’s
definitions are slightly different, though, and there are the additional
optimizing primitives we just mentioned.
Let’s see how this works with our average
function. Using
LongLongAccumulator
we can express our accumulate
primitive as
(acc, n) -> {
acc.set1(acc.get1() + n);
acc.set2(acc.get2() + 1);
}
The export
/finish
primitive will be
acc -> (double) acc.get1() / acc.get2()
Now we have to define the other three primitives to match our main
logic. For create
we just refer to the constructor:
LongLongAccumulator::new
. The combine
primitive expects you to
update the left-hand accumulator with the contents of the right-hand
one, so:
(left, right) -> {
left.set1(left.get1() + right.get1());
left.set2(left.get2() + right.get2());
}
Deducting must undo the effect of a previous combine
:
(left, right) -> {
left.set1(left.get1() - right.get1());
left.set2(left.get2() - right.get2());
}
All put together, we can define our averaging operation as follows:
AggregateOperation1<Long, LongLongAccumulator, Double> aggrOp = AggregateOperation
.withCreate(LongLongAccumulator::new)
.<Long>andAccumulate((acc, n) -> {
acc.set1(acc.get1() + n);
acc.set2(acc.get2() + 1);
})
.andCombine((left, right) -> {
left.set1(left.get1() + right.get1());
left.set2(left.get2() + right.get2());
})
.andDeduct((left, right) -> {
left.set1(left.get1() - right.get1());
left.set2(left.get2() - right.get2());
})
.andExportFinish(acc -> (double) acc.get1() / acc.get2());
Let’s stop for a second to look at the type we got:
AggregateOperation1<Long, LongLongAccumulator, Double>
. Its type
parameters are:
-
Long
: the type of the input item -
LongLongAccumulator
: the type of the accumulator -
Double
: the type of the result
Specifically note the 1
at the end of the type’s name: it signifies
that it’s the specialization of the general AggregateOperation
to
exactly one input stream. In Hazelcast Jet you can also perform a
co-aggregating operation, aggregating several input streams
together. Since the number of input types is variable, the general
AggregateOperation
type cannot statically capture them and we need
separate subtypes. We decided to statically support up to three input
types; if you need more, you’ll have to resort to the less type-safe,
general AggregateOperation
.
4.18.1. Aggregating over multiple inputs
Hazelcast Jet can join several streams and simultaneously perform aggregation on all of them. You specify a separate aggregate operation for each input stream and have the opportunity to combine their results when done. You can use aggregate operations provided in the library (see the section on co-aggregation for an example).
If you cannot express your aggregation logic using this approach, you can also specify a custom multi-input aggregate operation that can combine the items into the accumulator immediately as it receives them.
We’ll present a simple example on how to build a custom multi-input aggregate operation. Note that the same logic can also be expressed using separate single-input operations; the point of the example is introducing the API.
Say we are interested in the behavior of users in an online shop application and want to gather the following statistics for each user:
-
total load time of the visited product pages
-
quantity of items added to the shopping cart
-
amount paid for bought items
This data is dispersed among separate datasets: PageVisit
, AddToCart
and Payment
. Note that in each case we’re dealing with a simple sum
applied to a field in the input item. We can perform a
cogroup-and-aggregate transform with the following aggregate operation:
Pipeline p = Pipeline.create();
BatchStage<PageVisit> pageVisit = p.drawFrom(Sources.list("pageVisit"));
BatchStage<AddToCart> addToCart = p.drawFrom(Sources.list("addToCart"));
BatchStage<Payment> payment = p.drawFrom(Sources.list("payment"));
AggregateOperation3<PageVisit, AddToCart, Payment, LongAccumulator[], long[]>
aggrOp = AggregateOperation
.withCreate(() -> new LongAccumulator[] {
new LongAccumulator(),
new LongAccumulator(),
new LongAccumulator()
})
.<PageVisit>andAccumulate0((accs, pv) -> accs[0].add(pv.loadTime()))
.<AddToCart>andAccumulate1((accs, atc) -> accs[1].add(atc.quantity()))
.<Payment>andAccumulate2((accs, pm) -> accs[2].add(pm.amount()))
.andCombine((accs1, accs2) -> {
accs1[0].add(accs2[0]);
accs1[1].add(accs2[1]);
accs1[2].add(accs2[2]);
})
.andExportFinish(accs -> new long[] {
accs[0].get(),
accs[1].get(),
accs[2].get()
});
BatchStage<Entry<Integer, long[]>> coGrouped =
pageVisit.groupingKey(PageVisit::userId)
.aggregate3(
addToCart.groupingKey(AddToCart::userId),
payment.groupingKey(Payment::userId),
aggrOp);
Note how we got an AggregateOperation3
and how it captured each input
type. When we use it as an argument to a cogroup-and-aggregate
transform, the compiler will ensure that the ComputeStage
s we attach
it to have the correct type and are in the correct order.
On the other hand, if you use the co-aggregation
builder object, you’ll construct the aggregate operation by calling
andAccumulate(tag, accFn)
with all the tags you got from the
co-aggregation builder, and the static type will be just
AggregateOperation
. The compiler won’t be able to match up the inputs
to their treatment in the aggregate operation.
5. Source and Sink Connectors
Jet accesses data sources and sinks via its connectors. They are a computation job’s point of contact with the outside world.
5.1. Concerns
Although the connectors do their best to unify the various kinds of resources under the same “data stream” paradigm, there are still many concerns that need your attention.
5.1.1. Is it Unbounded?
The first decision when building a Jet computation job is whether it will deal with bounded or unbounded data. A typical example of a bounded resource is a persistent storage system, whereas an unbounded one is usually like a FIFO queue, discarding old data. This is true both for sources and sinks.
Bounded data is handled in batch jobs and there are less concerns to
deal with. Examples of finite resources are the Hazelcast IMap
/ICache
and the Hadoop Distributed File System (HDFS). In the unbounded category
the most popular choice is Kafka, but a Hazelcast IMap
/ICache
can
also be used as an infinite source of update events (via the Event
Journal feature). You can also set up an IMap
/ICache
as a sink for
an infinite amount of data, either by ensuring that the size of the
keyset will be finite or by allowing the eviction of old entries.
5.1.2. Is it Replayable?
Most finite data sources are replayable because they come from persistent storage. You can easily replay the whole dataset. However, an infinite data source may be of such nature that it can be consumed only once. An example is the TCP socket connector. Such sources are bad at fault tolerance: if anything goes wrong during the computation, it cannot be retried.
Does it Support Checkpointing?
You cannot retry to process an infinite data stream from the very beginning. You must save the complete state at regular intervals, then replay the input stream from the last saved position (checkpoint). Jet can create snapshots of its internal processing state, but for this to be useful the data source must have the ability to replay its data from the chosen point, discarding everything before it. Both Kafka and the Hazelcast Event Journal support this.
5.1.3. Is it Distributed?
A distributed computation engine prefers to work with distributed data
resources. If the resource is not distributed, all Jet members will have
to contend for access to a single endpoint. Kafka, HDFS, IMap
and
ICache
are all distributed. On the other hand, an IList
is not: it
resides on a single member. When used as a source, only one Jet member
pulls its data. When used as a sink, all Jet members send their data
to the one that holds it.
A file source/sink operating in local mode is a sort of a "manually distributed" resource, each member accessing its own local filesystem. You have to manually arrange the files so that on each member there is a subset of the full dataset. When used as a sink, you have to manually gather all the pieces that Jet created.
The file source/sink can also operate in shared mode, accessing a shared filesystem mounted as a local directory.
5.1.4. What about Data Locality?
If you’re looking to achieve record-breaking throughput for your application, you’ll have to think carefully how close you can deliver your data to the location where Jet will consume and process it. For example, if your source is HDFS, you should align the topologies of the Hadoop and Jet clusters so that each machine that hosts an HDFS member also hosts a Jet member. Jet will automatically figure this out and arrange for each member to consume only the slice of data stored locally.
If you’re using IMap
/ICache
as data sources, you have two basic
choices: have Jet connect to a Hazelcast IMDG cluster, or use Jet itself
to host the data (since a Jet cluster is at the same time a Hazelcast
IMDG cluster). In the second case Jet will automatically ensure a
data-local access pattern, but there’s a caveat: if the Jet job causes
an error of unrestricted scope, such as OutOfMemoryError
or
StackOverflowError
, it will have unpredictable consequences for the
state of the whole Jet member, jeopardizing the integrity of the data
stored on it.
5.2. Overview of Sources and Sinks
The table below gives you a high-level overview of the source and sink connectors we deliver with Jet. There are links to Javadoc and code samples. The sections following this one present each connector in more detail.
The Jet Connector Hub contains the complete connector list including the connectors that aren’t packaged with Jet.
Resource | Javadoc | Sample | Unbounded? | Replayable? | Checkpointing? | Distributed? | Data Locality |
---|---|---|---|---|---|---|---|
Src Sink |
|||||||
Src Sink |
|||||||
Event Journal of IMap in another cluster |
|||||||
Event Journal of ICache in another cluster |
|||||||
IList in another cluster |
|||||||
Local FS Shared FS |
|||||||
Local FS Shared FS |
|||||||
Local FS Shared FS |
|||||||
Queue Source Queue Sink Topic Source Topic Sink |
|||||||
Application Log |
N/A |
N/A |
5.3. Hazelcast IMap and ICache
Hazelcast IMDG’s IMap
and ICache
are very similar in the way Jet
uses them and largely interchangeable. IMap
has a bit more features.
The simplest way to use them is as finite sources of their contents, but
if you enable the Event Journal on a map/cache, you’ll be able to use
it as a source of an infinite stream of update events
(see below).
The most basic usage is very simple, here are snippets to use IMap
and ICache
as a source and a sink:
Pipeline p = Pipeline.create();
BatchStage<Entry<String, Long>> stage = p.drawFrom(Sources.map("myMap"));
stage.drainTo(Sinks.map("myMap"));
Pipeline p = Pipeline.create();
BatchStage<Entry<String, Long>> stage = p.drawFrom(Sources.cache("inCache"));
stage.drainTo(Sinks.cache("outCache"));
In these snippets we draw from and drain to the same kind of structure, but you can use any combination.
5.3.1. Access an External Cluster
To access a Hazelcast IMDG cluster separate from the Jet cluster, you
have to provide Hazelcast client configuration for the connection. In
this simple example we use programmatic configuration to draw from and
drain to remote IMap
and ICache
. Just for variety, we funnel the
data from IMap
to ICache
and vice versa:
ClientConfig cfg = new ClientConfig();
cfg.getGroupConfig().setName("myGroup");
cfg.getNetworkConfig().addAddress("node1.mydomain.com", "node2.mydomain.com");
Pipeline p = Pipeline.create();
BatchStage<Entry<String, Long>> fromMap =
p.drawFrom(Sources.remoteMap("inputMap", cfg));
BatchStage<Entry<String, Long>> fromCache =
p.drawFrom(Sources.remoteCache("inputCache", cfg));
fromMap.drainTo(Sinks.remoteCache("outputCache", cfg));
fromCache.drainTo(Sinks.remoteMap("outputMap", cfg));
For a full discussion on how to configure your client connection, refer to the Hazelcast IMDG documentation on this topic.
5.3.2. Optimize Data Traffic at the Source
If your use case calls for some filtering and/or transformation of the data you retrieve, you can optimize the traffic volume by providing a filtering predicate and an arbitrary transformation function to the source connector itself and they’ll get applied on the remote side, before sending:
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer, String, Person>remoteMap(
"inputMap", clientConfig,
e -> e.getValue().getAge() > 21,
e -> e.getValue().getAge()));
The same optimization works on a local IMap
, too, but has less impact.
However, Hazelcast IMDG goes a step further in optimizing your filtering
and mapping to a degree that matters even locally. If you don’t need
fully general functions, but can express your predicate via
Predicates
or
PredicateBuilder
,
they will create a specialized predicate instance that can test the
object without deserializing it. Similarly, if the mapping you need is
of a constrained kind where you just extract one or more object fields
(attributes), you can specify a projection instead of a general
mapping lambda:
Projections.singleAttribute()
or
Projections.multiAttribute()
.
These will extract the listed attributes without deserializing the whole
object. For these optimizations to work, however, your objects must
employ Hazelcast’s portable serialization.
They are especially relevant if the volume of data you need in the Jet
job is significantly less than the volume of the stored data.
Note that the above feature is not available on ICache
. It is,
however, available on `ICache’s event journal, which we introduce next.
5.3.3. Receive an Infinite Stream of Update Events
You can use IMap
/ICache
as sources of infinite event streams. For
this to work you have to enable the Event Journal on your data
structure. This is a feature you set in the Jet/IMDG instance
configuration, which means you cannot change it while the cluster is
running.
This is how you enable the Event Journal on an IMap
:
JetConfig cfg = new JetConfig();
cfg.getHazelcastConfig()
.getMapEventJournalConfig("inputMap")
.setEnabled(true)
.setCapacity(1000) // how many events to keep before evicting
.setTimeToLiveSeconds(10); // evict events older than this
JetInstance jet = Jet.newJetInstance(cfg);
The default journal capacity is 10,000 and the default time-to-live is 0 (which means “unlimited”). Since the entire event journal is kept in RAM, you should take care to adjust these values to match your use case.
The configuration API for ICache
is identical:
cfg.getHazelcastConfig()
.getCacheEventJournalConfig("inputCache")
.setEnabled(true)
.setCapacity(1000)
.setTimeToLiveSeconds(10);
Once properly configured, you use Event Journal sources like this:
Pipeline p = Pipeline.create();
StreamSourceStage<Entry<String, Long>> fromMap = p.drawFrom(
Sources.mapJournal("inputMap", START_FROM_CURRENT));
StreamSourceStage<Entry<String, Long>> fromCache = p.drawFrom(
Sources.cacheJournal("inputCache", START_FROM_CURRENT));
IMap
and ICache
are on an equal footing here. The second argument,
START_FROM_CURRENT
here, means "start receiving from events that occur
after the processing starts". If you specify START_FROM_OLDEST
, you’ll
get all the events still on record.
This version of methods will only emit ADDED
and UPDATED
event
types. Also, it will map the event object to simple Map.Entry
with the
key and new value. To make a different choice, use the overloads that
allow you to specify your own projection and filtering. For example,
you can request all event types and full event objects:
Pipeline p = Pipeline.create();
StreamSourceStage<EventJournalMapEvent<String, Long>> allFromMap = p.drawFrom(
Sources.mapJournal("inputMap",
alwaysTrue(), identity(), START_FROM_CURRENT));
StreamSourceStage<EventJournalCacheEvent<String, Long>> allFromCache = p.drawFrom(
Sources.cacheJournal("inputMap",
alwaysTrue(), identity(), START_FROM_CURRENT));
Note the type of the stream element: EventJournalMapEvent
and
EventJournalCacheEvent
. These are almost the same and have these
methods:
-
getKey()
-
getOldValue()
-
getNewValue()
-
getType()
The only difference is the return type of getType()
which is specific
to each kind of structure and gives detailed insight into what kind of
event it reports. Add, remove and update are the basic ones, but
there are also evict, clear, expire and some others.
Finally, you can get all of the above from a map/cache in another
cluster, you just have to prepend remote
to the source names and add a
ClientConfig
, for example:
Pipeline p = Pipeline.create();
StreamSourceStage<Entry<String, Long>> fromRemoteMap = p.drawFrom(
Sources.remoteMapJournal("inputMap",
someClientConfig, START_FROM_CURRENT));
StreamSourceStage<Entry<String, Long>> fromRemoteCache = p.drawFrom(
Sources.remoteCacheJournal("inputCache",
someClientConfig, START_FROM_CURRENT));
5.3.4. Update Entries in IMap
When you use an IMap
as a sink, instead of just pushing the data into
it you may have to merge the new with the existing data or delete the
existing data. Hazelcast Jet supports this with map-updating sinks which
rely on Hazelcast IMDG’s Entry Processor
feature. An entry processor allows you to atomically execute a piece of
code against a map entry, in a data-local manner.
The updating sinks come in three variants:
-
mapWithMerging
, where you provide a function that computes the map value from the stream item and a merging function that gets called if a value already exists in the map. Here’s an example that concatenates string values:Pipeline pipeline = Pipeline.create(); pipeline.drawFrom(Sources.<String, Long>map("inMap")) .drainTo(Sinks.mapWithMerging("outMap", Entry::getKey, Entry::getValue, (oldValue, newValue) -> oldValue + newValue) );
This operation is NOT lock-aware, it will process the entries regardless whether they are locked or not. We significantly boost performance by applying the update function in batches, but this operation doesn’t respect the locks. If you use this method on locked entries, it will break the mutual exclusion contract. Use mapWithEntryProcessor
if you need the proper locking behavior. -
mapWithUpdating
, where you provide a single updating function that combines the roles of the two functions inmapWithMerging
. It will be called on the stream item and the existing value, if any. Here’s an example that concatenates string values:Pipeline pipeline = Pipeline.create(); pipeline.drawFrom(Sources.<String, Long>map("inMap")) .drainTo(Sinks.<Entry<String, Long>, String, Long>mapWithUpdating( "outMap", Entry::getKey, (oldV, item) -> (oldV != null ? oldV : 0L) + item.getValue()) );
This operation is NOT lock-aware, it will process the entries regardless whether they are locked or not. We significantly boost performance by applying the update function in batches, but this operation doesn’t respect the locks. If you use this method on locked entries, it will break the mutual exclusion contract. Use mapWithEntryProcessor
if you need the proper locking behavior. -
mapWithEntryProcessor
, where you provide a function that returns a full-blownEntryProcessor
instance that will be submitted to the map. This is the most general variant, but can’t use batching that the other variants do and thus has a higher cost per item. You should use it only if you need a specialized entry processor that can’t be expressed in terms of the other variants. This example takes the values of the map and submits an entry processor that increments the values by 5:Pipeline pipeline = Pipeline.create(); pipeline.drawFrom(Sources.<String, Integer>map("mymap")) .drainTo(Sinks.mapWithEntryProcessor("mymap", Entry::getKey, entry -> new IncrementEntryProcessor(5) )); class IncrementEntryProcessor implements EntryProcessor<String, Integer> { private int incrementBy; public IncrementEntryProcessor(int incrementBy) { this.incrementBy = incrementBy; } @Override public Object process(Entry<String, Integer> entry) { return entry.setValue(entry.getValue() + incrementBy); } @Override public EntryBackupProcessor<String, Integer> getBackupProcessor() { return null; } }
5.4. Hazelcast IList
Whereas IMap
and ICache
are the recommended choice of data sources
and sinks in Jet jobs, Jet supports IList
purely for convenience
during prototyping, unit testing and similar non-production situations.
It is not a partitioned data structure and only one cluster member has
all the contents. In a distributed Jet job all the members will compete
for access to the single member holding it.
With that said, IList
is very simple to use. Here’s an example how to
fill it with test data, consume it in a Jet job, dump its results into
another list, and fetch the results (we assume you already have a Jet
instance in the variable jet
):
IList<Integer> inputList = jet.getList("inputList");
for (int i = 0; i < 10; i++) {
inputList.add(i);
}
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer>list("inputList"))
.map(i -> "item" + i)
.drainTo(Sinks.list("resultList"));
jet.newJob(p).join();
IList<String> resultList = jet.getList("resultList");
System.out.println("Results: " + new ArrayList<>(resultList));
You can access a list in an external cluster as well, by providing a
ClientConfig
object:
ClientConfig clientConfig = new ClientConfig();
clientConfig.getGroupConfig()
.setName("myGroup");
clientConfig.getNetworkConfig()
.addAddress("node1.mydomain.com", "node2.mydomain.com");
Pipeline p = Pipeline.create();
p.drawFrom(Sources.remoteList("inputlist", clientConfig))
.drainTo(Sinks.remoteList("outputList", clientConfig));
5.5. Kafka
Apache Kafka is a production-worthy choice of both source and sink for infinite stream processing jobs. It supports fault tolerance and snapshotting. The basic paradigm is that of a distributed publish/subscribe message queue. Jet’s Kafka Source subscribes to a Kafka topic and the sink publishes events to a Kafka topic.
The following code will consume from topics t1
and t2
and then write
to t3
:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.serializer", StringSerializer.class.getCanonicalName());
props.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
props.setProperty("value.serializer", IntegerSerializer.class.getCanonicalName());
props.setProperty("value.deserializer", IntegerDeserializer.class.getCanonicalName());
props.setProperty("auto.offset.reset", "earliest");
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(props, "t1", "t2"))
.withoutTimestamps()
.drainTo(KafkaSinks.kafka(props, "t3"));
5.5.1. Using Kafka as a Source
The Kafka source emits entries of type Map.Entry<Key,Value>
which can
be transformed using an optional mapping function. It never completes.
The job will end only if explicitly cancelled or aborted due to an
error.
Internally Jet creates one KafkaConsumer
per Processor
instance
using the supplied properties. Jet uses manual partition assignment to
arrange the available Kafka partitions among the available processors
and will ignore the group.id
property.
Currently there is a requirement that the global parallelism of the Kafka source be at most the number of partitions you are subscribing to. The local parallelism of the Kafka source is 2 and if your Jet cluster has 4 members, this means that a minimum of 8 Kafka partitions must be available.
If any new partitions are added while the job is running, Jet will automatically assign them to the existing processors and consume them from the beginning.
5.5.2. Processing Guarantees
The Kafka source supports snapshots. Upon each snapshot it saves the current offset for each partition. When the job is restarted from a snapshot, the source can continue reading from the saved offset.
If snapshots are disabled, the source will commit the offset of the last record it read to the Kafka cluster. Since the fact that the source read an item doesn’t mean that the whole Jet pipeline processed it, this doesn’t guarantee against data loss.
5.5.3. Using Kafka as a Sink
The Kafka sink creates one KafkaProducer
per cluster member and shares
it among all the sink processors on that member. You can provide a
mapping function that transforms the items the sink receives into
`ProducerRecord`s.
5.5.4. Limitations
Apache Kafka introduced client backward compatibility with version 1.0.0.
The compatibility is two way
, new brokers support older clients and
new clients support older broker.
The Kafka sink and source are based on version 2.2.0, this means Kafka connector will work with any client and broker having version equal to or greater than 1.0.0.
5.6. HDFS
The Hadoop Distributed File System is a production-worthy choice for both a data source and a data sink in a batch computation job. It is a distributed, replicated storage system that handles these concerns automatically, exposing a simple unified view to the client.
The HDFS source and sink require a configuration object of type
JobConf
which supplies the input and output paths and formats. They don’t
actually create a MapReduce job, this config is simply used to describe
the required inputs and outputs. You can share the same JobConf
instance between several source/sink instances.
Here’s a configuration sample:
JobConf jobConfig = new JobConf();
jobConfig.setInputFormat(TextInputFormat.class);
jobConfig.setOutputFormat(TextOutputFormat.class);
TextInputFormat.addInputPath(jobConfig, new Path("input-path"));
TextOutputFormat.setOutputPath(jobConfig, new Path("output-path"));
The word count pipeline can then be expressed using HDFS as follows
Pipeline p = Pipeline.create();
p.drawFrom(HdfsSources.hdfs(jobConfig, (k, v) -> v.toString()))
.flatMap(line -> traverseArray(line.toLowerCase().split("\\W+"))
.filter(w -> !w.isEmpty()))
.groupingKey(wholeItem())
.aggregate(counting())
.drainTo(HdfsSinks.hdfs(jobConfig));
5.6.1. Data Locality When Reading
Jet will split the input data across the cluster, with each processor instance reading a part of the input. If the Jet nodes are running along the HDFS datanodes, then Jet can make use of data locality by reading the blocks locally where possible. This can bring a significant increase in read speed.
5.6.2. Output
Each processor will write to a different file in the output folder identified by the unique processor id. The files will be in a temporary state until the job is completed and will be committed when the job is complete. For streaming jobs, they will be committed when the job is cancelled. We have plans to introduce a rolling sink for HDFS in the future to have better streaming support.
5.6.3. Dealing with Writables
Hadoop types implement their own serialization mechanism through the use
of Writable.
Jet provides an adapter to register a Writable
for
Hazelcast serialization
without having to write additional serialization code. To use this
adapter, you can register your own Writable
types by extending
WritableSerializerHook
and
registering the hook.
5.6.4. Hadoop JARs and Classpath
When submitting JARs along with a Job, sending Hadoop JARs should be avoided and instead Hadoop JARs should be present on the classpath of the running members. Hadoop JARs contain some JVM hooks and can keep lingering references inside the JVM long after the job has ended, causing memory leaks.
5.7. Files
Hazelcast Jet provides file and TCP/IP socket connectors that have limited production use, but are simple and can be very useful in an early rapid prototyping phase. They assume the data is in the form of plain text and emit/receive data items which represent individual lines of text.
These connectors are not fault-tolerant. On job restart they behave the as if you started a new job. The sources don’t do snapshotting. The sinks don’t suppress duplicate data.
5.7.1. File Sources
The file sources are designed to work with the local and shared file systems. For local file system, the sources expect to see on each member just the files that member should read. You can achieve the effect of a distributed source if you manually prepare a different set of files on each member. For shared file system, the sources split the work so that each member will read a part of the files.
There are two flavors of the file source: bounded and unbounded.
Here’s an example with the bounded source:
Pipeline p = Pipeline.create();
p.drawFrom(Sources.files("/home/jet/input"))
.drainTo(Sinks.logger());
This will log on each Jet member the contents of all the files in the specified directory. When the source reads all the files, the job completes. If the files change while the job is running, the behavior is undefined.
Here’s an example with the unbounded source:
Pipeline p = Pipeline.create();
p.drawFrom(Sources.fileWatcher("/home/jet/input"))
.withoutTimestamps()
.drainTo(Sinks.logger());
It will watch the directory for changes. It will emit only new contents added after startup: both new files and new content appended to existing ones. Files must be updated in an append-only fashion; if the existing content changes, the behavior is undefined.
If you delete the watched directory, the job will complete.
5.7.2. File Sink
The file sink can work with either a local or a shared network file system. Each member will write to different file names. You can achieve the effect of a distributed sink if you manually collect all the output files on all members and combine their contents.
Here’s a small example of usage:
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("inputList"))
.drainTo(Sinks.files("/home/jet/output"));
5.7.3. Avro
Hazelcast Jet provides Apache Avro file source and sink for batch
processing jobs. This is a specialized version of file connector.
Avro connector assumes the data is in the Avro Object Container File
format.
Avro File Source
Avro file source reads the files in the specified directory using the given datum reader supplier and emits the records to downstream.
Here’s an example with a reflect datum reader:
Pipeline p = Pipeline.create();
p.drawFrom(AvroSources.files("/home/jet/input", Person.class))
.drainTo(Sinks.logger());
This will log on each Jet member the records of all the files in the specified directory. When the source reads all the files, the job completes. If the files change while the job is running, the behavior is undefined.
Avro File Sink
Avro file sink writes the records to the files using the given datum writer and schema suppliers. The sink always overwrites the existing files, does not append.
Here is an example with a generic datum writer:
String schemaString = "{\"type\":\"record\",\"name\":\"Person\"," +
"\"namespace\":\"datamodel\",\"fields\":[{" +
"\"name\":\"id\",\"type\":\"int\"}]}";
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<GenericRecord>list("inputList"))
.drainTo(AvroSinks.files("/home/jet/output",
() -> new Schema.Parser().parse(schemaString)));
5.8. TCP/IP Socket
5.8.1. Socket Source
The socket source opens a blocking client TCP/IP socket and receives data over it. The data must be lines of plain text.
Each underlying worker of the Socket Source connector opens its own
client socket and asks for data from it. The user supplies the
host:port
connection details. The server side should ensure a
meaningful dispersion of data among all the connected clients, but
how it does it is outside of Jet’s control.
Here’s a simple example:
Pipeline p = Pipeline.create();
p.drawFrom(Sources.socket("localhost", 8080, StandardCharsets.UTF_8))
.withoutTimestamps()
.drainTo(Sinks.logger());
You can study a comprehensive code sample including a sample socket server using Netty.
Sink
The socket sink opens a blocking client TCP/IP socket and sends data over it. The data must be in the form of lines of plain text. To get meaningful behavior, the server side must collect and combine them from all the concurrently connected clients.
Here’s a simple example:
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("inputList"))
.drainTo(Sinks.socket("localhost", 8080));
5.9. JDBC
Jet contains both JDBC source and a sink. They can be used to read or write from/to relational databases or another source that supports the standard JDBC API.
The source is only a batch-style source: it reads records that are produced by one SQL query and once all records are emitted, the source completes. The sink can work in both batch and streaming jobs.
The connector is not fault-tolerant. On job restart it behaves as if you have started a new job. The source does not do snapshotting, the sink does not suppress duplicate data.
In order to use JDBC connector, user should include a JDBC Driver in classpath.
5.9.1. Using the JDBC Source
The JDBC source comes in two versions:
-
Parallel: You will need to provide a
ToResultSetFunction
that will create one query for each parallel worker to query part of the data. -
Non-parallel: There will be only one parallel worker that will read all the records.
Example of parallel source
Here we use modulo operator to select a part of the keys, but any query returning non-overlapping subsets is possible. Note that this way might not actually perform better that the non-parallel version unless the underlying table is actually physically partitioned by this key.
Pipeline p = Pipeline.create();
p.drawFrom(Sources.jdbc(
() -> DriverManager.getConnection(DB_CONNECTION_URL),
(con, parallelism, index) -> {
PreparedStatement stmt = con.prepareStatement("SELECT * FROM PERSON WHERE MOD(id, ?) = ?)");
stmt.setInt(1, parallelism);
stmt.setInt(2, index);
return stmt.executeQuery();
},
resultSet ->
new Person(resultSet.getInt(1), resultSet.getString(2))
)).drainTo(Sinks.logger());
Example of non-parallel source
A single worker of the source will fetch the whole result set with a single query. You need to provide just one SQL query.
Pipeline p = Pipeline.create();
p.drawFrom(Sources.jdbc(
DB_CONNECTION_URL,
"select * from PERSON",
resultSet ->
new Person(resultSet.getInt(1), resultSet.getString(2))
)).drainTo(Sinks.logger());
Output function gets the ResultSet
and creates desired output object.
The function is called for each row of the result set, user should not
call ResultSet#next()
or any other cursor-navigating functions.
The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
Any SQLException
will cause the job to fail. The default local
parallelism for this processor is 1.
5.9.2. Using the JDBC Sink
The JDBC sink connects to the specified database using the given connection supplier, prepares a statement using the given update query and inserts/updates the items.
The update query should be a parametrized query. The bind function will
receive a PreparedStatement
created for this query and should bind
parameters to it. It should not execute the query, call commit or any
other method.
The records will be committed after each batch of records and a batch mode will be used (if the driver supports it). Auto-commit will be disabled on the connection.
In case of an SQLException
the processor will automatically try to
reconnect and the job won’t fail, except for the
SQLNonTransientException
subclass. The default local parallelism for
this sink is 1.
No state is saved to snapshot for this sink. After the job is restarted,
the items will likely be duplicated, providing an at-least-once
guarantee. For this reason you should not use INSERT
statement which
can fail on duplicate primary key. Rather use an insert-or-update
statement that can tolerate duplicate writes.
The following code snippet shows writing the Person
objects to a
database table:
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Person>list("inputList"))
.drainTo(Sinks.jdbc(
"REPLACE INTO PERSON (id, name) values(?, ?)",
DB_CONNECTION_URL,
(stmt, item) -> {
stmt.setInt(1, item.id);
stmt.setString(2, item.name);
}));
5.10. JMS
JMS (Java Message Service) connector can be used both as a source and a sink for infinite stream processing. The connector is not fault-tolerant. On job restart it behaves as if you have started a new job. The source does not do snapshotting, the sink does not suppress duplicate data.
In order to use JMS connector, user should include a JMS Client in classpath. IO failures are generally handled by JMS Client and do not cause the connector to fail. Most of the clients offer a configuration parameter to enable auto-reconnection, refer to client documentation for details.
5.10.1. Using JMS as a Source
The JMS source opens a connection to the JMS server for each member. Then each underlying worker of the source creates a session and a message consumer using that connection. The user supplies necessary functions to create the connection, session and message consumer.
The JMS source uses non-blocking API to receive the messages and transforms each message to a desired output object using supplied projection function. Source flushes the session after receiving each message using the supplied flush function.
The following code snippets show streaming messages from a JMS queue and a JMS topic using ActiveMQ JMS Client.
Pipeline p = Pipeline.create();
p.drawFrom(Sources.jmsQueue(() -> new ActiveMQConnectionFactory(
"tcp://localhost:61616"), "queue"))
.withoutTimestamps()
.drainTo(Sinks.logger());
Pipeline p = Pipeline.create();
p.drawFrom(Sources.jmsTopic(() -> new ActiveMQConnectionFactory(
"tcp://localhost:61616"), "topic"))
.withoutTimestamps()
.drainTo(Sinks.logger());
The JMS topic is a non-distributed source, if messages are consumed by
multiple consumer, all of them will get the same messages. Therefore the
source operates on a single member with local parallelism of 1. Setting
local parallelism to a value other than 1 causes
IllegalArgumentException
.
5.10.2. Using JMS as a Sink
The JMS sink opens a connection to the JMS server for each member. Then each underlying worker of the sink creates a session and a message producer using that connection. The user supplies necessary functions and parameters to create the connection, session and message producer.
The JMS sink uses the supplied function to create a Message
object for
each input item and sends this message using the supplied send function.
After a batch of messages is sent, sink flushes the session using the
supplied flush function.
The following code snippets show writing to a JMS queue and a JMS topic using ActiveMQ JMS Client.
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("inputList"))
.drainTo(Sinks.jmsQueue(() -> new ActiveMQConnectionFactory(
"tcp://localhost:61616"), "queue"));
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("inputList"))
.drainTo(Sinks.jmsTopic(() -> new ActiveMQConnectionFactory(
"tcp://localhost:61616"), "topic"));
5.11. Amazon AWS S3
Amazon AWS S3 Object Storage is a production-worthy choice for both a data source and a data sink in a batch computation job.
The S3 Source and Sink uses Amazon S3 SDK to connect to the storage. The connectors expect the user to provide either an S3Client client instance or credentials (Access key ID, Secret Access Key) to create the client. The source and sink assume the data is in the form of plain text and emit/receive data items which represent individual lines of text.
The connectors are not fault-tolerant. On job restart they behave as if you started a new job. The source does not do snapshotting. The sink always overwrites previously written objects.
5.11.1. S3 Source
S3 source distributes the workload by splitting the objects to each processor instance. The source reads each object line by line and emits desired output to downstream using provided transform function.
Below example will log on each Jet member the contents of all the objects in the specified bucket. When the source reads all the files, the job completes. If the objects change while the job is running, the behavior is undefined.
String accessKeyId = "";
String accessKeySecret = "";
String prefix = "";
AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, accessKeySecret);
S3Client s3 = S3Client
.builder()
.credentialsProvider(StaticCredentialsProvider.create(credentials))
.build();
Pipeline p = Pipeline.create();
p.drawFrom(S3Sources.s3(singletonList("input-bucket"), prefix, () -> s3))
.drainTo(Sinks.logger());
5.11.2. S3 Sink
S3 sink writes items to the specified bucket by transforming each item
to a line using given transform function. The sink creates an object
in the bucket for each processor instance. Name of the file will include an user provided
prefix (if defined) and followed by the processor’s global index, for example the
processor having the index 2
with prefix my-object-
will create the object
my-object-2
.
Here’s a small example of usage:
String accessKeyId = "";
String accessKeySecret = "";
AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, accessKeySecret);
S3Client s3 = S3Client
.builder()
.credentialsProvider(StaticCredentialsProvider.create(credentials))
.build();
Pipeline p = Pipeline.create();
p.drawFrom(Sources.list("input-list"))
.drainTo(S3Sinks.s3("output-bucket", () -> s3));
5.11.3. Using HDFS as S3 connector
AWS S3 can be registered as a filesystem to HDFS which then can be used as a source or sink via our hadoop module. See Hadoop-AWS module for more information on the dependencies and configuration.
Below example will read from a bucket, transform each line to uppercase and write to another bucket using the hadoop module.
JobConf jobConfig = new JobConf();
jobConfig.setInputFormat(TextInputFormat.class);
jobConfig.setOutputFormat(TextOutputFormat.class);
TextInputFormat.addInputPath(jobConfig, new Path("s3a://input-bucket"));
TextOutputFormat.setOutputPath(jobConfig, new Path("s3a://output-bucket"));
Pipeline p = Pipeline.create();
p.drawFrom(HdfsSources.<String, String>hdfs(jobConfig))
.map(e -> Util.entry(e.getKey(), e.getValue().toUpperCase()))
.drainTo(HdfsSinks.hdfs(jobConfig));
5.12. Source and Sink Builders
If Jet doesn’t natively support the data source/sink you need, you can
build a connector for it yourself by using the
SourceBuilder
and
SinkBuilder
.
5.12.1. Source Builder
To make your custom source connector you need two basic ingredients:
-
a context object that holds all the resources and state you need to keep track of
-
a stateless function,
fillBufferFn
, taking two parameters: the state object and a buffer object provided by Jet
Jet repeatedly calls fillBufferFn
whenever it needs more data items.
Optimally, the function will fill the buffer with the items it can
acquire without blocking. A hundred items at a time is enough to
eliminate any per-call overheads within Jet. The function may block as
well (it’s running inside a non-cooperative
processor), but taking longer than a second to complete can have
negative effects on the overall performance of the processing pipeline.
Build a Bounded (Batch) Source
In this example we build a source that emits the lines of a file:
BatchSource<String> fileSource = SourceBuilder
.batch("file-source", x -> (1)
new BufferedReader(new FileReader("input.txt")))
.<String>fillBufferFn((in, buf) -> { (2)
String line = in.readLine();
if (line != null) {
buf.add(line);
} else {
buf.close(); (3)
}
})
.destroyFn(BufferedReader::close)
.build();
Pipeline p = Pipeline.create();
BatchStage<String> srcStage = p.drawFrom(fileSource);
1 | here we pass createFn , the factory of state objects |
2 | the main logic goes to fillBufferFn |
3 | call buf.close() to mark the end of the bounded data stream |
The file must be available on all the members of the Jet cluster for this to work. Only one member will actually read it, but you can’t choose or predict which one.
The code above emits a single item per fillBufferFn
call. To ensure we
get the best performance, we can improve it to emit the data in chunks:
BatchSource<String> fileSource = SourceBuilder
.batch("file-source", x ->
new BufferedReader(new FileReader("input.txt")))
.<String>fillBufferFn((in, buf) -> {
for (int i = 0; i < 128; i++) {
String line = in.readLine();
if (line == null) {
buf.close();
return;
}
buf.add(line);
}
})
.destroyFn(BufferedReader::close)
.build();
Build an Unbounded (Stream) Source
Here’s how you can build a simple source that keeps polling a URL, emitting all the lines it gets in the response:
StreamSource<String> httpSource = SourceBuilder
.stream("http-source", ctx -> HttpClients.createDefault())
.<String>fillBufferFn((httpc, buf) -> {
HttpGet request = new HttpGet("localhost:8008");
InputStream content = httpc.execute(request)
.getEntity()
.getContent();
BufferedReader reader = new BufferedReader(
new InputStreamReader(content)
);
reader.lines().forEach(buf::add);
})
.destroyFn(CloseableHttpClient::close)
.build();
Pipeline p = Pipeline.create();
StreamSourceStage<String> srcStage = p.drawFrom(httpSource);
Our state object is an instance of the Apache HTTP Client. It maintains a connection pool so it will reuse the same connection for all our requests. Note that we’re making a blocking call here, but it’s expected to respond quickly. In a more serious production setting we could configure timeouts on the HTTP client to limit the blocking time. We’d also have to add error handling so we just retry on failure instead of causing the whole Jet job to fail.
Build a Stream Source with Event Timestamps
To make the data usable for windowed aggregation, each event item must be timestamped. If the source doesn’t emit timestamped events, you’ll have to add them while building the processing pipeline. It’s more convenient and efficient to add the timestamps right at the source. Jet offers you an API variant specifically tailored to this need. Let’s say that in the above example the first 9 characters denote the event’s timestamp. We can extract them as follows:
StreamSource<String> httpSource = SourceBuilder
.timestampedStream("http-source", ctx -> HttpClients.createDefault())
.<String>fillBufferFn((httpc, buf) -> {
HttpGet request = new HttpGet("localhost:8008");
InputStream content = httpc.execute(request)
.getEntity()
.getContent();
BufferedReader reader = new BufferedReader(
new InputStreamReader(content)
);
reader.lines().forEach(item -> {
long timestamp = Long.valueOf(item.substring(0, 9));
buf.add(item.substring(9), timestamp);
});
})
.destroyFn(CloseableHttpClient::close)
.build();
Distributed Stream Source
In the examples we showed so far the source was non-distributed: Jet will create just a single processor in the whole cluster to serve all the data. This is an easy and obvious way to create a source connector.
If you want to create a distributed source, the challenge is
coordinating all the parallel instances to appear as a single, unified
source. Each instance must emit its unique slice of the whole data
stream. Jet passes the Processor.Context
to your createFn
and you can use it to identify each state object you
create. Consult the properties
totalParallelism
and
globalProcessorIndex
:
Jet will call createFn
exactly once with each globalProcessorIndex
from 0 to totalParallelism - 1
and you can use this to slice up the
data.
Here’s a rudimentary example that shows how such a scheme could work:
class SourceContext {
private final int limit;
private final int step;
private int currentValue;
SourceContext(Processor.Context ctx, int limit) {
this.limit = limit;
this.step = ctx.totalParallelism();
this.currentValue = ctx.globalProcessorIndex();
}
void addToBuffer(SourceBuffer<Integer> buffer) {
if (currentValue < limit) {
buffer.add(currentValue);
currentValue += step;
} else {
buffer.close();
}
}
}
BatchSource<Integer> sequenceSource = SourceBuilder
.batch("seq-source", procCtx -> new SourceContext(procCtx, 1_000))
.fillBufferFn(SourceContext::addToBuffer)
.distributed(2) (1)
.build();
1 | we request two parallel processors on each Jet member |
This is a distributed source that generates a bounded sequence of integers. If the Jet cluster has two members, the source will have four parallel generators. The first one will generate numbers 0, 4, 8, …, the second one 1, 5, 9, …, etc.
In the real world there are some fundamental differences. The data source is partitioned in advance and you cannot expect the number of partitions to perfectly match your topology. Instead you must write some logic that distributes, more or less evenly, M partitions over N processors.
There’s a specifically nasty problem hiding in this scheme: when we assign several partitions to a single processor and consume the data from each partition in chunks, the difference in the timestamp of the last event we fetch from partition i and the first item from partition j can be very large, much larger than the maximum lateness we configured. By the time it receives these events, Jet has already completed the processing of their window and moved on. The items must be dropped as "too late to process".
Jet has a mechanism that mitigates this issue by individually tracking
the highest timestamp from each partition and emitting the appropriate
watermark items that account for the lagging partitions. It is available
if you create a custom source processor using the Core API, but it’s not
currently exposed through the source builder. Refer to the section on
custom source vertices and to the Javadoc
of EventTimeMapper
.
Add Fault Tolerance to your Source
If you want your source to behave correctly within a streaming Jet job that has a processing guarantee configured (at-least-once or exactly-once), you must help Jet with saving the operational state of your context object to the snapshot storage. There are two functions you must supply:
-
createSnapshotFn
returns a serializable object that has all the data you’ll need to restore the operational state -
restoreSnapshotFn
applies the previously saved snapshot to the current context object
While a job is running, Jet calls createSnapshotFn
at regular
intervals to save the current state. When Jet resumes a job, it will:
-
create your context object the usual way, by calling
createFn
-
retrieve the latest snapshot object from its storage
-
pass the context and snapshot objects to
restoreSnapshotFn
-
start calling
fillBufferFn
, which must start by emitting the same item it was about to emit whencreateSnapshotFn
was called.
You’ll find that restoreSnapshotFn
, somewhat unexpectedly, accepts
not one but a list of snapshot objects. If you’re building a simple,
non-distributed source, this list will have just one member. However,
the same logic must work for distributed sources as well, and a
distributed source runs on many parallel processors at the same
time. Each of them will produce its own snapshot object. After a restart
the number of parallel processors may be different than before (because
you added a Jet cluster member, for example), so there’s no one-to-one
mapping between the processors before and after the restart. This is why
Jet passes all the snapshot objects to all the processors, and your
logic must work out which part of their data to use. See
Distributed Stream Source chapter for more information.
Here’s a brief example with a fault-tolerant streaming source that generates a sequence of integers:
StreamSource<Integer> faultTolerantSource = SourceBuilder
.stream("fault-tolerant-source", processorContext -> new int[1])
.<Integer>fillBufferFn((numToEmit, buffer) ->
buffer.add(numToEmit[0]++))
.createSnapshotFn(numToEmit -> numToEmit[0]) (1)
.restoreSnapshotFn(
(numToEmit, saved) -> numToEmit[0] = saved.get(0)) (2)
.build();
1 | the snapshotting function returns the current number to emit |
2 | the restoring function sets the number from the snapshot to the current state |
5.12.2. Sink Builder
To make your custom sink connector you need two basic ingredients:
-
an object that will hold all the state you need to keep track of
-
a stateless function,
receiveFn
, taking two parameters: the state object and a data item sent to the sink
This is a simple example with a file sink:
Sink<Object> sink = sinkBuilder(
"file-sink", x -> new PrintWriter(new FileWriter("output.txt")))
.receiveFn((out, item) -> out.println(item.toString()))
.destroyFn(PrintWriter::close)
.build();
Pipeline p = Pipeline.create();
p.drawFrom(list("input"))
.drainTo(sink);
This will create the file output.txt
on each member, so the overall
job output consists of the contents of all these files put together.
Note that we’re using blocking IO here. Jet will use non-cooperative processors for the sink, so we’re allowed to do that.
For good overall job performance we should still ensure that the call
receiveFn
doesn’t take more than a second to complete.
In the above example our state object is a PrintWriter
, which has
internal buffering. Jet allows us to make buffering a first-class
concern and deal with it explicitly by taking an optional flushFn
which it will call at regular intervals. Here’s our example modified to
do explicit buffering and flushing:
Sink<Object> sink = sinkBuilder("file-sink", x -> new StringBuilder())
.receiveFn((buf, item) -> buf.append(item).append('\n'))
.flushFn(buf -> {
try (Writer out = new FileWriter("output.txt", true)) {
out.write(buf.toString());
buf.setLength(0);
}
})
.build();
In this case we don’t need the destroyFn
because we keep the file
closed while not appending to it. Through the use of buffering we
drown out the overhead of opening and closing the file each time and
we get an overall more robust solution.
Sink Parallelism
Jet builds a sink that is distributed: each member of the Jet cluster
has a processor running it. You can configure how many parallel
processors there are on each member (the local parallelism) by calling
SinkBuilder.preferredLocalParallelism()
. By default there will be one
processor per member.
Fault Tolerance
The sink you get from the sink builder doesn’t participate in the fault tolerance protocol. You can’t preserve any internal state if a job fails and gets restarted. In a job with snapshotting enabled your sink will still receive every item at least once. If the system you’re storing the data into is idempotent, ignoring duplicate items, this will have the effect of the exactly-once guarantee.
6. Configuration
This chapter starts with a reference table describing all the options to configure Hazelcast Jet. Then it provides the ways of configuration, i.e., programmatic, declarative and within Spring context.
6.1. Configuration Options
The following tables list and describe all the methods and/or properties used to configure Hazelcast Jet, grouped by the features they are used for. Some of the listed options below can also be configured declaratively. See the Declarative Configuration section for an example.
6.1.1. Jet Instance
Configuration API Method | Description |
---|---|
|
The number of threads Jet creates in its cooperative multithreading pool.
Its default value is
|
|
Jet uses a flow control mechanism between
cluster members to prevent a slower vertex from getting overflowed with
data from a faster upstream vertex. Each receiver regularly reports to
each sender how much more data it may send over a given DAG edge. This
method sets the duration (in milliseconds) of the interval between
flow-control packets. Its default value is |
|
The number of synchronous backups to configure on the
IMap that Jet needs internally to store job metadata and snapshots. The
maximum allowed value is 6. Its default value is |
|
The delay after which the auto-scaled jobs restart if a new member is added to the
cluster. It has no effect on jobs with auto scaling disabled. Its default
value is |
|
Specifies whether the lossless job restart feature is enabled. With this feature,
you can restart the whole cluster without losing the jobs and
their state. It is implemented on top of Hazelcast IMDG’s Hot Restart
Store feature, which persists the data to disk. You need to have the Hazelcast Jet
Enterprise edition and configure Hazelcast IMDG’s Hot Restart Store to use this feature.
Its default value is |
6.1.2. Fault Tolerance
Configuration API Method | Description |
---|---|
|
Processing guarantee for failures. If a cluster member leaves the cluster during
a job execution and the job
is restarted automatically, it defines the semantics of at which point in the
stream the job is resumed from.
Available values are |
|
The interval in milliseconds between the completion of the previous snapshot and
the start of a new one. It must be a
positive value and it is only relevant when the processing guarantee is set as either
|
|
Specifies whether Jet scales the job up or down when a member is added or removed
from the cluster. Its default
value is |
|
Specifies whether the split-brain protection feature is enabled. When enabled, Jet restarts the job after a topology change, only if the cluster quorum is satisfied. See the Split-Brain Protection section. |
6.1.3. Performance
Configuration API Method | Description |
---|---|
|
For a distributed edge, data is sent to a remote member via Hazelcast network packets.
Each packet
is dedicated to the data of a single edge, but may contain any number of data items.
This setting
limits the size of the packet in bytes. Packets should be large enough to drown out any
fixed
overheads, but small enough to allow good interleaving with other packets. Its default
value is |
|
The capacity of processor-to-processor concurrent queues. The value is rounded upwards
to the
next power of 2. Its default value is |
|
The scaling factor used by the adaptive receive window sizing function.
Its default value is |
6.1.4. Security
Configuration Element | Description |
---|---|
|
Allows you to configure the following properties to configure TLS for all member to member and member to client communications:
You need to have the Hazelcast Jet Enterprise edition to use this feature. See the Security chapter for more information. |
6.1.5. Monitoring
Configuration Method/Property | Description |
---|---|
|
The predefined types of built-in logging adapters. Available values are |
|
Allows you to provide the following configuration options for metrics collection:
See Javadoc for more information. |
You can also use Hazelcast Jet Management Center to monitor your Jet clusters. See the Hazelcast Jet Management Center Reference Manual for more information. |
6.2. Ways of Configuration
You can configure Hazelcast Jet either programmatically or declaratively (XML or YAML).
6.2.1. Programmatic Configuration
Programmatic configuration is the simplest way to configure Jet. You
instantiate a JetConfig
and set
the desired properties. For example, the following will configure Jet to
use only two threads in its cooperative multithreading pool:
JetConfig config = new JetConfig();
config.getInstanceConfig().setCooperativeThreadCount(2);
JetInstance jet = Jet.newJetInstance(config);
If you want to specify your own configuration file to create JetConfig
,
Hazelcast Jet supports several ways including filesystem, classpath,
InputStream and in-memory string.
Building JetConfig from the XML declarative configuration:
-
JetConfig cfg = JetConfig.loadXmlFromStream(inputStream);
-
JetConfig cfg = JetConfig.loadFromClasspath(classloader, xmlFileName);
-
JetConfig cfg = JetConfig.loadFromFile(configFile);
-
JetConfig cfg = JetConfig.loadXmlFromString(xml);
Building JetConfig from the YAML declarative JetConfiguration:
-
JetConfig cfg = JetConfig.loadYamlFromStream(inputStream);
-
JetConfig cfg = JetConfig.loadFromClasspath(classloader, yamlFileName);
-
JetConfig cfg = JetConfig.loadFromFile(configFile);
-
JetConfig cfg = JetConfig.loadYamlFromString(yaml);
6.2.2. Declarative Configuration
If you don’t pass an explicit JetConfig
object when constructing a Jet
instance, it will perform the following lookup procedure:
-
Read the system property
hazelcast.jet.config
. If it starts withclasspath:
, treat it as a classpath resource, otherwise it’s a file pathname. If it’s defined but Jet can’t find the file it specifies, startup fails. -
Look for
hazelcast-jet.xml
in the working directory. -
Look for
hazelcast-jet.xml
in the classpath. -
Look for
hazelcast-jet.yaml
in the working directory. -
Look for
hazelcast-jet.yaml
in the classpath. -
Load the default XML configuration packaged in Jet’s JAR.
The YAML files need to have a .yaml or .yml extension if
they are passed in via hazelcast.jet.config property.
|
An example hazelcast-jet.xml
is shown below :
<hazelcast-jet>
<instance>
<cooperative-thread-count>8</cooperative-thread-count>
<flow-control-period>100</flow-control-period>
<temp-dir>/var/tmp/jet</temp-dir>
<backup-count>1</backup-count>
<scale-up-delay-millis>10000</scale-up-delay-millis>
</instance>
<properties>
<property name="custom.property">custom property</property>
</properties>
<edge-defaults>
<queue-size>1024</queue-size>
<packet-size-limit>16384</packet-size-limit>
<receive-window-multiplier>3</receive-window-multiplier>
</edge-defaults>
<metrics>
<retention-seconds>5</retention-seconds>
<collection-interval-seconds>5</collection-interval-seconds>
<metrics-for-data-structures>false</metrics-for-data-structures>
</metrics>
</hazelcast-jet>
The identical part of the configuration extracted from hazelcast-jet.yaml
is shown as below:
hazelcast-jet:
instance:
cooperative-thread-count: 8
flow-control-period: 100
backup-count: 1
scale-up-delay-millis: 10000
lossless-restart-enabled: false
edge-defaults:
queue-size: 1024
packet-size-limit: 16384
receive-window-multiplier: 3
metrics:
enabled: true
jmx-enabled: true
collection-interval-seconds: 5
retention-seconds: 5
metrics-for-data-structures: false
Composing Declarative Configuration
Declarative configurations (either XML or YAML) have the support for composing multiple declarative configuration snippets into a single one.
In order to compose a declarative configuration, you can import
different declarative configuration files.
Let’s say you want to compose the declarative configuration for
Hazelcast Jet out of two XML configurations: development-metrics-config.xml
and development-instance-config.xml
. These two configurations are shown
below.
development-metrics-config.xml
:
<hazelcast-jet>
<metrics enabled="true" jmxEnabled="true">
<retention-seconds>5</retention-seconds>
<collection-interval-seconds>5</collection-interval-seconds>
<metrics-for-data-structures>true</metrics-for-data-structures>
</metrics>
</hazelcast-jet>
development-instance-config.xml
:
<hazelcast-jet>
<instance>
<flow-control-period>100</flow-control-period>
<backup-count>1</backup-count>
<scale-up-delay-millis>10000</scale-up-delay-millis>
<lossless-restart-enabled>false</lossless-restart-enabled>
</instance>
</hazelcast-jet>
To get your example Hazelcast Jet declarative configuration out of the
above two, use the <import/>
element as shown below.
<hazelcast-jet>
<import resource="development-metrics-config.xml"/>
<import resource="development-instance-config.xml"/>
</hazelcast-jet>
The above example using the YAML configuration files looks like the following:
development-metrics-config.yaml
:
hazelcast-jet:
metrics:
enabled: true
jmx-enabled: true
collection-interval-seconds: 5
retention-seconds: 5
metrics-for-data-structures: true
development-instance-config.yaml
:
hazelcast-jet:
instance:
flow-control-period: 100
backup-count: 1
scale-up-delay-millis: 10000
lossless-restart-enabled: false
Composing the above two YAML configuration files needs them to be imported as shown below.
hazelcast-jet:
import:
- development-metrics-config.yaml
- development-instance-config.yaml
Use the <import/> element and import mapping on top level of
the XML and YAML hierarchies.
|
Hazelcast Jet and Hazelcast IMDG share the same syntax for declarative configuration composition. For more examples on importing the resources from the classpath and file system, or resources with variables in their names, refer to the relevant section in the Hazelcast IMDG Reference Manual.
6.2.3. Using Variables
In your Hazelcast Jet declarative configuration, you can use variables to set the values of the elements. This is valid when you set a system property programmatically or use the command line interface. You can use a variable in the declarative configuration to access the values of the system properties you set.
For example, see the following command that sets two system properties.
-Dcooperative.thread.count=16
Let’s get the values of these system properties in the declarative configuration of Hazelcast Jet, as shown below.
In the XML configuration:
<hazelcast-jet>
<instance>
<cooperative-thread-count>${cooperative.thread.count}</cooperative-thread-count>
</instance>
</hazelcast-jet>
In the YAML configuration:
hazelcast-jet:
instance:
cooperative-thread-count: ${cooperative.thread.count}
If you do not want to rely on the system properties, you can use the
XmlJetConfigBuilder
or YamlJetConfigBuilder
and explicitly set a
Properties
instance, as shown below.
Properties properties = new Properties();
// fill the properties, e.g., from database/LDAP, etc.
XmlJetConfigBuilder builder = new XmlJetConfigBuilder();
builder.setProperties(properties);
JetConfig config = builder.build();
JetInstance hz = Jet.newJetInstance(config);
6.2.4. Variable Replacers
Hazelcast Jet supports variable replacers which are used to replace custom strings during loading the configuration.
Hazelcast Jet and Hazelcast IMDG share the same syntax for variable replacers. You can configure your variable replacers in the Hazelcast Jet configuration via XML or YAML. Please see the Variable Replacers section in the Hazelcast IMDG Reference Manual for more information and examples on the variable replacers.
6.3. Configure the Underlying Hazelcast Instance
Each Jet member or client has its underlying Hazelcast member or client. Please refer to the Hazelcast IMDG Reference Manual for specific configuration options for Hazelcast IMDG.
6.3.1. Programmatic
You can configure the underlying Hazelcast IMDG member as follows:
JetConfig jetConfig = new JetConfig();
jetConfig.getHazelcastConfig().getGroupConfig().setName("test");
JetInstance jet = Jet.newJetInstance(jetConfig);
Similarly, you can also configure the underlying Hazelcast client:
ClientConfig clientConfig = new ClientConfig();
clientConfig.getGroupConfig().setName("test");
JetInstance jet = Jet.newJetClient(clientConfig);
6.3.2. Declarative
Hazelcast IMDG can also be configured declaratively. Please refer to the Hazelcast IMDG Reference Manual for information on how to do this.
6.4. Spring Integration
You can configure and start a Hazelcast Jet instance as a component in
the Spring Application Context. You can use the plain bean
element
and define individual properties on a JetConfig
instance, but Jet
provides its own schema-based configuration which will make this much
less verbose.
6.4.1. Approach 1: Use the Plain bean
Element
You can declare Hazelcast Jet objects using the default Spring beans namespace. Here is an example for a Hazelcast Jet Instance declaration:
<bean id="instance" class="com.hazelcast.jet.Jet" factory-method="newJetInstance">
<constructor-arg>
<bean class="com.hazelcast.jet.config.JetConfig">
<property name="hazelcastConfig">
<bean class="com.hazelcast.config.Config">
<!-- ... -->
</bean>
</property>
<property name="instanceConfig">
<bean class="com.hazelcast.jet.config.InstanceConfig">
<property name="cooperativeThreadCount" value="2"/>
</bean>
</property>
<property name="defaultEdgeConfig">
<bean class="com.hazelcast.jet.config.EdgeConfig">
<property name="queueSize" value="2048"/>
</bean>
</property>
<property name="properties">
<props>
<prop key="foo">bar</prop>
</props>
</property>
</bean>
</constructor-arg>
</bean>
<bean id="map" factory-bean="instance" factory-method="getMap">
<constructor-arg value="my-map"/>
</bean>
6.4.2. Approach 2: Use jet:instance
Hazelcast Jet provides its own Spring config schema. Add the namespace
declaration xmlns:jet=“http://www.hazelcast.com/schema/jet-spring”
to the beans
element and then use the jet namespace prefix.
Make sure you added hazelcast-jet-spring.jar
to the classpath.
Here’s how your namespace and schema instance declarations may look:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jet="http://www.hazelcast.com/schema/jet-spring"
xmlns:hz="http://www.hazelcast.com/schema/spring"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.hazelcast.com/schema/spring
http://www.hazelcast.com/schema/spring/hazelcast-spring-3.12.xsd
http://www.hazelcast.com/schema/jet-spring
http://www.hazelcast.com/schema/jet-spring/hazelcast-jet-spring-3.2.xsd">
<!-- ... -->
</beans>
Configuring the Hazelcast Jet Instance
<jet:instance id="instance">
<hz:config>
<hz:spring-aware/>
<hz:group name="jet"/>
<hz:network port="5701" port-auto-increment="false">
<hz:join>
<hz:multicast enabled="false"/>
<hz:tcp-ip enabled="true">
<hz:member>127.0.0.1:5701</hz:member>
</hz:tcp-ip>
</hz:join>
</hz:network>
<hz:map name="map" backup-count="3">
</hz:map>
</hz:config>
<jet:instance-config cooperative-thread-Count="2"/>
<jet:default-edge-config queue-size="2048"/>
<jet:properties>
<hz:property name="foo">bar</hz:property>
</jet:properties>
</jet:instance>
Configure a Jet Client
<jet:client id="jet-client">
<jet:group name="jet"/>
<jet:network>
<hz:member>127.0.0.1:5701</hz:member>
</jet:network>
<jet:spring-aware/>
</jet:client>
Additional Bean Types Supported by the Jet Namespace
You can obtain the underlying HazelcastInstance
from the Jet instance
as a bean and use it to obtain these Hazelcast IMDG beans:
-
map
-
list
-
multiMap
-
replicatedMap
-
queue
-
topic
-
set
-
executorService
-
idGenerator
-
atomicLong
-
atomicReference
-
semaphore
-
countDownLatch
-
lock
Here are short examples for each of them:
<jet:hazelcast jet-instance-ref="jet-instance" id="hazelcast-instance"/>
<jet:map instance-ref="jet-instance" name="my-map" id="my-map-bean"/>
<jet:list instance-ref="jet-client" name="my-list" id="my-list-bean"/>
<hz:multiMap id="multiMap" instance-ref="hazelcast-instance" name="my-multiMap"/>
<hz:replicatedMap id="replicatedMap" instance-ref="hazelcast-instance" name="my-replicatedMap"/>
<hz:queue id="queue" instance-ref="hazelcast-instance" name="my-queue"/>
<hz:topic id="topic" instance-ref="hazelcast-instance" name="my-topic"/>
<hz:set id="set" instance-ref="hazelcast-instance" name="my-set"/>
<hz:executorService id="executorService" instance-ref="hazelcast-instance" name="my-executorService"/>
<hz:idGenerator id="idGenerator" instance-ref="hazelcast-instance" name="my-idGenerator"/>
<hz:atomicLong id="atomicLong" instance-ref="hazelcast-instance" name="my-atomicLong"/>
<hz:atomicReference id="atomicReference" instance-ref="hazelcast-instance" name="my-atomicReference"/>
<hz:semaphore id="semaphore" instance-ref="hazelcast-instance" name="my-semaphore"/>
<hz:countDownLatch id="countDownLatch" instance-ref="hazelcast-instance" name="my-countDownLatch"/>
<hz:lock id="lock" instance-ref="hazelcast-instance" name="my-lock"/>
Hazelcast Jet also supports lazy-init, scope and depends-on bean attributes.
<jet:instance id="instance" lazy-init="true" scope="singleton">
<!-- ... -->
</jet:instance>
<jet:client id="client" scope="prototype" depends-on="instance">
<!-- ... -->
</jet:client>
6.4.3. Annotation-Based Configuration
Annotation-Based Configuration does not require any XML definition.
Simply create a configuration class annotated with @Configuration
and
provide a JetInstance
as a bean by annotating the method with @Bean
.
@Configuration
public class AppConfig {
@Bean
public JetInstance instance() {
return Jet.newJetInstance();
}
}
6.4.4. Enabling SpringAware Objects
Hazelcast IMDG has a special annotation, @SpringAware
, which
enables you to initialize the object with spring context.
When a job is submitted to the cluster, processors are created
by Hazelcast Jet on each member. By marking your processor with
@SpringAware
, you make spring context accessible to your
processor which gives you the ability:
-
to apply bean properties
-
to apply factory callbacks such as
ApplicationContextAware
,BeanNameAware
-
to apply bean post-processing annotations such as
InitializingBean
,@PostConstruct
You need to configure Hazelcast Jet with <hz:spring-aware/>
tag or set SpringManagedContext
programmatically to enable
spring-aware objects. Code samples for
declarative
and
annotation-based
configurations are available at our Code Samples repo.
7. Security
It is possible to configure Jet to use TLS for all member to member and member to client communications. TLS requires the use of Jet Enterprise version.
Once TLS is enabled on a member, all other members in the same cluster and all clients connecting to the cluster must also have TLS enabled.
The way TLS is configured is same as how it’s done in Hazelcast IMDG Enterprise. To configure TLS, you need a server certificate and a matching private key and do the necessary configuration on the member side.
The following is an example configuration snippet for hazelcast.xml
. The
configuration must be present on all the members.
<network>
<ssl enabled="true">
<factory-class-name>com.hazelcast.nio.ssl.BasicSSLContextFactory</factory-class-name>
<properties>
<property name="protocol">TLS</property>
<property name="keyStore">/opt/hazelcast.keystore</property>
<property name="keyStorePassword">password12345</property>
<property name="trustStore">/opt/hazelcast.truststore</property>
<property name="trustStorePassword">password12345</property>
<property name="keyManagerAlgorithm">SunX509</property>
<property name="trustManagerAlgorithm">SunX509</property>
<property name="mutualAuthentication">REQUIRED</property>
</properties>
</ssl>
</network>
And similarly, a matching snippet must be present on the clients
inside the hazelcast-client.xml
file:
<network>
<ssl enabled="true">
<factory-class-name>com.hazelcast.nio.ssl.BasicSSLContextFactory</factory-class-name>
<properties>
<property name="protocol">TLS</property>
<property name="trustStore">/opt/hazelcast-client.truststore</property>
<property name="trustStorePassword">secret.123456</property>
<property name="trustStoreType">JKS</property>
<!-- Following properties are only needed when the mutual authentication is used. -->
<property name="keyStore">/opt/hazelcast-client.keystore</property>
<property name="keyStorePassword">keystorePassword123</property>
<property name="keyStoreType">JKS</property>
<property name="mutualAuthentication">REQUIRED</property>
</properties>
</ssl>
</network>
A detailed list of TLS configuration options can be found in the Hazelcast IMDG Reference Manual.
7.1. Mutual Authentication
When a client connects to a node, the node can authenticate the client by using mutual authentication. In this mode, the client also has to present a certificate to the member upon connection. This requires some additional configuration. See the Mutual Authentication section on the IMDG Reference Manual for further details.
7.2. OpenSSL Configuration
By default Jet uses the Java implementation of SSL. For better performance, it is also possible to use Jet with OpenSSL instead. For configuring OpenSSL please refer to the Hazelcast IMDG Reference Manual.
7.3. TLS Impact on Performance
TLS can have some effect on performance when running jobs, as all node to node communications need to be encrypted. During an aggregation or another partitioned operation in a job, large amounts of data might be transferred from one node to another.
Jet makes use of several techniques that reduce the need for data to be transferred across the nodes, however this is sometimes unavoidable. When using Java SSL, the performance impact can be between 5-30% depending on how much network is utilized. When using OpenSSL, we measured the impact to be minimal regardless of the amount of network traffic. Detailed benchmarks are available upon request.
7.4. Remote Hazelcast Sources and Sinks
Currently it’s not possible to connect to remote Hazelcast sources and sinks using TLS. This issue will be resolved in a further patch release.
8. Jet Concepts
In this chapter we will take a deep dive into the fundamentals of distributed computing and Jet’s specific take on it. You’ll find that having some intuition and insight into how distributed computing actually works in Jet makes a big difference when diagnosing your pipeline and improving its performance.
Jet performs high performance in-memory data processing by modeling the
computation as a Directed Acyclic Graph (DAG), where vertices
represent computation and edges represent data flows. A vertex receives
data from its inbound edges, performs a step in the computation, and
emits data to its outbound edges. Both the edge and the vertex are
distributed entities: there are many parallel instances of the
Processor
type that perform a single vertex’s computation work on
each cluster member. An edge between two vertices is implemented with
many data connections, both within a member (concurrent SPSC queues) and
between members (Hazelcast network connections).
One of the major reasons to divide the full computation task into several vertices is data partitioning: the ability to split the data stream into slices which can be processed in parallel, independently of each other. This is how Jet can parallelize and distribute the group-and-aggregate stream transformation, the major workhorse in distributed computing. To make this work, there must be a function which computes the partitioning key for each item and makes all related items map to the same key. Jet can then route all such items to the same processor instance, but has the freedom to route items with different keys to different processors.
Typically your computation job consists of a mapping vertex, where you pre-process the input data into a form that’s ready to be partitioned, followed by the grouping-and-aggregating vertex. The edge between them contains the partitioning logic.
8.1. Modeling the Computation as a DAG
We’ll take one specific problem, the Word Count, dissect it and explain how it gets computed in a Jet cluster. Let us first see the definition of the computation in the Pipeline API:
Pattern delimiter = Pattern.compile("\\W+");
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Long, String>map("book-lines"))
.flatMap(e -> traverseArray(delimiter.split(e.getValue().toLowerCase())))
.filter(word -> !word.isEmpty())
.groupingKey(wholeItem())
.aggregate(AggregateOperations.counting())
.drainTo(Sinks.map("counts"));
Now let’s step back from this and start from the single-threaded Java
code that solves the problem for a basic data structure such as an
ArrayList
. If you have some familiarity with the java.util.stream
API, this is how you’d express it:
Map<String, Long> counts =
lines.stream()
.flatMap(line -> Arrays.stream(line.toLowerCase().split("\\W+")))
.filter(word -> !word.isEmpty())
.collect(Collectors.groupingBy(word -> word, Collectors.counting()));
You can notice a strong similarity with the Pipeline API formulation, but the way it’s executed is radically different. Java will compute it in a single thread, basically running this code:
List<String> lines = someExistingList();
Map<String, Long> counts = new HashMap<>();
for (String line : lines) {
for (String word : line.toLowerCase().split("\\W+")) {
if (!word.isEmpty()) {
counts.merge(word, 1L, (count, one) -> count + one);
}
}
}
The j.u.s. formulation helps us see the steps taken to process each data item:
-
lines.stream()
: read the items (lines of text) from the data source (we’ll call this the “source” step). -
flatMap()
+filter()
: split each line into lowercase words, avoiding empty strings (the tokenizing step). -
collect()
: group equal words together and count them (the accumulating step).
Our next move is to express these steps as a DAG. We’ll start with a single-threaded model and then make several transformations to reach a parallelized, distributed one, discussing at each step the concerns that arise and how to meet them.
We can represent the steps outlined above in a directed acyclic graph (DAG):
The simplest, single-threaded code (shown above) deals with each item as it is produced: the outer loop reads the lines, the inner loop that runs for each line deals with the words on that line, and inside the inner loop we populate the result map with running counts.
However, just by modeling the computation as a DAG, we’ve split the work into isolated steps with clear data interfaces between them. We can perform the same computation by running a separate thread for each step. Roughly speaking, these are the snippets the threads would be executing:
// Source thread
for (String line : readLines()) {
emit(line);
}
// Tokenizer thread
for (String line : receive()) {
for (String word : line.toLowerCase().split("\\W+")) {
if (!word.isEmpty()) {
emit(word);
}
}
}
// Accumulator thread
Map<String, Long> counts = new HashMap<>();
for (String word : receive()) {
counts.merge(word, 1L, (count, one) -> count + one);
}
// finally, when done receiving:
for (Entry<String, Long> wordAndCount : counts.entrySet()) {
emit(wordAndCount);
}
The source loop feeds the tokenizer loop over a concurrent queue, the tokenizer feeds the accumulator loop, and after the accumulator is done receiving, it emits its results to the sink. Diagrammatically it looks like this:
This transformation brought us a pipelined architecture: while the tokenizer is busy with the regex work, the accumulator is updating the map using the data the tokenizer is done with; and the source and sink stages are pumping the data from/to the environment. Our design is now able to engage more than one CPU core and will complete that much sooner; however, we’re still limited by the number of vertices. We’ll be able utilize two or three cores regardless of how many are available. To move forward we must try to parallelize the work of each individual vertex.
Given that our input is an in-memory list of lines, the bottleneck occurs in the processing stages (tokenizing and accumulating). Let’s first attack the tokenizing stage: it is a so-called "embarrassingly parallelizable" task because the processing of each line is completely self-contained. At this point we have to make a clear distinction between the notions of vertex and processor: there can be several processors doing the work of a single vertex. Let’s add another tokenizing processor:
The input processor can now use all the available tokenizers as a pool and submit to any one whose queue has some room.
The next step is parallelizing the accumulator vertex, but this is trickier: accumulators count word occurrences so using them as a pool will result in each processor observing almost all distinct words (entries taking space in its hashtable), but the counts will be partial and will need combining. The common strategy to reduce memory usage is to ensure that all occurrences of the same word go to the same processor. This is called “data partitioning” and in Jet we’ll use a partitioned edge between the tokenizer and the accumulator:
As a word is emitted from the tokenizer, it goes through a “switchboard” stage where it’s routed to the correct downstream processor. To determine where a word should be routed, we can calculate its hashcode and use the lowest bit to address either accumulator 0 or accumulator 1.
At this point we have a blueprint for a fully functional parallelized computation job which can max out all the CPU cores given enough instances of tokenizing and accumulating processors. The next challenge is making this work across machines.
For starters, our input can no longer be a simple in-memory list because
that would mean each machine processes the same data. To exploit the
cluster as a unified computation device, each cluster member must
observe only a slice of the dataset. Given that a Jet instance is also a
fully functional Hazelcast IMDG instance and a Jet cluster is also a
Hazelcast IMDG cluster, the natural choice is to pre-load our data into
an IMap
, which will be automatically partitioned and distributed
across the members. Now each Jet member can just read the slice of data
that was stored locally on it.
When run in a cluster, Jet will instantiate a replica of the whole DAG on each member. On a two-member cluster there will be two source processors, four tokenizers, and so on. The trickiest part is the partitioned edge between tokenizer and accumulator: each accumulator is supposed to receive its own subset of words. That means that, for example, a word emitted from tokenizer 0 will have to travel across the network to reach accumulator 3, if that’s the one that happens to own it. On average we can expect every other word to need network transport, causing both serious network traffic and serialization/deserialization CPU load.
There is a simple trick we can employ to avoid most of this traffic, closely related to what we pointed above as a source of problems when parallelizing locally: members of the cluster can be used as a pool, each doing its own partial word counts, and then send their results to a combining vertex. Note that this means sending only one item per distinct word. Here’s the rough equivalent of the code the combining vertex executes:
// Combining vertex
Map<String, Long> combined = new HashMap<>();
for (Entry<String, Long> wordAndCount : receive()) {
combined.merge(wordAndCount.getKey(), wordAndCount.getValue(),
(accCount, newCount) -> accCount + newCount);
}
// finally, when done receiving:
for (Entry<String, Long> wordAndCount : combined.entrySet()) {
emit(wordAndCount);
}
As noted above, such a scheme takes more memory due to more hashtable entries on each member, but it saves network traffic (an issue we didn’t have within a member). Given that memory costs scale with the number of distinct keys (english words in our case), the memory cost is more-or-less constant regardless of how much book material we process. On the other hand, network traffic scales with the total data size so the more material we process, the more we save on network traffic.
Jet distinguishes between local and distributed edges, so we’ll use
a local partitioned edge for tokenize
→`accumulate` and a
distributed partitioned edge for accumulate
→`combine`. With this
move we’ve finalized our DAG design, which can be illustrated by the
following diagram:
8.2. Unbounded Stream Processing
So far we’ve worked with a bounded (finite) stream processing task. In general, you provide Jet with one or more pre-existing datasets and order it to mine them for interesting information. The most important workhorse in this area is the "join, group and aggregate" operation: you define a classifying function that computes a grouping key for each of the datasets and an aggregate operation that will be performed on all the items in each group, yielding one result item per distinct key. Jet can apply the same operation on unbounded data streams as well.
8.2.1. The Importance of “Right Now”
In batch jobs the data we process represents a point-in-time snapshot of our state of knowledge (for example, warehouse inventory where individual data items represent items on stock). We can recapitulate each business day by setting up regular snapshots and batch jobs. However, there is more value hiding in the freshest data — our business can win by reacting to minute-old or even second-old updates. To get there we must make a shift from the finite to the infinite: from the snapshot to a continuous influx of events that update our state of knowledge. For example, an event could pop up in our stream every time an item is checked in or out of the warehouse.
A single word that captures the above story is latency: we want our system to minimize the latency from observing an event to acting upon it.
8.2.2. Windowing
In an unbounded stream, the dimension of time is always there. Consider a batch job: it may process a dataset labeled “Wednesday”, but the computation itself doesn’t have to know this. Its results will be understood from the outside to be “about Wednesday”. An endless stream, on the other hand, delivers information about the reality as it is unfolding, in near-real time, and the computation itself must deal with time explicitly.
Another point: in a batch it is obvious when to stop aggregating and emit the results: when we have exhausted the whole dataset. However, with unbounded streams we need a policy on how to select bounded chunks whose aggregate results we are interested in. This is called windowing. We imagine the window as a time interval laid over the time axis. A given window contains only the events that belong to that interval.
A very basic type of window is the tumbling window, which can be imagined to advance by tumbling over each time. There is no overlap between the successive positions of the window. In other words, it splits the time-series data into batches delimited by points on the time axis. The result of this is very similar to running a sequence of batch jobs, one per time interval.
A more useful and powerful policy is the sliding window: instead of splitting the data at fixed boundaries, it lets it roll in incrementally, new data gradually displacing the old. The window (pseudo)continuously slides along the time axis.
Another popular policy is called the session window and it’s used to detect bursts of activity by correlating events bunched together on the time axis. In an analogy to a user’s session with a web application, the session window “closes” when the specified session timeout elapses with no further events.
8.2.3. Time Ordering and the Watermark
Usually the time of observing an event is explicitly written in a field of the stream item. There is no guarantee that items will occur in the stream ordered by the value of that field; in fact in many cases it is certain that they won’t. Consider events gathered from users of a mobile app: for all kinds of reasons the items will arrive to our datacenter out of order, even with significant delays due to connectivity issues.
This disorder in the event stream makes it more difficult to formally specify a rule that tells us at which point all the data for a given window has been gathered, allowing us to emit the aggregated result.
To approach these challenges we use the concept of the watermark. It is a timestamped item Jet inserts into the stream that says "from this point on there will be no more items with timestamp less than this". Unfortunately, we almost never know for sure when such a statement becomes true and there is always a chance some events will arrive even later. If we do observe such an offending item, we must categorize it as “too late” and just filter it out.
Note the tension in defining the “perfect” watermark for a given use case: it is bad both the more we wait and the less we wait to emit a given watermark. The more we wait, the higher the latency of getting the results of the computation; the less we wait, the worse their accuracy due to missed events.
For these reasons Jet cannot determine the watermark on its own, you must decide how much disorder to accept (and expect).
8.3. Sliding and Tumbling Window
Many quantities, like “the current rate of change of a price” require you to aggregate your data over some time period. This is what makes the sliding window so important: it tracks the value of such a quantity in real time.
Calculating a single sliding window result can be quite computationally intensive, but we also expect it to slide smoothly and give a new result often, even many times per second. This is why we gave special attention to optimizing this computation.
We optimize especially heavily for those aggregate operations that have a cheap way of combining partial results and even more so for those which can cheaply undo the combining. For cheap combining you have to express your operation in terms of a commutative and associative (CA for short) function; to undo a combine you need the notion of “negating” an argument to the function. A great many operations can be expressed through CA functions: average, variance, standard deviation and linear regression are some examples. All of these also support the undoing (which we call deduct). The computation of extreme values (min/max) is an example that has CA, but no good notion of negation and thus doesn’t support deducting.
This is the way we leverage the above properties: our sliding window actually “hops” in fixed-size steps. The length of the window is an integer multiple of the step size. Under such a definition, the tumbling window becomes just a special case with one step per window.
This allows us to divide the timestamp axis into frames of equal length and assign each event to its frame. Instead of keeping the event object, we immediately pass it to the aggregate operation’s accumulate primitive. To compute a sliding window, we take all the frames covered by it and combine them. Finally, to compute the next window, we just deduct the trailing frame and combine the leading frame into the existing result.
Even without deduct the above process is much cheaper than the most naïve approach where you’d keep all data and recompute everything from scratch each time. After accumulating an item just once, the rest of the process has fixed cost regardless of input size. With deduct, the fixed cost approaches zero.
8.3.1. Example: 30-second Window Sliding by 10 Seconds
We’ll now illustrate the above story with a specific example: we’ll construct a 30-second window which slides by 10 seconds (i.e., three steps per window). The aggregate operation is to simply count the number of events. In the diagrams we label the events as minutes:seconds. This is the outline of the process:
-
Throw each event into its “bucket” (the frame whose time interval it belongs to).
-
Instead of keeping the items in the frame, just keep the item count.
-
Combine the frames into three different positions of the sliding window, yielding the final result: the number of events that occurred within the window’s timespan.
This would be a useful interpretation of the results: "At the time 1:30, the 30-second running average was 8/30 = 0.27 events per second. Over the next 20 seconds it increased to 10/30 = 0.33 events per second."
Keep in mind that the whole diagram represents what happens on just one cluster member and for just one grouping key. The same process is going on simultaneously for all the keys on all the members.
8.3.2. Two-stage aggregation
The concept of frame combining helps us implement two-stage aggregation as well. In the first stage the individual members come up with their partial results by frame and send them over a distributed edge to the second stage, which combines the frames with the same timestamp. After having combined all the partial frames from members, it combines the results along the event time axis into the sliding window.
8.4. Session Window
In the abstract sense, the session window is a quite intuitive concept: it simply captures a burst of events. If no new events occur within the configured session timeout, the window closes. However, because the Jet processor encounters events out of their original order, this kind of window becomes quite tricky to compute.
The way Jet computes the session windows is easiest to explain in terms
of the event interval: the range
[eventTimestamp, eventTimestamp + sessionTimeout]
.
Initially an event causes a new session window to be created, covering
exactly the event interval.
A following event under the same key belongs to this window iff its interval overlaps it. The window is extended to cover the entire interval of the new event.
If the event intervals don’t overlap, Jet creates new session window for the new event.
An event may happen to belong to two existing windows if its interval bridges the gap between them; in that case they are combined into one.
Once the watermark has passed the closing time of a session window, Jet can close it and emit the result of its aggregation.
8.5. Fault Tolerance and Processing Guarantees
One less-than-obvious consequence of stepping up from finite to infinite streams is the difficulty of forever maintaining the continuity of the output, even in the face of changing cluster topology. A Jet node may leave the cluster due to an internal error, loss of networking, or deliberate shutdown for maintenance. This will cause the computation job to be suspended. Except for the obvious problem of new data pouring in while we’re down, we have a much more fiddly issue of restarting the computation in a differently laid-out cluster exactly where it left off and neither miss anything nor process it twice. The technical term for this is the "exactly-once processing guarantee".
Jet achieves fault tolerance in streaming jobs by making a snapshot of the internal processing state at regular intervals. If a member of the cluster fails while a job is running, Jet will detect this and restart the job on the new cluster topology. It will restore its internal state from the snapshot and tell the source to start sending data from the last “committed” position (where the snapshot was taken). The data source must have built-in support to replay the data from the given checkpoint. The sink must either support transactions or be idempotent, tolerating duplicate submission of data.
In a Jet cluster, one member is the coordinator. It tells other members what to do and they report to it any status changes. The coordinator may fail and the cluster will automatically re-elect another one. If any other member fails, the coordinator restarts the job on the remaining members.
8.6. Update a DAG Without Losing the State
If you need to do a change in the pipeline or DAG, you need to submit a new job. The old job has to be cancelled and a new one with modified pipeline has to be submitted. To preserve the state, you have to export it before you cancel the old job and start the new job with the exported state.
There are 2 ways of exporting the state:
-
Job.cancelAndExportSnapshot(String name)
: exports a snapshot and cancels the job. This method is useful for starting the job anew with an updated pipeline -
Job.exportSnapshot(String name)
: exports a snapshot and keeps the job running. Useful for forking the job, for example in A/B testing scenario or for moving the state to the test environment
JetInstance client = Jet.newJetClient();
Job job = client.getJob("foo-job");
// This call will block until the snapshot is written
job.cancelAndExportSnapshot("foo-state-name");
// Code for the new pipeline
Pipeline p = Pipeline.create();
JobConfig config = new JobConfig()
.setInitialSnapshotName("foo-state-name");
client.newJob(p, config);
// Now make sure the new job is running and restored from the
// state correctly. When you no longer need the snapshot, delete
// it. Jet never deletes exported snapshots automatically.
client.getJobStateSnapshot("foo-state-name").destroy();
8.6.1. State Compatibility
The state has to be compatible with the updated pipeline. The snapshot contains separate data for each vertex, identified by the vertex name. If a snapshot doesn’t contain data for a vertex, it will be restored with an empty state. If a snapshot contains data for a vertex that does not exist, that data will be ignored. If the vertex name matches, then it depends on the specific processor type what type of change is allowed.
From this follows:
-
you can add and new stateful stages and remove existing ones without breaking compatibility. This includes adding/removing a source or sink or adding a new aggregation path from existing sources (see also Update a Job with Auto-Assigned Stage Names).
-
you can freely add, remove or change stateless stages, such as filter/map/flatMap stages, sinks and others
Recombining the Frames of Sliding Windows
Jet supports changing of sliding window size or sliding step. In some situations, however, you can have incorrect data on the output.
Sliding windows in Jet accumulate input items into frames (see Sliding and Tumbling Window). The frame size is equal to the slide step of the window. The processor saves the aggregation’s accumulator to the snapshot, one for each key and frame.
If the slide step changes, the frames in the snapshot won’t match those needed for the new slide step. You can do this change, but the frames will be recombined into new frames based on their end timestamps.
As you see on the figure, frame 1:10
will be put into frame 1:15
.
Frames 1:20
and 1:30
will be
combined
into frame 1:30
. This means that events with timestamp between 1:10
to
1:15
will be incorrectly accumulated into frame 1:15
-1:30
.
You also can extend the window size, but the first emitted windows after restoring will miss older events. The reason for this is that the frames were already purged, see the image:
In the example above, the 20-second window was extended to a 30-second
one, sliding step (i.e. frame size) was unchanged. As you see, the
frames after restore match the frames when the snapshot was saved, so no
recombining will occur. However, the next window to emit at 1:30
will
miss events between 0:50
-1:00
, because the frame containing them was
already purged because it wasn’t needed for the original 20-second
window. The window at 1:40
will be correct. Reducing the window size
has no such issue.
State Compatibility of Other Processors
Here are examples of other supported changes to the parameters of pipeline stages. For details, consult the javadoc of each stage.
-
change session window timeout
-
change connection parameters of sources/sinks
-
change parameters of aggregate operation: for example, change the comparator of
AggregateOperation.minBy()
-
replace the aggregate operation, if the accumulator type is the same: for example, change
counting()
tosummingLong()
, if that makes sense -
any change to stateless stages
The following changes are not supported:
-
change a sliding window to a session window
-
replace aggregation operation for one with incompatible accumulator type
-
assign a different name to a stage
Update a Job with Auto-Assigned Stage Names
The Pipeline API allows you to set the name of a stage using
Stage.setName
,
but if you don’t set it, Jet generates the name based on the stage type.
If this would result in the same name for multiple stages, Jet appends
-N
to the name, where the N
is a sequence number. For example, if
you have 3 sliding window aggregations in the job, their names will be
sliding-window
, sliding-window-2
and sliding-window-3
. The number
sequence follows the order in which you added the stages to the
Pipeline
.
When Jet translates the pipeline into the Core API DAG, it uses the name of the stage to name the vertex that implements it. Since the vertex name identifies the data element in the state snapshot, it’s very important to keep it equal when you change the pipeline to a new version. For example, you can’t reorder the aggregations or remove the first one because that would change the names and incorrect state will be restored to the vertex. Therefore we recommend that you explicitly assign a unique name to every stage in the pipeline you intend to use and maintain over a long term.
Caveats When Changing the Grouping Key
You can change the key-extracting function for a `groupingKey’s, but keep in mind that the snapshotted state has keys extracted with the old key extractor. If the new keys aren’t equal, they will create new groups.
You can also change the key type. In this case, however, the job may
fail with a ClassCastException
because the downstream might expect a
different type than it will actually receive. For example, if you change
the key type from Integer
to Long
, the downstream stage of a window
aggregation will expect KeyedWindowResult<Long, V>
, but can actually
get KeyedWindowResult<Integer, V>
.
State compatibility with Further Versions
Jet does not guarantee snapshot compatibility between major releases. Snapshot created in older version might not work in newer major version. Bugfix releases are compatible.
8.7. Jet’s Execution Model
At the heart of Jet is the TaskletExecutionService
.
It manages the threads that perform all the computation in a Jet job.
Although this class is not formally a part of Jet’s public API,
understanding how it schedules code for execution is essential if you
want to implement a cooperative processor.
8.7.1. Cooperative Multithreading
Cooperative multithreading is one of the core features of Jet and can be
roughly compared to
green threads.
It is purely a library-level feature and does not involve any low-level
system or JVM tricks; the Processor
API is simply designed in such a
way that the processor can do a small amount of work each time it is
invoked, then yield back to the Jet engine. The engine manages a thread
pool of fixed size and on each thread, the processors take their turn in
a round-robin fashion.
The point of cooperative multithreading is better performance. Several factors contribute to this:
-
The overhead of context switching between processors is much lower since the operating system’s thread scheduler is not involved.
-
The worker thread driving the processors stays on the same core for longer periods, preserving the CPU cache lines.
-
The worker thread has direct knowledge of the ability of a processor to make progress (by inspecting its input/output buffers).
8.7.2. Tasklet
The execution service doesn’t deal with processors directly; instead it
deals with tasklets.
Tasklet
is a very simple
functional interface derived from the standard Java Callable<ProgressState>
.
The execution service manages a pool of worker threads, each being
responsible for a list of tasklets. The worker thread simply invokes the
call()
methods on its tasklets in a round-robin fashion. The method’s
return value tells whether the tasklet made progress and whether it is
now done.
The most important tasklet is the one driving a processor
(ProcessorTasklet
); there are a few others that deal with network
sending/receiving and taking snapshots.
8.7.3. Exponential Backoff
If none of the worker’s tasklets report having made progress, the worker will go to a short sleep. If this happens again after it wakes up, it will sleep for twice as long. Once it reaches 1 ms sleep time, it will continue retrying once per millisecond to see if any tasklets can make progress.
8.7.4. ProcessorTasklet
ProcessorTasklet
is the one that drives a processor. It manages its inbox, outbox,
inbound/outbound concurrent queues, and tracks the current processor
state so it knows which of its callback methods to call.
During each tasklet.call()
, ProcessorTasklet
makes one call into
one of its processor’s callbacks. It determines the processor’s progress
status and reports it to the execution service.
8.7.5. Non-Cooperative Processor
If a processor declares itself as non-cooperative, the execution service will start a dedicated Java thread for its tasklet to run on.
Even if it’s non-cooperative, the processor’s callback methods must still make sure they don’t run for longer than a second or so at a time. Jet can’t start a snapshot until all the processors have yielded control back to it.
8.8. What Happens When you Submit a Job
When you submit a Job
to it, Jet replicates the DAG to the whole Jet
cluster and executes a copy of it on each member.
Jet executes the job on a user-configurable number of threads which use work stealing to balance the amount of work being done on each thread. Each worker thread has a list of tasklets it is in charge of and as tasklets complete at different rates, the remaining ones are moved between workers to keep the load balanced.
Each instance of a Processor
is wrapped in one tasklet which the
execution service repeatedly executes until it is done. A vertex with a
parallelism of 8 running on 4 members would have a total of 32 tasklets
running at the same time. Each member has the same number of tasklets
running.
When you make a request to execute a Job, the corresponding DAG and additional resources are deployed to the Jet cluster. Jet builds an execution plan for the DAG on each member, which creates the associated tasklets for each Vertex and connects them to their inputs and outputs.
Jet uses Single Producer/Single Consumer ringbuffers to transfer the data between processors on the same member. They are data-type agnostic, so any data type can be used to transfer the data between vertices.
Ringbuffers, being bounded queues, introduce natural backpressure into the system; if a consumer’s ringbuffer is full, the producer will have to back off until it can enqueue the next item. When data is sent to another member over the network, there is no natural backpressure, so Jet uses explicit signaling in the form of adaptive receive windows.
8.9. Distributed Snapshot
The technique Jet uses to achieve fault tolerance is called a “distributed snapshot”, described in a paper by Chandy and Lamport. At regular intervals, Jet raises a global flag that says "it’s time for another snapshot". All processors belonging to source vertices observe the flag, create a checkpoint on their source, emit a barrier item to the downstream processors and resume processing.
As the barrier item reaches a processor, it stops what it’s doing and emits its state to the snapshot storage. Once complete, it forwards the barrier item to its downstream processors.
Due to parallelism, in most cases a processor receives data from more than one upstream processor. It will receive the barrier item from each of them at separate times, but it must start taking a snapshot at a single point in time. There are two approaches it can take, as explained below.
8.9.1. Exactly-Once Snapshotting
With exactly-once configured, as soon as the processor gets a barrier item in any input stream (from any upstream processor), it must stop consuming it until it gets the same barrier item in all the streams:
-
At the barrier in stream X, but not Y. Must not accept any more X items.
-
At the barrier in both streams, taking a snapshot.
-
Snapshot done, barrier forwarded. Can resume consuming all streams.
8.9.2. At-Least-Once Snapshotting
With at-least-once configured, the processor can keep consuming all the streams until it gets all the barriers, at which point it will stop to take the snapshot:
-
At the barrier in stream X, but not Y. Carry on consuming all streams.
-
At the barrier in both streams, already consumed
x1
andx2
. Taking a snapshot. -
Snapshot done, barrier forwarded.
Even though x1
and x2
occur after the barrier, the processor
consumed and processed them, updating its state accordingly. If the
computation job stops and restarts, this state will be restored from the
snapshot and then the source will replay x1
and x2
. The processor
will think it got two new items.
8.10. Dropped Late Events
At times, you might encounter a line like the following in your logs:
Event dropped, late by 123ms. currentWatermark=14:23:10.256, eventTime=14:23:10.123, event=FooEvent
The most obvious cause to check is to make sure that there are no two
events E1
and E2
in any source partition where E2
is positioned
after E1
in the stream and E1.timestamp - allowedLag > E2.timestamp
.
Besides this obvious reason, there are few more possible causes:
-
If timestamps are not added in source, source streams might be split and merged in an unpredictable way. See Prefer Assigning Timestamps at the Source. Also Map Journal and Cache Journal sources currently don’t coalesce watermarks in source, they fall into this category too.
-
Source partitions can be marked as idle. The idle timeout is hardcoded for pipeline api to
DEFAULT_IDLE_TIMEOUT
. If the partition isn’t really idle, but rather is stalled, after it resumes, events from it can be late because it was excluded from coalescing.
Note: items are not dropped unless there is need to. For example,
map
operation never drops items, even if they come late. Currently
only windowing aggregations drop events, because they purge data from
memory as the event time passes.
8.11. Stream Skew
We explained how we use the concept of watermark to impose order onto a disordered data stream. However, items arriving out of order aren’t our only challenge; modern stream sources like Kafka are partitioned and distributed so “the stream” is actually a set of independent substreams, moving on in parallel. Substantial time difference may arise between events being processed on each one, but our system must produce coherent output as if there was only one stream. We meet this challenge by coalescing watermarks: as the data travels over a partitioned/distributed edge, we make sure the downstream processor observes the correct watermark value, which is the least of watermarks received from the contributing substreams.
8.11.1. Rules of Watermark Propagation
Watermark objects are sent interleaved with other stream items, but are handled specially:
-
The value of the watermark a processor emits must be strictly increasing. Jet will throw an exception if it detects a non-increasing watermark.
-
When a processor receives a watermark, it should add it to the outbox. The processor is allowed to delay the watermark: for example, if the processor does async mapping, it should emit the watermark after it received responses for all items that came before it. Sink processors don’t have to emit watermarks.
-
The watermark item is always broadcast, regardless of the edge type. This means that all N upstream processors send their watermark to all M downstream processors.
-
The processor will observe only a watermark that was received from all upstream processors and from all upstream edges. This is called watermark coalescing.
Jet’s internal class
WatermarkCoalescer
manages watermarks received from multiple inputs. As it receives
watermark items from them, its duty is to decide when to forward the
watermark downstream. This happens at two levels:
-
between multiple queues backing single edge
-
between multiple input edges to single processor
8.11.2. Idle inputs
A special object called idle message can be emitted from source processor when the processor sees no events for configured idle timeout. This can happen in real life when some external partitions have no events while others do.
When an idle message is received from an input, that input will be excluded from watermark coalescing. This means that we will not wait to receive watermark from the idle input. It will allow the other active inputs to be processed without further delay. When the idle timeout is disabled and some processor doesn’t emit any watermarks (because it sees no events), the processing will stall indefinitely.
8.12. The Pitfalls of At-Least-Once Processing
In some cases at-least-once semantics can have consequences of quite an unexpected magnitude, as we discuss next.
8.12.1. Apparent Data Loss
Imagine a very simple kind of processor: it matches up the items that belong to a pair based on some rule. If it receives item A first, it remembers it. Later on, when it receives item B, it emits that fact to its outbound edge and forgets about the two items. It may also first receive B and wait for A.
Now imagine this sequence: A → BARRIER → B
. In at-least-once the
processor may observe both A and B, emit its output, and forget about
them, all before taking the snapshot. After the restart, item B will be
replayed because it occurred after the last barrier, but item A won’t.
Now the processor is stuck forever in a state where it’s expecting A and
has no idea it already got it and emitted that fact.
Problems similar to this may happen with any state the processor keeps until it has got enough information to emit the results and then forgets it. By the time it takes a snapshot, the post-barrier items will have caused it to forget facts about some pre-barrier items. After a restart it will behave as though it has never observed those pre-barrier items, resulting in behavior equivalent to data loss.
8.12.2. Non-Monotonic Watermark
One special case of the above story concerns watermark items. Thanks to watermark coalescing, processors are typically implemented against the invariant that the watermark value always increases. However, in at-least-once the post-barrier watermark items will advance the processor’s watermark value. After the job restarts and the state gets restored to the snapshotted point, the watermark will appear to have gone back, breaking the invariant. This can again lead to apparent data loss.
9. Expert Zone — The Core API
This section covers the Core API, Jet’s low-level API that directly exposes the computation engine’s raw features. If you are looking for the API to build your computation pipeline, please refer to the Pipeline API section.
Creating a Core API DAG requires expert-level familiarity with concepts
like partitioning schemes, vertex parallelism, distributed vs. local
edges, etc. Furthermore, this API offers no static type safety and it
is very easy to create a DAG that fails with a ClassCastException
when
executed. Even though it is possible, this API is not intended to create
DAGs by hand; it offers the infrastructure on top of which to build
high-level DSLs and APIs that describe computation jobs.
Implementing a Core API Processor
requires even greater expertise than
building a DAG. Among other things, you have to be acquainted in detail
with Jet’s concept of cooperative multithreading. While we provide as
much convenience as we can for extending Jet with your custom
processors, we cannot remove the dangers of using these facilities
improperly.
9.1. DAG
The DAG-building API is centered around the DAG
class. This is a pure data class and can be instantiated on its own,
without a Jet instance. This makes it simple to separate the
job-describing code from the code that manages the lifecycle of Jet
instances.
You can "compile" a Jet pipeline into a DAG
:
DAG dag = pipeline.toDag();
Studying these DAGs may be a useful aid while learning the Core API.
To start building a DAG from scratch, write
DAG dag = new DAG();
A good practice is to structure the DAG-building code into the following sections:
-
Create all the vertices.
-
Configure the local parallelism of vertices.
-
Create the edges.
Example:
(1)
Vertex source = dag.newVertex("source",
SourceProcessors.readFilesP(".", UTF_8, "*", false, (file, line) -> line)
);
Vertex transform = dag.newVertex("transform", mapP(
(String line) -> entry(line, line.length())
));
Vertex sink = dag.newVertex("sink", SinkProcessors.writeMapP("sinkMap"));
(2)
source.localParallelism(1);
(3)
dag.edge(between(source, transform));
dag.edge(between(transform, sink));
1 | Create the vertices |
2 | Configure local parallelism |
3 | Create the edges |
9.1.1. Creating a Vertex
The two mandatory elements of creating a vertex are its string identifier and the supplier of processors. The latter can be provided in three variants, differing in the degree of explicit control over the lifecycle management of the processors. From simple to complex they are:
-
SupplierEx<Processor>
directly returns processor instances from itsget()
method. It is expected to be stateless and return equivalent instances on each call. It doesn’t provide any initialization or cleanup code. -
ProcessorSupplier
returns in a single call all the processors that will run on a single cluster member. It may specialize each instance, for example to achieve local data partitioning. It is also in charge of the member-local lifecycle (initialization and destruction). -
ProcessorMetaSupplier
returns in a single call an object that will be in charge of creating all the processors for a vertex. Given a list of member addresses, the object it returns is aFunction<Address, ProcessorSupplier>
which will then be called with each of the addresses from the list to retrieve theProcessorSupplier
specialized for the given member.
ProcessorMetaSupplier
is the most fundamental facility. It is a
factory of ProcessorSupplier
s. SupplierEx<Processor>
exists purely as convenience over ProcessorSupplier
for the simplest
kinds of vertices.
You make the choice which of the three to use for a particular vertex
when you implement it. When you build a DAG from already implemented
vertices, you don’t have to care, or even know, which one it’s using.
You’ll call a factory method that returns one or the other and they will
integrate the same way into your newVertex()
calls.
9.1.2. Local and Global Parallelism of Vertex
The vertex is implemented at runtime by one or more instances of
Processor
on each member. Each vertex can specify how many of its
processors will run per cluster member using the localParallelism
property; every member will have the same number of processors. A new
Vertex
instance has this property set to -1
, which requests to use
the default value equal to the configured size of the cooperative thread
pool. The latter defaults to Runtime.availableProcessors()
and is
configurable via
InstanceConfig.setCooperativeThreadCount()
.
In most cases the only level of local parallelism that you’ll want to
explicitly configure is 1
for the cases where no parallelism is
desirable (e.g. on a source processor reading from a file).
The total parallelism of a vertex is the total number of its
processors running in the whole cluster. In Jet this is always equal to
local parallelism times cluster size because it allocates the same
number of processors on each member. This number can be critical to the
proper functioning of a vertex that represents a distributed,
partitioned data source. If you let the total parallelism of such a
vertex exceed the number of partitions in the data source, some
processors will end up with no partitions assigned to them. This has
serious consequences in an event time-based streaming job: Jet must
produce a unique value of the watermark that doesn’t overtake the
timestamps coming out of any processors. A processor with no source
partitions will emit no data and its local watermark will permanently
stay at the initial value of Long.MIN_VALUE
. This will keep the
global watermark from advancing until the idle partition detection
mechanism kicks in. Jet will produce no output until this happens and
the default "partition idle" timeout is 60 seconds.
9.1.3. Edge Ordinal
An edge is connected to a vertex at a given ordinal, which identifies it to the vertex and its processors. When a processor receives an item, it knows which ordinal it came from. Things are similar on the outbound side: the processor emits an item to a given ordinal, but also has the option to emit the same item to all ordinals. This is the most typical case and allows easy replication of a data stream across several edges.
When you use the
between(a, b)
edge factory, the edge will be connected at ordinal 0 at both ends. When
you need a different ordinal, use the
from(a, ord1).to(b, ord2)
form. There must be no gaps in ordinal assignment, which means a vertex
will have inbound edges with ordinals 0..N and outbound edges with
ordinals 0..M.
This example shows the usage of between()
and from().to()
forms to
build a DAG with one source feeding two computational vertices:
Vertex source = dag.newVertex("source",
SourceProcessors.readFilesP(".", UTF_8, "*", false, (file, line) -> line)
);
Vertex toUpper = dag.newVertex("toUpper", mapP((String in) -> in.toUpperCase()));
Vertex toLower = dag.newVertex("toLower", mapP((String in) -> in.toLowerCase()));
dag.edge(between(source, toUpper));
dag.edge(from(source, 1).to(toLower));
9.1.4. Local and Distributed Edge
A major choice to make in terms of routing the data coming out of a
processor is whether the candidate set of the target vertex’s processors
is unconstrained, encompassing all its processors across the cluster, or
constrained to just those running on the same cluster member. You
control this with the distributed
property of the edge. By default the
edge is local and calling the distributed()
method removes this restriction.
You can minimize network traffic by employing local edges. They are
implemented with the most efficient kind of concurrent queue:
single-producer, single-consumer array-backed queue. It employs
wait-free algorithms on both sides and avoids even the latency of
volatile
writes by using lazySet
.
A good example of employing a local-distributed edge combo is two-stage aggregation. Here’s how it looks on our Word Count example:
dag.edge(between(tokenize, accumulate)
.partitioned(wholeItem(), HASH_CODE))
.edge(between(accumulate, combine)
.distributed()
.partitioned(entryKey()));
Note that only the edge from accumulate
to combine
is distributed.
9.1.5. Routing Policies
The routing policy decides which of the processors in the candidate set to route each particular item to.
Unicast
This is the default routing policy, the one you get when you write
dag.edge(between(input, output));
For each item it chooses a single destination processor with no further restrictions on the choice. The only guarantee given by this policy is that exactly one processor will receive the item, but Jet also takes care to “spray” the items equally over all the reception candidates.
This choice makes sense when the data doesn’t have to be partitioned, usually implying a downstream vertex which can compute the result based on each item in isolation.
Isolated
This is a more restricted kind of unicast policy: any given downstream processor receives data from exactly one upstream processor. In some DAG setups you may need to apply selective backpressure to individual upstream processors, with this policy you can achieve it. If the connected vertices have equal parallelism, an isolated edge creates a one-to-one mapping between their processors, preserving the encounter order and partitioning.
Activate this policy by calling isolated()
on the edge:
dag.edge(between(input, output).isolated());
Broadcast
A broadcasting edge sends each item to all candidate receivers. Due to the redundancy it creates, this kind of edge isn’t appropriate for the main data stream. Its purpose is dispatching a small amount of data to all the processors of a vertex, typically as a part of setting up before processing the data stream. For this reason a broadcast edge is often high-priority as well. For example, in a hash-join one vertex creates the lookup hashtable and sends the same instance to all the processors of the next vertex, the one that processes the main data stream.
Activate this policy by calling broadcast()
on the edge:
dag.edge(between(input, output).broadcast());
Partitioned
A partitioned edge sends each item to the one processor responsible for the item’s partition ID. On a distributed edge, this processor will be unique across the whole cluster. On a local edge, each member will have its own processor for each partition ID.
Jet automatically assigns partitions to processors during job
initialization. The number of partitions is fixed for the lifetime of a
cluster and you configure it on the IMDG level with the system property
hazelcast.partition.count
. The number of partitions must not be less
than the highest global parallelism of any vertex, otherwise some
processors will get no partitions.
You can also refer to the Hazelcast Reference Manual for more details on partitioning in Hazelcast IMDG.
This is the default algorithm to determine the partition ID of an item:
-
Apply the key extractor function defined on the edge to retrieve the partitioning key.
-
Serialize the partitioning key to a byte array using Hazelcast serialization.
-
Apply Hazelcast’s standard
MurmurHash3
-based algorithm to get the key’s hash value. -
Partition ID is the hash value modulo the number of partitions.
The above procedure is quite CPU-intensive, but has the crucial property of giving repeatable results across all cluster members, which may be running on disparate JVM implementations.
Another common choice is to use Java’s standard Object.hashCode()
. It
is often significantly faster and it’s safe to use on a local edge.
On a distributed edge it is not a safe strategy in general because
hashCode()
's contract does not require repeatable results across
JVMs or even different instances of the same JVM version. If a given
class’s Javadoc explicitly specifies the hashing function it uses, then
its instances are safe to partition with hashCode()
.
You can provide your own implementation of Partitioner
to gain full
control over the partitioning strategy.
We use both partitioning strategies in the Word Count example:
dag.edge(between(tokenize, accumulate)
.partitioned(wholeItem(), Partitioner.HASH_CODE))
.edge(between(accumulate, combine)
.distributed()
.partitioned(entryKey()));
Note that the local-partitioned edge uses partitioning by hash code and the distributed edge uses the default Hazelcast partitioning. It turns out that in this particular example we could safely use the hash code both times, but we leave it like this for didactic purposes. Since much less data travels towards the combiner than towards the accumulator, the performance of the whole job is hardly affected by this choice.
All-To-One
The all-to-one routing policy is a special case of the partitioned
policy which assigns the same partition ID to all items. allToOne(key)
is semantically equivalent to partitioned(t → key)
, the former is
slightly faster because Jet avoids recalculation of the partition ID for
each stream item.
Two edges with equal key will go to the same partition and will be processed by the same member. If you have multiple all-to-one edges in the job or in multiple jobs, you should assign different key to each of them so that they all don’t run on the same member. On the other hand, if you have two all-to-one edges going to the same vertex, they should have the same key so that all the items are really processed by the same member.
You should always set the local parallelism of the target vertex to 1, otherwise there will be idle processors that never get any items.
On a local edge this policy doesn’t make sense since simply setting the local parallelism of the target vertex to 1 constrains the local choice to just one processor instance.
One case where the all-to-one routing is appropriate is global aggregation (without grouping). A single processor must see all the data. Here’s how we would set it between the first-stage and the second-stage aggregating vertex:
dag.edge(between(stage1, stage2).distributed().allToOne(123));
9.1.6. Priority
By default the processor receives items from all inbound edges as they arrive. However, there are important cases where an edge must be consumed in full to make the processor ready to accept data from other edges. A major example is a “hash join” which enriches the data stream with data from a lookup table. This can be modeled as a join of two data streams where the enriching stream contains the data for the lookup table and must be consumed in full before consuming the stream to be enriched.
The priority
property controls the order of consuming the edges. Edges
are sorted by their priority number (ascending) and consumed in that
order. Edges with the same priority are consumed without particular
ordering (as the data arrives).
We can see a prioritized edge in action in the TF-IDF example:
dag.edge(between(stopwordSource, tokenize).broadcast().priority(-1));
The tokenize
vertex receives a set of stopwords and filters out
their occurrences from the input text. It must receive the entire set
before beginning to process the text.
A Fault Tolerance Caveat
As explained in the section on the Processor API, Jet takes regular snapshots of processor state when fault tolerance is enabled. A processor will get a special item in its input stream, called a barrier. When working in the exactly once mode, as soon as it receives it, it must stop pulling the data from that stream and continue pulling from all other streams until it receives the same barrier in all of them, and then emit its state to the snapshot storage. This is in direct contradiction with the contract of edge prioritization: the processor is not allowed to consume any other streams before having fully exhausted the prioritized ones.
For this reason Jet does not initiate a snapshot until all the higher-priority edges have been fully consumed.
Although strictly speaking this only applies to the exactly once mode, Jet postpones taking the snapshot in at least once mode as well. Even though the snapshot could begin early, it would still not be able to complete until the processor has consumed all the prioritized edges, started consuming non-prioritized ones, and received the barrier in all of them. The result would be many more items processed twice after the restart.
9.1.7. Fine-Tuning Edges
Edges can be configured with an
EdgeConfig
instance, which specifies additional fine-tuning parameters. For
example,
dag.edge(between(tickerSource, generateTrades)
.setConfig(new EdgeConfig().setQueueSize(512)));
Please refer to the Javadoc of
EdgeConfig
for details.
9.2. How to Build a DAG
9.2.1. Bounded Stream (Batch) DAG
Let’s use the Core API to build the Word Count DAG, already described in an earlier section:
We start by instantiating the DAG class and adding the source vertex:
DAG dag = new DAG();
Vertex source = dag.newVertex("source", SourceProcessors.readMapP("lines"));
Note how we can build the DAG outside the context of any running Jet instances: it is a pure POJO.
The source vertex will read the lines from the IMap
and emit items of
type Map.Entry<Integer, String>
to the next vertex. The key of the
entry is the line number, and the value is the line itself. The built-in
map-reading processor will do just what we want: on each member it will
read only the data local to that member.
The next vertex is the tokenizer, which does a simple "flat-mapping" operation (transforms one input item into zero or more output items). The low-level support for such a processor is a part of Jet’s library, we just need to provide the mapping function:
// (lineNum, line) -> words
Pattern delimiter = Pattern.compile("\\W+");
Vertex tokenize = dag.newVertex("tokenize",
Processors.flatMapP((Entry<Integer, String> e) ->
traverseArray(delimiter.split(e.getValue().toLowerCase()))
.filter(word -> !word.isEmpty()))
);
This creates a processor that applies the given function to each
incoming item, obtaining zero or more output items, and emits them.
Specifically, our processor accepts items of type Entry<Integer,
String>
, splits the entry value into lowercase words, and emits all
non-empty words. The function must return a Traverser
, which is a
functional interface used to traverse a sequence of non-null items. Its
purpose is equivalent to the standard Java Iterator
, but avoids the
cumbersome two-method API. Since a lot of support for cooperative
multithreading in Hazelcast Jet deals with sequence traversal, this
abstraction simplifies many of its aspects.
The next vertex will do the actual word count. We can use the built-in
accumulateByKey
processor for this:
// word -> (word, count)
Vertex accumulate = dag.newVertex("accumulate",
Processors.accumulateByKeyP(singletonList(wholeItem()), counting())
);
This processor maintains a hashtable that maps each distinct key to its
accumulated value. We specify wholeItem()
as the key extractor
function: our input item is just the word, which is also the grouping
key. The second argument is the kind of aggregate operation we want to
perform: counting. We are relying on Jet’s out-of-the-box
definitions here, but it is easy to define your own aggregate operations
and key extractors. The processor emits nothing until it has received
all the input, and at that point it emits the hashtable as a stream of
Entry<String, Long>
.
Next is the combining step which computes the grand totals from individual members' contributions. This is the code:
// (word, count) -> (word, count)
Vertex combine = dag.newVertex("combine",
Processors.combineByKeyP(counting(), Util::entry)
);
combineByKey
is designed to be used downstream of accumulateByKey
,
which is why it doesn’t need an explicit key extractor. The aggregate
operation must be the same as on accumulateByKey
.
The final vertex is the sink; we want to store the output in
another IMap
:
Vertex sink = dag.newVertex("sink", SinkProcessors.writeMapP("counts"));
Now that we have all the vertices, we must connect them into a graph and specify the edge type as discussed in the previous section. Here’s the code:
dag.edge(between(source, tokenize))
.edge(between(tokenize, accumulate) (1)
.partitioned(wholeItem(), Partitioner.HASH_CODE))
.edge(between(accumulate, combine) (2)
.distributed()
.partitioned(entryKey()))
.edge(between(combine, sink));
1 | Here we chose a local partitioned edge. For each word, there will
be a processor responsible for it on each member so that no items must
travel across the network. In the partitioned() call we specify two
things: the function that extracts the partitioning key (wholeItem()
— same as the grouping key extractor), and the policy object that
decides how to compute the partition ID from the key. Here we use the
built-in HASH_CODE , which will derive the ID from Object.hashCode() .
As long as the definitions of equals()/hashCode() on the key object
match our expected notion of key equality, this policy is always safe
to use on a local edge. |
2 | is a distributed partitioned edge: for each word there is a single
combiner processor in the whole cluster responsible for it and items
will be sent over the network if needed. The partitioning key is again
the word, but here it is the key part of the Map.Entry<String, Long> .
We are using the default partitioning policy here (Hazelcast’s own
partitioning scheme). It is the slower-but-safe choice on a distributed
edge. Detailed inspection shows that hashcode-based partitioning would
be safe as well because all of String , Long , and Map.Entry have
the hash function specified in their Javadoc. |
You can access a full, self-contained Java program with the above DAG code at the Hazelcast Jet Code Samples repository.
9.2.2. Unbounded Stream DAG
For this example we’ll build a simple Jet job that monitors trading events on a stock market, categorizes the events by stock ticker, and reports the number of trades per time unit (the time window). In terms of DAG design, not much changes going from batch to streaming. This is how it looks:
We have the same cascade of source, two-stage aggregation, and sink. The
source is the event journal of a Hazelcast IMap (we assume some other
process continuously updates this map with trade events). On the sink
side there’s another mapping vertex, format-output
, that transforms
the window result items into lines of text. The sink
vertex writes
these lines to a file.
Here’s the DAG-building code in full:
ToLongFunctionEx<? super Trade> timestampFn = Trade::timestamp;
FunctionEx<? super Trade, ?> keyFn = Trade::productId;
SlidingWindowPolicy winPolicy = slidingWinPolicy(
SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS);
DAG dag = new DAG();
Vertex tradeSource = dag.newVertex("trade-source",
SourceProcessors.<Trade, Long, Trade>streamMapP(
TRADES_MAP_NAME,
alwaysTrue(), (1)
EventJournalMapEvent::getNewValue, (1)
JournalInitialPosition.START_FROM_OLDEST, (2)
eventTimePolicy(
timestampFn, (3)
limitingLag(SECONDS.toMillis(3)), (4)
winPolicy.frameSize(), (5)
winPolicy.frameOffset(),
SECONDS.toMillis(3) (6)
)));
Vertex slidingStage1 = dag.newVertex("sliding-stage-1",
Processors.accumulateByFrameP(
singletonList(keyFn),
singletonList(timestampFn),
TimestampKind.EVENT,
winPolicy, counting()
));
Vertex slidingStage2 = dag.newVertex("sliding-stage-2",
Processors.combineToSlidingWindowP(winPolicy, counting(),
KeyedWindowResult::new));
Vertex formatOutput = dag.newVertex("format-output", mapUsingContextP( (7)
ContextFactory.withCreateFn(x -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS")),
(DateTimeFormatter timeFormat, KeyedWindowResult<String, Long> kwr) ->
String.format("%s %5s %4d",
timeFormat.format(Instant.ofEpochMilli(kwr.end())
.atZone(ZoneId.systemDefault())),
kwr.getKey(), kwr.getValue())
));
Vertex sink = dag.newVertex("sink", SinkProcessors.writeFileP(
OUTPUT_DIR_NAME, Object::toString, UTF_8, false));
tradeSource.localParallelism(1);
return dag
.edge(between(tradeSource, slidingStage1)
.partitioned(keyFn, HASH_CODE))
.edge(between(slidingStage1, slidingStage2)
.partitioned(entryKey(), HASH_CODE)
.distributed())
.edge(between(slidingStage2, formatOutput)
.isolated())
.edge(between(formatOutput, sink));
You can see quite a lot of code going into the setup of the streaming source. Let’s zoom in on it:
1 | filtering and mapping functions we supply directly to the source. Hazelcast IMDG will apply them before serializing and sending to Jet so this saves network traffic and CPU. |
2 | where to start from in map’s event journal: the oldest entry still available. |
3 | function to apply to the event object to get its timestamp. |
4 | watermark policy. Here we use the simplest
kind, limitingLag , which will make the watermark lag behind the top
observed event timestamp by the fixed amount we specified (3 seconds). |
5 | watermark emission policy that tells Jet when to actually send a watermark event. Since the sliding window processor ignores all watermark events that belong to the same frame, we configure a matching policy that emits only one watermark per frame. |
6 | partition idle timeout. This is a countermeasure to stalling problems that occur when some of the IMap partitions don’t receive any updates. Due to watermark coalescing this could stall the entire stream, but with this setting a partition will be marked as idle after 3 seconds of inactivity and then the rest of the system behaves as if it didn’t exist. |
7 | Here we use mapUsingContextP which allows us to create an object
available to the processor at a late point, after all the job
serialization-deserialization is done. In this case we need it because
the Java 8 DateTimeFormatter isn’t serializable. |
The full code of this sample is in StockExchangeCoreApi.java and running it you’ll get an endless stream of data accumulating on the disk. To spare your filesystem we’ve limited the execution time to 10 seconds.
9.2.3. Advanced Batch DAG — Inverted TF-IDF Index
In this tutorial we’ll explore what the Core API DAG model offers beyond the capabilities of the Pipeline API. Our DAG will feature splits, joins, broadcast, and prioritized edges. We’ll access data from the file system and show a simple technique to distribute file reading across Jet members. Several vertices we use can’t be implemented in terms of out-of-the-box processors, so we’ll also show you how to implement your own with minimum boilerplate.
The full code is available at the hazelcast-jet-code-samples
repository:
Let us first introduce the problem. The inverted index is a basic data structure in the domain of full-text search. First used in the 1950s, it is still at the core of modern information retrieval systems such as Lucene. The goal is to be able to quickly find the documents that contain a given set of search terms, and to sort them by relevance. To understand it we’ll need to throw in some terminology.
-
A document is treated as a list of words that has a unique ID. It is useful to define the notion of a document index which maps each document ID to the list of words it contains. We won’t build this index; it’s just for the sake of explanation.
-
The inverted index is the inverse of the document index: it maps each word to the list of documents that contain it. This is the fundamental building block in our search algorithm: it will allow us to find in O(1) time all documents relevant to a search term.
-
In the inverted index, each entry in the list is assigned a TF-IDF score which quantifies how relevant the document is to the search request.
-
Let DF (document frequency) be the length of the list: the number of documents that contain the word.
-
Let D be the total number of documents that were indexed.
-
IDF (inverse document frequency) is equal to
log(D/DF)
. -
TF (term frequency) is the number of occurrences of the word in the document.
-
TF-IDF score is simply the product of
TF * IDF
.
Note that IDF is a property of the word itself: it quantifies the relevance of each entered word to the search request as a whole. The list of entered words can be perceived as a list of filtering functions that we apply to the full set of documents. A more relevant word will apply a stronger filter. Specifically, common words like “the”, “it”, “on” act as pure "pass-through" filters and consequently have an IDF of zero, making them completely irrelevant to the search.
TF, on the other hand, is the property of the combination of word and document, and tells us how relevant the document is to the word, regardless of the relevance of the word itself.
When the user enters a search phrase:
-
each individual term from the phrase is looked up in the inverted index;
-
an intersection is found of all the lists, resulting in the list of documents that contain all the words;
-
each document is scored by summing the TF-IDF contributions of each word;
-
the result list is sorted by score (descending) and presented to the user.
Let’s have a look at a specific search phrase:
the man in the black suit murdered the king
The list of documents that contain all the above words is quite long… how do we decide which are the most relevant? The TF-IDF logic will make those stand out that have an above-average occurrence of words that are generally rare across all documents. For example, “murdered” occurs in far fewer documents than “black”… so given two documents where one has the same number of “murdered” as the other one has of “black”, the one with “murdered” wins because its word is more salient in general. On the other hand, “suit” and “king” might have a similar IDF, so the document that simply contains more of both wins.
Also note the limitation of this technique: a phrase is treated as just the sum of its parts; a document may contain the exact phrase and this will not affect its score.
Building the Inverted Index with Java Streams
To warm us up, let’s see what it takes to build the inverted index with just thread parallelism and without the ability to scale out across many machines. It is expressible in Java Streams API without too much work. The full code is here.
We’ll start by preparing a Stream<Entry<Long, String>> docWords
: a
stream of all the words found in all the documents. We use Map.Entry
as
a holder of a pair of values (a 2-tuple) and here we have a pair of
Long docId
and String word
:
Stream<Entry<Long, String>> docWords = docId2Name
.entrySet()
.parallelStream()
.flatMap(TfIdfJdkStreams::docLines)
.flatMap(this::tokenize);
We know the number of all documents so we can compute double
logDocCount
, the logarithm of the document count:
final double logDocCount = Math.log(docId2Name.size());
Calculating TF is very easy, just count the number of occurrences of
each distinct pair and save the result in a Map<Entry<Long, String>,
Long>
:
Map<Entry<Long, String>, Long> tfMap = docWords
.parallel()
.collect(groupingBy(identity(), counting()));
And now we build the inverted index. We start from tfMap
, group by
word, and the list under each word already matches our final product:
the list of all the documents containing the word. We finish off by
applying a transformation to the list: currently it’s just the raw
entries from the tf
map, but we need pairs (docId, tfIDfScore)
.
invertedIndex = tfMap
.entrySet()
.parallelStream()
.collect(groupingBy(
e -> e.getKey().getValue(),
collectingAndThen(
toList(),
entries -> {
double idf = logDocCount - Math.log(entries.size());
return entries.stream().map(e ->
tfidfEntry(e, idf)).collect(toList());
}
)
));
The search function can be implemented with another Streams expression, which you can review in the SearchGui class. You can also run the TfIdfJdkStreams class and take the inverted index for a spin, making actual searches.
There is one last concept in this model that we haven’t mentioned yet:
the stopword set. It contains those words that are known in advance to
be common enough to occur in every document. Without treatment, these
words are the worst case for the inverted index: the document list under
each such word is the longest possible, and the score of all documents
is zero due to zero IDF. They raise the index’s memory footprint without
providing any value. The cure is to prepare a file, stopwords.txt
,
which is read in advance into a Set<String>
and used to filter out the
words in the tokenization phase. The same set is used to cross out words
from the user’s search phrase, as if they weren’t entered. We’ll add this
feature to our DAG based model in the following section.
Translating to Jet DAG
Our DAG as a whole will look relatively complex, but it can be understood as a “backbone” (cascade of vertices) starting from a source and ending in a sink with several more vertices attached on the side. This is just the backbone:
-
The data source is a Hazelcast
IMap
which holds a mapping from document ID to its filename. The source vertex will emit all the map’s entries, but only a subset on each cluster member. -
doc-lines
opens each file named by the map entry and emits all its lines in the(docId, line)
format. -
tokenize
transforms each line into a sequence of its words, again paired with the document ID, so it emits(docId, word)
. -
tf
builds a set of all distinct pairs emitted fromtokenize
and maintains the count of each pair’s occurrences (its TF score). -
tf-idf
takes that set, groups the pairs by word, and calculates the TF-IDF scores. It emits the results to the sink, which saves them to a distributedIMap
.
Edge types follow the same pattern as in the word-counting job: after flatmapping there is first a local, then a distributed partitioned edge. The logic behind it is not the same, though: TF can actually compute the final TF scores by observing just the local data. This is because it treats each document separately (document ID is a part of the grouping key) and the source data is already partitioned by document ID. The TF-IDF vertex does something similar to word count’s combining, but there’s again a twist: it will group the TF entries by word, but instead of just merging them into a single result per word, it will keep them all in lists.
To this cascade we add a stopword-source
which reads the stopwords
file, parses it into a HashSet
, and sends the whole set as a single
item to the tokenize
vertex. We also add a vertex that takes the data
from doc-source
and simply counts its items; this is the total
document count used in the TF-IDF formula. We end up with this DAG:
The choice of edge types into and out of doc-count
may look
surprising, so let’s examine it. We start with the doc-source
vertex,
which emits one item per document, but its output is distributed across
the cluster. To get the full document count on each member, each
doc-count
processor must get all the items, and that’s just what the
distributed broadcast edge will achieve. We’ll configure doc-count
with local parallelism of 1, so there will be one processor on every
member, each observing all the doc-source
items. The output of
doc-count
must reach all tf-idf
processors on the same member, so we
use the local broadcast edge.
Another thing to note are the two flat-mapping vertices: doc-lines
and
tokenize
. From a purely semantic standpoint, composing flatmap with
flatmap yields just another flatmap. As we’ll see below, we’re using
custom code for these two processors… so why did we choose to separate
the logic this way? There are actually two good reasons. The first one
has to do with Jet’s cooperative multithreading model: doc-lines
makes
blocking file IO calls, so it must be declared non-cooperative;
tokenization is pure computation so it can be in a cooperative
processor. The second one is more general: the workload of doc-lines
is very uneven. It consists of waiting, then suddenly coming up with a
whole block of data. If we left tokenization there, performance would
suffer because first the CPU would be forced to sit idle, then we’d be
late in making the next IO call while tokenizing the input. The separate
vertex can proceed at full speed all the time.
Implementation Code
As we announced, some of the processors in our DAG will need custom
implementation code. Let’s start from the source vertex. It is easy,
just the standard IMap
reader:
dag.newVertex("doc-source", readMapP(DOCID_NAME));
The stopwords-producing processor has custom code, but it’s quite simple:
dag.newVertex("stopword-source", StopwordsP::new);
private static class StopwordsP extends AbstractProcessor {
@Override
public boolean complete() {
return tryEmit(docLines("stopwords.txt").collect(toSet()));
}
}
Since this is a source processor, all its action happens in
complete()
. It emits a single item: the HashSet
built directly from
the text file’s lines.
The doc-count
processor can be built from the primitives provided in
Jet’s library:
dag.newVertex("doc-count", Processors.aggregateP(counting()));
The doc-lines
processor is more of a mouthful, but still built from
existing primitives:
Vertex docLines = dag.newVertex("doc-lines", flatMapUsingContextP(
ContextFactory.withCreateFn(jet -> null).toNonCooperative(),
(Object ctx, Entry<Long, String> e) ->
traverseStream(docLines("books/" + e.getValue())
.map(line -> entry(e.getKey(), line)))));
Let’s break down this expression… Processors.flatMap
returns a
standard processor that emits an arbitrary number of items for each
received item. We already saw one in the introductory Word Count
example. There we created a traverser from an array, here we create it
from a Java stream. We additionally apply the nonCooperative()
wrapper
which will declare all the created processors non-cooperative. We
already explained why we do this: this processor will make blocking I/O
calls.
tokenize
is another custom vertex:
dag.newVertex("tokenize", TokenizeP::new);
private static class TokenizeP extends AbstractProcessor {
private Set<String> stopwords;
private final FlatMapper<Entry<Long, String>, Entry<Long, String>>
flatMapper = flatMapper(e -> traverseStream(
Arrays.stream(DELIMITER.split(e.getValue().toLowerCase()))
.filter(word -> !stopwords.contains(word))
.map(word -> entry(e.getKey(), word))));
@Override
@SuppressWarnings("unchecked")
protected boolean tryProcess0(@Nonnull Object item) {
stopwords = (Set<String>) item;
return true;
}
@Override
@SuppressWarnings("unchecked")
protected boolean tryProcess1(@Nonnull Object item) {
return flatMapper.tryProcess((Entry<Long, String>) item);
}
}
This is a processor that must deal with two different inbound edges. It
receives the stopword set over edge 0 and then it does a flatmapping
operation on edge 1. The logic presented here uses the same approach as
the implementation of the provided Processors.flatMap()
processor:
there is a single instance of FlatMapper
that holds the business logic
of the transformation, and tryProcess1()
directly delegates into it.
If FlatMapper
is done emitting the previous items, it will accept the
new item, apply the user-provided transformation, and start emitting the
output items. If the outbox refuses a pending item, it will return
false
, which will make the framework call the same tryProcess1()
method later, with the same input item.
Let’s show the code that creates the tokenize
's two inbound edges:
dag.edge(between(stopwordSource, tokenize).broadcast().priority(-1))
.edge(from(docLines).to(tokenize, 1));
Especially note the .priority(-1)
part: this ensures that there will
be no attempt to deliver any data coming from docLines
before all the
data from stopwordSource
is already delivered. The processor would
fail if it had to tokenize a line before it has its stopword set in
place.
tf
is another simple vertex, built purely from the provided
primitives:
dag.newVertex("tf", aggregateByKeyP(
singletonList(wholeItem()), counting(), Util::entry));
tf-idf
is the most complex processor:
dag.newVertex("tf-idf", TfIdfP::new);
private static class TfIdfP extends AbstractProcessor {
private double logDocCount;
private final Map<String, List<Entry<Long, Double>>> wordDocTf =
new HashMap<>();
private final Traverser<Entry<String, List<Entry<Long, Double>>>>
invertedIndexTraverser = lazy(() ->
traverseIterable(wordDocTf.entrySet())
.map(this::toInvertedIndexEntry));
@Override
protected boolean tryProcess0(@Nonnull Object item) {
logDocCount = Math.log((Long) item);
return true;
}
@Override
@SuppressWarnings("unchecked")
protected boolean tryProcess1(@Nonnull Object item) {
Entry<Entry<Long, String>, Long> e =
(Entry<Entry<Long, String>, Long>) item;
long docId = e.getKey().getKey();
String word = e.getKey().getValue();
long tf = e.getValue();
wordDocTf.computeIfAbsent(word, w -> new ArrayList<>())
.add(entry(docId, (double) tf));
return true;
}
@Override
public boolean complete() {
return emitFromTraverser(invertedIndexTraverser);
}
private Entry<String, List<Entry<Long, Double>>> toInvertedIndexEntry(
Entry<String, List<Entry<Long, Double>>> wordDocTf
) {
String word = wordDocTf.getKey();
List<Entry<Long, Double>> docidTfs = wordDocTf.getValue();
return entry(word, docScores(docidTfs));
}
private List<Entry<Long, Double>> docScores(
List<Entry<Long, Double>> docidTfs
) {
double logDf = Math.log(docidTfs.size());
return docidTfs.stream()
.map(tfe -> tfidfEntry(logDf, tfe))
.collect(toList());
}
private Entry<Long, Double> tfidfEntry(
double logDf, Entry<Long, Double> docidTf
) {
Long docId = docidTf.getKey();
double tf = docidTf.getValue();
double idf = logDocCount - logDf;
return entry(docId, tf * idf);
}
}
This is quite a lot of code, but each of the three pieces is not too difficult to follow:
-
tryProcess0()
accepts a single item, the total document count. -
tryProcess1()
performs a boilerplate group-and-aggregate operation, collecting a list of items under each key. -
complete()
outputs the accumulated results, also applying the final transformation on each one: replacing the TF score with the final TF-IDF score. It relies on a lazy traverser, which holds aSupplier<Traverser>
and will obtain the inner traverser from it the first timenext()
is called. This makes it very simple to write code that obtains a traverser from a map after it has been populated.
Finally, our DAG is terminated by a sink vertex:
dag.newVertex("sink", SinkProcessors.writeMapP(INVERTED_INDEX));
9.3. Processor
Processor
is the main type whose implementation is up to the user of the Core API:
it contains the code of the computation to be performed by a vertex.
There are a number of Processor building blocks in the Core API which
allow you to just specify the computation logic, while the provided code
handles the processor’s cooperative behavior. Please refer to the
AbstractProcessor section.
A processor’s work can be conceptually described as follows: "receive
data from zero or more input streams and emit data into zero or more
output streams." Each stream maps to a single DAG edge (either inbound
or outbound). There is no requirement on the correspondence between
input and output items; a processor can emit any data it sees fit,
including none at all. The same Processor
abstraction is used for all
kinds of vertices, including sources and sinks.
The implementation of a processor can be stateful and does not need to be thread-safe because Jet guarantees to use the processor instances from one thread at a time, although not necessarily always the same thread.
9.3.1. Cooperativeness
Processor
instances are cooperative by default. The processor can opt
out of cooperative multithreading by overriding isCooperative()
to
return false
. Jet will then start a dedicated thread for it.
To maintain an overall good throughput, a cooperative processor must take care not to hog the thread for too long (a rule of thumb is up to a millisecond at a time). Jet’s design strongly favors cooperative processors and most processors can and should be implemented to fit these requirements. The major exception are sources and sinks because they often have no choice but calling into blocking I/O APIs.
9.3.2. The Outbox
The processor sends its output items to its
Outbox
which has a separate bucket for each outbound edge. The buckets have
limited capacity and will refuse an item when full. A cooperative
processor should be implemented such that when the outbox refuses its
item, it saves its processing state and returns from the processing
method. The execution engine will then drain the outbox buckets.
9.3.3. Data Processing Callbacks
process(ordinal, inbox)
Jet passes the items received over a given edge to the processor by
calling
process(ordinal, inbox)
.
All items received since the last process()
call are in the inbox, but
also all the items the processor hasn’t removed in a previous
process()
call. There is a separate instance of Inbox
for each
inbound edge, so any given process()
call involves items from only one
edge.
The processor must not remove an item from the inbox until it has fully
processed it. This is important with respect to the cooperative
behavior: the processor may not be allowed to emit all items
corresponding to a given input item and may need to return from the
process()
call early, saving its state. In such a case the item should
stay in the inbox so Jet knows the processor has more work to do even if
no new items are received.
tryProcessWatermark(watermark)
When new highest watermark is received from all input edges and all
input processor instances, the
tryProcessWatermark(watermark)
method is called. The watermark value is always greater than in the
previous call.
The implementation may choose to process only partially and return
false
, in which case it will be called again later with the same
timestamp before any other processing method is called. When the method
returns true
, the watermark is forwarded to the downstream processors.
tryProcess()
If a processor’s inbox is empty, Jet will call its
tryProcess()
method instead. This allows the processor to perform work that is not
input data-driven. The method has a boolean
return value and if it
returns false
, it will be called again before any other methods are
called. This way it can retry emitting its output until the outbox
accepts it.
An important use case for this method is the emission of watermark
items. A job that processes an infinite data stream may experience
occasional lulls - periods with no items arriving. On the other
hand, a windowing processor is not allowed to act upon each item
immediately due to event skew; it must wait for a watermark item to
arrive. During a stream lull this becomes problematic because the
watermark itself is primarily data-driven and advances in response to
the observation of event timestamps. The watermark-inserting processor
must be able to advance the watermark even during a stream lull, based
on the passage of wall-clock time, and it can do it inside the
tryProcess()
method.
complete()
Jet calls
complete()
when all the input edges are exhausted. It is the last method to be
invoked on the processor before disposing of it. Typically this is where
a batch processor emits the results of an aggregating operation. If it
can’t emit everything in a given call, it should return false
and will
be called again later.
9.3.4. Snapshotting Callbacks
Hazelcast Jet supports fault-tolerant processing jobs by taking distributed snapshots. In regular time intervals each of the source vertices will perform a snapshot of its own state and then emit a special item to its output stream: a barrier. The downstream vertex that receives the barrier item makes its own snapshot and then forwards the barrier to its outbound edges, and so on towards the sinks.
At the level of the Processor
API the barrier items are not visible;
ProcessorTasklet
handles them internally and invokes the snapshotting
callback methods described below.
saveToSnapshot()
Jet will call
saveToSnapshot()
when it determines it’s time for the processor to save its state to the
current snapshot. Except for source vertices, this happens when the
processor has received the barrier item from all its inbound streams and
processed all the data items preceding it. The method must emit all its
state to the special snapshotting bucket in the Outbox, by calling
outbox.offerToSnapshot()
. If the outbox doesn’t accept all the data,
it must return false
to be called again later, after the outbox has
been flushed.
When this method returns true
, ProcessorTasklet
will forward the
barrier item to all the outbound edges.
restoreFromSnapshot()
When a Jet job is restarting after having been suspended, it will first
reload all the state from the last successful snapshot. Each processor
will get its data through the invocations of
restoreFromSnapshot()
.
Its parameter is the Inbox
filled with a batch of snapshot data. The
method will be called repeatedly until it consumes all the snapshot
data.
finishSnapshotRestore()
After it has delivered all the snapshot data to restoreFromSnapshot()
,
Jet will call
finishSnapshotRestore()
.
The processor may use it to initialize some transient state from the
restored state.
9.3.5. Best Practice: Document At-Least-Once Behavior
As we discuss in the Jet Concepts chapter, the behavior of a processor under at-least-once semantics can deviate from correctness in extremely non-trivial and unexpected ways. Therefore the processor should always document its possible behaviors for that case.
9.4. AbstractProcessor
AbstractProcessor
is a convenience class designed to deal with most of the boilerplate in
implementing the full Processor
API.
9.4.1. Receiving items
On the reception side the first line of convenience are the
tryProcessN()
methods. While in the inbox the watermark and data items
are interleaved, these methods take care of the boilerplate needed to
filter out the watermarks. Additionally, they get one item at a time,
eliminating the need to write a suspendable loop over the input items.
There is a separate method specialized for each edge from 0 to 4
(tryProcess0
..tryProcess4
) and a catch-all method
tryProcess(ordinal, item)
. If the processor doesn’t need to
distinguish between the inbound edges, the latter method is a good
match; otherwise, it is simpler to implement one or more of the
ordinal-specific methods. The catch-all method is also the only way to
access inbound edges beyond ordinal 4, but such cases are very rare in
practice.
Paralleling the above there are tryProcessWm(ordinal, wm)
and
tryProcessWmN(wm)
methods that get just the watermark items.
9.4.2. Emitting items
AbstractProcessor
has a private reference to its outbox and lets you
access all its functionality indirectly. The tryEmit()
methods offer
your items to the outbox. If you get a false
return value, you must
stop emitting items and return from the current callback method of the
processor. For example, if you called tryEmit()
from tryProcess0()
,
you should return false
so Jet will call tryProcess0()
again later,
when there’s more room in the outbox. Similar to these methods there are
tryEmitToSnapshot()
and emitFromTraverserToSnapshot()
, to be used
from the saveToSnapshot()
callback.
Implementing a processor to respect the above rule is quite tricky and error-prone. Things get especially tricky when there are several items to emit, such as:
-
when a single input item maps to many output items
-
when the processor performs a group-by-key operation and emits each group as a separate item
You can avoid most of the complexity if you wrap all your output in a
Traverser
. Then you can simply say return
emitFromTraverser(myTraverser)
. It will:
-
try to emit as many items as possible
-
return
false
if the outbox refuses an item -
hold on to the refused item and continue from it when it’s called again with the same traverser.
There is one more layer of convenience relying on emitFromTraverser()
:
the nested class FlatMapper
, which makes it easy to implement a
flatmapping kind of transformation. It automatically handles the concern
of creating a new traverser when the previous one is exhausted and
reusing the previous one until exhausted.
9.4.3. Traverser
Traverser
is a very simple functional interface whose shape matches
that of a Supplier
, but with a contract specialized for the traversal
over a sequence of non-null items: each call to its next()
method
returns another item of the sequence until exhausted, then keeps
returning null
. A traverser may also represent an infinite,
non-blocking stream of items: it may return null
when no item is
currently available, then later return more items.
The point of this type is the ability to implement traversal over any kind of dataset or lazy sequence with minimum hassle: often just by providing a one-liner lambda expression. This makes it very easy to integrate with Jet’s convenience APIs for cooperative processors.
Traverser
also supports some default
methods that facilitate
building a simple transformation layer over the underlying sequence:
map
, filter
, flatMap
, etc.
The following example shows how you can implement a simple flatmapping processor:
class ItemAndSuccessorP extends AbstractProcessor {
private final FlatMapper<Integer, Integer> flatMapper =
flatMapper(i -> traverseIterable(asList(i, i + 1)));
@Override
protected boolean tryProcess(int ordinal, @Nonnull Object item) {
return flatMapper.tryProcess((int) item);
}
}
For each received Integer
this processor emits the number and its
successor. If the outbox refuses an item, flatMapper.tryProcess()
returns false
and stays ready to resume the next time it is invoked.
The fact that it returned false
signals Jet to invoke
ItemAndSuccessorP.tryProcess()
again with the same arguments.
9.5. Watermark Policy
As mentioned in the Time Ordering and the Watermark chapter, determining the watermark is somewhat of a black art; it’s about superimposing order over a disordered stream of events. We must decide at which point it stops making sense to wait even longer for data about past events to arrive. There’s a tension between two opposing forces here:
-
wait as long as possible to account for all the data;
-
get results as soon as possible.
While there are ways to (kind of) achieve both, there’s a significant associated cost in terms of complexity and overall performance. Hazelcast Jet takes a simple approach and strictly triages stream items into “still on time” and “late”, discarding the latter.
WatermarkPolicy
is the abstraction that computes the value of the watermark for a
(sub)stream of disordered data items. It takes as input the timestamp of
each observed item and outputs the current watermark value.
9.5.1. Predefined Watermark Policy
We provide a simple, data-agnostic watermark policy, the
limitingLag()
policy. This policy will maintain a watermark that lags behind the
highest observed event timestamp by a configured amount. In other words,
each time an event with the highest timestamp so far is encountered,
this policy advances the watermark to eventTimestamp - lag
. This puts
a limit on the spread between timestamps in the stream: all events whose
timestamp is more than the configured lag
behind the highest timestamp
are considered late.
9.5.2. Watermark Throttling
The policy objects presented above will return the “ideal” watermark
value according to their logic; however it would be too much overhead to
insert a watermark item each time the ideal watermark advances
(typically every millisecond). The
EventTimePolicy.watermarkThrottlingFrameSize()
and
EventTimePolicy.watermarkThrottlingFrameOffset()
are the parameters that control the watermark interval.
Since the watermark is only needed for windowing aggregation, we can emit only those watermarks, that will cause a new window to be emitted. In case of sliding windows, the throttling frame size and offset should correspond to the sliding step and frame offset of the sliding window. Session windows can emit windows at any moment, using high throttling frame size will increase latency as the event time only advances with the arrival of a newer watermark. For session window in the Pipeline API, we use 100ms as the throttling frame size or less, if the session timeout is short.
9.6. Vertices in the Library
While formally there’s only one kind of vertex in Jet, in practice there is an important distinction between the following:
-
A source is a vertex with no inbound edges. It injects data from the environment into the Jet job.
-
A sink is a vertex with no outbound edges. It drains the output of the Jet job into the environment.
-
A computational vertex has both kinds of edges. It accepts some data from upstream vertices, performs some computation, and emits the results to downstream vertices. Typically it doesn’t interact with the environment.
The com.hazelcast.jet.processor
package contains static utility
classes with factory methods that return suppliers of processors, as
required by the dag.newVertex(name, procSupplier)
calls. There is a
convention in Jet that every module containing vertex implementations
contributes a utility class to the same package. Inspecting the
contents of this package in your IDE should allow you to discover all
vertex implementations available on the project’s classpath. For example,
there are modules that connect to 3rd party resources like Kafka and
Hadoop Distributed File System (HDFS). Each such module declares a class
in the same package, com.hazelcast.jet.processor
, exposing the
module’s source and sink definitions.
The main factory class for the source vertices provided by the Jet core
module is SourceProcessors
. It contains sources that ingest data from
Hazelcast IMDG structures like IMap
, ICache
, IList
, etc., as well as
some simple sources that get data from files and TCP sockets (readFiles
,
streamSocket
and some more).
Paralleling the sources there’s SinkProcessors
for the sink vertices,
supporting the same range of resources (IMDG, files, sockets). There’s
also a general writeBuffered
method that takes some boilerplate out of
writing custom sinks. The user must implement a few primitives: create a
new buffer, add an item to it, flush the buffer. The provided code takes
care of integrating these primitives into the Processor
API (draining
the inbox into the buffer and handling the general lifecycle).
Finally, the computational vertices are where the main action takes
place. The main class with factories for built-in computational
vertices is Processors
.
9.7. Implement a Custom Source or Sink Vertex
The Hazelcast Jet distribution contains vertices for the sources and sinks exposed through the Pipeline API. You can extend Jet’s support for sources and sinks by writing your own vertex implementations.
One of the main concerns when implementing a source vertex is that the data source is typically distributed across multiple machines and partitions, and the work needs to be distributed across multiple members and processors.
9.7.1. How Jet Creates and Initializes a Job
These are the steps taken to create and initialize a Jet job:
-
The user builds the DAG and submits it to the local Jet client instance.
-
The client instance serializes the DAG and sends it to a member of the Jet cluster. This member becomes the coordinator for this Jet job.
-
The coordinator deserializes the DAG and builds an execution plan for each member.
-
The coordinator serializes the execution plans and distributes each to its target member.
-
Each member acts upon its execution plan by creating all the needed tasklets, concurrent queues, network senders/receivers, etc.
-
The coordinator sends the signal to all members to start job execution.
The most visible consequence of the above process is the
ProcessorMetaSupplier
type: one must be provided for each Vertex
. In Step 3, the coordinator
deserializes the meta-supplier as a constituent of the DAG
and asks it
to create ProcessorSupplier
instances which go into the execution
plans. A separate instance of ProcessorSupplier
is created
specifically for each member’s plan. In Step 4, the coordinator
serializes these and sends each to its member. In Step 5 each member
deserializes its ProcessorSupplier
and asks it to create as many
Processor
instances as configured by the vertex’s localParallelism
property.
This process is so involved because each Processor
instance may need
to be differently configured. This is especially relevant for processors
driving a source vertex: typically each one will emit only a slice of
the total data stream, as appropriate to the partitions it is in charge
of.
ProcessorMetaSupplier
The client serializes an instance of ProcessorMetaSupplier
as part of
each Vertex
in the DAG
. The coordinator member deserializes the
instance and uses it to create ProcessorSupplier
s by calling the
ProcessorMetaSupplier.get()
method. Before that, coordinator calls the
init()
method with a context object that you can use to get useful
information. The get()
method takes List<Address>
as a parameter,
which you should use to determine cluster members that will run the job,
if needed.
ProcessorSupplier
Usually this type will be custom-implemented in the same cases where the
meta-supplier is custom-implemented and will complete the logic of a
distributed data source’s partition assignment. It creates instances of
Processor
ready to start executing the vertex’s logic.
Another use is to open and close external resources, such as files or
connections. We provide CloseableProcessorSupplier
for this.
9.7.2. Example - Distributed Integer Generator
Let’s say we want to write a simple source that will generate numbers
from 0 to 1,000,000 (exclusive). It is trivial to write a single
Processor
which can do this using java.util.stream
and
Traverser
class GenerateNumbersP extends AbstractProcessor {
private final Traverser<Integer> traverser;
GenerateNumbersP(int upperBound) {
traverser = Traversers.traverseStream(range(0, upperBound).boxed());
}
@Override
public boolean complete() {
return emitFromTraverser(traverser);
}
}
Now we can build our DAG and execute it:
JetInstance jet = Jet.newJetInstance();
int upperBound = 10;
DAG dag = new DAG();
Vertex generateNumbers = dag.newVertex("generate-numbers",
() -> new GenerateNumbersP(upperBound));
Vertex logInput = dag.newVertex("log-input",
DiagnosticProcessors.writeLoggerP(i -> "Received number: " + i));
dag.edge(Edge.between(generateNumbers, logInput));
try {
jet.newJob(dag).join();
} finally {
Jet.shutdownAll();
}
When you run this code, you will see the output as below:
Received number: 4 Received number: 0 Received number: 3 Received number: 2 Received number: 2 Received number: 2
Since we are using the default parallelism setting on this vertex, several instances of the source processor were created, all of which generated the same sequence of values. Generally we want the ability to parallelize the source vertex, so we have to make each processor emit only a slice of the total data set.
So far we’ve used the simplest approach to creating processors: a
DistributeSupplier<Processor>
function that keeps returning equal
instances of processors. Now we’ll step up to Jet’s custom interface that
gives us the ability to provide a list of separately configured
processors: ProcessorSupplier
and its method get(int processorCount)
.
First we must decide on a partitioning policy: what subset will each
processor emit. In our simple example we can use a simple policy: we’ll
label each processor with its index in the list and have it emit only
those numbers n
that satisfy n % processorCount processorIndex
.
Let’s write a new constructor for our processor which implements this
partitioning logic:
GenerateNumbersP(int upperBound, int processorCount, int processorIndex) {
traverser = Traversers.traverseStream(
range(0, upperBound)
.filter(n -> n % processorCount == processorIndex)
.boxed());
}
Given this preparation, implementing ProcessorSupplier
is easy:
class GenerateNumbersPSupplier implements ProcessorSupplier {
private final int upperBound;
GenerateNumbersPSupplier(int upperBound) {
this.upperBound = upperBound;
}
@Override @Nonnull
public List<? extends Processor> get(int processorCount) {
return
range(0, processorCount)
.mapToObj(index -> new GenerateNumbersP(
upperBound, processorCount, index))
.collect(toList());
}
}
Let’s use the custom processor supplier in our DAG-building code:
DAG dag = new DAG();
Vertex generateNumbers = dag.newVertex("generate-numbers",
new GenerateNumbersPSupplier(10));
Vertex logInput = dag.newVertex("log-input",
DiagnosticProcessors.writeLoggerP(i -> "Received number: " + i));
dag.edge(Edge.between(generateNumbers, logInput));
Now we can re-run our example and see that each number indeed occurs only once. However, note that we are still working with a single-member Jet cluster; let’s see what happens when we add another member:
JetInstance jet = Jet.newJetInstance();
Jet.newJetInstance();
int upperBound = 10;
DAG dag = new DAG();
// rest of the code same as above
Running after this change we’ll see that both members are generating the
same set of numbers. This is because ProcessorSupplier
is instantiated
independently for each member and asked for the same number of
processors, resulting in identical processors on all members. We have to
solve the same problem as we just did, but at the higher level of
cluster-wide parallelism. For that we’ll need the
ProcessorMetaSupplier
: an interface which acts as a factory of
ProcessorSupplier
s, one for each cluster member. Under the hood it is
actually always the meta-supplier that’s created by the DAG-building
code; the above examples are just implicit about it for the sake of
convenience. They result in a simple meta-supplier that reuses the
provided suppliers everywhere.
The meta-supplier is a bit trickier to implement because its method
takes a list of Jet member addresses instead of a simple count, and the
return value is a function from address to ProcessorSupplier
. In our
case we’ll treat the address as just an opaque ID and we’ll build a map
from address to a properly configured ProcessorSupplier
. Then we can
simply return map::get
as our function.
class GenerateNumbersPMetaSupplier implements ProcessorMetaSupplier {
private final int upperBound;
private transient int totalParallelism;
private transient int localParallelism;
GenerateNumbersPMetaSupplier(int upperBound) {
this.upperBound = upperBound;
}
@Override
public void init(@Nonnull Context context) {
totalParallelism = context.totalParallelism();
localParallelism = context.localParallelism();
}
@Override @Nonnull
public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses) {
Map<Address, ProcessorSupplier> map = new HashMap<>();
for (int i = 0; i < addresses.size(); i++) {
// We'll calculate the global index of each processor in the cluster:
int globalIndexBase = localParallelism * i;
// Capture the value of the transient field for the lambdas below:
int divisor = totalParallelism;
// processorCount will be equal to localParallelism:
ProcessorSupplier supplier = processorCount ->
range(globalIndexBase, globalIndexBase + processorCount)
.mapToObj(globalIndex ->
new GenerateNumbersP(upperBound, divisor, globalIndex)
).collect(toList());
map.put(addresses.get(i), supplier);
}
return map::get;
}
}
We change our DAG-building code to use the meta-supplier:
DAG dag = new DAG();
Vertex generateNumbers = dag.newVertex("generate-numbers",
new GenerateNumbersPMetaSupplier(upperBound));
Vertex logInput = dag.newVertex("log-input",
DiagnosticProcessors.writeLoggerP(i -> "Received number: " + i));
dag.edge(Edge.between(generateNumbers, logInput));
After re-running with two Jet members, we should once again see each number generated just once.
9.7.3. Sinks
Like a source, a sink is just another kind of processor. It accepts
items from the inbox and pushes them into some system external to the
Jet job (Hazelcast IMap, files, databases, distributed queues, etc.). A
simple way to implement it is to extend
AbstractProcessor
and override tryProcess
, which deals with items one at a time.
However, sink processors must often explicitly deal with batching. In
this case directly implementing Processor
is better because its
process()
method gets the entire Inbox
which can be drained to a
buffer and flushed out.
9.7.4. Example: File Writer
In this example we’ll implement a vertex that writes the received items
to files. To avoid contention and conflicts, each processor must write
to its own file. Since we’ll be using a BufferedWriter
which takes
care of the buffering/batching concern, we can use the simpler approach
of extending AbstractProcessor
:
class WriteFileP extends AbstractProcessor implements Closeable {
private final String path;
private transient BufferedWriter writer;
WriteFileP(String path) {
this.path = path;
}
@Override
public boolean isCooperative() {
return false;
}
@Override
protected void init(@Nonnull Context context) throws Exception {
Path path = Paths.get(this.path, context.jetInstance().getName()
+ '-' + context.globalProcessorIndex());
writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8);
}
@Override
protected boolean tryProcess(int ordinal, @Nonnull Object item) throws Exception {
writer.append(item.toString());
writer.newLine();
return true;
}
@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
}
}
}
Some comments:
-
The constructor declares the processor non-cooperative because it will perform blocking IO operations.
-
init()
method finds a unique filename for each processor by relying on the information reachable from theContext
object. -
Note the careful implementation of
close()
: it first checks if writer is null, which can happen ifnewBufferedWriter()
fails ininit()
. This would makeinit()
fail as well, which would make the whole job fail and then ourProcessorSupplier
would callclose()
to clean up.
Cleaning up on completion/failure is actually the only concern that we
need ProcessorSupplier
for: the other typical concern, specializing
processors to achieve data partitioning, was achieved directly from the
processor’s code. This is the supplier’s code:
class WriteFilePSupplier implements ProcessorSupplier {
private final String path;
private transient List<WriteFileP> processors;
WriteFilePSupplier(String path) {
this.path = path;
}
@Override
public void init(@Nonnull Context context) {
File homeDir = new File(path);
boolean success = homeDir.isDirectory() || homeDir.mkdirs();
if (!success) {
throw new JetException("Failed to create " + homeDir);
}
}
@Override @Nonnull
public List<WriteFileP> get(int count) {
processors = Stream.generate(() -> new WriteFileP(path))
.limit(count)
.collect(Collectors.toList());
return processors;
}
@Override
public void close(Throwable error) {
try {
for (WriteFileP p : processors) {
p.close();
}
} catch (IOException e) {
throw new JetException(e);
}
}
}
9.8. Best Practices
9.8.1. Inspecting Processor Input and Output
The structure of the DAG model is a very poor match for Java’s type system, which results in the lack of compile-time type safety between connected vertices. Developing a type-correct DAG therefore usually requires some trial and error. To facilitate this process, but also to allow many more kinds of diagnostics and debugging, Jet’s library offers ways to capture the input/output of a vertex and inspect it.
Peeking with Processor Wrappers
The first approach is to decorate a vertex declaration with a layer that
will log all the data traffic going through it. This support is present
in the
DiagnosticProcessors
factory class, which contains the following methods:
-
peekInput()
: logs items received at any edge ordinal. -
peekOutput()
: logs items emitted to any ordinal. An item emitted to several ordinals is logged just once.
These methods take two optional parameters:
-
toStringF
returns the string representation of an item. The default is to useObject.toString()
. -
shouldLogF
is a filtering function so you can focus your log output only on some specific items. The default is to log all items.
Example Usage
Suppose we have declared the second-stage vertex in a two-stage aggregation setup:
DAG dag = new DAG();
Vertex combine = dag.newVertex("combine",
combineByKeyP(counting(), Util::entry)
);
We’d like to see what exactly we’re getting from the first stage, so
we’ll wrap the processor supplier with peekInput()
:
Vertex combine = dag.newVertex("combine",
peekInputP(combineByKeyP(counting(), Util::entry))
);
Keep in mind that logging happens on the machine running hosting the processor, so this technique is primarily targeted to Jet jobs the developer runs locally in his development environment.
Attaching a Sink Vertex
Since most vertices are implemented to emit the same data stream to all
attached edges, it is usually possible to attach a diagnostic sink to
any vertex. For example, Jet’s standard
writeFileP()
sink vertex can be very useful here.
Example Usage
In the example from the Word Count tutorial we can add the following declarations:
Vertex diagnose = dag
.newVertex("diagnose", writeFileP(
"tokenize-output", Object::toString, UTF_8, false))
.localParallelism(1);
dag.edge(from(tokenize, 1).to(diagnose));
This will create the directory tokenize-output
which will contain one
file per processor instance running on the machine. When running in a
cluster, you can inspect on each member the input seen on that member.
By specifying the allToOne()
routing policy you can also have the
output of all the processors on all the members saved on a single member
(although the choice of exactly which member will be arbitrary).
9.8.2. How to Unit-Test a Processor
We provide some utility classes to simplify writing unit tests for your custom processors. You can find them in the
com.hazelcast.jet.core.test
package. Using these utility classes you can unit test your processor by
passing it some input items and asserting the expected output.
Start by calling
TestSupport.verifyProcessor()
by passing it a processor supplier or a processor instance.
The test process does the following:
-
initialize the processor by calling
Processor.init()
-
do a snapshot+restore (optional, see below)
-
call
Processor.process(0, inbox)
. The inbox always contains one item from theinput
parameter -
every time the inbox gets empty, do a snapshot+restore
-
call
Processor.complete()
until it returnstrue
(optional) -
do a final snapshot+restore after
complete()
is done
The optional snapshot+restore test procedure:
-
call
saveToSnapshot()
-
create a new processor instance and use it instead of the existing one
-
restore the snapshot using
restoreFromSnapshot()
-
call
finishSnapshotRestore()
The test optionally asserts that the processor made progress on each call to any processing method. To be judged as having made progress, the callback method must do at least one of these:
-
take something from the inbox
-
put something to the outbox
-
return
true
(applies only toboolean
-returning methods)
Cooperative Processors
The test will provide a 1-capacity outbox to cooperative processors. The
outbox will already be full on every other call to process()
. This
tests the edge case: process()
may be called even when the outbox is
full, giving the processor a chance to process the inbox without
emitting anything.
The test will also assert that the processor doesn’t spend more time in
any callback than the limit specified in cooperativeTimeout(long)
.
Cases Not Covered
This class does not cover these cases:
-
testing of processors which distinguish input or output edges by ordinal
-
checking that the state of a stateful processor is empty at the end (you can do that yourself afterwards with the last instance returned from your supplier)
-
it never calls
Processor.tryProcess()
Example Usage
This will test one of the jet-provided processors:
TestSupport.verifyProcessor(mapP((String s) -> s.toUpperCase()))
.disableCompleteCall() (1)
.disableLogging() (1)
.disableProgressAssertion() (1)
.disableSnapshots() (1)
.cooperativeTimeout(2000) (2)
.outputChecker(TestSupport.SAME_ITEMS_ANY_ORDER) (3)
.input(asList("foo", "bar")) (4)
.expectOutput(asList("FOO", "BAR"));
1 | Enabled by default |
2 | The default is 1000ms |
3 | The default is Objects::equal |
4 | The default is emptyList() |
Other Utility Classes
com.hazelcast.jet.test
contains these classes that you can use as
implementations of Jet interfaces in tests:
-
TestInbox
-
TestOutbox
-
TestProcessorContext
-
TestProcessorSupplierContext
-
TestProcessorMetaSupplierContext
The class JetAssert
contains a few of the assertX()
methods normally
found in JUnit’s Assert
class. We had to reimplement them to avoid a
dependency on JUnit from our production code.
9.9. Management Center Integration
Jet exposes metrics using the JVM’s standard JMX interface. See the Monitor Metrics section.
These metrics are also exposed in the Jet Management Center. Please refer to the Hazelcast Jet Management Center Reference Manual for more information.
Appendix A: Jet Version Compatibility
The following rules currently apply for Jet for compile time and runtime compatibility.
A.1. Semantic Versioning
Hazelcast Jet uses Semantic Versioning which can be summarized as follows:
-
MAJOR version when you make incompatible API changes,
-
MINOR version when you add functionality in a backwards-compatible manner, and
-
PATCH version when you make backwards-compatible bug fixes.
This means that a Jet job written using Pipeline API in a previous minor version should compile in later minor versions.
However some exceptions apply:
-
Classes in
com.hazelcast.jet.core
package which form the Core API of Jet only provide PATCH level compatibility guarantee. -
Classes in
impl
packages do not provide any compatibility guarantees between versions.
A.2. Summary Table
The compatibility guarantees can be summarized as follows:
Type | Component | Guarantee |
---|---|---|
Compile Time |
Job API |
MINOR |
Compile Time |
Pipeline API |
MINOR |
Compile Time |
Core API |
PATCH |
Runtime |
Member to Member |
NONE |
Runtime |
Management Center to Member |
NONE |
Runtime |
Client to Member |
PATCH |
Runtime |
Job State |
PATCH |
Runtime |
Command Line Tools |
MINOR |
Runtime |
Configuration XML files |
PATCH |
Runtime |
Metrics (JMX) |
PATCH |
A.3. Runtime Compatibility
A.3.1. Members
Jet requires that all members in a cluster use the same PATCH version. When updating Jet to a newer PATCH version, the whole cluster must be shutdown and restarted with the newer version at once.
A.3.2. Management Center
Management Center, like members, is only compatible with the same PATCH version. This means that Management Center and the cluster must have the exact same PATCH version to be compatible.
A.3.3. Clients
Jet clients are compatible with the members running on the same MINOR version. This means that a client using an older or newer PATCH version should be able to connect and work with a cluster that’s running a different PATCH version.
A.4. Job State Compatibility
Job state is only compatible across the same MINOR version and only backwards-compatible i.e. a newer PATCH version is be able to understand the job state from a previous PATCH version.
This means that if you have a running job, using the job upgrades and lossless recovery features you are able to upgrade the cluster to a newer PATCH version without losing the state of a running job.
A.5. Command Line Tools and Configuration Files
The command line tools provided such as jet.sh
and the configuration
XML files are backwards-compatible between MINOR versions. This means
that when upgrading a cluster to a new minor version, the XML configuration
for the previous version can be used without any modification.
A.6. Metrics
Jet currently provides metrics to Management Center and also through other means such as JMX. The metrics names may change between MINOR versions but not between PATCH versions.
Appendix B: System Properties
The table below lists the system properties supported by Jet. Jet also supports the system properties defined by Hazelcast IMDG, please see the System properties chapter in Hazelcast IMDG Reference Manual.
When you want to reconfigure a system property, you need to restart the members for which the property is modified. |
Property Name |
Default Value |
Type |
Description |
|
5000 |
long [milliseconds] |
Jet will periodically check for new jobs to start and perform cleanup of unused resources. This property configures how often this check and cleanup will be done. Value is in milliseconds. |
|
true |
boolean |
Whether a JVM shutdown hook is registered to shutdown the node gracefully when the process is terminated. The shutdown hook will terminate all running jobs and then gracefully terminate the note, in a way that is equivalent to calling JetInstance.shutdown(). |
|
604800 (7 days) |
long [seconds] |
Maximum number of time in seconds the job results will be kept in the cluster. They will be automatically deleted after this period is reached. |
|
1000 |
integer |
Maximum number of job results to keep in the cluster, the oldest results will be automatically deleted after this size is reached. |
|
Jet installation path |
string |
Root of Jet installation. Used as default location for the lossless recovery store. By default it will be automatically set to the start of the Jet installation path. |
|
25 |
long [microseconds] |
The minimum time in microseconds the cooperative worker threads will sleep if none of the tasklets made any progress. Lower values increase idle CPU usage but may result in decreased latency. Higher values will increase latency and very high values (>10000µs) will also limit throughput. Note: the underlying |
|
500 |
long [microseconds] |
The maximum time in microseconds the cooperative worker threads will sleep if none of the tasklets made any progress. Lower values increase idle CPU usage but may result in decreased latency. Higher values will increase latency and very high values (>10000µs) will also limit throughput. Note: the underlying |
|
25 |
long [microseconds] |
The minimum time in microseconds the non-cooperative worker threads will sleep if none of the tasklets made any progress. Lower values increase idle CPU usage but may result in decreased latency. Higher values will increase latency and very high values (>10000µs) will also limit throughput. Note: the underlying |
|
5000 |
long [microseconds] |
The maximum time in microseconds the non-cooperative worker threads will sleep if none of the tasklets made any progress. Lower values increase idle CPU usage but may result in decreased latency. Higher values will increase latency and very high values (>10000µs) will also limit throughput. Note: the underlying |
Appendix C: Common Exceptions
You may see the following exceptions thrown when working with Jet:
-
JetException
: A general exception thrown if a job failure occurs. It has the original exception as its cause. -
TopologyChangedException
: Thrown when a member participating in a job leaves the cluster. If auto-restart is enabled, Jet will restart the job automatically, without throwing the exception to the user. -
JobNotFoundException
: Thrown when the coordinator node is not able to find the metadata for a given job.
There are also several Hazelcast exceptions that might be thrown when
interacting with JetInstance
. For a description of Hazelcast IMDG
exceptions, please refer to the
IMDG Reference manual.
Appendix D: Phone Homes
Hazelcast uses phone home data to learn about the usage of Hazelcast Jet.
Hazelcast Jet instances call our phone home server initially when they are started and then every 24 hours. This applies to all the instances joined to the cluster.
D.1. What is sent in?
The following information is sent in a phone home:
-
Hazelcast Jet version
-
Local Hazelcast Jet member UUID
-
Download ID
-
A hash value of the cluster ID
-
Cluster size bands for 5, 10, 20, 40, 60, 100, 150, 300, 600 and > 600
-
Number of connected clients bands of 5, 10, 20, 40, 60, 100, 150, 300, 600 and > 600
-
Cluster uptime
-
Member uptime
-
Environment Information:
-
Name of operating system
-
Kernel architecture (32-bit or 64-bit)
-
Version of operating system
-
Version of installed Java
-
Name of Java Virtual Machine
-
-
Hazelcast IMDG Enterprise specific:
-
Number of clients by language (Java, C++, C#)
-
Flag for Hazelcast Enterprise
-
Hash value of license key
-
Native memory usage
-
D.2. Phone Home Code
The phone home code itself is open source. Please see here.
D.3. Disabling Phone Homes
Set the hazelcast.phone.home.enabled
system property to false either
in the config or on the Java command line.
Starting with Hazelcast Jet 0.5, you can also disable the phone home
using the environment variable HZ_PHONE_HOME_ENABLED
. Simply add the
following line to your .bash_profile
:
export HZ_PHONE_HOME_ENABLED=false
Appendix E: FAQ
You can refer to the FAQ page to see the answers to frequently asked questions related to topics such as the relationship and differences between Hazelcast Jet and Hazelcast IMDG, Jet’s APIs and roadmap.
Appendix F: License Questions
Hazelcast Jet is distributed using the Apache License 2, therefore permissions are granted to use, reproduce and distribute it along with any kind of open source and closed source applications.
Depending on the used feature-set, Hazelcast Jet has certain runtime dependencies which might have different licenses. Following are dependencies and their respective licenses.
F.1. Embedded Dependencies
Embedded dependencies are merged (shaded) with the Hazelcast Jet codebase at compile-time. These dependencies become an integral part of the Hazelcast Jet distribution.
For license files of embedded dependencies, please see the license
directory of the Hazelcast Jet distribution, available at our
download page.
F.1.1. minimal-json
minimal-json is a JSON parsing and generation library which is a part of the Hazelcast Jet distribution. It is used for communication between the Hazelcast Jet cluster and the Management Center.
minimal-json is distributed under the MIT license and offers the same rights to add, use, modify, and distribute the source code as the Apache License 2.0 that Hazelcast uses. However, some other restrictions might apply.
F.1.2. picocli
picocli is a command line parser which is used for the implementation of
jet.sh
command line tool.
picocli is distributed under the terms of the Apache License 2.
F.1.3. Runtime Dependencies
Depending on the used features, additional dependencies might be added to the dependency set. Those runtime dependencies might have other licenses. See the following list of additional runtime dependencies.
F.1.4. Apache Hadoop
Hazelcast integrates with Apache Hadoop and can use it as a data sink or source. Jet has a dependency on the libraries required to read from and write to the Hadoop File System.
Apache Hadoop is distributed under the terms of the Apache License 2.
F.1.5. Apache Kafka
Hazelcast integrates with Apache Kafka and can make use of it as a data sink or source. Hazelcast has a dependency on Kafka client libraries.
Apache Kafka is distributed under the terms of the Apache License 2.
F.1.6. Spring
Hazelcast integrates with Spring and can be configured using Spring Context. Jet has a dependency on the libraries required to create a Spring context.
Spring is distributed under the terms of the Apache License 2.
Glossary
- Accumulation
-
The act of building up an intermediate result inside a mutable object (called the accumulator) as a part of performing an aggregate operation. After all accumulation is done, a finishing function is applied to the object to produce the result of the operation.
- Aggregate Operation
-
A set of functional primitives that instructs Jet how to calculate some aggregate function over one or more data sets. Used in the group-by, co-group and windowing transforms.
- Aggregation
-
The act of applying an aggregate function to a stream of items. The result of the function can be simple, like a sum or average, or complex, like a collection of all aggregated items.
- At-Least-Once Processing Guarantee
-
The system guarantees to process each item of the input stream(s), but doesn’t guarantee it will process it just once.
- Batch Processing
-
The act of processing a finite dataset, such as one stored in Hazelcast IMDG or HDFS.
- Client Server Topology
-
Hazelcast topology where members run outside the user application and are connected to clients using client libraries. The client library is installed in the user application.
- Co-Grouping
-
An operation that is a mix of an SQL JOIN and GROUP BY with specific restrictions. Sometimes called a “stream join”. Each item of each stream that is being joined must be mapped to its grouping key. All items with the same grouping key (from all streams) are aggregated together into a single result. However, the result can be structured and preserve all input items separated by their stream of origin. In that form the operation effectively becomes a pure JOIN with no aggregation.
- DAG
-
Directed Acyclic Graph which Hazelcast Jet uses to model the relationships between individual steps of the data processing.
- Edge
-
A DAG element which holds the logic on how to route the data from one vertex’s processing units to the next one’s.
- Embedded Topology
-
Hazelcast topology where the members are in-process with the user application and act as both client and server.
- Event time
-
A data item in an infinite stream typically contains a timestamp data field. This is its event time. As the stream items go by, the event time passes as the items' timestamps increase. A typical distributed stream has a certain amount of event time disorder (items aren’t strictly ordered by their timestamp) so the “passage of event time” is a somewhat fuzzy concept. Jet uses the watermark to superimpose order over the disordered stream.
- Exactly-Once Processing Guarantee
-
The system guarantees that it will process each item of the input stream(s) and will never process an item more than once.
- Fault Tolerance
-
The property of a distributed computation system that gives it resilience to changes in the topology of the cluster running the computation. If a member leaves the cluster, the system adapts to the change and resumes the computation without loss.
- Hash-Join
-
A special-purpose stream join optimized for the use case of data enrichment. Each item of the primary stream is joined with one item from each of the enriching streams. Items are matched by the join key. The name "hash-join" stems from the fact that the contents of the enriching streams are held in hashtables for fast lookup. Hashtables are replicated on each cluster member, which is why this operation is also known as a “replicated join”.
- Hazelcast IMDG
-
An In-Memory Data grid (IMDG) is a data structure that resides entirely in memory, and is distributed among many machines in a single location (and possibly replicated across different locations). IMDGs can support millions of in-memory data updates per second, and they can be clustered and scaled in ways that support large quantities of data. Hazelcast IMDG is the in-memory data grid offered by Hazelcast.
- HDFS
-
Hadoop Distributed File System. Hazelcast Jet can use it both as a data source and a sink.
- Jet Job
-
A unit of distributed computation that Jet executes. One job has one DAG specifying what to do. A distributed array of Jet processors performs the computation.
- Kafka
-
Apache Kafka is a product that offers a distributed publish-subscribe message queue with guarantees of delivery and message persistence. The most commonly used component over which heterogeneous distributed systems exchange data.
- Latency
-
The time that passes from the occurrence of an event that triggers some response to the occurrence of the response. In the case of Hazelcast Jet’s stream processing, latency refers to the time that passes from the point in time the last item that belongs to a window enters the system to the point where the result for that window appears in the output.
- Member
-
A Hazelcast Jet instance (node) that is a member of a cluster. A single JVM can host one or more Jet members, but in production there should be one member per physical machine.
- Partition (Data)
-
To guarantee that all items with the same grouping key are processed by the same processor, Hazelcast Jet uses a total surjective function to map each data item to the ID of its partition and assigns to each processor its unique subset of all partition IDs. A partitioned edge then routes all items with the same partition ID to the same processor.
- Partition (Network)
-
A malfunction in network connectivity that splits the cluster into two or more parts that are mutually unreachable, but the connections among nodes within each part remain intact. May cause each of the parts to behave as if it was “the” cluster that lost the other members. Also known as “split brain”.
- Pipeline
-
Hazelcast Jet’s name for the high-level description of a computation job constructed using the Pipeline API. Topologically it is a DAG, but the vertices have different semantics than the Core API vertices and are called pipeline stages. Edges are implicit and not expressed in the API. Each stage (except for source/sink stages) has an associated transform that it performs on its input data.
- Processor
-
The unit which contains the code of the computation to be performed by a vertex. Each vertex’s computation is implemented by a processor. On each Jet cluster member there are one or more instances of the processor running in parallel for a single vertex.
- Session Window
-
A window that groups an infinite stream’s items by their timestamp. It groups together bursts of events closely spaced in time (by less than the configured session timeout).
- Sliding Window
-
A window that groups an infinite stream’s items by their timestamp. It groups together events that belong to a segment of fixed size on the timeline. As the time passes, the segment slides along, always extending from the present into the recent past. In Jet, the window doesn’t literally slide, but hops in steps of user-defined size. (“Time” here refers to the stream’s own notion of time, i.e., event time.)
- Source
-
A resource present in a Jet job’s environment that delivers a data stream to it. Hazelcast Jet uses a source connector to access the resource. Alternatively, source may refer to the DAG vertex that hosts the connector.
- Sink
-
A resource present in a Jet job’s environment that accepts its output data. Hazelcast Jet uses a sink connector to access the resource. Alternatively, sink may refer to the vertex that hosts the connector.
- Skew
-
A generalization of the term “clock skew” applied to distributed stream processing. In this context it refers to the deviation in event time as opposed to wall-clock time in the classical usage. Several substreams of a distributed stream may at the same time emit events with timestamps differing by some delta, due to various lags that accumulate in the delivery pipeline for each substream. This is called stream skew. Event skew refers to the disorder within a substream, where data items appear out of order with respect to their timestamps.
- Split Brain
-
A popular name for a network partition, which see above.
- Stream Processing
-
The act of processing an infinite stream of data, typically implying that the data is processed as soon as it appears. Such a processing job must explicitly deal with the notion of time in order to make sense of the data. It achieves this with the concept of windowing.
- Throughput
-
A measure for the volume of data a system is capable of processing per unit of time. Typical ways to express it for Hazelcast Jet are in terms of events per second and megabytes per second.
- Tumbling Window
-
A window that groups an infinite stream’s items by their timestamp. It groups together events that belong to a segment of fixed size on the timeline. As the time passes, the segment “tumbles” along, never covering the same point in time twice. This means that each event belongs to just one tumbling window position. (“Time” here refers to the stream’s own notion of time, i.e., event time.)
- Vertex
-
The DAG element that performs a step in the overall computation. It receives data from its inbound edges and sends the results of its computation to its outbound edges. There are three kinds of vertices: source (has only outbound edges), sink (has only inbound edges) and computational (has both kinds of edges).
- Watermark
-
A concept that superimposes order over a disordered underlying data stream. An infinite data stream’s items represent timestamped events, but they don’t occur in the stream ordered by the timestamp. The value of the watermark at a certain location in the processing pipeline denotes the lowest value of the timestamp that is expected to occur in the upcoming items. Items that don’t meet this criterion are discarded because they arrived too late to be processed.
- Windowing
-
The act of splitting an infinite stream’s data into windows according to some rule, most typically one that involves the item’s timestamps. Each window becomes the target of an aggregate function, which outputs one data item per window (and per grouping key).