18 #ifndef HAZELCAST_CLIENT_RELIABLETOPIC_H_
19 #define HAZELCAST_CLIENT_RELIABLETOPIC_H_
21 #include "hazelcast/client/proxy/ReliableTopicImpl.h"
22 #include "hazelcast/client/Ringbuffer.h"
23 #include "hazelcast/client/DataArray.h"
24 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
59 serialization::pimpl::Data data = context->getSerializationService().toData<E>(message);
60 proxy::ReliableTopicImpl::publish(data);
81 int id = ++runnerCounter;
82 boost::shared_ptr<MessageRunner<E> > runner(
new MessageRunner<E>(
id, &listener, ringbuffer.get(), getName(),
83 &context->getSerializationService(), config));
84 runnersMap.put(
id, runner);
86 return util::IOUtil::to_string<int>(id);
98 int id = util::IOUtil::to_value<int>(registrationId);
99 boost::shared_ptr<MessageRunner<E> > runner = runnersMap.get(
id);
100 if (NULL == runner) {
104 runnersMap.remove(
id);
108 virtual void onDestroy() {
110 std::vector<std::pair<int, boost::shared_ptr<MessageRunner<E> > > > runners = runnersMap.clear();
111 for (
typename std::vector<std::pair<
int, boost::shared_ptr<MessageRunner<E> > > >::const_iterator it = runners.begin();
112 it != runners.end(); ++it) {
113 it->second->cancel();
117 ringbuffer->destroy();
120 ReliableTopic(
const std::string &instanceName, spi::ClientContext *context, boost::shared_ptr<Ringbuffer<topic::impl::reliable::ReliableTopicMessage> > rb)
121 : proxy::ReliableTopicImpl(instanceName, context, rb) {
124 template <
typename T>
125 class MessageRunner : impl::ExecutionCallback<DataArray<topic::impl::reliable::ReliableTopicMessage> > {
127 MessageRunner(
int id, topic::ReliableMessageListener<T> *listener,
128 Ringbuffer<topic::impl::reliable::ReliableTopicMessage> *rb,
129 const std::string &topicName, serialization::pimpl::SerializationService *service,
130 const config::ReliableTopicConfig *reliableTopicConfig)
131 : cancelled(false), logger(util::ILogger::getLogger()), name(topicName), executor(rb),
132 serializationService(service), config(reliableTopicConfig) {
134 this->listener = listener;
135 this->ringbuffer = rb;
138 int64_t initialSequence = listener->retrieveInitialSequence();
139 if (initialSequence == -1) {
140 initialSequence = ringbuffer->tailSequence() + 1;
142 this->sequence = initialSequence;
144 this->executor.start();
148 virtual ~MessageRunner() { }
155 topic::impl::reliable::ReliableTopicExecutor::Message m;
156 m.type = topic::impl::reliable::ReliableTopicExecutor::GET_ONE_MESSAGE;
158 m.sequence = sequence;
159 m.maxCount = config->getReadBatchSize();
164 void onResponse(DataArray<topic::impl::reliable::ReliableTopicMessage> *allMessages) {
169 size_t numMessages = allMessages->size();
173 for (
size_t i = 0; i < numMessages; ++i) {
175 listener->storeSequence(sequence);
176 process(allMessages->get(i));
177 }
catch (exception::IException &e) {
191 void onFailure(
const exception::ProtocolException *t) {
196 int32_t err = t->getErrorCode();
197 if (protocol::EXECUTION == err &&
198 protocol::STALE_SEQUENCE == t->getCauseErrorCode()) {
200 int64_t remoteHeadSeq = ringbuffer->headSequence();
202 if (listener->isLossTolerant()) {
203 if (logger.isFinestEnabled()) {
204 std::ostringstream out;
205 out <<
"MessageListener " <<
id <<
" on topic: " << name <<
" ran into a stale sequence. "
206 <<
"Jumping from oldSequence: " << sequence <<
" to sequence: " << remoteHeadSeq;
207 logger.finest(out.str());
209 sequence = remoteHeadSeq;
214 std::ostringstream out;
215 out <<
"Terminating MessageListener:" <<
id <<
" on topic: " << name <<
"Reason: The listener "
216 "was too slow or the retention period of the message has been violated. " <<
"head: "
217 << remoteHeadSeq <<
" sequence:" << sequence;
218 logger.warning(out.str());
219 }
else if (protocol::HAZELCAST_INSTANCE_NOT_ACTIVE == err) {
220 if (logger.isFinestEnabled()) {
221 std::ostringstream out;
222 out <<
"Terminating MessageListener " <<
id <<
" on topic: " << name <<
". "
223 <<
" Reason: HazelcastInstance is shutting down";
224 logger.finest(out.str());
226 }
else if (protocol::DISTRIBUTED_OBJECT_DESTROYED == err) {
227 if (logger.isFinestEnabled()) {
228 std::ostringstream out;
229 out <<
"Terminating MessageListener " <<
id <<
" on topic: " << name <<
" Reason: Topic is destroyed";
230 logger.finest(out.str());
233 std::ostringstream out;
234 out <<
"Terminating MessageListener " <<
id <<
" on topic: " << name <<
". "
235 <<
" Reason: Unhandled exception, details:" << t->what();
236 logger.warning(out.str());
247 void process(
const topic::impl::reliable::ReliableTopicMessage *message) {
249 listener->onMessage(toMessage(message));
252 std::auto_ptr<topic::Message<T> > toMessage(
const topic::impl::reliable::ReliableTopicMessage *m) {
253 std::auto_ptr<Member> member;
254 const Address *addr = m->getPublisherAddress();
256 member = std::auto_ptr<Member>(
new Member(*addr));
258 std::auto_ptr<T> msg = serializationService->toObject<T>(m->getPayload());
259 return std::auto_ptr<topic::Message<T> >(
new topic::impl::MessageImpl<T>(name, msg, m->getPublishTime(), member));
262 bool terminate(
const exception::IException &failure) {
268 bool terminate = listener->isTerminal(failure);
270 std::ostringstream out;
271 out <<
"Terminating MessageListener " <<
id <<
" on topic: " << name <<
". "
272 <<
" Reason: Unhandled exception, details: " << failure.what();
273 logger.warning(out.str());
275 if (logger.isFinestEnabled()) {
276 std::ostringstream out;
277 out <<
"MessageListener " <<
id <<
" on topic: " << name <<
". "
278 <<
" ran into an exception, details:" << failure.what();
279 logger.finest(out.str());
283 }
catch (exception::IException &t) {
284 std::ostringstream out;
285 out <<
"Terminating MessageListener " <<
id <<
" on topic: " << name <<
". "
286 <<
" Reason: Unhandled exception while calling ReliableMessageListener::isTerminal() method. "
288 logger.warning(out.str());
295 topic::ReliableMessageListener<T> *listener;
297 Ringbuffer<topic::impl::reliable::ReliableTopicMessage> *ringbuffer;
299 util::AtomicBoolean cancelled;
300 util::ILogger &logger;
301 const std::string &name;
302 topic::impl::reliable::ReliableTopicExecutor executor;
303 serialization::pimpl::SerializationService *serializationService;
304 const config::ReliableTopicConfig *config;
307 util::SynchronizedMap<int, MessageRunner<E> > runnersMap;
308 util::AtomicInt runnerCounter;
313 #endif //HAZELCAST_CLIENT_RELIABLETOPIC_H_
bool removeMessageListener(const std::string ®istrationId)
Stops receiving messages for the given message listener.
Definition: ReliableTopic.h:97
void publish(const E *message)
Publishes the message to all subscribers of this topic Current implementation only supports DISCARD_O...
Definition: ReliableTopic.h:58
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
Definition: ReliableTopic.h:45
std::string addMessageListener(topic::ReliableMessageListener< E > &listener)
Subscribes to this topic.
Definition: ReliableTopic.h:80
Definition: MapEntryView.h:32
Hazelcast Client enables you to do all Hazelcast operations without being a member of the cluster...
Definition: HazelcastClient.h:459