Jobs
-
All the code and state needed for the Jet job must be declared in the classes that become a part of the job's definition through
JobConfig.addClass()
oraddJar()
. -
If you have a client connecting to your Jet cluster, the Jet job should never have to refer to
ClientConfig
. Create a separateDagBuilder
class using thebuildDag()
method; this class should not have any references to theJobHelper
class. -
You should have a careful control over the object graph which is submitted with the Jet job. Please be aware that inner classes/lambdas may inadvertently capture their parent classes which will cause serialization errors.
Packaging the Job
One way to easily submit the job to a Jet cluster is by using the
submit-job.sh
script (submit-job.bat
on Windows).
The main issue with achieving this is that the JAR must be attached as a
resource to the job being submitted, so the Jet cluster will be able to
load and use its classes. However, from within a running main()
method
it is not trivial to find out the filename of the JAR containing it.
To use the submit-job
script, follow these steps:
-
Write your
main()
method and your Jet code the usual way, except for callingJetBootstrap.getInstance()
to acquire a Jet client instance (instead ofJet.newJetClient()
). -
Create a runnable JAR with your entry point declared as the
Main-Class
inMANIFEST.MF
. -
Run your JAR, but instead of
java -jar jetjob.jar
usesubmit-jet.sh jetjob.jar
. The script is found in the Jet distribution zipfile, in thebin
directory. On Windows usesubmit-jet.bat
. -
The Jet client will be configured from
hazelcast-client.xml
found in theconfig
directory in Jet's distribution directory structure. Adjust that file to suit your needs.
For example, write a class like this:
public class CustomJetJob {
public static void main(String[] args) {
JetInstance jet = JetBootstrap.getInstance();
jet.newJob(buildDag()).execute().get();
}
public static DAG buildDag() {
// ...
}
}
After building the JAR, submit the job:
$ submit-jet.sh jetjob.jar
Inspecting Processor Input and Output
The structure of the DAG model is a very poor match for Java's type system, which results in the lack of compile-time type safety between connected vertices. Developing a type-correct DAG therefore usually requires some trial and error. To facilitate this process, but also to allow many more kinds of diagnostics and debugging, Jet's library offers ways to capture the input/output of a vertex and inspect it.
Peeking with processor wrappers
The first approach is to decorate a vertex declaration with a layer that
will log all the data traffic going through it. This support is present
in the DiagnosticProcessors
factory class, which contains the
following methods:
-
peekInput()
: logs items received at any edge ordinal. -
peekOutput()
: logs items emitted to any ordinal. An item emitted to several ordinals is logged just once.
These methods take two optional parameters:
-
toStringF
returns the string representation of an item. The default is to useObject.toString()
. -
shouldLogF
is a filtering function so you can focus your log output only on some specific items. The default is to log all items.
Example usage
Suppose we have declared the second-stage vertex in a two-stage aggregation setup:
Vertex combine = dag.newVertex("combine",
combineByKey(counting()));
We'd like to see what exactly we're getting from the first stage, so
we'll wrap the processor supplier with peekInput()
:
Vertex combine = dag.newVertex("combine",
peekInput(combineByKey(counting())));
Keep in mind that logging happens on the machine running hosting the processor, so this technique is primarily targetted to Jet jobs the developer runs locally in his development environment.
Attaching a sink vertex
Since most vertices are implemented to emit the same data stream to all
attached edges, it is usually possible to attach a diagnostic sink to
any vertex. For example, Jet's standard writeFile()
sink can be very
useful here.
Example usage
In the example from the Word Count tutorial we can add the following declarations:
Vertex diagnose = dag.newVertex("diagnose",
Sinks.writeFile("tokenize-output"))
.localParallelism(1);
dag.edge(from(tokenize, 1).to(diagnose));
This will create the directory tokenize-output
which will contain one
file per processor instance running on the machine. When running in a
cluster, you can inspect on each member the input seen on that member.
By specifying the allToOne()
routing policy you can also have the
output of all the processors on all the members saved on a single member
(although the choice of exactly which member will be arbitrary).
How to Unit-Test a Processor
Utility classes for unit testing is provided as part of the core API
inside com.hazelcast.jet.test
package. Using these utility classes,
you can unit test custom processors by passing them input items and
asserting the expected output.
A TestSupport.testProcessor()
set of methods is provided for the
typical case.
For cooperative processors a 1-capacity outbox will be provided, which will additionally be full on every other processing method call. This will test edge cases in cooperative processors.
This method does the following:
- initializes the processor by calling
Processor.init()
- calls
Processor.process(0, inbox)
, theinbox
contains all items frominput
parameter - asserts the progress of the
process()
call: that something was taken from the inbox or put to the outbox - calls
Processor.complete()
until it returnstrue
- asserts the progress of the
complete()
call if it returnedfalse
: something must have been put to the outbox.
Note that this method never calls Processor.tryProcess()
.
This class does not cover these cases:
-
testing of processors which distinguish input or output edges by ordinal.
-
checking that the state of a stateful processor is empty at the end (you can do that yourself afterwards).
Example usage. This will test one of the jet-provided processors:
TestSupport.testProcessor(
Processors.map((String s) -> s.toUpperCase()),
asList("foo", "bar"),
asList("FOO", "BAR")
);
Serialization Caveats
Creating a DAG for Jet usually involves writing lambda expressions.
Because the DAG is sent to the cluster members in serialized form,
the lambda expressions must be serializable. To somewhat alleviate the
inconvenience of this requirement, Jet declares the package
com.hazelcast.jet.function
with all the functional interfaces from
java.util.function
subtyped and made Serializable
. Each subtype has
the name of the original with Distributed
prepended. For example, a
DistributedFunction
is just like Function
, but implements
Serializable
. Java has explicit support for lambda target types that
implement Serializable
. There are several caveats, however.
Lambda variable capture
If the lambda references a variable in the outer scope, the variable is
captured and must also be serializable. If it references an instance
variable of the enclosing class, it implicitly captures this
so the
entire class will be serialized. For example, this will fail because
JetJob
doesn't implement Serializable
:
public class JetJob {
private String instanceVar;
public DAG buildDag() {
DAG dag = new DAG();
// Refers to instanceVar, capturing "this", but JetJob is not
// Serializable so this call will fail.
dag.newVertex("filter", filter(item -> instanceVar.equals(item)));
}
}
Just adding implements Serializable
to JetJob
would be a viable workaround here. However, consider something just a bit different:
public class JetJob implements Serializable {
private String instanceVar;
private OutputStream fileOut;
public DAG buildDag() {
DAG dag = new DAG();
// Refers to instanceVar, capturing "this". JetJob is declared
// Serializable, but has a non-serializable field and this fails.
dag.newVertex("filter", filter(item -> parameter.equals(item)));
}
}
Even though we never refer to fileOut
, we are still capturing the
entire JetJob
instance. We might mark fileOut
as transient
, but
the sane approach is to avoid referring to instance variables of the
surrounding class. This can be simply achieved by assigning to a local
variable, then referring to that variable inside the lambda:
public class JetJob {
private String instanceVar;
public DAG buildDag() {
DAG dag = new DAG();
String findMe = instanceVar;
// By referring to the local variable "findMe" we avoid
// capturing "this" and the job runs fine.
dag.newVertex("filter", filter(item -> findMe.equals(item)));
}
}
Another common pitfall is capturing an instance of DateTimeFormatter
or a similar non-serializable class:
DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
.withZone(ZoneId.systemDefault());
// Captures the non-serializable formatter, so this fails
dag.newVertex("map", map((Long tstamp) ->
formatter.format(Instant.ofEpochMilli(tstamp))));
Sometimes we can get away by using one of the preconfigured formatters available in the JDK:
// Accesses the static final field ISO_LOCAL_TIME. Static fields are
// not subject to lambda capture, they are dereferenced when the code
// runs on the target machine.
dag.newVertex("map", map((Long tstamp) ->
DateTimeFormatter.ISO_LOCAL_TIME.format(
Instant.ofEpochMilli(tstamp).atZone(ZoneId.systemDefault()))));
This refers to a static final
field in the JDK, so the instance is available on any JVM. A similar approach is to declare our own static final
field; however in that case we must add the declaring class as a job resource:
public class JetJob {
// Our own static field
private static final DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
.withZone(ZoneId.systemDefault());
DAG buildDag() {
DAG dag = new DAG();
dag.newVertex("map", Processors.map((Long tstamp) ->
formatter.format(Instant.ofEpochMilli(tstamp))));
return dag;
}
// The job will fail unless we attach the JetJob class as a
// resource, making the formatter instance available at the
// target machine.
void runJob(JetInstance jet) throws Exception {
JobConfig c = new JobConfig();
c.addClass(JetJob.class);
jet.newJob(buildDag(), c).execute().get();
}
}
An approach that is self-contained is to instantiate the non-serializable class just in time, inside the processor supplier:
// This lambda captures nothing and creates its own formatter when
// executed.
dag.newVertex("map", () -> {
DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
.withZone(ZoneId.systemDefault());
return Processors.map((Long tstamp) ->
formatter.format(Instant.ofEpochMilli(tstamp))).get();
});
Note the .get()
at the end: this retrieves the processor from the Jet-provided processor supplier and returns it from our custom-declared supplier.
Serialization performance
When it comes to serializing the description of a Jet job, performance is not critical. However, when the job executes, every distributed edge in the DAG will cause stream items to be serialized and sent over the network. In this context the performance of Java serialization is so poor that it regularly becomes the bottleneck. This is due to its heavy usage of reflection, overheads in the serialized form, etc.
Since Hazelcast IMDG faced the same problem a long time ago, we have
mature support for optimized custom serialization and in Jet you can
use it for stream data. In essence, you must implement a
StreamSerializer
for the objects you emit from your processors and
register it in Jet configuration:
SerializerConfig serializerConfig = new SerializerConfig()
.setImplementation(new MyItemSerializer())
.setTypeClass(MyItem.class);
JetConfig config = new JetConfig();
config.getHazelcastConfig().getSerializationConfig()
.addSerializerConfig(serializerConfig);
JetInstance jet = Jet.newJetInstance(config);
Consult the chapter on custom serialization in Hazelcast IMDG's reference manual for more details.
Note the limitation implied here: the serializers must be registered with Jet on startup because this is how it is supported in Hazelcast IMDG. There is a plan to improve this and allow serializers to be registered on individual Jet jobs.