Inspecting Processor Input and Output
The structure of the DAG model is a very poor match for Java's type system, which results in the lack of compile-time type safety between connected vertices. Developing a type-correct DAG therefore usually requires some trial and error. To facilitate this process, but also to allow many more kinds of diagnostics and debugging, Jet's library offers ways to capture the input/output of a vertex and inspect it.
Peeking with Processor Wrappers
The first approach is to decorate a vertex declaration with a layer that
will log all the data traffic going through it. This support is present
factory class, which contains the following methods:
peekInput(): logs items received at any edge ordinal.
peekOutput(): logs items emitted to any ordinal. An item emitted to several ordinals is logged just once.
These methods take two optional parameters:
toStringFreturns the string representation of an item. The default is to use
shouldLogFis a filtering function so you can focus your log output only on some specific items. The default is to log all items.
Suppose we have declared the second-stage vertex in a two-stage aggregation setup:
Vertex combine = dag.newVertex("combine", combineByKey(counting()));
We'd like to see what exactly we're getting from the first stage, so
we'll wrap the processor supplier with
Vertex combine = dag.newVertex("combine", peekInput(combineByKey(counting())));
Keep in mind that logging happens on the machine running hosting the processor, so this technique is primarily targeted to Jet jobs the developer runs locally in his development environment.
Attaching a Sink Vertex
Since most vertices are implemented to emit the same data stream to all
attached edges, it is usually possible to attach a diagnostic sink to
any vertex. For example, Jet's standard
sink vertex can be very useful here.
In the example from the Word Count tutorial we can add the following declarations:
Vertex diagnose = dag.newVertex("diagnose", Sinks.writeFile("tokenize-output")) .localParallelism(1); dag.edge(from(tokenize, 1).to(diagnose));
This will create the directory
tokenize-output which will contain one
file per processor instance running on the machine. When running in a
cluster, you can inspect on each member the input seen on that member.
By specifying the
allToOne() routing policy you can also have the
output of all the processors on all the members saved on a single member
(although the choice of exactly which member will be arbitrary).
How to Unit-Test a Processor
We provide some utility classes to simplify writing unit tests for your custom processors. You can find them in the
package. Using these utility classes you can unit test your processor by
passing it some input items and asserting the expected output.
Start by calling
by passing it a processor supplier or a processor instance.
The test process does the following:
- initialize the processor by calling
- do a snapshot+restore (optional, see below)
Processor.process(0, inbox). The inbox always contains one item from the
- every time the inbox gets empty, do a snapshot+restore
Processor.complete()until it returns
- do a final snapshot+restore after
The optional snapshot+restore test procedure:
- create a new processor instance and use it instead of the existing one
- restore the snapshot using
The test optionally asserts that the processor made progress on each call to any processing method. To be judged as having made progress, the callback method must do at least one of these:
- take something from the inbox
- put something to the outbox
true(applies only to
The test will provide a 1-capacity outbox to cooperative processors. The
outbox will already be full on every other call to
tests the edge case:
process() may be called even when the outbox is
full, giving the processor a chance to process the inbox without
The test will also assert that the processor doesn't spend more time in
any callback than the limit specified in
Cases Not Covered
This class does not cover these cases:
- testing of processors which distinguish input or output edges by ordinal
- checking that the state of a stateful processor is empty at the end (you can do that yourself afterwards with the last instance returned from your supplier)
- it never calls
This will test one of the jet-provided processors:
TestSupport.verifyProcessor(Processors.map((String s) -> s.toUpperCase())) .disableCompleteCall() // enabled by default .disableLogging() // enabled by default .disableProgressAssertion() // enabled by default .disableSnapshots() // enabled by default .cooperativeTimeout(<timeoutInMs>) // default is 1000 .outputChecker(<function>) // default is `Objects::equal` .input(asList("foo", "bar")) // default is `emptyList()` .expectOutput(asList("FOO", "BAR"));
Other Utility Classes
com.hazelcast.jet.test contains these classes that you can use as
implementations of Jet interfaces in tests:
JetAssert contains a few of the
assertX() methods normally
found in JUnit's
Assert class. We had to reimplement them to avoid a
dependency on JUnit from our production code.