public final class TestSupport extends Object
The test process does the following:
Processor.init(com.hazelcast.jet.core.Outbox, com.hazelcast.jet.core.Processor.Context)
Processor.process(int, com.hazelcast.jet.core.Inbox)
, in two scenarios:Processor.complete()
until it returns true
or until the
output matches (for streaming
sources)
complete()
method returned false
and made a progress
The init()
and close()
methods of ProcessorSupplier
and ProcessorMetaSupplier
are called if you call
the verifyProcessor(com.hazelcast.function.SupplierEx<com.hazelcast.jet.core.Processor>)
using one of these.
optional
snapshot+restore test procedure:
saveToSnapshot()
is called. If we are not doing restore, this
is the last step.
restoreFromSnapshot()
finishSnapshotRestore()
is called
Watermark
s. They will be passed to the
Processor.tryProcessWatermark(com.hazelcast.jet.core.Watermark)
method.
optional
). The processor must do at least one
of these:
true
is
considered as making progress
process()
. This will test the edge case: the
process()
method is called even when the outbox is full to give
the processor a chance to process the inbox. The snapshot bucket will
also have capacity of 1.
cooperativeTimeout(long)
.
Processor.tryProcess()
.
TestSupport.verifyProcessor(Processors.map((String s) -> s.toUpperCase(Locale.ROOT)))
.disableCompleteCall() // enabled by default
.disableLogging() // enabled by default
.disableProgressAssertion() // enabled by default
.disableSnapshots() // enabled by default
.cooperativeTimeout(<timeoutInMs>) // default is 1000
.outputChecker(<function>) // default is `Objects::equal`
.input(asList("foo", "bar")) // default is `emptyList()`
.expectOutput(asList("FOO", "BAR"));
Modifier and Type | Class and Description |
---|---|
static interface |
TestSupport.TestEvent |
static class |
TestSupport.TestMode
Describes the current test mode.
|
Modifier and Type | Field and Description |
---|---|
static BiPredicate<List<?>,List<?>> |
SAME_ITEMS_ANY_ORDER
An output checker that will claim actual and expected object lists as
equal if they both contain the same items, in any order.
|
Modifier and Type | Method and Description |
---|---|
void |
assertOutput(int outputOrdinalCount,
BiConsumer<TestSupport.TestMode,List<List<Object>>> assertFn)
Runs the test with the specified custom assertion.
|
TestSupport |
cooperativeTimeout(long timeout)
If
timeout > 0 , the test will fail if any call to processing
method in a cooperative processor exceeds this timeout. |
TestSupport |
disableCompleteCall()
Disables calling
complete() method during the test. |
TestSupport |
disableLogging()
Disables logging of input and output objects.
|
TestSupport |
disableProgressAssertion()
Disables checking of progress of processing methods (see
class javadoc for information on what is "progress"). |
TestSupport |
disableSnapshots()
Disable snapshot save and restore before first item and after each
process() and complete() call. |
TestSupport |
executeBeforeEachRun(Runnable runnable)
Action to execute before each test scenario.
|
void |
expectExactOutput(TestSupport.TestEvent... testEvents)
Runs the test and expects an exact sequence of input and output items.
|
void |
expectOutput(List<?> expectedOutput)
Sets the expected output and runs the test.
|
void |
expectOutputs(List<List<?>> expectedOutputs)
Specifies the expected outputs and runs the test.
|
TestSupport |
globalProcessorIndex(int globalProcessorIndex)
Sets the globalProcessorIndex for the Processor
|
TestSupport |
hazelcastInstance(HazelcastInstance hazelcastInstance)
Use the given instance for
ProcessorMetaSupplier.Context.hazelcastInstance() |
static TestSupport.TestEvent |
in(int ordinal,
Object item)
Create a test event with an input item on the given ordinal.
|
static TestSupport.TestEvent |
in(Object item)
Create a test event with an input item.
|
TestSupport |
input(List<?> input)
Sets the input objects for processor.
|
TestSupport |
inputs(List<List<?>> inputs)
Sets the input objects for the processor on multiple input ordinals.
|
TestSupport |
inputs(List<List<?>> inputs,
int[] priorities)
Sets the input objects for the processor on multiple input ordinals.
|
TestSupport |
jobConfig(JobConfig jobConfig)
Use the given instance for
ProcessorMetaSupplier.Context.jobConfig() |
TestSupport |
localParallelism(int localParallelism)
Sets the localParallelism for the Processor
|
TestSupport |
localProcessorIndex(int localProcessorIndex)
Sets the localProcessorIndex for the Processor
|
static TestSupport.TestEvent |
out(int ordinal,
Object item)
Create a test event with an output item on the given ordinal.
|
static TestSupport.TestEvent |
out(Object item)
Create a test event with an output item.
|
TestSupport |
outputChecker(BiPredicate<? super List<?>,? super List<?>> outputChecker)
Predicate to compare expected and actual output.
|
static <T extends Processor> |
processorAssertion(Consumer<T> assertion)
Create a test event with a processor assertion.
|
TestSupport |
runUntilOutputMatches(long timeoutMillis,
long extraTimeMillis)
Normally, the
complete() method is run repeatedly until it
returns true . |
static Supplier<Processor> |
supplierFrom(ProcessorMetaSupplier supplier)
Wraps the provided
ProcessorMetaSupplier with a Supplier<Processor> that returns processors obtained from it. |
static Supplier<Processor> |
supplierFrom(ProcessorMetaSupplier supplier,
ProcessorSupplier.Context context)
Wraps the provided
ProcessorMetaSupplier with a Supplier<Processor> that returns processors obtained from it. |
static Supplier<Processor> |
supplierFrom(ProcessorSupplier supplier)
Wraps the provided
ProcessorSupplier with a Supplier<Processor> that returns processors obtained from it. |
static Supplier<Processor> |
supplierFrom(ProcessorSupplier supplier,
ProcessorSupplier.Context context)
Wraps the provided
ProcessorSupplier with a Supplier<Processor> that returns processors obtained from it. |
TestSupport |
totalParallelism(int totalParallelism)
Sets the totalParallelism for the Processor
|
static TestSupport |
verifyProcessor(ProcessorMetaSupplier supplier) |
static TestSupport |
verifyProcessor(ProcessorSupplier supplier) |
static TestSupport |
verifyProcessor(SupplierEx<Processor> supplier) |
public static final BiPredicate<List<?>,List<?>> SAME_ITEMS_ANY_ORDER
Use as an argument for outputChecker(BiPredicate)
.
public static TestSupport verifyProcessor(@Nonnull SupplierEx<Processor> supplier)
supplier
- a processor supplier create processor instancespublic static TestSupport verifyProcessor(@Nonnull ProcessorSupplier supplier)
supplier
- a processor supplier create processor instancespublic static TestSupport verifyProcessor(@Nonnull ProcessorMetaSupplier supplier)
supplier
- a processor supplier create processor instancespublic TestSupport input(@Nonnull List<?> input)
The input
can contain Watermark
s;
they will be delivered to the Processor.tryProcessWatermark(com.hazelcast.jet.core.Watermark)
method.
Defaults to empty list.
this
instance for fluent APIpublic TestSupport inputs(@Nonnull List<List<?>> inputs)
See also:
input(List)
- if you have just one input ordinal
inputs(List, int[])
- if you want to specify input
priorities
inputs
- one list of input items for each input edgethis
instance for fluent APIpublic TestSupport inputs(@Nonnull List<List<?>> inputs, int[] priorities)
See also:
input(List)
- if you have just one input ordinal
inputs(List)
- if all inputs are of equal priority
inputs
- one list of input items for each input edgethis
instance for fluent APIpublic void expectOutput(@Nonnull List<?> expectedOutput)
The expectedOutput
can contain Watermark
s. Each Watermark in the input will be
found in the output, as well as other watermarks the processor emits.
AssertionError
- If some assertion does not hold.public void expectOutputs(@Nonnull List<List<?>> expectedOutputs)
The expectedOutput
can contain Watermark
s to assert the
watermarks emitted by the processor.
expectedOutputs
- one list for each output ordinalAssertionError
- if some assertion does not holdpublic void expectExactOutput(TestSupport.TestEvent... testEvents)
testEvents
parameter, that is particular output items must occur after
particular input items. If the output happens at other time, the test
fails.
Additionally, the testEvents
can contain processor assertions that give the test a
chance to assert the internal processor state. For example, the test can
assert that the internal processor buffers contain or don't contain
particular data, or that the internal watermark is at a certain value
etc. Processor assertions must not be immediately followed by output
items, they must occur before an input item, or at the end of test events
- this is an implementation restriction and might be lifted in the
future; the test will fail in this case.
To create `ItemWithOrdinal` instances, use the in(java.lang.Object)
and out(java.lang.Object)
static factory methods.
The output after the last input item is asserted after the `complete()` method calls, not immediately after processing of the last input item.
The number of input and output edges of the processor will be equal to
the maximum ordinal found for input and output, plus one. If there's no
input or output item, the processor will have zero input or output
ordinals. Use a dummy null
item if you want to increase the
number of ordinals in that case, this item will be ignored, except for
using its ordinal.
testEvents
- a sequence of input items, output items and
processor assertionsin(Object)
,
in(int, Object)
,
out(Object)
,
out(int, Object)
,
processorAssertion(Consumer)
public void assertOutput(int outputOrdinalCount, BiConsumer<TestSupport.TestMode,List<List<Object>>> assertFn)
The consumer takes a list of collected output and the current test mode which can be used in the assertion message.
outputOrdinalCount
- how many output ordinals should be createdassertFn
- an assertion function which takes the current mode and the collected outputpublic TestSupport disableProgressAssertion()
class javadoc
for information on what is "progress").this
instance for fluent APIpublic TestSupport runUntilOutputMatches(long timeoutMillis, long extraTimeMillis)
complete()
method is run repeatedly until it
returns true
. This is suitable for processors processing the
input or for batch sources. However, if you test a streaming source, the
complete()
method never returns true
. To be able to test
such processors, this method allows you to change the behavior to run
complete()
until the output matches.
The extraTimeMillis
parameter specifies an extra time to call
complete()
after the output matches. It can be used to ensure
that no more items are produced after the output matches.
Has no effect if calling complete()
is disabled.
timeoutMillis
- maximum time to wait for the output to matchextraTimeMillis
- for how long to call complete()
after the output matchesthis
instance for fluent APIpublic TestSupport disableSnapshots()
process()
and complete()
call.this
instance for fluent APIpublic TestSupport disableLogging()
this
instance for fluent APIpublic TestSupport disableCompleteCall()
complete()
method during the test. Suitable for
testing of streaming processors to make sure that the flushing code in
complete()
method is not executed.this
instance for fluent APIpublic TestSupport cooperativeTimeout(long timeout)
timeout > 0
, the test will fail if any call to processing
method in a cooperative processor exceeds this timeout. Has no effect
for non-cooperative processors.
Default value is COOPERATIVE_TIME_LIMIT_MS_FAIL
ms. Useful to
set to 0 during debugging.
this
instance for fluent APIpublic TestSupport localProcessorIndex(int localProcessorIndex)
localProcessorIndex
- localProcessorIndex, defaults to 0public TestSupport globalProcessorIndex(int globalProcessorIndex)
globalProcessorIndex
- globalProcessorIndex, default to 0public TestSupport localParallelism(int localParallelism)
localParallelism
- localParallelism, defaults to 1public TestSupport totalParallelism(int totalParallelism)
totalParallelism
- totalParallelism, defaults to 1public TestSupport outputChecker(@Nonnull BiPredicate<? super List<?>,? super List<?>> outputChecker)
BiPredicate
are the list of expected items and the list of actual
processor output.
Defaults to Objects::equals
, which will pass, if both lists
contain equal objects in the same order. If the ordering doesn't matter,
you can use SAME_ITEMS_ANY_ORDER
.
this
instance for fluent APIpublic TestSupport hazelcastInstance(@Nonnull HazelcastInstance hazelcastInstance)
ProcessorMetaSupplier.Context.hazelcastInstance()
this
instance for fluent APIpublic TestSupport jobConfig(JobConfig jobConfig)
ProcessorMetaSupplier.Context.jobConfig()
this
instance for fluent APIpublic TestSupport executeBeforeEachRun(Runnable runnable)
this
instance for fluent APIpublic static Supplier<Processor> supplierFrom(ProcessorSupplier supplier)
ProcessorSupplier
with a Supplier<Processor>
that returns processors obtained from it.public static Supplier<Processor> supplierFrom(ProcessorSupplier supplier, ProcessorSupplier.Context context)
ProcessorSupplier
with a Supplier<Processor>
that returns processors obtained from it.public static Supplier<Processor> supplierFrom(ProcessorMetaSupplier supplier)
ProcessorMetaSupplier
with a Supplier<Processor>
that returns processors obtained from it.public static Supplier<Processor> supplierFrom(ProcessorMetaSupplier supplier, ProcessorSupplier.Context context)
ProcessorMetaSupplier
with a Supplier<Processor>
that returns processors obtained from it.public static TestSupport.TestEvent in(Object item)
in(0, item)
.expectExactOutput(TestEvent...)
public static TestSupport.TestEvent in(int ordinal, Object item)
expectExactOutput(TestEvent...)
public static TestSupport.TestEvent out(Object item)
out(0, item)
.expectExactOutput(TestEvent...)
public static TestSupport.TestEvent out(int ordinal, Object item)
expectExactOutput(TestEvent...)
public static <T extends Processor> TestSupport.TestEvent processorAssertion(Consumer<T> assertion)
expectExactOutput(TestEvent...)
Copyright © 2024 Hazelcast, Inc.. All rights reserved.