This manual is for an old version of Hazelcast Jet, use the latest stable version.

A Job is the unit of work which is executed. A Job is described by a DAG, which describes the computation to be performed, and the inputs and outputs of the computation.

Job is a handle to the execution of a DAG. To create a job, supply the DAG to a previously created JetInstance as shown below:

JetInstance jet = Jet.newJetInstance(); // or Jet.newJetClient();
DAG dag = new DAG();
dag.newVertex(..);
jet.newJob(dag).execute().get();

As hinted in the code example, the job submission API is identical whether you use it from a client machine or directly on an instance of a Jet cluster member. This works because the Job instance is serializable and the client can send it over the network when submitting the job. The same Job instance can be submitted for execution many times.

Job execution is asynchronous. The execute() call returns as soon as the Jet cluster has been contacted and the serialized job is sent to it. The user gets a Future which can be inspected or waited on to find out the outcome of a computation job. It is also cancelable and can send a cancelation command to the Jet cluster.

Note that the Future only signals the status of the job, it does not contain the result of the computation. The DAG explicitly models the storing of results via its sink vertices. Typically the results will be in a Hazelcast map or another structure and have to be accessed by their own API after the job is done.

Deploying the Resources

If the Jet cluster has not been started with all the job's computation code already on the classpath, you have to deploy the code together with the Job instance:

JobConfig config = new JobConfig();
config.addJar("..");
jet.newJob(dag, config).execute().get();

When reading and writing data to the underlying Hazelcast IMDG instance, keep in mind that the deployed code is available only within the scope of the executing Jet job.

Creating and Initializing Jobs

These are the steps taken to create and initialize a Jet job:

  1. The user builds the DAG and submits it to the local Jet client instance.
  2. The client instance serializes the DAG and sends it to a member of the Jet cluster. This member becomes the coordinator for this Jet job.
  3. The coordinator deserializes the DAG and builds an execution plan for each member.
  4. The coordinator serializes the execution plans and distributes each to its target member.
  5. Each member acts upon its execution plan by creating all the needed tasklets, concurrent queues, network senders/receivers, etc.
  6. The coordinator sends the signal to all members to start job execution.

The most visible consequence of the above process is the ProcessorMetaSupplier type: one must be provided for each Vertex. In Step 3, the coordinator deserializes the meta-supplier as a constituent of the DAG and asks it to create ProcessorSupplier instances which go into the execution plans. A separate instance of ProcessorSupplier is created specifically for each member's plan. In Step 4, the coordinator serializes these and sends each to its member. In Step 5 each member deserializes its ProcessorSupplier and asks it to create as many Processor instances as configured by the vertex's localParallelism property.

This process is so involved because each Processor instance may need to be differently configured. This is especially relevant for processors driving a source vertex: typically each one will emit only a slice of the total data stream, as appropriate to the partitions it is in charge of.

ProcessorMetaSupplier

This type is designed to be implemented by the user, but the Processors utility class provides implementations covering most cases. You may need custom meta-suppliers primarily to implement a custom data source or sink. Instances of this type are serialized and transferred as a part of each Vertex instance in a DAG. The coordinator member deserializes it to retrieve ProcessorSuppliers. Before being asked for ProcessorSuppliers, the meta-supplier is given access to the Hazelcast instance so it can find out the parameters of the cluster the job will run on. Most typically, the meta-supplier in the source vertex will use the cluster size to control the assignment of data partitions to each member.

ProcessorSupplier

Usually this type will be custom-implemented in the same cases where its meta-supplier is custom-implemented and complete the logic of a distributed data source's partition assignment. It supplies instances of Processor ready to start executing the vertex's logic.

Please see the Implementing Custom Sources and Sinks section for more guidance on how these interfaces can be implemented.