Class TestOutbox
- All Implemented Interfaces:
Outbox,com.hazelcast.jet.impl.execution.OutboxInternal
Outbox implementation suitable to be used in tests.- Since:
- Jet 3.0
-
Constructor Summary
ConstructorsConstructorDescriptionTestOutbox(int... capacities) TestOutbox(int[] edgeCapacities, int snapshotCapacity) -
Method Summary
Modifier and TypeMethodDescriptionvoidblock()Blocks the outbox so it allows the caller only to offer the current unfinished item.intReturns the number of buckets in this outbox.<T> voiddrainQueueAndReset(int queueOrdinal, Collection<T> target, boolean logItems) Move all items from the queue to thetargetcollection and make the outbox available to accept more items.<T> voiddrainQueuesAndReset(List<? extends Collection<T>> target, boolean logItems) Move all items from all queues (except the snapshot queue) to thetargetlist of collections.<K,V> void drainSnapshotQueueAndReset(Collection<? super Map.Entry<K, V>> target, boolean logItems) Deserialize and move all items from the snapshot queue to thetargetcollection and make the outbox available to accept more items.booleanReturns true if this outbox has an unfinished item and the same item must be offered again.longlastForwardedWm(byte key) Returns the timestamp of the last forwarded watermark for the given key.booleanOffers the item to all supplied edge ordinals.booleanOffers the supplied item to the bucket with the supplied ordinal.booleanOffers the item to all edges.booleanofferToSnapshot(Object key, Object value) Offers the given key and value pair to the processor's snapshot storage.<T> Queue<T>queue(int ordinal) Exposes individual output queues to the testing code.voidreset()Resets the counter that prevents adding more thanbatchSizeitems until this method is called again.Returns the queue to which snapshot is written.toString()voidunblock()Removes the effect of a previousOutboxInternal.block()call (if any).
-
Constructor Details
-
TestOutbox
public TestOutbox(int... capacities) - Parameters:
capacities- Capacities of individual buckets. Number of buckets is determined by the number of provided capacities. There is no snapshot bucket.
-
TestOutbox
public TestOutbox(int[] edgeCapacities, int snapshotCapacity) - Parameters:
edgeCapacities- Capacities of individual buckets. Number of buckets is determined by the number of provided capacities.snapshotCapacity- Capacity of snapshot bucket. If 0, snapshot queue is not present.
-
-
Method Details
-
bucketCount
public int bucketCount()Description copied from interface:OutboxReturns the number of buckets in this outbox. This is equal to the number of output edges of the vertex and does not include the snapshot bucket.- Specified by:
bucketCountin interfaceOutbox
-
offer
Description copied from interface:OutboxOffers the supplied item to the bucket with the supplied ordinal.Items offered to outbox should not be subsequently mutated because the same instance might be used by a downstream processor in different thread, causing concurrent access.
Outbox is not thread safe, see
Thread safetyin its class javadoc. -
offer
Description copied from interface:OutboxOffers the item to all edges. SeeOutbox.offer(int, Object)for more details.Outbox is not thread safe, see
Thread safetyin its class javadoc. -
offer
Description copied from interface:OutboxOffers the item to all supplied edge ordinals. SeeOutbox.offer(int, Object)for more details.Outbox is not thread safe, see
Thread safetyin its class javadoc. -
offerToSnapshot
Description copied from interface:OutboxOffers the given key and value pair to the processor's snapshot storage.The type of the offered key determines which processors receive the key and value pair when it is restored. If the key is of type
BroadcastKey, the entry will be restored to all processor instances. Otherwise, the key will be distributed according to default partitioning and only a single processor instance will receive the key.This method must only be called from the
Processor.saveToSnapshot()orProcessor.snapshotCommitPrepare()methods.Keys and values offered to snapshot are serialized and can be further mutated as soon as this method returns.
Outbox is not thread safe, see
Thread safetyin its class javadoc.- Specified by:
offerToSnapshotin interfaceOutbox- Returns:
trueif the outbox accepted the item
-
queue
Exposes individual output queues to the testing code.- Parameters:
ordinal- ordinal of the bucket
-
snapshotQueue
Returns the queue to which snapshot is written. It contains serialized data, if you need deserialized data, you might prefer to usedrainSnapshotQueueAndReset(java.util.Collection<? super java.util.Map.Entry<K, V>>, boolean). -
drainQueueAndReset
Move all items from the queue to thetargetcollection and make the outbox available to accept more items. Also callsreset(). If you have a limited capacity outbox, you need to call this method regularly.- Parameters:
queueOrdinal- the queue from Outbox to draintarget- target collectionlogItems- whether to log drained items toSystem.out
-
drainQueuesAndReset
Move all items from all queues (except the snapshot queue) to thetargetlist of collections. Queue N is moved to collection at target N etc. Also callsreset(). If you have a limited capacity outbox, you need to call this method regularly.- Parameters:
target- list of target collectionslogItems- whether to log drained items toSystem.out
-
drainSnapshotQueueAndReset
public <K,V> void drainSnapshotQueueAndReset(Collection<? super Map.Entry<K, V>> target, boolean logItems) Deserialize and move all items from the snapshot queue to thetargetcollection and make the outbox available to accept more items. Also callsreset(). If you have a limited capacity outbox, you need to call this method regularly.- Parameters:
target- target listlogItems- whether to log drained items toSystem.out
-
reset
public void reset()Description copied from interface:com.hazelcast.jet.impl.execution.OutboxInternalResets the counter that prevents adding more thanbatchSizeitems until this method is called again. Note that the counter may jump to the "full" state even before the outbox accepted that many items.- Specified by:
resetin interfacecom.hazelcast.jet.impl.execution.OutboxInternal
-
hasUnfinishedItem
public boolean hasUnfinishedItem()Description copied from interface:OutboxReturns true if this outbox has an unfinished item and the same item must be offered again. If it returns false, it is safe to offer a new item.Outbox is not thread safe, see
Thread safetyin its class javadoc.- Specified by:
hasUnfinishedItemin interfaceOutbox
-
block
public void block()Description copied from interface:com.hazelcast.jet.impl.execution.OutboxInternalBlocks the outbox so it allows the caller only to offer the current unfinished item. If there is no unfinished item, the outbox will reject all items until you callOutboxInternal.unblock().- Specified by:
blockin interfacecom.hazelcast.jet.impl.execution.OutboxInternal
-
unblock
public void unblock()Description copied from interface:com.hazelcast.jet.impl.execution.OutboxInternalRemoves the effect of a previousOutboxInternal.block()call (if any).- Specified by:
unblockin interfacecom.hazelcast.jet.impl.execution.OutboxInternal
-
lastForwardedWm
public long lastForwardedWm(byte key) Description copied from interface:com.hazelcast.jet.impl.execution.OutboxInternalReturns the timestamp of the last forwarded watermark for the given key.If there was no watermark added, it returns
Long.MIN_VALUE. Can be called from a concurrent thread.- Specified by:
lastForwardedWmin interfacecom.hazelcast.jet.impl.execution.OutboxInternal
-
toString
-