Hazelcast C++ Client
Public Types | Public Member Functions | Protected Member Functions | List of all members
hazelcast::client::Ringbuffer< E > Class Template Referenceabstract

A Ringbuffer is a data-structure where the content is stored in a ring like structure. More...

#include <Ringbuffer.h>

+ Inheritance diagram for hazelcast::client::Ringbuffer< E >:

Public Types

enum  OverflowPolicy { OVERWRITE = 0, FAIL = 1 }
 Using this policy one can control the behavior what should to be done when an item is about to be added to the ringbuffer, but there is 0 remaining capacity. More...
 

Public Member Functions

virtual int64_t capacity ()=0
 Returns the capacity of this Ringbuffer. More...
 
virtual int64_t size ()=0
 Returns number of items in the ringbuffer. More...
 
virtual int64_t tailSequence ()=0
 Returns the sequence of the tail. More...
 
virtual int64_t headSequence ()=0
 Returns the sequence of the head. More...
 
virtual int64_t remainingCapacity ()=0
 Returns the remaining capacity of the ringbuffer. More...
 
virtual int64_t add (const E &item)=0
 Adds an item to the tail of the Ringbuffer. More...
 
virtual std::auto_ptr< E > readOne (int64_t sequence)=0
 Reads one item from the Ringbuffer. More...
 
virtual boost::shared_ptr< ICompletableFuture< int64_t > > addAsync (const E &item, OverflowPolicy overflowPolicy)=0
 Asynchronously writes an item with a configurable OverflowPolicy. More...
 
virtual boost::shared_ptr< ICompletableFuture< int64_t > > addAllAsync (const std::vector< E > &items, OverflowPolicy overflowPolicy)=0
 Adds all the items of a collection to the tail of the Ringbuffer. More...
 
template<typename IFUNCTION >
boost::shared_ptr< ICompletableFuture< ringbuffer::ReadResultSet< E > > > readManyAsync (int64_t startSequence, int32_t minCount, int32_t maxCount, const IFUNCTION *filter)
 Reads a batch of items from the Ringbuffer. More...
 
- Public Member Functions inherited from hazelcast::client::DistributedObject
virtual const std::string & getServiceName () const =0
 Returns the service name for this object.
 
virtual const std::string & getName () const =0
 Returns the unique name for this DistributedObject. More...
 
virtual void destroy ()=0
 Destroys this object cluster-wide. More...
 
virtual ~DistributedObject ()
 Destructor.
 

Protected Member Functions

virtual boost::shared_ptr< ICompletableFuture< ringbuffer::ReadResultSet< E > > > readManyAsyncInternal (int64_t startSequence, int32_t minCount, int32_t maxCount, const serialization::pimpl::Data &filterData)=0
 
virtual serialization::pimpl::SerializationService & getSerializationService ()=0
 

Detailed Description

template<typename E>
class hazelcast::client::Ringbuffer< E >

A Ringbuffer is a data-structure where the content is stored in a ring like structure.

A ringbuffer has a capacity so it won't grow beyond that capacity and endanger the stability of the system. If that capacity is exceeded, than the oldest item in the ringbuffer is overwritten.

The ringbuffer has 2 always incrementing sequences:

  1. tailSequence: this is the side where the youngest item is found. So the tail is the side of the ringbuffer where items are added to.
  2. headSequence: this is the side where the oldest items are found. So the head is the side where items gets discarded.

The items in the ringbuffer can be found by a sequence that is in between (inclusive) the head and tail sequence.

If data is read from a ringbuffer with a sequence that is smaller than the headSequence, it means that the data is not available anymore and a StaleSequenceException is thrown.

A Ringbuffer currently is not a distributed data-structure. So all data is stored in a single partition; comparable to the IQueue implementation. But we'll provide an option to partition the data in the near future.

A Ringbuffer can be used in a similar way as a queue, but one of the key differences is that a queue.take is destructive, meaning that only 1 thread is able to take an item. A ringbuffer.read is not destructive, so you can have multiple threads reading the same item multiple times.

The Ringbuffer is the backing data-structure for the reliable com.hazelcast.core.ITopic implementation. See com.hazelcast.config.ReliableTopicConfig.

Parameters
<E>

Member Enumeration Documentation

◆ OverflowPolicy

Using this policy one can control the behavior what should to be done when an item is about to be added to the ringbuffer, but there is 0 remaining capacity.

Overflowing happens when a time-to-live is set and the oldest item in the ringbuffer (the head) is not old enough to expire.

See also
Ringbuffer::addAsync(const E &, OverflowPolicy)
Ringbuffer::addAllAsync(const std::vector<E> &, OverflowPolicy)
Enumerator
OVERWRITE 

Using this policy the oldest item is overwritten no matter it is not old enough to retire.

Using this policy you are sacrificing the time-to-live in favor of being able to write.

Example: if there is a time-to-live of 30 seconds, the buffer is full and the oldest item in the ring has been placed a second ago, then there are 29 seconds remaining for that item. Using this policy you are going to overwrite no matter what.

FAIL 

Using this policy the call will fail immediately and the oldest item will not be overwritten before it is old enough to retire.

So this policy sacrificing the ability to write in favor of time-to-live.

The advantage of fail is that the caller can decide what to do since it doesn't trap the thread due to backoff.

Example: if there is a time-to-live of 30 seconds, the buffer is full and the oldest item in the ring has been placed a second ago, then there are 29 seconds remaining for that item. Using this policy you are not going to overwrite that item for the next 29 seconds.

Member Function Documentation

◆ add()

template<typename E>
virtual int64_t hazelcast::client::Ringbuffer< E >::add ( const E &  item)
pure virtual

Adds an item to the tail of the Ringbuffer.

If there is no space in the Ringbuffer, the add will overwrite the oldest item in the ringbuffer no matter what the ttl is. For more control on this behavior.

The returned value is the sequence of the added item. Using this sequence you can read the added item.

Using the sequence as id

This sequence will always be unique for this Ringbuffer instance so it can be used as a unique id generator if you are publishing items on this Ringbuffer. However you need to take care of correctly determining an initial id when any node uses the ringbuffer for the first time. The most reliable way to do that is to write a dummy item into the ringbuffer and use the returned sequence as initial id. On the reading side, this dummy item should be discard. Please keep in mind that this id is not the sequence of the item you are about to publish but from a previously published item. So it can't be used to find that item.

Parameters
itemthe item to add.
Returns
the sequence of the added item.

◆ addAllAsync()

template<typename E>
virtual boost::shared_ptr<ICompletableFuture<int64_t> > hazelcast::client::Ringbuffer< E >::addAllAsync ( const std::vector< E > &  items,
OverflowPolicy  overflowPolicy 
)
pure virtual

Adds all the items of a collection to the tail of the Ringbuffer.

An addAll is likely to outperform multiple calls to add(const E&) due to better io utilization and a reduced number of executed operations. If the batch is empty, the call is ignored.

If the collection is larger than the capacity of the Ringbuffer, then the items that were written first will be overwritten. Therefore this call will not block.

The items are inserted in the order of the Iterator of the collection. If an addAll is executed concurrently with an add or addAll, no guarantee is given that items are contiguous.

The result of the future contains the sequenceId of the last written item.

Parameters
collectionthe batch of items to add.
Returns
the ICompletableFuture to synchronize on completion.
Exceptions
IllegalArgumentExceptionif items is empty

◆ addAsync()

template<typename E>
virtual boost::shared_ptr<ICompletableFuture<int64_t> > hazelcast::client::Ringbuffer< E >::addAsync ( const E &  item,
OverflowPolicy  overflowPolicy 
)
pure virtual

Asynchronously writes an item with a configurable OverflowPolicy.

If there is space in the Ringbuffer, the call will return the sequence of the written item. If there is no space, it depends on the overflow policy what happens:

  1. OverflowPolicy#OVERWRITE: we just overwrite the oldest item in the Ringbuffer and we violate the ttl
  2. OverflowPolicy#FAIL: we return -1

The reason that FAIL exist is to give the opportunity to obey the ttl. If blocking behavior is required, this can be implemented using retrying in combination with an exponential backoff. Example:

int64_t sleepMs = 100;
for (; ; ) {
int64_t result = *(ringbuffer.addAsync(item, FAIL)->get());
if (result != -1) {
break;
}
util::sleepMillis(sleepMs);
sleepMs = min(5000, sleepMs * 2);
}
Parameters
itemthe item to add
overflowPolicythe OverflowPolicy to use.
Returns
the sequenceId of the added item, or -1 if the add failed.

◆ capacity()

template<typename E>
virtual int64_t hazelcast::client::Ringbuffer< E >::capacity ( )
pure virtual

Returns the capacity of this Ringbuffer.

Returns
the capacity.

◆ headSequence()

template<typename E>
virtual int64_t hazelcast::client::Ringbuffer< E >::headSequence ( )
pure virtual

Returns the sequence of the head.

The head is the side of the ringbuffer where the oldest items in the ringbuffer are found.

If the RingBuffer is empty, the head will be one more than the tail.

The initial value of the head is 0 (1 more than tail).

Returns
the sequence of the head.

◆ readManyAsync()

template<typename E>
template<typename IFUNCTION >
boost::shared_ptr<ICompletableFuture<ringbuffer::ReadResultSet<E> > > hazelcast::client::Ringbuffer< E >::readManyAsync ( int64_t  startSequence,
int32_t  minCount,
int32_t  maxCount,
const IFUNCTION *  filter 
)
inline

Reads a batch of items from the Ringbuffer.

If the number of available items after the first read item is smaller than the

maxCount

, these items are returned. So it could be the number of items read is smaller than the

maxCount

.

If there are less items available than

minCount

, then this call blocks.

Reading a batch of items is likely to perform better because less overhead is involved.

A filter can be provided to only select items that need to be read. If the filter is null, all items are read. If the filter is not null, only items where the filter function returns true are returned. Using filters is a good way to prevent getting items that are of no value to the receiver. This reduces the amount of IO and the number of operations being executed, and can result in a significant performance improvement.

For each item not available in the Ringbuffer an attempt is made to read it from the underlying {com.hazelcast.core.RingbufferStore} via multiple invocations of {com.hazelcast.core.RingbufferStore::load(long)}, if store is configured for the Ringbuffer. These cases may increase the execution time significantly depending on the implementation of the store. Note that exceptions thrown by the store are propagated to the caller.

Parameters
startSequencethe startSequence of the first item to read.
minCountthe minimum number of items to read.
maxCountthe maximum number of items to read.
filterthe filter. Filter is allowed to be null, indicating there is no filter.
Returns
a future containing the items read.
Exceptions
IllegalArgumentExceptionif startSequence is smaller than 0 or if startSequence larger than tailSequence() or if minCount smaller than 0 or if minCount larger than maxCount, or if maxCount larger than the capacity of the ringbuffer or if maxCount larger than 1000 (to prevent overload)

◆ readOne()

template<typename E>
virtual std::auto_ptr<E> hazelcast::client::Ringbuffer< E >::readOne ( int64_t  sequence)
pure virtual

Reads one item from the Ringbuffer.

If the sequence is one beyond the current tail, this call blocks until an item is added.

This means that the ringbuffer can be processed using the following idiom: Ringbuffer<String> ringbuffer = client.getRingbuffer<E>("rb"); long seq = ringbuffer.headSequence(); while(true){ String item = ringbuffer.readOne(seq); seq++; ... process item }

This method is not destructive unlike e.g. a queue.take. So the same item can be read by multiple readers or it can be read multiple times by the same reader.

Currently it isn't possible to control how long this call is going to block. In the future we could add e.g. tryReadOne(long sequence, long timeout, TimeUnit unit).

Parameters
sequencethe sequence of the item to read.
Returns
the read item
Exceptions
StaleSequenceExceptionif the sequence is smaller then headSequence(). Because a Ringbuffer won't store all event indefinitely, it can be that the data for the given sequence doesn't exist anymore and the StaleSequenceException is thrown. It is up to the caller to deal with this particular situation, e.g. throw an Exception or restart from the last known head. That is why the StaleSequenceException contains the last known head.
IllegalArgumentExceptionif sequence is smaller than 0 or larger than tailSequence()+1.
InterruptedExceptionif the call is interrupted while blocking.

◆ remainingCapacity()

template<typename E>
virtual int64_t hazelcast::client::Ringbuffer< E >::remainingCapacity ( )
pure virtual

Returns the remaining capacity of the ringbuffer.

The returned value could be stale as soon as it is returned.

If ttl is not set, the remaining capacity will always be the capacity.

Returns
the remaining capacity.

◆ size()

template<typename E>
virtual int64_t hazelcast::client::Ringbuffer< E >::size ( )
pure virtual

Returns number of items in the ringbuffer.

If no ttl is set, the size will always be equal to capacity after the head completed the first loop around the ring. This is because no items are getting retired.

Returns
the size.

◆ tailSequence()

template<typename E>
virtual int64_t hazelcast::client::Ringbuffer< E >::tailSequence ( )
pure virtual

Returns the sequence of the tail.

The tail is the side of the ringbuffer where the items are added to.

The initial value of the tail is -1.

Returns
the sequence of the tail.

The documentation for this class was generated from the following file: