public class ConcurrentConveyor<E> extends Object
submit()
calls fail with an exception. This
mechanism supports building an implementation which is both starvation-safe
and uses bounded queues with blocking queue submission.
There is a further option for the drainer to apply immediate backpressure to
the submitter by invoking backpressureOn()
. This will make the
submit()
invocations block after having successfully submitted their
item, until the drainer calls backpressureOff()
or fails. This
mechanism allows the drainer to apply backpressure and keep draining the queue,
thus letting all submitters progress until after submitting their item. Such an
arrangement eliminates a class of deadlock patterns where the submitter blocks
to submit the item that would have made the drainer remove backpressure.
Does not manage drainer threads. There should be only one drainer thread at a
time.
// 1. Set up the concurrent conveyor final int queueCapacity = 128; final Runnable doneItem = new Runnable() { public void run() {} }; final QueuedPipe[] qs = new QueuedPipe[2]; qs[0] = new OneToOneConcurrentArrayQueue (queueCapacity); qs[1] = new OneToOneConcurrentArrayQueue (queueCapacity); final ConcurrentConveyor conveyor = concurrentConveyor(doneItem, qs); // 2. Set up the drainer thread final Thread drainer = new Thread(new Runnable() { private int submitterGoneCount;
Modifier and Type | Field and Description |
---|---|
static IdleStrategy |
SUBMIT_IDLER
Idling strategy used by the
submit() methods. |
static long |
SUBMIT_MAX_PARK_MICROS
Max park microseconds while waiting to submit to the work queue.
|
static int |
SUBMIT_SPIN_COUNT
How many times to busy-spin while waiting to submit to the work queue.
|
static int |
SUBMIT_YIELD_COUNT
How many times to yield while waiting to submit to the work queue.
|
Modifier and Type | Method and Description |
---|---|
void |
awaitDrainerGone()
Blocks until the drainer thread leaves.
|
void |
backpressureOff()
Lowers the
backpressure flag. |
void |
backpressureOn()
Raises the
backpressure flag, which will make the caller of
submit to block until the flag is lowered. |
void |
checkDrainerGone()
Checks whether the drainer thread has left and throws an exception if it
has.
|
static <E1> ConcurrentConveyor<E1> |
concurrentConveyor(E1 submitterGoneItem,
QueuedPipe<E1>... queues)
Creates a new concurrent conveyor.
|
int |
drain(int queueIndex,
Predicate<? super E> itemHandler)
Drains a batch of items from the queue at the supplied index to the
supplied
itemHandler . |
void |
drainerArrived()
Called by the drainer thread to signal that it has started draining the
queue.
|
void |
drainerDone()
Called by the drainer thread to signal that it is done draining the queue.
|
void |
drainerFailed(Throwable t)
Called by the drainer thread to signal that it has failed and will drain
no more items from the queue.
|
int |
drainTo(Collection<? super E> drain)
Drains a batch of items from the default queue into the supplied collection.
|
int |
drainTo(Collection<? super E> drain,
int limit)
Drains no more than
limit items from the default queue into the
supplied collection. |
int |
drainTo(int queueIndex,
Collection<? super E> drain)
Drains a batch of items from the queue at the supplied index into the
supplied collection.
|
int |
drainTo(int queueIndex,
Collection<? super E> drain,
int limit)
Drains no more than
limit items from the queue at the supplied
index into the supplied collection. |
boolean |
isDrainerGone() |
int |
liveQueueCount()
Returns the number of remaining live queues, i.e.,
queueCount()
minus the number of queues nulled out by calling removeQueue(int) . |
boolean |
offer(int queueIndex,
E item)
Offers an item to the queue at the given index.
|
boolean |
offer(Queue<E> queue,
E item)
Offers an item to the given queue.
|
QueuedPipe<E> |
queue(int index) |
int |
queueCount()
Returns the size of the array holding the concurrent queues.
|
boolean |
removeQueue(int index) |
void |
submit(Queue<E> queue,
E item)
Blocks until successfully inserting the given item to the given queue.
|
E |
submitterGoneItem() |
public static final int SUBMIT_SPIN_COUNT
public static final int SUBMIT_YIELD_COUNT
public static final long SUBMIT_MAX_PARK_MICROS
public static final IdleStrategy SUBMIT_IDLER
submit()
methods.public static <E1> ConcurrentConveyor<E1> concurrentConveyor(E1 submitterGoneItem, QueuedPipe<E1>... queues)
submitterGoneItem
- the object that a submitter thread can use to
signal it's done submittingqueues
- the concurrent queues the conveyor will managepublic final E submitterGoneItem()
public final int queueCount()
removeQueue(int)
, but this method will keep reporting the same
number. The intended use case for this method is giving the upper bound
for a loop that iterates over all queues. Since queue indices never
change, this number must stay the same.public final int liveQueueCount()
queueCount()
minus the number of queues nulled out by calling removeQueue(int)
.public final QueuedPipe<E> queue(int index)
public final boolean removeQueue(int index)
public final boolean offer(int queueIndex, E item)
public final boolean offer(Queue<E> queue, E item) throws ConcurrentConveyorException
ConcurrentConveyorException
- if the draining thread has already leftpublic final void submit(Queue<E> queue, E item) throws ConcurrentConveyorException
#backpressure
flag is raised on this conveyor at the time
the item has been submitted, further blocks until the flag is lowered.ConcurrentConveyorException
- if the current thread is interrupted
or the draining thread has already leftpublic final int drainTo(Collection<? super E> drain)
public final int drainTo(int queueIndex, Collection<? super E> drain)
public final int drain(int queueIndex, Predicate<? super E> itemHandler)
itemHandler
. Stops draining, after the itemHandler
returns false;public final int drainTo(Collection<? super E> drain, int limit)
limit
items from the default queue into the
supplied collection.public final int drainTo(int queueIndex, Collection<? super E> drain, int limit)
limit
items from the queue at the supplied
index into the supplied collection.public final void drainerArrived()
public final void drainerFailed(Throwable t)
t
- the drainer's failurepublic final void drainerDone()
public final boolean isDrainerGone()
public final void checkDrainerGone()
public final void awaitDrainerGone()
public final void backpressureOn()
backpressure
flag, which will make the caller of
submit
to block until the flag is lowered.public final void backpressureOff()
backpressure
flag.Copyright © 2022 Hazelcast, Inc.. All Rights Reserved.