Class TestOutbox
- All Implemented Interfaces:
Outbox
,com.hazelcast.jet.impl.execution.OutboxInternal
Outbox
implementation suitable to be used in tests.- Since:
- Jet 3.0
-
Constructor Summary
ConstructorDescriptionTestOutbox
(int... capacities) TestOutbox
(int[] edgeCapacities, int snapshotCapacity) -
Method Summary
Modifier and TypeMethodDescriptionvoid
block()
Blocks the outbox so it allows the caller only to offer the current unfinished item.int
Returns the number of buckets in this outbox.<T> void
drainQueueAndReset
(int queueOrdinal, Collection<T> target, boolean logItems) Move all items from the queue to thetarget
collection and make the outbox available to accept more items.<T> void
drainQueuesAndReset
(List<? extends Collection<T>> target, boolean logItems) Move all items from all queues (except the snapshot queue) to thetarget
list of collections.<K,
V> void drainSnapshotQueueAndReset
(Collection<? super Map.Entry<K, V>> target, boolean logItems) Deserialize and move all items from the snapshot queue to thetarget
collection and make the outbox available to accept more items.boolean
Returns true if this outbox has an unfinished item and the same item must be offered again.long
lastForwardedWm
(byte key) Returns the timestamp of the last forwarded watermark for the given key.boolean
Offers the item to all supplied edge ordinals.boolean
Offers the supplied item to the bucket with the supplied ordinal.boolean
Offers the item to all edges.boolean
offerToSnapshot
(Object key, Object value) Offers the given key and value pair to the processor's snapshot storage.<T> Queue<T>
queue
(int ordinal) Exposes individual output queues to the testing code.void
reset()
Resets the counter that prevents adding more thanbatchSize
items until this method is called again.Returns the queue to which snapshot is written.toString()
void
unblock()
Removes the effect of a previousOutboxInternal.block()
call (if any).
-
Constructor Details
-
TestOutbox
public TestOutbox(int... capacities) - Parameters:
capacities
- Capacities of individual buckets. Number of buckets is determined by the number of provided capacities. There is no snapshot bucket.
-
TestOutbox
public TestOutbox(int[] edgeCapacities, int snapshotCapacity) - Parameters:
edgeCapacities
- Capacities of individual buckets. Number of buckets is determined by the number of provided capacities.snapshotCapacity
- Capacity of snapshot bucket. If 0, snapshot queue is not present.
-
-
Method Details
-
bucketCount
public int bucketCount()Description copied from interface:Outbox
Returns the number of buckets in this outbox. This is equal to the number of output edges of the vertex and does not include the snapshot bucket.- Specified by:
bucketCount
in interfaceOutbox
-
offer
Description copied from interface:Outbox
Offers the supplied item to the bucket with the supplied ordinal.Items offered to outbox should not be subsequently mutated because the same instance might be used by a downstream processor in different thread, causing concurrent access.
Outbox is not thread safe, see
Thread safety
in its class javadoc. -
offer
Description copied from interface:Outbox
Offers the item to all edges. SeeOutbox.offer(int, Object)
for more details.Outbox is not thread safe, see
Thread safety
in its class javadoc. -
offer
Description copied from interface:Outbox
Offers the item to all supplied edge ordinals. SeeOutbox.offer(int, Object)
for more details.Outbox is not thread safe, see
Thread safety
in its class javadoc. -
offerToSnapshot
Description copied from interface:Outbox
Offers the given key and value pair to the processor's snapshot storage.The type of the offered key determines which processors receive the key and value pair when it is restored. If the key is of type
BroadcastKey
, the entry will be restored to all processor instances. Otherwise the key will be distributed according to default partitioning and only a single processor instance will receive the key.This method must only be called from the
Processor.saveToSnapshot()
orProcessor.snapshotCommitPrepare()
methods.Keys and values offered to snapshot are serialized and can be further mutated as soon as this method returns.
Outbox is not thread safe, see
Thread safety
in its class javadoc.- Specified by:
offerToSnapshot
in interfaceOutbox
- Returns:
true
if the outbox accepted the item
-
queue
Exposes individual output queues to the testing code.- Parameters:
ordinal
- ordinal of the bucket
-
snapshotQueue
Returns the queue to which snapshot is written. It contains serialized data, if you need deserialized data, you might prefer to usedrainSnapshotQueueAndReset(java.util.Collection<? super java.util.Map.Entry<K, V>>, boolean)
. -
drainQueueAndReset
Move all items from the queue to thetarget
collection and make the outbox available to accept more items. Also callsreset()
. If you have a limited capacity outbox, you need to call this method regularly.- Parameters:
queueOrdinal
- the queue from Outbox to draintarget
- target collectionlogItems
- whether to log drained items toSystem.out
-
drainQueuesAndReset
Move all items from all queues (except the snapshot queue) to thetarget
list of collections. Queue N is moved to collection at target N etc. Also callsreset()
. If you have a limited capacity outbox, you need to call this method regularly.- Parameters:
target
- list of target collectionslogItems
- whether to log drained items toSystem.out
-
drainSnapshotQueueAndReset
public <K,V> void drainSnapshotQueueAndReset(Collection<? super Map.Entry<K, V>> target, boolean logItems) Deserialize and move all items from the snapshot queue to thetarget
collection and make the outbox available to accept more items. Also callsreset()
. If you have a limited capacity outbox, you need to call this method regularly.- Parameters:
target
- target listlogItems
- whether to log drained items toSystem.out
-
reset
public void reset()Description copied from interface:com.hazelcast.jet.impl.execution.OutboxInternal
Resets the counter that prevents adding more thanbatchSize
items until this method is called again. Note that the counter may jump to the "full" state even before the outbox accepted that many items.- Specified by:
reset
in interfacecom.hazelcast.jet.impl.execution.OutboxInternal
-
hasUnfinishedItem
public boolean hasUnfinishedItem()Description copied from interface:Outbox
Returns true if this outbox has an unfinished item and the same item must be offered again. If it returns false, it is safe to offer a new item.Outbox is not thread safe, see
Thread safety
in its class javadoc.- Specified by:
hasUnfinishedItem
in interfaceOutbox
-
block
public void block()Description copied from interface:com.hazelcast.jet.impl.execution.OutboxInternal
Blocks the outbox so it allows the caller only to offer the current unfinished item. If there is no unfinished item, the outbox will reject all items until you callOutboxInternal.unblock()
.- Specified by:
block
in interfacecom.hazelcast.jet.impl.execution.OutboxInternal
-
unblock
public void unblock()Description copied from interface:com.hazelcast.jet.impl.execution.OutboxInternal
Removes the effect of a previousOutboxInternal.block()
call (if any).- Specified by:
unblock
in interfacecom.hazelcast.jet.impl.execution.OutboxInternal
-
lastForwardedWm
public long lastForwardedWm(byte key) Description copied from interface:com.hazelcast.jet.impl.execution.OutboxInternal
Returns the timestamp of the last forwarded watermark for the given key.If there was no watermark added, it returns
Long.MIN_VALUE
. Can be called from a concurrent thread.- Specified by:
lastForwardedWm
in interfacecom.hazelcast.jet.impl.execution.OutboxInternal
-
toString
-