E
- the type of elements in this ringbufferpublic class ClientRingbufferProxy<E> extends ClientProxy implements Ringbuffer<E>
Ringbuffer
.name
Constructor and Description |
---|
ClientRingbufferProxy(String serviceName,
String objectName,
ClientContext context) |
Modifier and Type | Method and Description |
---|---|
long |
add(E item)
Adds an item to the tail of the Ringbuffer.
|
ICompletableFuture<Long> |
addAllAsync(Collection<? extends E> collection,
OverflowPolicy overflowPolicy)
Adds all the items of a collection to the tail of the Ringbuffer.
|
ICompletableFuture<Long> |
addAsync(E item,
OverflowPolicy overflowPolicy)
Asynchronously writes an item with a configurable
OverflowPolicy . |
long |
capacity()
Returns the capacity of this Ringbuffer.
|
long |
headSequence()
Returns the sequence of the head.
|
protected <T> T |
invoke(ClientMessage clientMessage,
int partitionId) |
protected void |
onInitialize()
Called when proxy is created.
|
ICompletableFuture<ReadResultSet<E>> |
readManyAsync(long startSequence,
int minCount,
int maxCount,
IFunction<E,Boolean> filter)
Reads a batch of items from the Ringbuffer.
|
E |
readOne(long sequence)
Reads one item from the Ringbuffer.
|
long |
remainingCapacity()
Returns the remaining capacity of the ringbuffer.
|
long |
size()
Returns number of items in the Ringbuffer.
|
long |
tailSequence()
Returns the sequence of the tail.
|
String |
toString() |
deregisterListener, destroy, equals, getClient, getConnectedServerVersion, getContext, getDistributedObjectName, getId, getName, getPartitionKey, getSerializationService, getServiceName, hashCode, invoke, invoke, invokeOnAddress, invokeOnPartition, invokeOnPartitionInterruptibly, onDestroy, onShutdown, postDestroy, preDestroy, registerListener, setContext, toData, toObject
clone, finalize, getClass, notify, notifyAll, wait, wait, wait
destroy, getName, getPartitionKey, getServiceName
public ClientRingbufferProxy(String serviceName, String objectName, ClientContext context)
protected void onInitialize()
ClientProxy
onInitialize
in class ClientProxy
public long capacity()
Ringbuffer
capacity
in interface Ringbuffer<E>
public long size()
Ringbuffer
If no ttl is set, the size will always be equal to capacity after the head completed the first loop around the ring. This is because no items are getting retired.
size
in interface Ringbuffer<E>
public long tailSequence()
Ringbuffer
The initial value of the tail is -1 if the Ringbuffer is not backed by a store, otherwise tail sequence will be set to the sequence of the previously last stored item.
tailSequence
in interface Ringbuffer<E>
public long headSequence()
Ringbuffer
If the RingBuffer is empty, the head will be one more than the tail.
The initial value of the head is 0 if the Ringbuffer is not backed by a store, otherwise head sequence will be set to the sequence of the previously last stored item + 1. In both cases head sequence is 1 more than the tail sequence.
headSequence
in interface Ringbuffer<E>
public long remainingCapacity()
Ringbuffer
The returned value could be stale as soon as it is returned.
If ttl is not set, the remaining capacity will always be the capacity.
remainingCapacity
in interface Ringbuffer<E>
public long add(E item)
Ringbuffer
Ringbuffer.addAsync(Object, OverflowPolicy)
and the OverflowPolicy
.
The returned value is the sequence of the added item. Using this sequence you can read the added item.
If the Ringbuffer is backed by a RingbufferStore
, the item gets persisted by the underlying
store via RingbufferStore.store(long, Object)
. Note that in case an exception is thrown by the
store, it prevents the item from being added to the Ringbuffer, keeping the store, primary and the backups consistent.
add
in interface Ringbuffer<E>
item
- the item to add.Ringbuffer.addAsync(Object, OverflowPolicy)
public ICompletableFuture<Long> addAsync(E item, OverflowPolicy overflowPolicy)
Ringbuffer
OverflowPolicy
.
If there is space in the Ringbuffer, the call will return the sequence of the written item.
If there is no space, it depends on the overflow policy what happens:
OverflowPolicy.OVERWRITE
: we just overwrite the oldest item in the Ringbuffer and we violate
the ttlOverflowPolicy.FAIL
: we return -1 The reason that FAIL exist is to give the opportunity to obey the ttl. If blocking behavior is required, this can be implemented using retrying in combination with an exponential backoff. Example:
long sleepMs = 100;
for (; ; ) {
long result = ringbuffer.addAsync(item, FAIL).get();
if (result != -1) {
break;
}
TimeUnit.MILLISECONDS.sleep(sleepMs);
sleepMs = min(5000, sleepMs * 2);
}
If the Ringbuffer is backed by a RingbufferStore
, the item gets persisted by the underlying
store via RingbufferStore.store(long, Object)
. Note that in case an exception is thrown by the
store, it prevents the item from being added to the Ringbuffer, keeping the store, primary and the backups consistent.
addAsync
in interface Ringbuffer<E>
item
- the item to addoverflowPolicy
- the OverflowPolicy to use.public E readOne(long sequence) throws InterruptedException
Ringbuffer
If the sequence is one beyond the current tail, this call blocks until an item is added.
This means that the ringbuffer can be processed using the following idiom:
Ringbuffer<String> ringbuffer = hz.getRingbuffer("rb");
long seq = ringbuffer.headSequence();
while(true){
String item = ringbuffer.readOne(seq);
seq++;
... process item
}
This method is not destructive unlike e.g. a queue.take. So the same item can be read by multiple readers or it can be
read multiple times by the same reader.
Currently it isn't possible to control how long this call is going to block. In the future we could add e.g. tryReadOne(long sequence, long timeout, TimeUnit unit).
If the item is not in the Ringbuffer an attempt is made to read it from the underlying
RingbufferStore
via RingbufferStore.load(long)
if store is
configured for the Ringbuffer. These cases may increase the execution time significantly depending on the implementation
of the store. Note that exceptions thrown by the store are propagated to the caller.
readOne
in interface Ringbuffer<E>
sequence
- the sequence of the item to read.InterruptedException
- if the call is interrupted while blocking.public ICompletableFuture<Long> addAllAsync(Collection<? extends E> collection, OverflowPolicy overflowPolicy)
Ringbuffer
An addAll is likely to outperform multiple calls to Ringbuffer.add(Object)
due to better io utilization and a reduced number
of executed operations.
If the batch is empty, the call is ignored.
When the collection is not empty, the content is copied into a different data-structure. This means that:
If the collection is larger than the capacity of the Ringbuffer, then the items that were written first will be overwritten. Therefore this call will not block.
The items are inserted in the order of the Iterator of the collection. If an addAll is executed concurrently with an add or addAll, no guarantee is given that items are contiguous.
The result of the future contains the sequenceId of the last written item
If the Ringbuffer is backed by a RingbufferStore
, the items are persisted by the underlying
store via RingbufferStore.storeAll(long, Object[])
. Note that in case an exception is thrown by
the store, it makes the Ringbuffer not adding any of the items to the primary and the backups. Keeping the store
consistent with the primary and the backups is the responsibility of the store.
addAllAsync
in interface Ringbuffer<E>
collection
- the batch of items to add.public ICompletableFuture<ReadResultSet<E>> readManyAsync(long startSequence, int minCount, int maxCount, IFunction<E,Boolean> filter)
Ringbuffer
If there are less items available than minCount, then this call blocks.
Reading a batch of items is likely to perform better because less overhead is involved.
A filter can be provided to only select items that need to be read. If the filter is null, all items are read. If the filter is not null, only items where the filter function returns true are returned. Using filters is a good way to prevent getting items that are of no value to the receiver. This reduces the amount of IO and the number of operations being executed, and can result in a significant performance improvement.
For each item not available in the Ringbuffer an attempt is made to read it from the underlying
RingbufferStore
via multiple invocations of
RingbufferStore.load(long)
, if store is configured for the Ringbuffer. These cases may
increase the execution time significantly depending on the implementation of the store. Note that exceptions thrown by
the store are propagated to the caller.
readManyAsync
in interface Ringbuffer<E>
startSequence
- the startSequence of the first item to read.minCount
- the minimum number of items to read.maxCount
- the maximum number of items to read.filter
- the filter. Filter is allowed to be null, indicating there is no filter.protected <T> T invoke(ClientMessage clientMessage, int partitionId)
Copyright © 2018 Hazelcast, Inc.. All Rights Reserved.