public class ReliableTopicConfig extends Object implements IdentifiedDataSerializable, NamedConfig
ITopic
.
The reliable topic makes use of a Ringbuffer
to store the actual messages.
To configure the ringbuffer for a reliable topic, define a ringbuffer in the config with exactly the same name. It is very unlikely that you want to run with the default settings.
When a ReliableTopic starts, it will always start from the tail+1
item from the RingBuffer. It will not chew its way through all available
events but it will wait for the next item being published.
In the reliable topic, global order is always maintained, so all listeners will observe exactly the same order of sequence of messages.
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_READ_BATCH_SIZE
The default read batch size.
|
static boolean |
DEFAULT_STATISTICS_ENABLED
Default value for statistics enabled.
|
static TopicOverloadPolicy |
DEFAULT_TOPIC_OVERLOAD_POLICY
The default slow consumer policy.
|
Constructor and Description |
---|
ReliableTopicConfig() |
ReliableTopicConfig(ReliableTopicConfig config)
Creates a new ReliableTopicConfig by cloning an existing one.
|
ReliableTopicConfig(String name)
Creates a new ReliableTopicConfig with default settings.
|
Modifier and Type | Method and Description |
---|---|
ReliableTopicConfig |
addMessageListenerConfig(ListenerConfig listenerConfig)
Adds a message listener (listens for when messages are added or removed)
to this reliable topic.
|
boolean |
equals(Object o) |
int |
getClassId()
Returns type identifier for this class.
|
Executor |
getExecutor()
Gets the Executor that is going to process the events.
|
int |
getFactoryId()
Returns DataSerializableFactory factory ID for this class.
|
List<ListenerConfig> |
getMessageListenerConfigs()
Gets the list of message listeners (listens for when messages are added
or removed) for this reliable topic.
|
String |
getName()
Gets the name of the reliable topic.
|
int |
getReadBatchSize()
Gets the maximum number of items to read in a batch.
|
TopicOverloadPolicy |
getTopicOverloadPolicy()
Gets the TopicOverloadPolicy for this reliable topic.
|
int |
hashCode() |
boolean |
isStatisticsEnabled()
Checks if statistics are enabled for this reliable topic.
|
void |
readData(ObjectDataInput in)
Reads fields from the input stream
|
ReliableTopicConfig |
setExecutor(Executor executor)
Sets the executor that is going to process the event.
|
ReliableTopicConfig |
setMessageListenerConfigs(List<ListenerConfig> listenerConfigs)
Sets the list of message listeners (listens for when messages are added
or removed) for this topic.
|
ReliableTopicConfig |
setName(String name)
Sets the name of the reliable topic.
|
ReliableTopicConfig |
setReadBatchSize(int readBatchSize)
Sets the read batch size.
|
ReliableTopicConfig |
setStatisticsEnabled(boolean statisticsEnabled)
Enables or disables statistics for this reliable topic.
|
ReliableTopicConfig |
setTopicOverloadPolicy(TopicOverloadPolicy topicOverloadPolicy)
Sets the TopicOverloadPolicy for this reliable topic.
|
String |
toString() |
void |
writeData(ObjectDataOutput out)
Writes object fields to output stream
|
public static final int DEFAULT_READ_BATCH_SIZE
public static final TopicOverloadPolicy DEFAULT_TOPIC_OVERLOAD_POLICY
public static final boolean DEFAULT_STATISTICS_ENABLED
public ReliableTopicConfig()
public ReliableTopicConfig(String name)
public ReliableTopicConfig(ReliableTopicConfig config)
config
- the ReliableTopicConfig to clonepublic ReliableTopicConfig setName(String name)
setName
in interface NamedConfig
name
- the name of the reliable topicIllegalArgumentException
- if name is null
or an empty stringpublic String getName()
getName
in interface NamedConfig
public TopicOverloadPolicy getTopicOverloadPolicy()
public ReliableTopicConfig setTopicOverloadPolicy(TopicOverloadPolicy topicOverloadPolicy)
TopicOverloadPolicy
for more details about this setting.topicOverloadPolicy
- the new TopicOverloadPolicyIllegalArgumentException
- if topicOverloadPolicy is null
public Executor getExecutor()
If no Executor is selected, then the
ExecutionService.ASYNC_EXECUTOR
is used.
setExecutor(java.util.concurrent.Executor)
public ReliableTopicConfig setExecutor(Executor executor)
In some cases it is desirable to set a specific executor. For example, you may want to isolate a certain topic from other topics because it contains long running messages or very high priority messages.
A single executor can be shared between multiple Reliable topics, although it could take more time to process a message. If a single executor is not shared with other reliable topics, then the executor only needs to have a single thread.
executor
- the executor. if the executor is null, the
ExecutionService.ASYNC_EXECUTOR
will be used
to process the eventpublic int getReadBatchSize()
public ReliableTopicConfig setReadBatchSize(int readBatchSize)
The ReliableTopic tries to read a batch of messages from the ringbuffer. It will get at least one, but if there are more available, then it will try to get more to increase throughput. The maximum read batch size can be influenced using the read batch size.
Apart from influencing the number of messages to retrieve, the
readBatchSize
also determines how many messages will be processed
by the thread running the MessageListener
before it returns back
to the pool to look for other MessageListener
s that need to be
processed. The problem with returning to the pool and looking for new work
is that interacting with an executor is quite expensive due to contention
on the work-queue. The more work that can be done without retuning to the
pool, the smaller the overhead.
If the readBatchSize
is 10 and there are 50 messages available,
10 items are retrieved and processed consecutively before the thread goes
back to the pool and helps out with the processing of other messages.
If the readBatchSize
is 10 and there are 2 items available,
2 items are retrieved and processed consecutively.
If the readBatchSize
is an issue because a thread will be busy
too long with processing a single MessageListener
and it can't
help out other MessageListener
s, increase the size of the
threadpool so the other MessageListener
s don't need to wait for
a thread, but can be processed in parallel.
readBatchSize
- the maximum number of items to read in a batchIllegalArgumentException
- if the readBatchSize
is smaller than 1public boolean isStatisticsEnabled()
true
if enabled, false
otherwisepublic ReliableTopicConfig setStatisticsEnabled(boolean statisticsEnabled)
statisticsEnabled
- true
to enable statistics, false
to disablepublic ReliableTopicConfig setMessageListenerConfigs(List<ListenerConfig> listenerConfigs)
listenerConfigs
- the list of message listeners for this topicpublic List<ListenerConfig> getMessageListenerConfigs()
public ReliableTopicConfig addMessageListenerConfig(ListenerConfig listenerConfig)
listenerConfig
- the ListenerConfig to addNullPointerException
- if listenerConfig is null
public int getFactoryId()
IdentifiedDataSerializable
getFactoryId
in interface IdentifiedDataSerializable
public int getClassId()
IdentifiedDataSerializable
getClassId
in interface IdentifiedDataSerializable
public void writeData(ObjectDataOutput out) throws IOException
DataSerializable
writeData
in interface DataSerializable
out
- outputIOException
- if an I/O error occurs. In particular,
an IOException
may be thrown if the
output stream has been closed.public void readData(ObjectDataInput in) throws IOException
DataSerializable
readData
in interface DataSerializable
in
- inputIOException
- if an I/O error occurs. In particular,
an IOException
may be thrown if the
input stream has been closed.Copyright © 2023 Hazelcast, Inc.. All rights reserved.