Class TestOutbox

java.lang.Object
com.hazelcast.jet.core.test.TestOutbox
All Implemented Interfaces:
Outbox, com.hazelcast.jet.impl.execution.OutboxInternal

public final class TestOutbox extends Object implements com.hazelcast.jet.impl.execution.OutboxInternal
Outbox implementation suitable to be used in tests.
Since:
Jet 3.0
  • Constructor Summary

    Constructors
    Constructor
    Description
    TestOutbox(int... capacities)
     
    TestOutbox(int[] edgeCapacities, int snapshotCapacity)
     
  • Method Summary

    Modifier and Type
    Method
    Description
    void
    Blocks the outbox so it allows the caller only to offer the current unfinished item.
    int
    Returns the number of buckets in this outbox.
    <T> void
    drainQueueAndReset(int queueOrdinal, Collection<T> target, boolean logItems)
    Move all items from the queue to the target collection and make the outbox available to accept more items.
    <T> void
    drainQueuesAndReset(List<? extends Collection<T>> target, boolean logItems)
    Move all items from all queues (except the snapshot queue) to the target list of collections.
    <K, V> void
    drainSnapshotQueueAndReset(Collection<? super Map.Entry<K,V>> target, boolean logItems)
    Deserialize and move all items from the snapshot queue to the target collection and make the outbox available to accept more items.
    boolean
    Returns true if this outbox has an unfinished item and the same item must be offered again.
    long
    lastForwardedWm(byte key)
    Returns the timestamp of the last forwarded watermark for the given key.
    boolean
    offer(int[] ordinals, Object item)
    Offers the item to all supplied edge ordinals.
    boolean
    offer(int ordinal, Object item)
    Offers the supplied item to the bucket with the supplied ordinal.
    boolean
    offer(Object item)
    Offers the item to all edges.
    boolean
    Offers the given key and value pair to the processor's snapshot storage.
    <T> Queue<T>
    queue(int ordinal)
    Exposes individual output queues to the testing code.
    void
    Resets the counter that prevents adding more than batchSize items until this method is called again.
    Returns the queue to which snapshot is written.
     
    void
    Removes the effect of a previous OutboxInternal.block() call (if any).

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
  • Constructor Details

    • TestOutbox

      public TestOutbox(int... capacities)
      Parameters:
      capacities - Capacities of individual buckets. Number of buckets is determined by the number of provided capacities. There is no snapshot bucket.
    • TestOutbox

      public TestOutbox(int[] edgeCapacities, int snapshotCapacity)
      Parameters:
      edgeCapacities - Capacities of individual buckets. Number of buckets is determined by the number of provided capacities.
      snapshotCapacity - Capacity of snapshot bucket. If 0, snapshot queue is not present.
  • Method Details

    • bucketCount

      public int bucketCount()
      Description copied from interface: Outbox
      Returns the number of buckets in this outbox. This is equal to the number of output edges of the vertex and does not include the snapshot bucket.
      Specified by:
      bucketCount in interface Outbox
    • offer

      public boolean offer(int ordinal, @Nonnull Object item)
      Description copied from interface: Outbox
      Offers the supplied item to the bucket with the supplied ordinal.

      Items offered to outbox should not be subsequently mutated because the same instance might be used by a downstream processor in different thread, causing concurrent access.

      Outbox is not thread safe, see Thread safety in its class javadoc.

      Specified by:
      offer in interface Outbox
      Parameters:
      ordinal - output ordinal number or -1 to offer to all ordinals
      Returns:
      true if the outbox accepted the item
    • offer

      public boolean offer(@Nonnull Object item)
      Description copied from interface: Outbox
      Offers the item to all edges. See Outbox.offer(int, Object) for more details.

      Outbox is not thread safe, see Thread safety in its class javadoc.

      Specified by:
      offer in interface Outbox
      Returns:
      true if the outbox accepted the item
    • offer

      public boolean offer(@Nonnull int[] ordinals, @Nonnull Object item)
      Description copied from interface: Outbox
      Offers the item to all supplied edge ordinals. See Outbox.offer(int, Object) for more details.

      Outbox is not thread safe, see Thread safety in its class javadoc.

      Specified by:
      offer in interface Outbox
      Returns:
      true if the outbox accepted the item
    • offerToSnapshot

      public boolean offerToSnapshot(@Nonnull Object key, @Nonnull Object value)
      Description copied from interface: Outbox
      Offers the given key and value pair to the processor's snapshot storage.

      The type of the offered key determines which processors receive the key and value pair when it is restored. If the key is of type BroadcastKey, the entry will be restored to all processor instances. Otherwise, the key will be distributed according to default partitioning and only a single processor instance will receive the key.

      This method must only be called from the Processor.saveToSnapshot() or Processor.snapshotCommitPrepare() methods.

      Keys and values offered to snapshot are serialized and can be further mutated as soon as this method returns.

      Outbox is not thread safe, see Thread safety in its class javadoc.

      Specified by:
      offerToSnapshot in interface Outbox
      Returns:
      true if the outbox accepted the item
    • queue

      public <T> Queue<T> queue(int ordinal)
      Exposes individual output queues to the testing code.
      Parameters:
      ordinal - ordinal of the bucket
    • snapshotQueue

      public Queue<Map.Entry<Object,Object>> snapshotQueue()
      Returns the queue to which snapshot is written. It contains serialized data, if you need deserialized data, you might prefer to use drainSnapshotQueueAndReset(java.util.Collection<? super java.util.Map.Entry<K, V>>, boolean).
    • drainQueueAndReset

      public <T> void drainQueueAndReset(int queueOrdinal, Collection<T> target, boolean logItems)
      Move all items from the queue to the target collection and make the outbox available to accept more items. Also calls reset(). If you have a limited capacity outbox, you need to call this method regularly.
      Parameters:
      queueOrdinal - the queue from Outbox to drain
      target - target collection
      logItems - whether to log drained items to System.out
    • drainQueuesAndReset

      public <T> void drainQueuesAndReset(List<? extends Collection<T>> target, boolean logItems)
      Move all items from all queues (except the snapshot queue) to the target list of collections. Queue N is moved to collection at target N etc. Also calls reset(). If you have a limited capacity outbox, you need to call this method regularly.
      Parameters:
      target - list of target collections
      logItems - whether to log drained items to System.out
    • drainSnapshotQueueAndReset

      public <K, V> void drainSnapshotQueueAndReset(Collection<? super Map.Entry<K,V>> target, boolean logItems)
      Deserialize and move all items from the snapshot queue to the target collection and make the outbox available to accept more items. Also calls reset(). If you have a limited capacity outbox, you need to call this method regularly.
      Parameters:
      target - target list
      logItems - whether to log drained items to System.out
    • reset

      public void reset()
      Description copied from interface: com.hazelcast.jet.impl.execution.OutboxInternal
      Resets the counter that prevents adding more than batchSize items until this method is called again. Note that the counter may jump to the "full" state even before the outbox accepted that many items.
      Specified by:
      reset in interface com.hazelcast.jet.impl.execution.OutboxInternal
    • hasUnfinishedItem

      public boolean hasUnfinishedItem()
      Description copied from interface: Outbox
      Returns true if this outbox has an unfinished item and the same item must be offered again. If it returns false, it is safe to offer a new item.

      Outbox is not thread safe, see Thread safety in its class javadoc.

      Specified by:
      hasUnfinishedItem in interface Outbox
    • block

      public void block()
      Description copied from interface: com.hazelcast.jet.impl.execution.OutboxInternal
      Blocks the outbox so it allows the caller only to offer the current unfinished item. If there is no unfinished item, the outbox will reject all items until you call OutboxInternal.unblock().
      Specified by:
      block in interface com.hazelcast.jet.impl.execution.OutboxInternal
    • unblock

      public void unblock()
      Description copied from interface: com.hazelcast.jet.impl.execution.OutboxInternal
      Removes the effect of a previous OutboxInternal.block() call (if any).
      Specified by:
      unblock in interface com.hazelcast.jet.impl.execution.OutboxInternal
    • lastForwardedWm

      public long lastForwardedWm(byte key)
      Description copied from interface: com.hazelcast.jet.impl.execution.OutboxInternal
      Returns the timestamp of the last forwarded watermark for the given key.

      If there was no watermark added, it returns Long.MIN_VALUE. Can be called from a concurrent thread.

      Specified by:
      lastForwardedWm in interface com.hazelcast.jet.impl.execution.OutboxInternal
    • toString

      public String toString()
      Overrides:
      toString in class Object