com.hazelcast.ringbuffer.impl
Class RingbufferProxy<E>

java.lang.Object
  extended by com.hazelcast.spi.AbstractDistributedObject<RingbufferService>
      extended by com.hazelcast.ringbuffer.impl.RingbufferProxy<E>
Type Parameters:
E - the type of the elements in the ringbuffer.
All Implemented Interfaces:
DistributedObject, Ringbuffer<E>

public class RingbufferProxy<E>
extends AbstractDistributedObject<RingbufferService>
implements Ringbuffer<E>

The serverside proxy to access a Ringbuffer.


Field Summary
 
Fields inherited from class com.hazelcast.spi.AbstractDistributedObject
PARTITIONING_STRATEGY
 
Constructor Summary
RingbufferProxy(NodeEngine nodeEngine, RingbufferService service, String name, RingbufferConfig config)
           
 
Method Summary
 long add(E item)
          Adds an item to the tail of the Ringbuffer.
 ICompletableFuture<Long> addAllAsync(Collection<? extends E> collection, OverflowPolicy overflowPolicy)
          Adds all the items of a collection to the tail of the Ringbuffer.
 ICompletableFuture<Long> addAsync(E item, OverflowPolicy overflowPolicy)
          Asynchronously writes an item with a configurable OverflowPolicy.
 long capacity()
          Returns the capacity of this Ringbuffer.
 String getName()
          Returns the unique name for this DistributedObject.
 String getServiceName()
          Returns the service name for this object.
 long headSequence()
          Returns the sequence of the head.
 ICompletableFuture<ReadResultSet<E>> readManyAsync(long startSequence, int minCount, int maxCount, IFunction<E,Boolean> filter)
          Reads a batch of items from the Ringbuffer.
 E readOne(long sequence)
          Reads one item from the Ringbuffer.
 long remainingCapacity()
          Returns the remaining capacity of the ringbuffer.
 long size()
          Returns number of items in the ringbuffer.
 long tailSequence()
          Returns the sequence of the tail.
 String toString()
           
 
Methods inherited from class com.hazelcast.spi.AbstractDistributedObject
destroy, equals, getId, getNameAsPartitionAwareData, getNodeEngine, getOperationService, getPartitionKey, getService, hashCode, invalidate, postDestroy, throwNotActiveException
 
Methods inherited from class java.lang.Object
clone, finalize, getClass, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface com.hazelcast.core.DistributedObject
destroy, getId, getPartitionKey
 

Constructor Detail

RingbufferProxy

public RingbufferProxy(NodeEngine nodeEngine,
                       RingbufferService service,
                       String name,
                       RingbufferConfig config)
Method Detail

getName

public String getName()
Description copied from interface: DistributedObject
Returns the unique name for this DistributedObject. The returned value will never be null.

Specified by:
getName in interface DistributedObject
Returns:
the unique name for this object.

getServiceName

public String getServiceName()
Description copied from interface: DistributedObject
Returns the service name for this object.

Specified by:
getServiceName in interface DistributedObject
Specified by:
getServiceName in class AbstractDistributedObject<RingbufferService>
Returns:
the service name for this object.

capacity

public long capacity()
Description copied from interface: Ringbuffer
Returns the capacity of this Ringbuffer.

Specified by:
capacity in interface Ringbuffer<E>
Returns:
the capacity.

size

public long size()
Description copied from interface: Ringbuffer
Returns number of items in the ringbuffer. If no ttl is set, the size will always be equal to capacity after the head completed the first loop around the ring. This is because no items are getting retired.

Specified by:
size in interface Ringbuffer<E>
Returns:
the size.

tailSequence

public long tailSequence()
Description copied from interface: Ringbuffer
Returns the sequence of the tail. The tail is the side of the ringbuffer where the items are added to. The initial value of the tail is -1.

Specified by:
tailSequence in interface Ringbuffer<E>
Returns:
the sequence of the tail.

headSequence

public long headSequence()
Description copied from interface: Ringbuffer
Returns the sequence of the head. The head is the side of the ringbuffer where the oldest items in the ringbuffer are found. If the RingBuffer is empty, the head will be one more than the tail. The initial value of the head is 0 (1 more than tail).

Specified by:
headSequence in interface Ringbuffer<E>
Returns:
the sequence of the head.

remainingCapacity

public long remainingCapacity()
Description copied from interface: Ringbuffer
Returns the remaining capacity of the ringbuffer. The returned value could be stale as soon as it is returned. If ttl is not set, the remaining capacity will always be the capacity.

Specified by:
remainingCapacity in interface Ringbuffer<E>
Returns:
the remaining capacity.

add

public long add(E item)
Description copied from interface: Ringbuffer
Adds an item to the tail of the Ringbuffer. If there is no space in the Ringbuffer, the add will overwrite the oldest item in the ringbuffer no matter what the ttl is. For more control on this behavior, check the Ringbuffer.addAsync(Object, OverflowPolicy) and the OverflowPolicy. The returned value is the sequence of the added item. Using this sequence you can read the added item.

Using the sequence as id

This sequence will always be unique for this Ringbuffer instance so it can be used as a unique id generator if you are publishing items on this Ringbuffer. However you need to take care of correctly determining an initial id when any node uses the ringbuffer for the first time. The most reliable way to do that is to write a dummy item into the ringbuffer and use the returned sequence as initial id. On the reading side, this dummy item should be discard. Please keep in mind that this id is not the sequence of the item you are about to publish but from a previously published item. So it can't be used to find that item.

Specified by:
add in interface Ringbuffer<E>
Parameters:
item - the item to add.
Returns:
the sequence of the added item.
See Also:
Ringbuffer.addAsync(Object, OverflowPolicy)

addAsync

public ICompletableFuture<Long> addAsync(E item,
                                         OverflowPolicy overflowPolicy)
Description copied from interface: Ringbuffer
Asynchronously writes an item with a configurable OverflowPolicy. If there is space in the ringbuffer, the call will return the sequence of the written item. If there is no space, it depends on the overflow policy what happens:
  1. OverflowPolicy.OVERWRITE: we just overwrite the oldest item in the ringbuffer and we violate the ttl
  2. OverflowPolicy.FAIL: we return -1
The reason that FAIL exist is to give the opportunity to obey the ttl. If blocking behavior is required, this can be implemented using retrying in combination with a exponential backoff. Example: long sleepMs = 100; for (; ; ) { long result = ringbuffer.addAsync(item, FAIL).get(); if (result != -1) { break; } TimeUnit.MILLISECONDS.sleep(sleepMs); sleepMs = min(5000, sleepMs * 2); }

Specified by:
addAsync in interface Ringbuffer<E>
Parameters:
item - the item to add
overflowPolicy - the OverflowPolicy to use.
Returns:
the sequenceId of the added item, or -1 if the add failed.

readOne

public E readOne(long sequence)
          throws InterruptedException
Description copied from interface: Ringbuffer
Reads one item from the Ringbuffer. If the sequence is one beyond the current tail, this call blocks until an item is added. This means that the ringbuffer can be processed using the following idiom: Ringbuffer<String> ringbuffer = hz.getRingbuffer("rb"); long seq = ringbuffer.headSequence(); while(true){ String item = ringbuffer.readOne(seq); seq++; ... process item } This method is not destructive unlike e.g. a queue.take. So the same item can be read by multiple readers or it can be read multiple times by the same reader. Currently it isn't possible to control how long this call is going to block. In the future we could add e.g. tryReadOne(long sequence, long timeout, TimeUnit unit).

Specified by:
readOne in interface Ringbuffer<E>
Parameters:
sequence - the sequence of the item to read.
Returns:
the read item
Throws:
InterruptedException - todo

addAllAsync

public ICompletableFuture<Long> addAllAsync(Collection<? extends E> collection,
                                            OverflowPolicy overflowPolicy)
Description copied from interface: Ringbuffer
Adds all the items of a collection to the tail of the Ringbuffer. A addAll is likely to outperform multiple calls to Ringbuffer.add(Object) due to better io utilization and a reduced number of executed operations. If the batch is empty, the call is ignored. When the collection is not empty, the content is copied into a different data-structure. This means that:
  1. after this call completes, the collection can be re-used.
  2. the collection doesn't need to be serializable
If the collection is larger than the capacity of the ringbuffer, then the items that were written first will be overwritten. Therefor this call will not block. The items are inserted in the order of the Iterator of the collection. If an addAll is executed concurrently with an add or addAll, no guarantee is given that items are contiguous. The result of the future contains the sequenceId of the last written item

Specified by:
addAllAsync in interface Ringbuffer<E>
Parameters:
collection - the batch of items to add.
Returns:
the ICompletableFuture to synchronize on completion.

readManyAsync

public ICompletableFuture<ReadResultSet<E>> readManyAsync(long startSequence,
                                                          int minCount,
                                                          int maxCount,
                                                          IFunction<E,Boolean> filter)
Description copied from interface: Ringbuffer
Reads a batch of items from the Ringbuffer. If the number of available items after the first read item is smaller than the maxCount, these items are returned. So it could be the number of items read is smaller than the maxCount. If there are no items available, this call Reading a batch of items is likely to perform better because less overhead is involved. A filter can be provided to only select items that need to be read. If the filter is null, all items are read. If the filter is not null, only items where the filter function returns true are returned. Using filters is a good way to prevent getting items that are of no value to the receiver. This reduces the amount of IO and the number of operations being executed, and can result in a significant performance improvement.

Specified by:
readManyAsync in interface Ringbuffer<E>
Parameters:
startSequence - the startSequence of the first item to read.
minCount - the minimum number of items to read.
maxCount - the maximum number of items to read.
Returns:
a future containing the items read. Filter is allowed to be null, indicating there is no filter.

toString

public String toString()
Overrides:
toString in class AbstractDistributedObject<RingbufferService>


Copyright © 2015 Hazelcast, Inc.. All Rights Reserved.