public final class TestSupport extends Object
The test process does the following:
Processor.init(com.hazelcast.jet.core.Outbox, com.hazelcast.jet.core.Processor.Context)
Processor.process(int, com.hazelcast.jet.core.Inbox)
, in two scenarios:optionally
calls Processor.complete()
until it returns true
or calls it until specified timeout
elapses (for streaming sources)
complete()
method returned false
and made a progress
init()
and complete()
methods of ProcessorSupplier
and ProcessorMetaSupplier
are called if you call
the verifyProcessor(com.hazelcast.jet.function.DistributedSupplier<com.hazelcast.jet.core.Processor>)
using one of these.
optional
snapshot+restore test procedure:
saveToSnapshot()
is called. If we are not doing restore, this
is the last step.
restoreFromSnapshot()
finishSnapshotRestore()
is called
Watermark
s. They will be passed to the
Processor.tryProcessWatermark(com.hazelcast.jet.core.Watermark)
method and can be asserted in the
output.
optional
). The processor must do at least one
of these:
true
is
considered as making progress
process()
. This will test the edge case: the
process()
method is called even when the outbox is full to give
the processor a chance to process the inbox. The snapshot bucket will
also have capacity of 1.
cooperativeTimeout(long)
.
Processor.tryProcess()
.
TestSupport.verifyProcessor(Processors.map((String s) -> s.toUpperCase()))
.disableCompleteCall() // enabled by default
.disableLogging() // enabled by default
.disableProgressAssertion() // enabled by default
.disableSnapshots() // enabled by default
.cooperativeTimeout(<timeoutInMs>) // default is 1000
.outputChecker(<function>) // default is `Objects::equal`
.input(asList("foo", "bar")) // default is `emptyList()`
.expectOutput(asList("FOO", "BAR"));
Modifier and Type | Field and Description |
---|---|
static BiPredicate<List<?>,List<?>> |
SAME_ITEMS_ANY_ORDER
An output checker that will claim actual and expected object lists as
equal if they both contain the same items, in any order.
|
Modifier and Type | Method and Description |
---|---|
TestSupport |
cooperativeTimeout(long timeout)
If
timeout > 0 , the test will fail if any call to processing
method in a cooperative processor exceeds this timeout. |
TestSupport |
disableCompleteCall()
Disables calling
complete() method during the test. |
TestSupport |
disableLogging()
Disables logging of input and output objects.
|
TestSupport |
disableProgressAssertion()
Disables checking of progress of processing methods (see
class javadoc for information on what is "progress"). |
TestSupport |
disableRunUntilCompleted(long timeoutMillis)
Normally, the
complete() method is run repeatedly until it
returns true . |
TestSupport |
disableSnapshots()
Disable snapshot save and restore before first item and after each
process() and complete() call. |
void |
expectOutput(List<?> expectedOutput)
Sets the expected output and runs the test.
|
void |
expectOutputs(List<List<?>> expectedOutputs)
Sets the expected outputs and runs the test.
|
TestSupport |
input(List<?> input)
Sets the input objects for processor.
|
TestSupport |
inputs(List<List<?>> inputs)
Sets the input objects for the processor on multiple input ordinals.
|
TestSupport |
inputs(List<List<?>> inputs,
int[] priorities)
Sets the input objects for the processor on multiple input ordinals.
|
TestSupport |
jetInstance(JetInstance jetInstance)
Use the given instance for
ProcessorMetaSupplier.Context.jetInstance() |
static String |
listToString(List<?> list)
Converts a list to a string putting
toString() of each element
on separate line. |
TestSupport |
outputChecker(BiPredicate<? super List<?>,? super List<?>> outputChecker)
Predicate to compare expected and actual output.
|
static Supplier<Processor> |
supplierFrom(ProcessorMetaSupplier supplier)
Wraps the provided
ProcessorMetaSupplier with a Supplier<Processor> that returns processors obtained from it. |
static Supplier<Processor> |
supplierFrom(ProcessorSupplier supplier)
Wraps the provided
ProcessorSupplier with a Supplier<Processor> that returns processors obtained from it. |
static TestSupport |
verifyProcessor(DistributedSupplier<Processor> supplier) |
static TestSupport |
verifyProcessor(ProcessorMetaSupplier supplier) |
static TestSupport |
verifyProcessor(ProcessorSupplier supplier) |
public static final BiPredicate<List<?>,List<?>> SAME_ITEMS_ANY_ORDER
Use as an argument for outputChecker(BiPredicate)
.
public static TestSupport verifyProcessor(@Nonnull DistributedSupplier<Processor> supplier)
supplier
- a processor supplier create processor instancespublic static TestSupport verifyProcessor(@Nonnull ProcessorSupplier supplier)
supplier
- a processor supplier create processor instancespublic static TestSupport verifyProcessor(@Nonnull ProcessorMetaSupplier supplier)
supplier
- a processor supplier create processor instancespublic TestSupport input(@Nonnull List<?> input)
The input
can contain Watermark
s;
they will be delivered to the Processor.tryProcessWatermark(com.hazelcast.jet.core.Watermark)
method.
Defaults to empty list.
this
instance for fluent APIpublic TestSupport inputs(@Nonnull List<List<?>> inputs)
See also:
input(List)
- if you have just one input ordinal
inputs(List, int[])
- if you want to specify input
priorities
inputs
- one list of input items for each input edgethis
instance for fluent APIpublic TestSupport inputs(@Nonnull List<List<?>> inputs, int[] priorities)
See also:
input(List)
- if you have just one input ordinal
inputs(List)
- if all inputs are of equal priority
inputs
- one list of input items for each input edgethis
instance for fluent APIpublic void expectOutput(@Nonnull List<?> expectedOutput)
The expectedOutput
can contain Watermark
s. Each Watermark in the input will be
found in the output, as well as other watermarks the processor emits.
AssertionError
- If some assertion does not hold.public void expectOutputs(@Nonnull List<List<?>> expectedOutputs)
The expectedOutput
can contain Watermark
s. Each Watermark in the input will be
found in the output, as well as other watermarks the processor emits.
expectedOutputs
- one list for each out output ordinalAssertionError
- if some assertion does not holdpublic TestSupport disableProgressAssertion()
class javadoc
for information on what is "progress").this
instance for fluent APIpublic TestSupport disableRunUntilCompleted(long timeoutMillis)
complete()
method is run repeatedly until it
returns true
. But in infinite source processors the method never
returns true
. To be able test such processors, this method
allows you to disable the "run until completed" behavior and instead run
the complete()
for a specified time.
If the timeout > 0, the complete()
method is called repeatedly
until the timeout elapses. After that, the output is compared using the
output checker
. The complete()
method is also not allowed to return true
in this
case.
If the timeout is <= 0 (the default), complete()
method is
called until it returns true
, after which the output is checked.
Has no effect if complete()
call is disabled
.
timeoutMillis
- how long to wait until outputs matchthis
instance for fluent APIpublic TestSupport disableSnapshots()
process()
and complete()
call.this
instance for fluent APIpublic TestSupport disableLogging()
this
instance for fluent APIpublic TestSupport disableCompleteCall()
complete()
method during the test. Suitable for
testing of streaming processors to make sure that the flushing code in
complete()
method is not executed.this
instance for fluent APIpublic TestSupport cooperativeTimeout(long timeout)
timeout > 0
, the test will fail if any call to processing
method in a cooperative processor exceeds this timeout. Has no effect
for non-cooperative processors.
Default value is COOPERATIVE_TIME_LIMIT_MS_FAIL
ms. Useful to
set to 0 during debugging.
this
instance for fluent APIpublic TestSupport outputChecker(@Nonnull BiPredicate<? super List<?>,? super List<?>> outputChecker)
BiPredicate
are the list of expected items and the list of actual
processor output.
Defaults to Objects::equals
, which will pass, if both lists
contain equal objects in the same order. If the ordering doesn't matter,
you can use SAME_ITEMS_ANY_ORDER
.
this
instance for fluent APIpublic TestSupport jetInstance(@Nonnull JetInstance jetInstance)
ProcessorMetaSupplier.Context.jetInstance()
this
instance for fluent APIpublic static Supplier<Processor> supplierFrom(ProcessorSupplier supplier)
ProcessorSupplier
with a Supplier<Processor>
that returns processors obtained from it.public static Supplier<Processor> supplierFrom(ProcessorMetaSupplier supplier)
ProcessorMetaSupplier
with a Supplier<Processor>
that returns processors obtained from it.public static String listToString(List<?> list)
toString()
of each element
on separate line. It is useful to transform list inputs to assertEquals()
: the exception will show the entire collections instead
of just non-equal sizes or the first non-equal element.list
- Input listCopyright © 2018 Hazelcast, Inc.. All rights reserved.