Class TestSupport
- java.lang.Object
-
- com.hazelcast.jet.core.test.TestSupport
-
public final class TestSupport extends java.lang.Object
A utility to test processors. It will initialize the processor instance, pass input items to it and assert the outbox contents.The test process does the following:
- initializes the processor by calling
Processor.init(com.hazelcast.jet.core.Outbox, com.hazelcast.jet.core.Processor.Context)
- does snapshot or snapshot+restore (optional, see below)
- calls
Processor.process(int, com.hazelcast.jet.core.Inbox)
, in two scenarios:- the inbox contains one input item
- the inbox contains all input items (if snapshots are not restored)
- every time the inbox gets empty it does snapshot or snapshot+restore
- optionally calls
Processor.complete()
until it returnstrue
or until the output matches (for streaming sources) - does snapshot or snapshot+restore each time the
complete()
method returnedfalse
and made a progress
The
init()
andclose()
methods ofProcessorSupplier
andProcessorMetaSupplier
are called if you call theverifyProcessor(com.hazelcast.function.SupplierEx<com.hazelcast.jet.core.Processor>)
using one of these.Snapshot & restore
Theoptional
snapshot+restore test procedure:saveToSnapshot()
is called. If we are not doing restore, this is the last step.- new processor instance is created, from now on only this instance will be used
- snapshot is restored using
restoreFromSnapshot()
finishSnapshotRestore()
is called
Watermark handling
The input can containWatermark
s. They will be passed to theProcessor.tryProcessWatermark(com.hazelcast.jet.core.Watermark)
method.Progress assertion
For each call to any processing method the progress is asserted (optional
). The processor must do at least one of these:- take something from inbox
- put something to outbox
- for boolean-returning methods, returning
true
is considered as making progress
Outbox rejection
A 1-capacity outbox will be provided, which will additionally be full in every other call toprocess()
. This will test the edge case: theprocess()
method is called even when the outbox is full to give the processor a chance to process the inbox. The snapshot bucket will also have capacity of 1.Cooperative processors
For cooperative processors, time spent in each call to processing method must not exceedcooperativeTimeout(long)
.Non-covered cases
This class does not cover these cases:- Checking that the state of a stateful processor is empty at the end (you can do that yourself afterwards with the last instance returned from your supplier).
- This utility never calls
Processor.tryProcess()
.
Example usage
This will test one of the jet-provided processors:TestSupport.verifyProcessor(Processors.map((String s) -> s.toUpperCase(Locale.ROOT))) .disableCompleteCall() // enabled by default .disableLogging() // enabled by default .disableProgressAssertion() // enabled by default .disableSnapshots() // enabled by default .cooperativeTimeout(<timeoutInMs>) // default is 1000 .outputChecker(<function>) // default is `Objects::equal` .input(asList("foo", "bar")) // default is `emptyList()` .expectOutput(asList("FOO", "BAR"));
- Since:
- Jet 3.0
- initializes the processor by calling
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
TestSupport.TestContext
static interface
TestSupport.TestEvent
static class
TestSupport.TestMode
Describes the current test mode.
-
Field Summary
Fields Modifier and Type Field Description static java.util.function.BiPredicate<java.util.List<?>,java.util.List<?>>
SAME_ITEMS_ANY_ORDER
An output checker that will claim actual and expected object lists as equal if they both contain the same items, in any order.static java.lang.ThreadLocal<TestSupport.TestContext>
TEST_CONTEXT
A context various methods can probe to find context information about the current test.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
assertOutput(int outputOrdinalCount, java.util.function.BiConsumer<TestSupport.TestMode,java.util.List<java.util.List<java.lang.Object>>> assertFn)
Runs the test with the specified custom assertion.TestSupport
cooperativeTimeout(long timeout)
Iftimeout > 0
, the test will fail if any call to processing method in a cooperative processor exceeds this timeout.TestSupport
disableCompleteCall()
Disables callingcomplete()
method during the test.TestSupport
disableLogging()
Disables logging of input and output objects.TestSupport
disableProgressAssertion()
Disables checking of progress of processing methods (seeclass javadoc
for information on what is "progress").TestSupport
disableSnapshots()
Disable snapshot save and restore before first item and after eachprocess()
andcomplete()
call.TestSupport
executeBeforeEachRun(java.lang.Runnable runnable)
Action to execute before each test scenario.void
expectExactOutput(TestSupport.TestEvent... testEvents)
Runs the test and expects an exact sequence of input and output items.void
expectOutput(java.util.List<?> expectedOutput)
Sets the expected output and runs the test.void
expectOutputs(java.util.List<java.util.List<?>> expectedOutputs)
Specifies the expected outputs and runs the test.TestSupport
globalProcessorIndex(int globalProcessorIndex)
Sets the globalProcessorIndex for the ProcessorTestSupport
hazelcastInstance(HazelcastInstance hazelcastInstance)
Use the given instance forProcessorMetaSupplier.Context.hazelcastInstance()
static TestSupport.TestEvent
in(int ordinal, java.lang.Object item)
Create a test event with an input item on the given ordinal.static TestSupport.TestEvent
in(java.lang.Object item)
Create a test event with an input item.TestSupport
input(java.util.List<?> input)
Sets the input objects for processor.TestSupport
inputs(java.util.List<java.util.List<?>> inputs)
Sets the input objects for the processor on multiple input ordinals.TestSupport
inputs(java.util.List<java.util.List<?>> inputs, int[] priorities)
Sets the input objects for the processor on multiple input ordinals.TestSupport
jobConfig(JobConfig jobConfig)
Use the given instance forProcessorMetaSupplier.Context.jobConfig()
TestSupport
localParallelism(int localParallelism)
Sets the localParallelism for the ProcessorTestSupport
localProcessorIndex(int localProcessorIndex)
Sets the localProcessorIndex for the Processorstatic TestSupport.TestEvent
out(int ordinal, java.lang.Object item)
Create a test event with an output item on the given ordinal.static TestSupport.TestEvent
out(java.lang.Object item)
Create a test event with an output item.TestSupport
outputChecker(java.util.function.BiPredicate<? super java.util.List<?>,? super java.util.List<?>> outputChecker)
Predicate to compare expected and actual output.static <T extends Processor>
TestSupport.TestEventprocessorAssertion(java.util.function.Consumer<T> assertion)
Create a test event with a processor assertion.TestSupport
runUntilOutputMatches(long timeoutMillis, long extraTimeMillis)
Normally, thecomplete()
method is run repeatedly until it returnstrue
.static java.util.function.Supplier<Processor>
supplierFrom(ProcessorMetaSupplier supplier)
Wraps the providedProcessorMetaSupplier
with aSupplier<Processor>
that returns processors obtained from it.static java.util.function.Supplier<Processor>
supplierFrom(ProcessorMetaSupplier supplier, ProcessorSupplier.Context context)
Wraps the providedProcessorMetaSupplier
with aSupplier<Processor>
that returns processors obtained from it.static java.util.function.Supplier<Processor>
supplierFrom(ProcessorSupplier supplier)
Wraps the providedProcessorSupplier
with aSupplier<Processor>
that returns processors obtained from it.static java.util.function.Supplier<Processor>
supplierFrom(ProcessorSupplier supplier, ProcessorSupplier.Context context)
Wraps the providedProcessorSupplier
with aSupplier<Processor>
that returns processors obtained from it.TestSupport
totalParallelism(int totalParallelism)
Sets the totalParallelism for the Processorstatic TestSupport
verifyProcessor(SupplierEx<Processor> supplier)
static TestSupport
verifyProcessor(ProcessorMetaSupplier supplier)
static TestSupport
verifyProcessor(ProcessorSupplier supplier)
-
-
-
Field Detail
-
SAME_ITEMS_ANY_ORDER
public static final java.util.function.BiPredicate<java.util.List<?>,java.util.List<?>> SAME_ITEMS_ANY_ORDER
An output checker that will claim actual and expected object lists as equal if they both contain the same items, in any order. If some item is expected multiple times, it must also be present the same number of times in the actual output.Use as an argument for
outputChecker(BiPredicate)
.
-
TEST_CONTEXT
public static final java.lang.ThreadLocal<TestSupport.TestContext> TEST_CONTEXT
A context various methods can probe to find context information about the current test. For example, theoutput checker
can check differently in various modes.
-
-
Method Detail
-
verifyProcessor
public static TestSupport verifyProcessor(@Nonnull SupplierEx<Processor> supplier)
- Parameters:
supplier
- a processor supplier create processor instances
-
verifyProcessor
public static TestSupport verifyProcessor(@Nonnull ProcessorSupplier supplier)
- Parameters:
supplier
- a processor supplier create processor instances
-
verifyProcessor
public static TestSupport verifyProcessor(@Nonnull ProcessorMetaSupplier supplier)
- Parameters:
supplier
- a processor supplier create processor instances
-
input
public TestSupport input(@Nonnull java.util.List<?> input)
Sets the input objects for processor.The
input
can containWatermark
s; they will be delivered to theProcessor.tryProcessWatermark(com.hazelcast.jet.core.Watermark)
method.Defaults to empty list.
- Returns:
this
instance for fluent API
-
inputs
public TestSupport inputs(@Nonnull java.util.List<java.util.List<?>> inputs)
Sets the input objects for the processor on multiple input ordinals. Items will be passed to the processor in round-robin fashion: item0 from input0, item0 from input1, item1 from input0 etc.See also:
input(List)
- if you have just one input ordinalinputs(List, int[])
- if you want to specify input priorities
- Parameters:
inputs
- one list of input items for each input edge- Returns:
this
instance for fluent API
-
inputs
public TestSupport inputs(@Nonnull java.util.List<java.util.List<?>> inputs, int[] priorities)
Sets the input objects for the processor on multiple input ordinals. Items will be passed to the processor according to priority: lower is higher. If two inputs have the same priority, they will be passed in round-robin fashion.See also:
input(List)
- if you have just one input ordinalinputs(List)
- if all inputs are of equal priority
- Parameters:
inputs
- one list of input items for each input edge- Returns:
this
instance for fluent API
-
expectOutput
public void expectOutput(@Nonnull java.util.List<?> expectedOutput)
Sets the expected output and runs the test.The
expectedOutput
can containWatermark
s. Each Watermark in the input will be found in the output, as well as other watermarks the processor emits.- Throws:
java.lang.AssertionError
- If some assertion does not hold.
-
expectOutputs
public void expectOutputs(@Nonnull java.util.List<java.util.List<?>> expectedOutputs)
Specifies the expected outputs and runs the test.The
expectedOutput
can containWatermark
s to assert the watermarks emitted by the processor.- Parameters:
expectedOutputs
- one list for each output ordinal- Throws:
java.lang.AssertionError
- if some assertion does not hold
-
expectExactOutput
public void expectExactOutput(TestSupport.TestEvent... testEvents)
Runs the test and expects an exact sequence of input and output items. The output must occur in the expected order given by thetestEvents
parameter, that is particular output items must occur after particular input items. If the output happens at other time, the test fails.Additionally, the
testEvents
can contain processor assertions that give the test a chance to assert the internal processor state. For example, the test can assert that the internal processor buffers contain or don't contain particular data, or that the internal watermark is at a certain value etc. Processor assertions must not be immediately followed by output items, they must occur before an input item, or at the end of test events - this is an implementation restriction and might be lifted in the future; the test will fail in this case.To create `ItemWithOrdinal` instances, use the
in(java.lang.Object)
andout(java.lang.Object)
static factory methods.The output after the last input item is asserted after the `complete()` method calls, not immediately after processing of the last input item.
The number of input and output edges of the processor will be equal to the maximum ordinal found for input and output, plus one. If there's no input or output item, the processor will have zero input or output ordinals. Use a dummy
null
item if you want to increase the number of ordinals in that case, this item will be ignored, except for using its ordinal.- Parameters:
testEvents
- a sequence of input items, output items and processor assertions- See Also:
in(Object)
,in(int, Object)
,out(Object)
,out(int, Object)
,processorAssertion(Consumer)
-
assertOutput
public void assertOutput(int outputOrdinalCount, java.util.function.BiConsumer<TestSupport.TestMode,java.util.List<java.util.List<java.lang.Object>>> assertFn)
Runs the test with the specified custom assertion.The consumer takes a list of collected output and the current test mode which can be used in the assertion message.
- Parameters:
outputOrdinalCount
- how many output ordinals should be createdassertFn
- an assertion function which takes the current mode and the collected output
-
disableProgressAssertion
public TestSupport disableProgressAssertion()
Disables checking of progress of processing methods (seeclass javadoc
for information on what is "progress").- Returns:
this
instance for fluent API
-
runUntilOutputMatches
public TestSupport runUntilOutputMatches(long timeoutMillis, long extraTimeMillis)
Normally, thecomplete()
method is run repeatedly until it returnstrue
. This is suitable for processors processing the input or for batch sources. However, if you test a streaming source, thecomplete()
method never returnstrue
. To be able to test such processors, this method allows you to change the behavior to runcomplete()
until the output matches.The
extraTimeMillis
parameter specifies an extra time to callcomplete()
after the output matches. It can be used to ensure that no more items are produced after the output matches.Has no effect if calling
complete()
is disabled.- Parameters:
timeoutMillis
- maximum time to wait for the output to matchextraTimeMillis
- for how long to callcomplete()
after the output matches- Returns:
this
instance for fluent API
-
disableSnapshots
public TestSupport disableSnapshots()
Disable snapshot save and restore before first item and after eachprocess()
andcomplete()
call.- Returns:
this
instance for fluent API
-
disableLogging
public TestSupport disableLogging()
Disables logging of input and output objects. Normally they are logged as they are processed to standard output.- Returns:
this
instance for fluent API
-
disableCompleteCall
public TestSupport disableCompleteCall()
Disables callingcomplete()
method during the test. Suitable for testing of streaming processors to make sure that the flushing code incomplete()
method is not executed.- Returns:
this
instance for fluent API
-
cooperativeTimeout
public TestSupport cooperativeTimeout(long timeout)
Iftimeout > 0
, the test will fail if any call to processing method in a cooperative processor exceeds this timeout. Has no effect for non-cooperative processors.Default value is
COOPERATIVE_TIME_LIMIT_MS_FAIL
ms. Useful to set to 0 during debugging.- Returns:
this
instance for fluent API
-
localProcessorIndex
public TestSupport localProcessorIndex(int localProcessorIndex)
Sets the localProcessorIndex for the Processor- Parameters:
localProcessorIndex
- localProcessorIndex, defaults to 0
-
globalProcessorIndex
public TestSupport globalProcessorIndex(int globalProcessorIndex)
Sets the globalProcessorIndex for the Processor- Parameters:
globalProcessorIndex
- globalProcessorIndex, default to 0
-
localParallelism
public TestSupport localParallelism(int localParallelism)
Sets the localParallelism for the Processor- Parameters:
localParallelism
- localParallelism, defaults to 1
-
totalParallelism
public TestSupport totalParallelism(int totalParallelism)
Sets the totalParallelism for the Processor- Parameters:
totalParallelism
- totalParallelism, defaults to 1
-
outputChecker
public TestSupport outputChecker(@Nonnull java.util.function.BiPredicate<? super java.util.List<?>,? super java.util.List<?>> outputChecker)
Predicate to compare expected and actual output. Parameters to theBiPredicate
are the list of expected items and the list of actual processor output.Defaults to
Objects::equals
, which will pass, if both lists contain equal objects in the same order. If the ordering doesn't matter, you can useSAME_ITEMS_ANY_ORDER
.- Returns:
this
instance for fluent API
-
hazelcastInstance
public TestSupport hazelcastInstance(@Nonnull HazelcastInstance hazelcastInstance)
Use the given instance forProcessorMetaSupplier.Context.hazelcastInstance()
- Returns:
this
instance for fluent API
-
jobConfig
public TestSupport jobConfig(JobConfig jobConfig)
Use the given instance forProcessorMetaSupplier.Context.jobConfig()
- Returns:
this
instance for fluent API
-
executeBeforeEachRun
public TestSupport executeBeforeEachRun(java.lang.Runnable runnable)
Action to execute before each test scenario.- Returns:
this
instance for fluent API
-
supplierFrom
public static java.util.function.Supplier<Processor> supplierFrom(ProcessorSupplier supplier)
Wraps the providedProcessorSupplier
with aSupplier<Processor>
that returns processors obtained from it.
-
supplierFrom
public static java.util.function.Supplier<Processor> supplierFrom(ProcessorSupplier supplier, ProcessorSupplier.Context context)
Wraps the providedProcessorSupplier
with aSupplier<Processor>
that returns processors obtained from it.
-
supplierFrom
public static java.util.function.Supplier<Processor> supplierFrom(ProcessorMetaSupplier supplier)
Wraps the providedProcessorMetaSupplier
with aSupplier<Processor>
that returns processors obtained from it.
-
supplierFrom
public static java.util.function.Supplier<Processor> supplierFrom(ProcessorMetaSupplier supplier, ProcessorSupplier.Context context)
Wraps the providedProcessorMetaSupplier
with aSupplier<Processor>
that returns processors obtained from it.
-
in
public static TestSupport.TestEvent in(java.lang.Object item)
Create a test event with an input item. Equivalent toin(0, item)
.- Returns:
- the new test event
- See Also:
expectExactOutput(TestEvent...)
-
in
public static TestSupport.TestEvent in(int ordinal, java.lang.Object item)
Create a test event with an input item on the given ordinal.- Returns:
- the new test event
- See Also:
expectExactOutput(TestEvent...)
-
out
public static TestSupport.TestEvent out(java.lang.Object item)
Create a test event with an output item. Equivalent toout(0, item)
.- Returns:
- the new test event
- See Also:
expectExactOutput(TestEvent...)
-
out
public static TestSupport.TestEvent out(int ordinal, java.lang.Object item)
Create a test event with an output item on the given ordinal.- Returns:
- the new test event
- See Also:
expectExactOutput(TestEvent...)
-
processorAssertion
public static <T extends Processor> TestSupport.TestEvent processorAssertion(java.util.function.Consumer<T> assertion)
Create a test event with a processor assertion. The assertion must not be followed by an output item event.- Returns:
- the new test event
- See Also:
expectExactOutput(TestEvent...)
-
-