Interface Ringbuffer<E>
- Type Parameters:
E
- The type of the elements that the Ringbuffer contains
- All Superinterfaces:
DistributedObject
The ringbuffer has 2 always incrementing sequences:
-
tailSequence()
: this is the side where the youngest item is found. So the tail is the side of the ringbuffer where items are added to. -
headSequence()
: this is the side where the oldest items are found. So the head is the side where items get discarded.
If data is read from a ringbuffer with a sequence that is smaller than the
headSequence, it means that the data is not available anymore and a
StaleSequenceException
is thrown.
A Ringbuffer currently is a replicated, but not partitioned data structure.
So all data is stored in a single partition, similarly to the IQueue
implementation.
A Ringbuffer can be used in a way similar to the IQueue, but one of the key
differences is that a queue.take
is destructive, meaning that only 1
thread is able to take an item. A ringbuffer.read
is not
destructive, so you can have multiple threads reading the same item multiple
times.
The Ringbuffer is the backing data structure for the reliable
ITopic
implementation. See
ReliableTopicConfig
.
A Ringbuffer can be configured to be backed by a
RingbufferStore
. All write methods will delegate
to the store to persist the items, while reader methods will try to read
items from the store if not found in the in-memory Ringbuffer.
When a Ringbuffer is constructed with a backing store, head and tail sequences are set to the following
tailSequence
:lastStoreSequence
headSequence
:lastStoreSequence
+ 1
lastStoreSequence
is the sequence of the previously last
stored item.
Supports split brain protection SplitBrainProtectionConfig
since 3.10 in
cluster versions 3.10 and higher.
Asynchronous methods
Asynchronous methods return a CompletionStage
that can be used to
chain further computation stages. Alternatively, a CompletableFuture
can be obtained via CompletionStage.toCompletableFuture()
to wait
for the operation to complete in a blocking way.
Actions supplied for dependent completions of default non-async methods and async methods
without an explicit Executor
argument are performed
by the ForkJoinPool.commonPool()
(unless it does not
support a parallelism level of at least 2, in which case a new Thread
is
created per task).
- Since:
- 3.5
-
Method Summary
Modifier and TypeMethodDescriptionlong
Adds an item to the tail of the Ringbuffer.addAllAsync
(Collection<? extends E> collection, OverflowPolicy overflowPolicy) Adds all the items of a collection to the tail of the Ringbuffer.addAsync
(E item, OverflowPolicy overflowPolicy) Asynchronously writes an item with a configurableOverflowPolicy
.long
capacity()
Returns the capacity of this Ringbuffer.long
Returns the sequence of the head.readManyAsync
(long startSequence, int minCount, int maxCount, IFunction<E, Boolean> filter) Reads a batch of items from the Ringbuffer.readOne
(long sequence) Reads one item from the Ringbuffer.long
Returns the remaining capacity of the ringbuffer.long
size()
Returns number of items in the Ringbuffer.long
Returns the sequence of the tail.Methods inherited from interface com.hazelcast.core.DistributedObject
destroy, getDestroyContextForTenant, getName, getPartitionKey, getServiceName
-
Method Details
-
capacity
long capacity()Returns the capacity of this Ringbuffer.- Returns:
- the capacity.
-
size
long size()Returns number of items in the Ringbuffer.If no TTL is set, the size will always be equal to capacity after the head completed the first loop around the ring. This is because no items are being removed.
- Returns:
- the size.
-
tailSequence
long tailSequence()Returns the sequence of the tail. The tail is the side of the Ringbuffer where the items are added to.The initial value of the tail is -1 if the Ringbuffer is not backed by a store, otherwise tail sequence will be set to the sequence of the previously last stored item.
- Returns:
- the sequence of the tail.
-
headSequence
long headSequence()Returns the sequence of the head. The head is the side of the Ringbuffer where the oldest items in the Ringbuffer are found.If the RingBuffer is empty, the head will be one more than the tail.
The initial value of the head is 0 if the Ringbuffer is not backed by a store, otherwise head sequence will be set to the sequence of the previously last stored item + 1. In both cases head sequence is 1 more than the tail sequence.
- Returns:
- the sequence of the head.
-
remainingCapacity
long remainingCapacity()Returns the remaining capacity of the ringbuffer. If TTL is enabled, then the returned capacity is equal to the total capacity of the ringbuffer minus the number of used slots in the ringbuffer which have not yet been marked as expired and cleaned up. Keep in mind that some slots could have expired items that have not yet been cleaned up and that the returned value could be stale as soon as it is returned.If TTL is disabled, the remaining capacity is equal to the total ringbuffer capacity.
- Returns:
- the remaining capacity
- See Also:
-
add
Adds an item to the tail of the Ringbuffer. If there is no space in the Ringbuffer, the add will overwrite the oldest item in the ringbuffer no matter what the TTL is. For more control on this behavior, check theaddAsync(Object, OverflowPolicy)
and theOverflowPolicy
.The returned value is the sequence of the added item. Using this sequence you can read the added item.
Using the sequence as ID
This sequence will always be unique for this Ringbuffer instance, so it can be used as a unique ID generator if you are publishing items on this Ringbuffer. However, you need to take care of correctly determining an initial ID when any node uses the Ringbuffer for the first time. The most reliable way to do that is to write a dummy item into the Ringbuffer and use the returned sequence as initial ID. On the reading side, this dummy item should be discarded. Please keep in mind that this ID is not the sequence of the item you are about to publish but from a previously published item. So it can't be used to find that item.If the Ringbuffer is backed by a
RingbufferStore
, the item gets persisted by the underlying store viaRingbufferStore.store(long, Object)
. Note that in case an exception is thrown by the store, it prevents the item from being added to the Ringbuffer, keeping the store, primary and the backups consistent.- Parameters:
item
- the item to add.- Returns:
- the sequence of the added item.
- Throws:
NullPointerException
- if item is null.- See Also:
-
addAsync
Asynchronously writes an item with a configurableOverflowPolicy
.If there is space in the Ringbuffer, the call will return the sequence of the written item. If there is no space, it depends on the overflow policy what happens:
OverflowPolicy.OVERWRITE
: we just overwrite the oldest item in the Ringbuffer and we violate the TTLOverflowPolicy.FAIL
: we return -1
The reason that FAIL exist is to give the opportunity to obey the TTL. If blocking behavior is required, this can be implemented using retrying in combination with an exponential backoff. Example:
long sleepMs = 100; for (; ; ) { long result = ringbuffer.addAsync(item, FAIL).toCompletableFuture().get(); if (result != -1) { break; } TimeUnit.MILLISECONDS.sleep(sleepMs); sleepMs = min(5000, sleepMs * 2); }
If the Ringbuffer is backed by a
RingbufferStore
, the item gets persisted by the underlying store viaRingbufferStore.store(long, Object)
. Note that in case an exception is thrown by the store, it prevents the item from being added to the Ringbuffer, keeping the store, primary and the backups consistent.- Parameters:
item
- the item to addoverflowPolicy
- the OverflowPolicy to use.- Returns:
- the sequenceId of the added item, or -1 if the add failed.
- Throws:
NullPointerException
- if item or overflowPolicy is null.
-
readOne
Reads one item from the Ringbuffer.If the sequence is one beyond the current tail, this call blocks until an item is added. This means that the ringbuffer can be processed using the following idiom:
Ringbuffer<String> ringbuffer = hz.getRingbuffer("rb"); long seq = ringbuffer.headSequence(); while(true){ String item = ringbuffer.readOne(seq); seq++; ... process item }
This method is not destructive unlike e.g. a
BaseQueue.take()
. So the same item can be read by multiple readers, or it can be read multiple times by the same reader.Currently, it isn't possible to control how long this call is going to block. In the future we could add e.g.
tryReadOne(long sequence, long timeout, TimeUnit unit)
.If the item is not in the Ringbuffer an attempt is made to read it from the underlying
RingbufferStore
viaRingbufferStore.load(long)
if store is configured for the Ringbuffer. These cases may increase the execution time significantly depending on the implementation of the store. Note that exceptions thrown by the store are propagated to the caller.- Parameters:
sequence
- the sequence of the item to read.- Returns:
- the read item
- Throws:
StaleSequenceException
- if the sequence is smaller thanheadSequence()
. Because a Ringbuffer won't store all event indefinitely, it can be that the data for the given sequence doesn't exist anymore and theStaleSequenceException
is thrown. It is up to the caller to deal with this particular situation, e.g. throw an Exception or restart from the last known head. That is why the StaleSequenceException contains the last known head.IllegalArgumentException
- if sequence is smaller than 0 or larger thantailSequence()
+1.InterruptedException
- if the call is interrupted while blocking.
-
addAllAsync
CompletionStage<Long> addAllAsync(@Nonnull Collection<? extends E> collection, @Nonnull OverflowPolicy overflowPolicy) Adds all the items of a collection to the tail of the Ringbuffer.An addAll is likely to outperform multiple calls to
add(Object)
due to better io utilization and a reduced number of executed operations. If the batch is empty, the call is ignored.When the collection is not empty, the content is copied into a different data-structure. This means that:
- after this call completes, the collection can be re-used.
- the collection doesn't need to be serializable
If the collection is larger than the capacity of the Ringbuffer, then the items that were written first will be overwritten. Therefore, this call will not block.
The items are inserted in the order of the Iterator of the collection. If an addAll is executed concurrently with an add or addAll, no guarantee is given that items are contiguous.
The result of the future contains the sequenceId of the last written item.
If the Ringbuffer is backed by a
RingbufferStore
, the items are persisted by the underlying store viaRingbufferStore.storeAll(long, Object[])
. Note that in case an exception is thrown by the store, it makes the Ringbuffer not adding any of the items to the primary and the backups. Keeping the store consistent with the primary and the backups is the responsibility of the store.- Parameters:
collection
- the batch of items to add.- Returns:
- the CompletionStage to synchronize on completion.
- Throws:
NullPointerException
- if batch is null, or if an item in this batch is null or if overflowPolicy is nullIllegalArgumentException
- if collection is empty
-
readManyAsync
CompletionStage<ReadResultSet<E>> readManyAsync(long startSequence, int minCount, int maxCount, @Nullable IFunction<E, Boolean> filter) Reads a batch of items from the Ringbuffer. If the number of available items after the first read item is smaller than themaxCount
, these items are returned. So it could be the number of items read is smaller than themaxCount
.If there are fewer items available than
minCount
, then this call blocks.Warning:
These blocking calls consume server memory and if there are many calls, it can be possible to see leaking memory or OOME.
Reading a batch of items is likely to perform better because less overhead is involved.
A filter can be provided to only select items that need to be read. If the filter is null, all items are read. If the filter is not null, only items where the filter function returns true are returned. Using filters is a good way to prevent getting items that are of no value to the receiver. This reduces the amount of IO and the number of operations being executed, and can result in a significant performance improvement.
For each item not available in the Ringbuffer an attempt is made to read it from the underlying
RingbufferStore
via multiple invocations ofRingbufferStore.load(long)
, if store is configured for the Ringbuffer. These cases may increase the execution time significantly depending on the implementation of the store. Note that exceptions thrown by the store are propagated to the caller.If the startSequence is smaller than the smallest sequence still available in the Ringbuffer (
headSequence()
, then the smallest available sequence will be used as the start sequence and the minimum/maximum number of items will be attempted to be read from there on.If the startSequence is bigger than the last available sequence in the Ringbuffer (
tailSequence()
), then the last available sequence plus one will be used as the start sequence and the call will block until further items become available, and it can read at least the minimum number of items.- Parameters:
startSequence
- the startSequence of the first item to read.minCount
- the minimum number of items to read.maxCount
- the maximum number of items to read.filter
- the filter. Filter is allowed to be null, indicating there is no filter.- Returns:
- a future containing the items read.
- Throws:
IllegalArgumentException
- if startSequence is smaller than 0 or if minCount smaller than 0 or if minCount larger than maxCount, or if maxCount larger than the capacity of the ringbuffer or if maxCount larger than 1000 (to prevent overload)
-