Class TestSupport

java.lang.Object
com.hazelcast.jet.core.test.TestSupport

public final class TestSupport extends Object
A utility to test processors. It will initialize the processor instance, pass input items to it and assert the outbox contents.

The test process does the following:

The init() and close() methods of ProcessorSupplier and ProcessorMetaSupplier are called if you call the verifyProcessor(com.hazelcast.function.SupplierEx<com.hazelcast.jet.core.Processor>) using one of these.

Snapshot & restore

The optional snapshot+restore test procedure:
  • saveToSnapshot() is called. If we are not doing restore, this is the last step.
  • new processor instance is created, from now on only this instance will be used
  • snapshot is restored using restoreFromSnapshot()
  • finishSnapshotRestore() is called

Watermark handling

The input can contain Watermarks. They will be passed to the Processor.tryProcessWatermark(com.hazelcast.jet.core.Watermark) method.

Progress assertion

For each call to any processing method the progress is asserted (optional). The processor must do at least one of these:
  • take something from inbox
  • put something to outbox
  • for boolean-returning methods, returning true is considered as making progress

Outbox rejection

A 1-capacity outbox will be provided, which will additionally be full in every other call to process(). This will test the edge case: the process() method is called even when the outbox is full to give the processor a chance to process the inbox. The snapshot bucket will also have capacity of 1.

Cooperative processors

For cooperative processors, time spent in each call to processing method must not exceed cooperativeTimeout(long).

Non-covered cases

This class does not cover these cases:
  • Checking that the state of a stateful processor is empty at the end (you can do that yourself afterwards with the last instance returned from your supplier).
  • This utility never calls Processor.tryProcess().

Example usage

This will test one of the jet-provided processors:

 TestSupport.verifyProcessor(Processors.map((String s) -> s.toUpperCase(Locale.ROOT)))
            .disableCompleteCall()             // enabled by default
            .disableLogging()                  // enabled by default
            .disableProgressAssertion()        // enabled by default
            .disableSnapshots()                // enabled by default
            .cooperativeTimeout(<timeoutInMs>) // default is 1000
            .outputChecker(<function>)         // default is `Objects::equal`
            .input(asList("foo", "bar"))       // default is `emptyList()`
            .expectOutput(asList("FOO", "BAR"));
 
Since:
Jet 3.0
  • Field Details

    • SAME_ITEMS_ANY_ORDER

      public static final BiPredicate<List<?>,List<?>> SAME_ITEMS_ANY_ORDER
      An output checker that will claim actual and expected object lists as equal if they both contain the same items, in any order. If some item is expected multiple times, it must also be present the same number of times in the actual output.

      Use as an argument for outputChecker(BiPredicate).

    • TEST_CONTEXT

      public static final ThreadLocal<TestSupport.TestContext> TEST_CONTEXT
      A context various methods can probe to find context information about the current test. For example, the output checker can check differently in various modes.
  • Method Details

    • verifyProcessor

      public static TestSupport verifyProcessor(@Nonnull SupplierEx<Processor> supplier)
      Parameters:
      supplier - a processor supplier create processor instances
    • verifyProcessor

      public static TestSupport verifyProcessor(@Nonnull ProcessorSupplier supplier)
      Parameters:
      supplier - a processor supplier create processor instances
    • verifyProcessor

      public static TestSupport verifyProcessor(@Nonnull ProcessorMetaSupplier supplier)
      Parameters:
      supplier - a processor supplier create processor instances
    • input

      public TestSupport input(@Nonnull List<?> input)
      Sets the input objects for processor.

      The input can contain Watermarks; they will be delivered to the Processor.tryProcessWatermark(com.hazelcast.jet.core.Watermark) method.

      Defaults to empty list.

      Returns:
      this instance for fluent API
    • inputs

      public TestSupport inputs(@Nonnull List<List<?>> inputs)
      Sets the input objects for the processor on multiple input ordinals. Items will be passed to the processor in round-robin fashion: item0 from input0, item0 from input1, item1 from input0 etc.

      See also:

      Parameters:
      inputs - one list of input items for each input edge
      Returns:
      this instance for fluent API
    • inputs

      public TestSupport inputs(@Nonnull List<List<?>> inputs, int[] priorities)
      Sets the input objects for the processor on multiple input ordinals. Items will be passed to the processor according to priority: lower is higher. If two inputs have the same priority, they will be passed in round-robin fashion.

      See also:

      Parameters:
      inputs - one list of input items for each input edge
      Returns:
      this instance for fluent API
    • expectOutput

      public void expectOutput(@Nonnull List<?> expectedOutput)
      Sets the expected output and runs the test.

      The expectedOutput can contain Watermarks. Each Watermark in the input will be found in the output, as well as other watermarks the processor emits.

      Throws:
      AssertionError - If some assertion does not hold.
    • expectOutputs

      public void expectOutputs(@Nonnull List<List<?>> expectedOutputs)
      Specifies the expected outputs and runs the test.

      The expectedOutput can contain Watermarks to assert the watermarks emitted by the processor.

      Parameters:
      expectedOutputs - one list for each output ordinal
      Throws:
      AssertionError - if some assertion does not hold
    • expectExactOutput

      public void expectExactOutput(TestSupport.TestEvent... testEvents)
      Runs the test and expects an exact sequence of input and output items. The output must occur in the expected order given by the testEvents parameter, that is particular output items must occur after particular input items. If the output happens at other time, the test fails.

      Additionally, the testEvents can contain processor assertions that give the test a chance to assert the internal processor state. For example, the test can assert that the internal processor buffers contain or don't contain particular data, or that the internal watermark is at a certain value etc. Processor assertions must not be immediately followed by output items, they must occur before an input item, or at the end of test events - this is an implementation restriction and might be lifted in the future; the test will fail in this case.

      To create `ItemWithOrdinal` instances, use the in(java.lang.Object) and out(java.lang.Object) static factory methods.

      The output after the last input item is asserted after the `complete()` method calls, not immediately after processing of the last input item.

      The number of input and output edges of the processor will be equal to the maximum ordinal found for input and output, plus one. If there's no input or output item, the processor will have zero input or output ordinals. Use a dummy null item if you want to increase the number of ordinals in that case, this item will be ignored, except for using its ordinal.

      Parameters:
      testEvents - a sequence of input items, output items and processor assertions
      See Also:
    • assertOutput

      public void assertOutput(int outputOrdinalCount, BiConsumer<TestSupport.TestMode,List<List<Object>>> assertFn)
      Runs the test with the specified custom assertion.

      The consumer takes a list of collected output and the current test mode which can be used in the assertion message.

      Parameters:
      outputOrdinalCount - how many output ordinals should be created
      assertFn - an assertion function which takes the current mode and the collected output
    • disableProgressAssertion

      public TestSupport disableProgressAssertion()
      Disables checking of progress of processing methods (see class javadoc for information on what is "progress").
      Returns:
      this instance for fluent API
    • runUntilOutputMatches

      public TestSupport runUntilOutputMatches(long timeoutMillis, long extraTimeMillis)
      Normally, the complete() method is run repeatedly until it returns true. This is suitable for processors processing the input or for batch sources. However, if you test a streaming source, the complete() method never returns true. To be able to test such processors, this method allows you to change the behavior to run complete() until the output matches.

      The extraTimeMillis parameter specifies an extra time to call complete() after the output matches. It can be used to ensure that no more items are produced after the output matches.

      Has no effect if calling complete() is disabled.

      Parameters:
      timeoutMillis - maximum time to wait for the output to match
      extraTimeMillis - for how long to call complete() after the output matches
      Returns:
      this instance for fluent API
    • disableSnapshots

      public TestSupport disableSnapshots()
      Disable snapshot save and restore before first item and after each process() and complete() call.
      Returns:
      this instance for fluent API
    • disableLogging

      public TestSupport disableLogging()
      Disables logging of input and output objects. Normally they are logged as they are processed to standard output.
      Returns:
      this instance for fluent API
    • disableCompleteCall

      public TestSupport disableCompleteCall()
      Disables calling complete() method during the test. Suitable for testing of streaming processors to make sure that the flushing code in complete() method is not executed.
      Returns:
      this instance for fluent API
    • cooperativeTimeout

      public TestSupport cooperativeTimeout(long timeout)
      If timeout > 0, the test will fail if any call to processing method in a cooperative processor exceeds this timeout. Has no effect for non-cooperative processors.

      Default value is COOPERATIVE_TIME_LIMIT_MS_FAIL ms. Useful to set to 0 during debugging.

      Returns:
      this instance for fluent API
    • localProcessorIndex

      public TestSupport localProcessorIndex(int localProcessorIndex)
      Sets the localProcessorIndex for the Processor
      Parameters:
      localProcessorIndex - localProcessorIndex, defaults to 0
    • globalProcessorIndex

      public TestSupport globalProcessorIndex(int globalProcessorIndex)
      Sets the globalProcessorIndex for the Processor
      Parameters:
      globalProcessorIndex - globalProcessorIndex, default to 0
    • localParallelism

      public TestSupport localParallelism(int localParallelism)
      Sets the localParallelism for the Processor
      Parameters:
      localParallelism - localParallelism, defaults to 1
    • totalParallelism

      public TestSupport totalParallelism(int totalParallelism)
      Sets the totalParallelism for the Processor
      Parameters:
      totalParallelism - totalParallelism, defaults to 1
    • outputChecker

      public TestSupport outputChecker(@Nonnull BiPredicate<? super List<?>,? super List<?>> outputChecker)
      Predicate to compare expected and actual output. Parameters to the BiPredicate are the list of expected items and the list of actual processor output.

      Defaults to Objects::equals, which will pass, if both lists contain equal objects in the same order. If the ordering doesn't matter, you can use SAME_ITEMS_ANY_ORDER.

      Returns:
      this instance for fluent API
    • hazelcastInstance

      public TestSupport hazelcastInstance(@Nonnull HazelcastInstance hazelcastInstance)
      Returns:
      this instance for fluent API
    • jobConfig

      public TestSupport jobConfig(JobConfig jobConfig)
      Returns:
      this instance for fluent API
    • executeBeforeEachRun

      public TestSupport executeBeforeEachRun(Runnable runnable)
      Action to execute before each test scenario.
      Returns:
      this instance for fluent API
    • supplierFrom

      public static Supplier<Processor> supplierFrom(ProcessorSupplier supplier)
      Wraps the provided ProcessorSupplier with a Supplier<Processor> that returns processors obtained from it.
    • supplierFrom

      public static Supplier<Processor> supplierFrom(ProcessorSupplier supplier, ProcessorSupplier.Context context)
      Wraps the provided ProcessorSupplier with a Supplier<Processor> that returns processors obtained from it.
    • supplierFrom

      public static Supplier<Processor> supplierFrom(ProcessorMetaSupplier supplier)
      Wraps the provided ProcessorMetaSupplier with a Supplier<Processor> that returns processors obtained from it.
    • supplierFrom

      public static Supplier<Processor> supplierFrom(ProcessorMetaSupplier supplier, ProcessorSupplier.Context context)
      Wraps the provided ProcessorMetaSupplier with a Supplier<Processor> that returns processors obtained from it.
    • in

      public static TestSupport.TestEvent in(Object item)
      Create a test event with an input item. Equivalent to in(0, item).
      Returns:
      the new test event
      See Also:
    • in

      public static TestSupport.TestEvent in(int ordinal, Object item)
      Create a test event with an input item on the given ordinal.
      Returns:
      the new test event
      See Also:
    • out

      public static TestSupport.TestEvent out(Object item)
      Create a test event with an output item. Equivalent to out(0, item).
      Returns:
      the new test event
      See Also:
    • out

      public static TestSupport.TestEvent out(int ordinal, Object item)
      Create a test event with an output item on the given ordinal.
      Returns:
      the new test event
      See Also:
    • processorAssertion

      public static <T extends Processor> TestSupport.TestEvent processorAssertion(Consumer<T> assertion)
      Create a test event with a processor assertion. The assertion must not be followed by an output item event.
      Returns:
      the new test event
      See Also: