E
- public interface Ringbuffer<E> extends DistributedObject
StaleSequenceException
is thrown.
A Ringbuffer currently is not a distributed data-structure. So all data is stored in a single partition; comparable to the
IQueue implementation. But we'll provide an option to partition the data in the near future.
A Ringbuffer can be used in a similar way as a queue, but one of the key differences is that a queue.take is destructive,
meaning that only 1 thread is able to take an item. A ringbuffer.read is not destructive, so you can have multiple threads
reading the same item multiple times.
The Ringbuffer is the backing data-structure for the reliable ITopic
implementation. See
ReliableTopicConfig
.Modifier and Type | Method and Description |
---|---|
long |
add(E item)
Adds an item to the tail of the Ringbuffer.
|
ICompletableFuture<Long> |
addAllAsync(Collection<? extends E> collection,
OverflowPolicy overflowPolicy)
Adds all the items of a collection to the tail of the Ringbuffer.
|
ICompletableFuture<Long> |
addAsync(E item,
OverflowPolicy overflowPolicy)
Asynchronously writes an item with a configurable
OverflowPolicy . |
long |
capacity()
Returns the capacity of this Ringbuffer.
|
long |
headSequence()
Returns the sequence of the head.
|
ICompletableFuture<ReadResultSet<E>> |
readManyAsync(long startSequence,
int minCount,
int maxCount,
IFunction<E,Boolean> filter)
Reads a batch of items from the Ringbuffer.
|
E |
readOne(long sequence)
Reads one item from the Ringbuffer.
|
long |
remainingCapacity()
Returns the remaining capacity of the ringbuffer.
|
long |
size()
Returns number of items in the ringbuffer.
|
long |
tailSequence()
Returns the sequence of the tail.
|
destroy, getName, getPartitionKey, getServiceName
long capacity()
long size()
long tailSequence()
long headSequence()
long remainingCapacity()
long add(E item)
addAsync(Object, OverflowPolicy)
and the OverflowPolicy
.
The returned value is the sequence of the added item. Using this sequence you can read the added item.
item
- the item to add.NullPointerException
- if item is null.addAsync(Object, OverflowPolicy)
ICompletableFuture<Long> addAsync(E item, OverflowPolicy overflowPolicy)
OverflowPolicy
.
If there is space in the ringbuffer, the call will return the sequence of the written item.
If there is no space, it depends on the overflow policy what happens:
OverflowPolicy.OVERWRITE
: we just overwrite the oldest item in the ringbuffer and we violate
the ttlOverflowPolicy.FAIL
: we return -1
long sleepMs = 100;
for (; ; ) {
long result = ringbuffer.addAsync(item, FAIL).get();
if (result != -1) {
break;
}
TimeUnit.MILLISECONDS.sleep(sleepMs);
sleepMs = min(5000, sleepMs * 2);
}
item
- the item to addoverflowPolicy
- the OverflowPolicy to use.NullPointerException
- if item or overflowPolicy is null.E readOne(long sequence) throws InterruptedException
Ringbuffer<String> ringbuffer = hz.getRingbuffer("rb");
long seq = ringbuffer.headSequence();
while(true){
String item = ringbuffer.readOne(seq);
seq++;
... process item
}
This method is not destructive unlike e.g. a queue.take. So the same item can be read by multiple readers or it can be
read multiple times by the same reader.
Currently it isn't possible to control how long this call is going to block. In the future we could add e.g.
tryReadOne(long sequence, long timeout, TimeUnit unit).sequence
- the sequence of the item to read.StaleSequenceException
- if the sequence is smaller then headSequence()
. Because a
Ringbuffer won't store all event indefinitely, it can be that the data for the
given sequence doesn't exist anymore and the StaleSequenceException
is thrown. It is up to the caller to deal with this particular situation, e.g.
throw an Exception or restart from the last known head. That is why the
StaleSequenceException contains the last known head.IllegalArgumentException
- if sequence is smaller than 0 or larger than tailSequence()
+1.InterruptedException
- if the call is interrupted while blocking.ICompletableFuture<Long> addAllAsync(Collection<? extends E> collection, OverflowPolicy overflowPolicy)
add(Object)
due to better io utilization and a reduced number
of executed operations.
If the batch is empty, the call is ignored.
When the collection is not empty, the content is copied into a different data-structure. This means that:
collection
- the batch of items to add.NullPointerException
- if batch is null,
or if an item in this batch is null
or if overflowPolicy is nullIllegalArgumentException
- if collection is emptyICompletableFuture<ReadResultSet<E>> readManyAsync(long startSequence, int minCount, int maxCount, IFunction<E,Boolean> filter)
startSequence
- the startSequence of the first item to read.minCount
- the minimum number of items to read.maxCount
- the maximum number of items to read.filter
- the filter. Filter is allowed to be null, indicating there is no filter.IllegalArgumentException
- if startSequence is smaller than 0
or if startSequence larger than tailSequence()
or if minCount smaller than 0
or if minCount larger than maxCount,
or if maxCount larger than the capacity of the ringbuffer
or if maxCount larger than 1000 (to prevent overload)Copyright © 2017 Hazelcast, Inc.. All Rights Reserved.