public class RingbufferCacheEventJournalImpl extends Object implements CacheEventJournal
Ringbuffer
.
It will add all journal events into a RingbufferContainer
with the provided namespace
and partition ID and allows checking if the cache has a configured event journal.
Adapts the EventJournalConfig
to the RingbufferConfig
when creating the ringbuffer.Constructor and Description |
---|
RingbufferCacheEventJournalImpl(NodeEngine engine) |
Modifier and Type | Method and Description |
---|---|
void |
cleanup(ObjectNamespace namespace,
int partitionId)
Cleans up the event journal by removing any expired items.
|
void |
destroy(ObjectNamespace namespace,
int partitionId)
Destroys the event journal for the given object and partition ID.
|
EventJournalConfig |
getEventJournalConfig(ObjectNamespace namespace)
Returns the event journal configuration or
null if there is none or the journal is disabled
for the given namespace . |
WaitNotifyKey |
getWaitNotifyKey(ObjectNamespace namespace,
int partitionId)
Return the
WaitNotifyKey for objects waiting and notifying on the event journal. |
boolean |
hasEventJournal(ObjectNamespace namespace)
Returns
true if the object has a configured and enabled event journal. |
void |
isAvailableOrNextSequence(ObjectNamespace namespace,
int partitionId,
long sequence)
Checks if the sequence is of an item that can be read immediately
or is the sequence of the next item to be added into the event journal.
|
boolean |
isNextAvailableSequence(ObjectNamespace namespace,
int partitionId,
long sequence)
Checks if the
sequence is the sequence of the next event to
be added to the event journal. |
boolean |
isPersistenceEnabled(ObjectNamespace namespace,
int partitionId)
Returns
true if the event journal has persistence enabled and
can be queried for events older than the
EventJournal.oldestSequence(ObjectNamespace, int) . |
long |
newestSequence(ObjectNamespace namespace,
int partitionId)
Returns the sequence of the newest event stored in the event journal.
|
long |
oldestSequence(ObjectNamespace namespace,
int partitionId)
Returns the sequence of the oldest event stored in the event journal.
|
<T> long |
readMany(ObjectNamespace namespace,
int partitionId,
long beginSequence,
ReadResultSetImpl<InternalEventJournalCacheEvent,T> resultSet)
Reads events from the journal in batches.
|
protected Data |
toData(Object val) |
RingbufferConfig |
toRingbufferConfig(EventJournalConfig config,
ObjectNamespace namespace)
Creates a new
RingbufferConfig for a ringbuffer that will keep
event journal events for a single partition. |
void |
writeCreatedEvent(EventJournalConfig journalConfig,
ObjectNamespace namespace,
int partitionId,
Data key,
Object value)
Writes an
CacheEventType.CREATED to the event journal. |
void |
writeEvictEvent(EventJournalConfig journalConfig,
ObjectNamespace namespace,
int partitionId,
Data key,
Object value)
Writes an
CacheEventType.EVICTED to the event journal. |
void |
writeExpiredEvent(EventJournalConfig journalConfig,
ObjectNamespace namespace,
int partitionId,
Data key,
Object value)
Writes an
CacheEventType.EXPIRED to the event journal. |
void |
writeRemoveEvent(EventJournalConfig journalConfig,
ObjectNamespace namespace,
int partitionId,
Data key,
Object value)
Writes an
CacheEventType.REMOVED to the event journal. |
void |
writeUpdateEvent(EventJournalConfig journalConfig,
ObjectNamespace namespace,
int partitionId,
Data key,
Object oldValue,
Object newValue)
Writes an
CacheEventType.UPDATED to the event journal. |
public RingbufferCacheEventJournalImpl(NodeEngine engine)
public void writeUpdateEvent(EventJournalConfig journalConfig, ObjectNamespace namespace, int partitionId, Data key, Object oldValue, Object newValue)
CacheEventJournal
CacheEventType.UPDATED
to the event journal.
If there is no event journal configured for this cache, the method will do nothing.
If an event is added to the event journal, all parked operations waiting for
new events on that journal will be unparked.writeUpdateEvent
in interface CacheEventJournal
journalConfig
- the event journal config for the cache in which the event occurrednamespace
- the cache namespace, containing the full prefixed cache namepartitionId
- the entry key partitionkey
- the entry keyoldValue
- the old valuenewValue
- the new valuepublic void writeCreatedEvent(EventJournalConfig journalConfig, ObjectNamespace namespace, int partitionId, Data key, Object value)
CacheEventJournal
CacheEventType.CREATED
to the event journal.
If there is no event journal configured for this cache, the method will do nothing.
If an event is added to the event journal, all parked operations waiting for
new events on that journal will be unparked.writeCreatedEvent
in interface CacheEventJournal
journalConfig
- the event journal config for the cache in which the event occurrednamespace
- the cache namespace, containing the full prefixed cache namepartitionId
- the entry key partitionkey
- the entry keyvalue
- the entry valuepublic void writeRemoveEvent(EventJournalConfig journalConfig, ObjectNamespace namespace, int partitionId, Data key, Object value)
CacheEventJournal
CacheEventType.REMOVED
to the event journal.
If there is no event journal configured for this cache, the method will do nothing.
If an event is added to the event journal, all parked operations waiting for
new events on that journal will be unparked.writeRemoveEvent
in interface CacheEventJournal
journalConfig
- the event journal config for the cache in which the event occurrednamespace
- the cache namespace, containing the full prefixed cache namepartitionId
- the entry key partitionkey
- the entry keyvalue
- the entry valuepublic void writeEvictEvent(EventJournalConfig journalConfig, ObjectNamespace namespace, int partitionId, Data key, Object value)
CacheEventJournal
CacheEventType.EVICTED
to the event journal.
If there is no event journal configured for this cache, the method will do nothing.
If an event is added to the event journal, all parked operations waiting for
new events on that journal will be unparked.writeEvictEvent
in interface CacheEventJournal
journalConfig
- the event journal config for the cache in which the event occurrednamespace
- the cache namespace, containing the full prefixed cache namepartitionId
- the entry key partitionkey
- the entry keyvalue
- the entry valuepublic void writeExpiredEvent(EventJournalConfig journalConfig, ObjectNamespace namespace, int partitionId, Data key, Object value)
CacheEventJournal
CacheEventType.EXPIRED
to the event journal.
If there is no event journal configured for this cache, the method will do nothing.
If an event is added to the event journal, all parked operations waiting for
new events on that journal will be unparked.writeExpiredEvent
in interface CacheEventJournal
journalConfig
- the event journal config for the cache in which the event occurrednamespace
- the cache namespace, containing the full prefixed cache namepartitionId
- the entry key partitionkey
- the entry keyvalue
- the entry valuepublic long newestSequence(ObjectNamespace namespace, int partitionId)
EventJournal
newestSequence
in interface EventJournal<InternalEventJournalCacheEvent>
namespace
- the object namespacepartitionId
- the partition ID of the event journalpublic long oldestSequence(ObjectNamespace namespace, int partitionId)
EventJournal
oldestSequence
in interface EventJournal<InternalEventJournalCacheEvent>
namespace
- the object namespacepartitionId
- the partition ID of the event journalpublic boolean isPersistenceEnabled(ObjectNamespace namespace, int partitionId)
EventJournal
true
if the event journal has persistence enabled and
can be queried for events older than the
EventJournal.oldestSequence(ObjectNamespace, int)
. If the journal is not
backed by a persistent store, this method will return false
.isPersistenceEnabled
in interface EventJournal<InternalEventJournalCacheEvent>
namespace
- the object namespacepartitionId
- the partition ID of the event journalpublic void destroy(ObjectNamespace namespace, int partitionId)
EventJournal
destroy
in interface EventJournal<InternalEventJournalCacheEvent>
namespace
- the object namespacepartitionId
- the partition ID of the entries in the journalpublic void isAvailableOrNextSequence(ObjectNamespace namespace, int partitionId, long sequence)
EventJournal
EventJournal.newestSequence(ObjectNamespace, int)
, the caller can use this method
to check the sequence before performing a possibly blocking read.isAvailableOrNextSequence
in interface EventJournal<InternalEventJournalCacheEvent>
namespace
- the object namespacepartitionId
- the partition ID of the entries in the journalsequence
- the sequence wanting to be readpublic boolean isNextAvailableSequence(ObjectNamespace namespace, int partitionId, long sequence)
EventJournal
sequence
is the sequence of the next event to
be added to the event journal.isNextAvailableSequence
in interface EventJournal<InternalEventJournalCacheEvent>
namespace
- the object namespacepartitionId
- the partition ID of the entries in the journalsequence
- the sequence to be checkedtrue
if the sequence
is one greater
than the sequence of the last event, false
otherwisepublic WaitNotifyKey getWaitNotifyKey(ObjectNamespace namespace, int partitionId)
EventJournal
WaitNotifyKey
for objects waiting and notifying on the event journal.getWaitNotifyKey
in interface EventJournal<InternalEventJournalCacheEvent>
namespace
- the object namespacepartitionId
- the partition ID of the entries in the journalpublic <T> long readMany(ObjectNamespace namespace, int partitionId, long beginSequence, ReadResultSetImpl<InternalEventJournalCacheEvent,T> resultSet)
EventJournal
ReadResultSetImpl.isMaxSizeReached()
or until the journal is exhausted. The resultSet
allows
filtering and projections on journal items so that the caller
can control which data is returned.
If the set has reached its max size, the returned sequence is one greater than the sequence of the last item in the set. In other cases it means that the set hasn't reached its full size because we have reached the end of the event journal. In this case the returned sequence is one greater than the sequence of the last stored event.
readMany
in interface EventJournal<InternalEventJournalCacheEvent>
T
- the return type of the projected eventsnamespace
- the object namespacepartitionId
- the partition ID of the entries in the journalbeginSequence
- the sequence of the first item to read.resultSet
- the container for read, filtered and projected eventsEventJournal.isAvailableOrNextSequence(ObjectNamespace, int, long)
public void cleanup(ObjectNamespace namespace, int partitionId)
EventJournal
cleanup
in interface EventJournal<InternalEventJournalCacheEvent>
namespace
- the object namespacepartitionId
- the partition ID of the entries in the journalpublic boolean hasEventJournal(ObjectNamespace namespace)
true
if the object has a configured and enabled event journal.
NOTE: The cache config should have already been created in the cache service before
invoking this method.hasEventJournal
in interface EventJournal<InternalEventJournalCacheEvent>
namespace
- the cache namespace, containing the full prefixed cache nametrue
if the object has a configured and enabled event journal, false
otherwiseCacheNotExistsException
- if the cache configuration was not foundpublic EventJournalConfig getEventJournalConfig(ObjectNamespace namespace)
null
if there is none or the journal is disabled
for the given namespace
.getEventJournalConfig
in interface EventJournal<InternalEventJournalCacheEvent>
namespace
- the object namespace of the specific distributed objectnull
if the journal is not enabled or availableCacheNotExistsException
- if the cache configuration was not foundpublic RingbufferConfig toRingbufferConfig(EventJournalConfig config, ObjectNamespace namespace)
RingbufferConfig
for a ringbuffer that will keep
event journal events for a single partition.toRingbufferConfig
in interface EventJournal<InternalEventJournalCacheEvent>
config
- the event journal confignamespace
- the object namespaceCacheNotExistsException
- if the cache configuration was not foundCopyright © 2022 Hazelcast, Inc.. All Rights Reserved.