Class TestSupport
The test process does the following:
- initializes the processor by calling Processor.init(com.hazelcast.jet.core.Outbox, com.hazelcast.jet.core.Processor.Context)
- does snapshot or snapshot+restore (optional, see below)
- calls Processor.process(int, com.hazelcast.jet.core.Inbox), in two scenarios:- the inbox contains one input item
- the inbox contains all input items (if snapshots are not restored)
 
- every time the inbox gets empty it does snapshot or snapshot+restore
- optionally calls Processor.complete()until it returnstrueor until the output matches (for streaming sources)
- does snapshot or snapshot+restore each time the complete()method returnedfalseand made a progress
 The init() and close() methods of ProcessorSupplier and ProcessorMetaSupplier are called if you call
 the verifyProcessor(com.hazelcast.function.SupplierEx<com.hazelcast.jet.core.Processor>) using one of these.
 
Snapshot & restore
Theoptional snapshot+restore test procedure:
 - saveToSnapshot()is called. If we are not doing restore, this is the last step.
- new processor instance is created, from now on only this instance will be used
- snapshot is restored using restoreFromSnapshot()
- finishSnapshotRestore()is called
Watermark handling
The input can containWatermarks. They will be passed to the
 Processor.tryProcessWatermark(com.hazelcast.jet.core.Watermark) method.
 Progress assertion
For each call to any processing method the progress is asserted (optional). The processor must do at least one
 of these:
 - take something from inbox
- put something to outbox
- for boolean-returning methods, returning trueis considered as making progress
Outbox rejection
A 1-capacity outbox will be provided, which will additionally be full in every other call toprocess(). This will test the edge case: the
 process() method is called even when the outbox is full to give
 the processor a chance to process the inbox. The snapshot bucket will
 also have capacity of 1.
 Cooperative processors
For cooperative processors, time spent in each call to processing method must not exceedcooperativeTimeout(long).
 Non-covered cases
This class does not cover these cases:- Checking that the state of a stateful processor is empty at the end (you can do that yourself afterwards with the last instance returned from your supplier).
- This utility never calls Processor.tryProcess().
Example usage
This will test one of the jet-provided processors:
 TestSupport.verifyProcessor(Processors.map((String s) -> s.toUpperCase(Locale.ROOT)))
            .disableCompleteCall()             // enabled by default
            .disableLogging()                  // enabled by default
            .disableProgressAssertion()        // enabled by default
            .disableSnapshots()                // enabled by default
            .cooperativeTimeout(<timeoutInMs>) // default is 1000
            .outputChecker(<function>)         // default is `Objects::equal`
            .input(asList("foo", "bar"))       // default is `emptyList()`
            .expectOutput(asList("FOO", "BAR"));
 - Since:
- Jet 3.0
- 
Nested Class SummaryNested ClassesModifier and TypeClassDescriptionstatic final classstatic interfacestatic final classDescribes the current test mode.
- 
Field SummaryFieldsModifier and TypeFieldDescriptionstatic final BiPredicate<List<?>,List<?>> An output checker that will claim actual and expected object lists as equal if they both contain the same items, in any order.static final ThreadLocal<TestSupport.TestContext>A context various methods can probe to find context information about the current test.
- 
Method SummaryModifier and TypeMethodDescriptionvoidassertOutput(int outputOrdinalCount, BiConsumer<TestSupport.TestMode, List<List<Object>>> assertFn) Runs the test with the specified custom assertion.cooperativeTimeout(long timeout) Iftimeout > 0, the test will fail if any call to processing method in a cooperative processor exceeds this timeout.Disables callingcomplete()method during the test.Disables logging of input and output objects.Disables checking of progress of processing methods (seeclass javadocfor information on what is "progress").Disable snapshot save and restore before first item and after eachprocess()andcomplete()call.executeBeforeEachRun(Runnable runnable) Action to execute before each test scenario.voidexpectExactOutput(TestSupport.TestEvent... testEvents) Runs the test and expects an exact sequence of input and output items.voidexpectOutput(List<?> expectedOutput) Sets the expected output and runs the test.voidexpectOutputs(List<List<?>> expectedOutputs) Specifies the expected outputs and runs the test.globalProcessorIndex(int globalProcessorIndex) Sets the globalProcessorIndex for the ProcessorhazelcastInstance(HazelcastInstance hazelcastInstance) Use the given instance forProcessorMetaSupplier.Context.hazelcastInstance()static TestSupport.TestEventCreate a test event with an input item on the given ordinal.static TestSupport.TestEventCreate a test event with an input item.Sets the input objects for processor.Sets the input objects for the processor on multiple input ordinals.Sets the input objects for the processor on multiple input ordinals.Use the given instance forProcessorMetaSupplier.Context.jobConfig()localParallelism(int localParallelism) Sets the localParallelism for the ProcessorlocalProcessorIndex(int localProcessorIndex) Sets the localProcessorIndex for the Processorstatic TestSupport.TestEventCreate a test event with an output item on the given ordinal.static TestSupport.TestEventCreate a test event with an output item.outputChecker(BiPredicate<? super List<?>, ? super List<?>> outputChecker) Predicate to compare expected and actual output.static <T extends Processor>
 TestSupport.TestEventprocessorAssertion(Consumer<T> assertion) Create a test event with a processor assertion.runUntilOutputMatches(long timeoutMillis, long extraTimeMillis) Normally, thecomplete()method is run repeatedly until it returnstrue.supplierFrom(ProcessorMetaSupplier supplier) Wraps the providedProcessorMetaSupplierwith aSupplier<Processor>that returns processors obtained from it.supplierFrom(ProcessorMetaSupplier supplier, ProcessorSupplier.Context context) Wraps the providedProcessorMetaSupplierwith aSupplier<Processor>that returns processors obtained from it.supplierFrom(ProcessorSupplier supplier) Wraps the providedProcessorSupplierwith aSupplier<Processor>that returns processors obtained from it.supplierFrom(ProcessorSupplier supplier, ProcessorSupplier.Context context) Wraps the providedProcessorSupplierwith aSupplier<Processor>that returns processors obtained from it.totalParallelism(int totalParallelism) Sets the totalParallelism for the Processorstatic TestSupportverifyProcessor(SupplierEx<Processor> supplier) static TestSupportverifyProcessor(ProcessorMetaSupplier supplier) static TestSupportverifyProcessor(ProcessorSupplier supplier) 
- 
Field Details- 
SAME_ITEMS_ANY_ORDERAn output checker that will claim actual and expected object lists as equal if they both contain the same items, in any order. If some item is expected multiple times, it must also be present the same number of times in the actual output.Use as an argument for outputChecker(BiPredicate).
- 
TEST_CONTEXTA context various methods can probe to find context information about the current test. For example, theoutput checkercan check differently in various modes.
 
- 
- 
Method Details- 
verifyProcessor- Parameters:
- supplier- a processor supplier create processor instances
 
- 
verifyProcessor- Parameters:
- supplier- a processor supplier create processor instances
 
- 
verifyProcessor- Parameters:
- supplier- a processor supplier create processor instances
 
- 
inputSets the input objects for processor.The inputcan containWatermarks; they will be delivered to theProcessor.tryProcessWatermark(com.hazelcast.jet.core.Watermark)method.Defaults to empty list. - Returns:
- thisinstance for fluent API
 
- 
inputsSets the input objects for the processor on multiple input ordinals. Items will be passed to the processor in round-robin fashion: item0 from input0, item0 from input1, item1 from input0 etc.See also: - input(List)- if you have just one input ordinal
- inputs(List, int[])- if you want to specify input priorities
 - Parameters:
- inputs- one list of input items for each input edge
- Returns:
- thisinstance for fluent API
 
- 
inputsSets the input objects for the processor on multiple input ordinals. Items will be passed to the processor according to priority: lower is higher. If two inputs have the same priority, they will be passed in round-robin fashion.See also: - input(List)- if you have just one input ordinal
- inputs(List)- if all inputs are of equal priority
 - Parameters:
- inputs- one list of input items for each input edge
- Returns:
- thisinstance for fluent API
 
- 
expectOutputSets the expected output and runs the test.The expectedOutputcan containWatermarks. Each Watermark in the input will be found in the output, as well as other watermarks the processor emits.- Throws:
- AssertionError- If some assertion does not hold.
 
- 
expectOutputsSpecifies the expected outputs and runs the test.The expectedOutputcan containWatermarks to assert the watermarks emitted by the processor.- Parameters:
- expectedOutputs- one list for each output ordinal
- Throws:
- AssertionError- if some assertion does not hold
 
- 
expectExactOutputRuns the test and expects an exact sequence of input and output items. The output must occur in the expected order given by thetestEventsparameter, that is particular output items must occur after particular input items. If the output happens at other time, the test fails.Additionally, the testEventscan contain processor assertions that give the test a chance to assert the internal processor state. For example, the test can assert that the internal processor buffers contain or don't contain particular data, or that the internal watermark is at a certain value etc. Processor assertions must not be immediately followed by output items, they must occur before an input item, or at the end of test events - this is an implementation restriction and might be lifted in the future; the test will fail in this case.To create `ItemWithOrdinal` instances, use the in(java.lang.Object)andout(java.lang.Object)static factory methods.The output after the last input item is asserted after the `complete()` method calls, not immediately after processing of the last input item. The number of input and output edges of the processor will be equal to the maximum ordinal found for input and output, plus one. If there's no input or output item, the processor will have zero input or output ordinals. Use a dummy nullitem if you want to increase the number of ordinals in that case, this item will be ignored, except for using its ordinal.- Parameters:
- testEvents- a sequence of input items, output items and processor assertions
- See Also:
 
- 
assertOutputpublic void assertOutput(int outputOrdinalCount, BiConsumer<TestSupport.TestMode, List<List<Object>>> assertFn) Runs the test with the specified custom assertion.The consumer takes a list of collected output and the current test mode which can be used in the assertion message. - Parameters:
- outputOrdinalCount- how many output ordinals should be created
- assertFn- an assertion function which takes the current mode and the collected output
 
- 
disableProgressAssertionDisables checking of progress of processing methods (seeclass javadocfor information on what is "progress").- Returns:
- thisinstance for fluent API
 
- 
runUntilOutputMatchesNormally, thecomplete()method is run repeatedly until it returnstrue. This is suitable for processors processing the input or for batch sources. However, if you test a streaming source, thecomplete()method never returnstrue. To be able to test such processors, this method allows you to change the behavior to runcomplete()until the output matches.The extraTimeMillisparameter specifies an extra time to callcomplete()after the output matches. It can be used to ensure that no more items are produced after the output matches.Has no effect if calling complete()is disabled.- Parameters:
- timeoutMillis- maximum time to wait for the output to match
- extraTimeMillis- for how long to call- complete()after the output matches
- Returns:
- thisinstance for fluent API
 
- 
disableSnapshotsDisable snapshot save and restore before first item and after eachprocess()andcomplete()call.- Returns:
- thisinstance for fluent API
 
- 
disableLoggingDisables logging of input and output objects. Normally they are logged as they are processed to standard output.- Returns:
- thisinstance for fluent API
 
- 
disableCompleteCallDisables callingcomplete()method during the test. Suitable for testing of streaming processors to make sure that the flushing code incomplete()method is not executed.- Returns:
- thisinstance for fluent API
 
- 
cooperativeTimeoutIftimeout > 0, the test will fail if any call to processing method in a cooperative processor exceeds this timeout. Has no effect for non-cooperative processors.Default value is COOPERATIVE_TIME_LIMIT_MS_FAILms. Useful to set to 0 during debugging.- Returns:
- thisinstance for fluent API
 
- 
localProcessorIndexSets the localProcessorIndex for the Processor- Parameters:
- localProcessorIndex- localProcessorIndex, defaults to 0
 
- 
globalProcessorIndexSets the globalProcessorIndex for the Processor- Parameters:
- globalProcessorIndex- globalProcessorIndex, default to 0
 
- 
localParallelismSets the localParallelism for the Processor- Parameters:
- localParallelism- localParallelism, defaults to 1
 
- 
totalParallelismSets the totalParallelism for the Processor- Parameters:
- totalParallelism- totalParallelism, defaults to 1
 
- 
outputCheckerpublic TestSupport outputChecker(@Nonnull BiPredicate<? super List<?>, ? super List<?>> outputChecker) Predicate to compare expected and actual output. Parameters to theBiPredicateare the list of expected items and the list of actual processor output.Defaults to Objects::equals, which will pass, if both lists contain equal objects in the same order. If the ordering doesn't matter, you can useSAME_ITEMS_ANY_ORDER.- Returns:
- thisinstance for fluent API
 
- 
hazelcastInstanceUse the given instance forProcessorMetaSupplier.Context.hazelcastInstance()- Returns:
- thisinstance for fluent API
 
- 
jobConfigUse the given instance forProcessorMetaSupplier.Context.jobConfig()- Returns:
- thisinstance for fluent API
 
- 
executeBeforeEachRunAction to execute before each test scenario.- Returns:
- thisinstance for fluent API
 
- 
supplierFromWraps the providedProcessorSupplierwith aSupplier<Processor>that returns processors obtained from it.
- 
supplierFrompublic static Supplier<Processor> supplierFrom(ProcessorSupplier supplier, ProcessorSupplier.Context context) Wraps the providedProcessorSupplierwith aSupplier<Processor>that returns processors obtained from it.
- 
supplierFromWraps the providedProcessorMetaSupplierwith aSupplier<Processor>that returns processors obtained from it.
- 
supplierFrompublic static Supplier<Processor> supplierFrom(ProcessorMetaSupplier supplier, ProcessorSupplier.Context context) Wraps the providedProcessorMetaSupplierwith aSupplier<Processor>that returns processors obtained from it.
- 
inCreate a test event with an input item. Equivalent toin(0, item).- Returns:
- the new test event
- See Also:
 
- 
inCreate a test event with an input item on the given ordinal.- Returns:
- the new test event
- See Also:
 
- 
outCreate a test event with an output item. Equivalent toout(0, item).- Returns:
- the new test event
- See Also:
 
- 
outCreate a test event with an output item on the given ordinal.- Returns:
- the new test event
- See Also:
 
- 
processorAssertionCreate a test event with a processor assertion. The assertion must not be followed by an output item event.- Returns:
- the new test event
- See Also:
 
 
-