This manual is for an old version of Hazelcast Jet, use the latest stable version.

Remember that a Jet Job is Distributed

The API to submit a job to Jet is in a way deceptively simple: "just call a method." As long as you're toying around with Jet instances started locally in a single JVM, everything will indeed work. However, as soon as you try to deploy to an actual cluster, you'll face the consequences of the fact that your job definition must travel over the wire to reach remote members which don't have your code on their classpath.

Your custom code must be packaged with the Jet job. For simple examples you can have everything in a single class and use code like this:

class JetExample {
    static Job createJob(JetInstance jet) {
        JobConfig jobConfig = new JobConfig();
        jobConfig.addClass(JetExample.class);
        return jet.newJob(createPipeline(), jobConfig);
    }
    ...
}

For more complex jobs it will become more practical to first package the job in a JAR and then use a command-line utility to submit it, as explained next.

Submit a Job from the Command Line

Jet comes with the submit-job script, which allows you to submit a Jet job packaged in a JAR file. You can find it in the Jet distribution zipfile, in the bin directory. On Windows use submit-jet.bat. To use it, follow these steps:

  • Write your main() method and your Jet code the usual way, except for calling JetBootstrap.getInstance() to acquire a Jet client instance (instead of Jet.newJetClient()).

  • Create a runnable JAR which declares its Main-Class in

  • MANIFEST.MF.

  • Run your JAR, but instead of java -jar jetjob.jar use submit-jet.sh jetjob.jar.

  • The script will create a Jet client and configure it from hazelcast-client.xml located in the config directory of Jet's distribution. Adjust that file to suit your needs.

For example, write a class like this:

public class CustomJetJob {
    public static void main(String[] args) {
        JetInstance jet = JetBootstrap.getInstance();
        jet.newJob(buildPipeline()).join();
    }

    static Pipeline buildPipeline() {
        // ...
    }
}

After building the JAR, submit the job:

$ submit-jet.sh jetjob.jar

Watch out for Capturing Lambdas

A typical Jet pipeline involves lambda expressions. Since the whole pipeline definition must be serialized to be sent to the cluster, the lambda expressions must be serializable as well. The Java standard provides an essential building block: if the static type of the lambda is a subtype of Serializable, you will automatically get a lambda instance that can serialize itself.

None of the functional interfaces in the JDK extend Serializable, so we had to mirror the entire java.util.function package in our own com.hazelcast.jet.function with all the interfaces subtyped and made Serializable. Each subtype has the name of the original with Distributed prepended. For example, a DistributedFunction is just like Function, but implements Serializable. We use these types everywhere in the Pipeline API.

As always with this kind of magic, auto-serializability of lambdas has its flipside: it is easy to overlook what's going on.

If the lambda references a variable in the outer scope, the variable is captured and must also be serializable. If it references an instance variable of the enclosing class, it implicitly captures this so the entire class will be serialized. For example, this will fail because JetJob doesn't implement Serializable:

class JetJob {
    private String instanceVar;

    Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        p.drawFrom(readList("input"))
         // Refers to instanceVar, capturing "this", but JetJob is not
         // Serializable so this call will fail.
         .filter(item -> item.equals(instanceVar));
        return p;
    }
}

Just adding implements Serializable to JetJob would be a viable workaround here. However, consider something just a bit different:

class JetJob {
    private String instanceVar;
    private OutputStream fileOut; // a non-serializable field

    Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        p.drawFrom(readList("input"))
         // Refers to instanceVar, capturing "this". JetJob is declared
         // Serializable, but has a non-serializable field and this fails.
         .filter(item -> item.equals(instanceVar));
        return p;
    }
}

Even though we never refer to fileOut, we are still capturing the entire JetJob instance. We might mark fileOut as transient, but the sane approach is to avoid referring to instance variables of the surrounding class. This can be simply achieved by assigning to a local variable, then referring to that variable inside the lambda:

class JetJob {
    private String instanceVar;

    Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        String findMe = instanceVar;
        p.drawFrom(readList("input"))
         // By referring to the local variable "findMe" we avoid
         // capturing "this" and the job runs fine.
         .filter(item -> item.equals(findMe));
        return p;
    }
}

Another common pitfall is capturing an instance of DateTimeFormatter or a similar non-serializable class:

DateTimeFormatter formatter =
        DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
                         .withZone(ZoneId.systemDefault());
Pipeline p = Pipeline.create();
ComputeStage<Long> src = p.drawFrom(readList("input"));
// Captures the non-serializable formatter, so this fails
src.map((Long tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp)));

Sometimes we can get away by using one of the preconfigured formatters available in the JDK:

// Accesses the static final field ISO_LOCAL_TIME. Static fields are
// not subject to lambda capture, they are dereferenced when the code
// runs on the target machine.
src.map((Long tstamp) ->
    DateTimeFormatter.ISO_LOCAL_TIME.format(
        Instant.ofEpochMilli(tstamp).atZone(ZoneId.systemDefault())));

This refers to a static final field in the JDK, so the instance is available on any JVM. A similar approach is to declare our own static final field; however in that case we must add the declaring class as a job resource:

class JetJob {

    // Our own static field
    private static final DateTimeFormatter formatter =
            DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
                             .withZone(ZoneId.systemDefault());

    Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        ComputeStage<Long> src = p.drawFrom(readList("input"));
        src.map((Long tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp)));
        return p;
    }

    // The job will fail unless we attach the JetJob class as a
    // resource, making the formatter instance available at the
    // target machine.
    void runJob(JetInstance jet) throws Exception {
        JobConfig c = new JobConfig();
        c.addClass(JetJob.class);
        jet.newJob(buildPipeline(), c).join();
    }
}

Standard Java Serialization is Slow

When it comes to serializing the description of a Jet job, performance is not critical. However, for the data passing through the pipeline, the cost of the serialize-deserialize cycle can easily dwarf the cost of actual data transfer, especially on high-end LANs typical for data centers. In this context the performance of Java serialization is so poor that it regularly becomes the bottleneck. This is due to its heavy usage of reflection, overheads in the serialized form, etc.

Since Hazelcast IMDG faced the same problem a long time ago, we have mature support for optimized custom serialization and in Jet you can use it for stream data. In essence, you must implement a StreamSerializer for the objects you emit from your processors and register it in Jet configuration:

SerializerConfig serializerConfig = new SerializerConfig()
        .setImplementation(new MyItemSerializer())
        .setTypeClass(MyItem.class);
JetConfig config = new JetConfig();
config.getHazelcastConfig().getSerializationConfig()
      .addSerializerConfig(serializerConfig);
JetInstance jet = Jet.newJetInstance(config);

Consult the chapter on custom serialization in Hazelcast IMDG's reference manual for more details.

Note the limitation implied here: the serializers must be registered with Jet on startup because this is how it is supported in Hazelcast IMDG. There is a plan to improve this and allow serializers to be registered on individual Jet jobs.