public final class TestSupport extends Object
The test process does the following:
Processor.init(com.hazelcast.jet.core.Outbox, com.hazelcast.jet.core.Processor.Context)
Processor.process(int, com.hazelcast.jet.core.Inbox)
, in two scenarios:Processor.complete()
until it returns true
or until the
output matches (for streaming
sources)
complete()
method returned false
and made a progress
init()
and close()
methods of ProcessorSupplier
and ProcessorMetaSupplier
are called if you call
the verifyProcessor(com.hazelcast.function.SupplierEx<com.hazelcast.jet.core.Processor>)
using one of these.
optional
snapshot+restore test procedure:
saveToSnapshot()
is called. If we are not doing restore, this
is the last step.
restoreFromSnapshot()
finishSnapshotRestore()
is called
Watermark
s. They will be passed to the
Processor.tryProcessWatermark(com.hazelcast.jet.core.Watermark)
method.
optional
). The processor must do at least one
of these:
true
is
considered as making progress
process()
. This will test the edge case: the
process()
method is called even when the outbox is full to give
the processor a chance to process the inbox. The snapshot bucket will
also have capacity of 1.
cooperativeTimeout(long)
.
Processor.tryProcess()
.
TestSupport.verifyProcessor(Processors.map((String s) -> s.toUpperCase()))
.disableCompleteCall() // enabled by default
.disableLogging() // enabled by default
.disableProgressAssertion() // enabled by default
.disableSnapshots() // enabled by default
.cooperativeTimeout(<timeoutInMs>) // default is 1000
.outputChecker(<function>) // default is `Objects::equal`
.input(asList("foo", "bar")) // default is `emptyList()`
.expectOutput(asList("FOO", "BAR"));
Modifier and Type | Class and Description |
---|---|
static class |
TestSupport.TestMode
Describes the current test mode.
|
Modifier and Type | Field and Description |
---|---|
static BiPredicate<List<?>,List<?>> |
SAME_ITEMS_ANY_ORDER
An output checker that will claim actual and expected object lists as
equal if they both contain the same items, in any order.
|
Modifier and Type | Method and Description |
---|---|
void |
assertOutput(int outputOrdinalCount,
BiConsumer<TestSupport.TestMode,List<List<Object>>> assertFn)
Runs the test with the specified custom assertion.
|
TestSupport |
cooperativeTimeout(long timeout)
If
timeout > 0 , the test will fail if any call to processing
method in a cooperative processor exceeds this timeout. |
TestSupport |
disableCompleteCall()
Disables calling
complete() method during the test. |
TestSupport |
disableLogging()
Disables logging of input and output objects.
|
TestSupport |
disableProgressAssertion()
Disables checking of progress of processing methods (see
class javadoc for information on what is "progress"). |
TestSupport |
disableSnapshots()
Disable snapshot save and restore before first item and after each
process() and complete() call. |
TestSupport |
executeBeforeEachRun(Runnable runnable)
Execute test before each test run
|
void |
expectOutput(List<?> expectedOutput)
Sets the expected output and runs the test.
|
void |
expectOutputs(List<List<?>> expectedOutputs)
Specifies the expected outputs and runs the test.
|
TestSupport |
globalProcessorIndex(int globalProcessorIndex)
Sets the globalProcessorIndex for the Processor
|
TestSupport |
input(List<?> input)
Sets the input objects for processor.
|
TestSupport |
inputs(List<List<?>> inputs)
Sets the input objects for the processor on multiple input ordinals.
|
TestSupport |
inputs(List<List<?>> inputs,
int[] priorities)
Sets the input objects for the processor on multiple input ordinals.
|
TestSupport |
jetInstance(JetInstance jetInstance)
Use the given instance for
ProcessorMetaSupplier.Context.jetInstance() |
TestSupport |
jobConfig(JobConfig jobConfig)
Use the given instance for
ProcessorMetaSupplier.Context.jobConfig() |
TestSupport |
localParallelism(int localParallelism)
Sets the localParallelism for the Processor
|
TestSupport |
localProcessorIndex(int localProcessorIndex)
Sets the localProcessorIndex for the Processor
|
TestSupport |
outputChecker(BiPredicate<? super List<?>,? super List<?>> outputChecker)
Predicate to compare expected and actual output.
|
TestSupport |
runUntilOutputMatches(long timeoutMillis,
long extraTimeMillis)
Normally, the
complete() method is run repeatedly until it
returns true . |
static Supplier<Processor> |
supplierFrom(ProcessorMetaSupplier supplier)
Wraps the provided
ProcessorMetaSupplier with a Supplier<Processor> that returns processors obtained from it. |
static Supplier<Processor> |
supplierFrom(ProcessorMetaSupplier supplier,
ProcessorSupplier.Context context)
Wraps the provided
ProcessorMetaSupplier with a Supplier<Processor> that returns processors obtained from it. |
static Supplier<Processor> |
supplierFrom(ProcessorSupplier supplier)
Wraps the provided
ProcessorSupplier with a Supplier<Processor> that returns processors obtained from it. |
static Supplier<Processor> |
supplierFrom(ProcessorSupplier supplier,
ProcessorSupplier.Context context)
Wraps the provided
ProcessorSupplier with a Supplier<Processor> that returns processors obtained from it. |
TestSupport |
totalParallelism(int totalParallelism)
Sets the totalParallelism for the Processor
|
static TestSupport |
verifyProcessor(ProcessorMetaSupplier supplier) |
static TestSupport |
verifyProcessor(ProcessorSupplier supplier) |
static TestSupport |
verifyProcessor(SupplierEx<Processor> supplier) |
public static final BiPredicate<List<?>,List<?>> SAME_ITEMS_ANY_ORDER
Use as an argument for outputChecker(BiPredicate)
.
public static TestSupport verifyProcessor(@Nonnull SupplierEx<Processor> supplier)
supplier
- a processor supplier create processor instancespublic static TestSupport verifyProcessor(@Nonnull ProcessorSupplier supplier)
supplier
- a processor supplier create processor instancespublic static TestSupport verifyProcessor(@Nonnull ProcessorMetaSupplier supplier)
supplier
- a processor supplier create processor instancespublic TestSupport input(@Nonnull List<?> input)
The input
can contain Watermark
s;
they will be delivered to the Processor.tryProcessWatermark(com.hazelcast.jet.core.Watermark)
method.
Defaults to empty list.
this
instance for fluent APIpublic TestSupport inputs(@Nonnull List<List<?>> inputs)
See also:
input(List)
- if you have just one input ordinal
inputs(List, int[])
- if you want to specify input
priorities
inputs
- one list of input items for each input edgethis
instance for fluent APIpublic TestSupport inputs(@Nonnull List<List<?>> inputs, int[] priorities)
See also:
input(List)
- if you have just one input ordinal
inputs(List)
- if all inputs are of equal priority
inputs
- one list of input items for each input edgethis
instance for fluent APIpublic void expectOutput(@Nonnull List<?> expectedOutput)
The expectedOutput
can contain Watermark
s. Each Watermark in the input will be
found in the output, as well as other watermarks the processor emits.
AssertionError
- If some assertion does not hold.public void expectOutputs(@Nonnull List<List<?>> expectedOutputs)
The expectedOutput
can contain Watermark
s to assert the
watermarks emitted by the processor.
expectedOutputs
- one list for each output ordinalAssertionError
- if some assertion does not holdpublic void assertOutput(int outputOrdinalCount, BiConsumer<TestSupport.TestMode,List<List<Object>>> assertFn)
The consumer takes a list of collected output and the current test mode which can be used in the assertion message.
outputOrdinalCount
- how many output ordinals should be createdassertFn
- an assertion function which takes the current mode and the collected outputpublic TestSupport disableProgressAssertion()
class javadoc
for information on what is "progress").this
instance for fluent APIpublic TestSupport runUntilOutputMatches(long timeoutMillis, long extraTimeMillis)
complete()
method is run repeatedly until it
returns true
. This is suitable for processors processing the
input or for batch sources. However, if you test a streaming source, the
complete()
method never returns true
. To be able to test
such processors, this method allows you to change the behavior to run
complete()
until the output matches.
The extraTimeMillis
parameter specifies an extra time to call
complete()
after the output matches. It can be used to ensure
that no more items are produced after the output matches.
Has no effect if calling complete()
is disabled.
timeoutMillis
- maximum time to wait for the output to matchextraTimeMillis
- for how long to call complete()
after the output matchesthis
instance for fluent APIpublic TestSupport disableSnapshots()
process()
and complete()
call.this
instance for fluent APIpublic TestSupport disableLogging()
this
instance for fluent APIpublic TestSupport disableCompleteCall()
complete()
method during the test. Suitable for
testing of streaming processors to make sure that the flushing code in
complete()
method is not executed.this
instance for fluent APIpublic TestSupport cooperativeTimeout(long timeout)
timeout > 0
, the test will fail if any call to processing
method in a cooperative processor exceeds this timeout. Has no effect
for non-cooperative processors.
Default value is COOPERATIVE_TIME_LIMIT_MS_FAIL
ms. Useful to
set to 0 during debugging.
this
instance for fluent APIpublic TestSupport localProcessorIndex(int localProcessorIndex)
localProcessorIndex
- localProcessorIndex, defaults to 0public TestSupport globalProcessorIndex(int globalProcessorIndex)
globalProcessorIndex
- globalProcessorIndex, default to 0public TestSupport localParallelism(int localParallelism)
localParallelism
- localParallelism, defaults to 1public TestSupport totalParallelism(int totalParallelism)
totalParallelism
- totalParallelism, defaults to 1public TestSupport outputChecker(@Nonnull BiPredicate<? super List<?>,? super List<?>> outputChecker)
BiPredicate
are the list of expected items and the list of actual
processor output.
Defaults to Objects::equals
, which will pass, if both lists
contain equal objects in the same order. If the ordering doesn't matter,
you can use SAME_ITEMS_ANY_ORDER
.
this
instance for fluent APIpublic TestSupport jetInstance(@Nonnull JetInstance jetInstance)
ProcessorMetaSupplier.Context.jetInstance()
this
instance for fluent APIpublic TestSupport jobConfig(JobConfig jobConfig)
ProcessorMetaSupplier.Context.jobConfig()
this
instance for fluent APIpublic TestSupport executeBeforeEachRun(Runnable runnable)
runnable
- runnable to be executed before each test runthis
instance for fluent APIpublic static Supplier<Processor> supplierFrom(ProcessorSupplier supplier)
ProcessorSupplier
with a Supplier<Processor>
that returns processors obtained from it.public static Supplier<Processor> supplierFrom(ProcessorSupplier supplier, ProcessorSupplier.Context context)
ProcessorSupplier
with a Supplier<Processor>
that returns processors obtained from it.public static Supplier<Processor> supplierFrom(ProcessorMetaSupplier supplier)
ProcessorMetaSupplier
with a Supplier<Processor>
that returns processors obtained from it.public static Supplier<Processor> supplierFrom(ProcessorMetaSupplier supplier, ProcessorSupplier.Context context)
ProcessorMetaSupplier
with a Supplier<Processor>
that returns processors obtained from it.Copyright © 2021 Hazelcast, Inc.. All rights reserved.