Hazelcast C++ Client
ReliableTopic.h
1 /*
2  * Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 //
17 // Created by ihsan demir on 31 May 2016.
18 #ifndef HAZELCAST_CLIENT_RELIABLETOPIC_H_
19 #define HAZELCAST_CLIENT_RELIABLETOPIC_H_
20 
21 #include "hazelcast/client/proxy/ReliableTopicImpl.h"
22 #include "hazelcast/client/Ringbuffer.h"
23 #include "hazelcast/client/DataArray.h"
24 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
25 
26 #include <string>
27 #include <memory>
28 
29 namespace hazelcast {
30  namespace client {
31 
44  template<typename E>
45  class ReliableTopic : public proxy::ReliableTopicImpl {
46  friend class impl::HazelcastClientInstanceImpl;
47 
48  public:
49 
58  void publish(const E *message) {
59  serialization::pimpl::Data data = getContext().getSerializationService().template toData<E>(message);
60  proxy::ReliableTopicImpl::publish(data);
61  }
62 
80  std::string addMessageListener(topic::ReliableMessageListener<E> &listener) {
81  int id = ++runnerCounter;
82  boost::shared_ptr<MessageRunner < E> >
83  runner(new MessageRunner<E>(id, &listener, ringbuffer.get(), getName(),
84  &getContext().getSerializationService(), config, logger));
85  runnersMap.put(id, runner);
86  runner->next();
87  return util::IOUtil::to_string<int>(id);
88  }
89 
98  bool removeMessageListener(const std::string &registrationId) {
99  int id = util::IOUtil::to_value<int>(registrationId);
100  boost::shared_ptr<MessageRunner < E> > runner = runnersMap.get(id);
101  if (NULL == runner) {
102  return false;
103  }
104  runner->cancel();
105  runnersMap.remove(id);
106  return true;
107  };
108  protected:
109  virtual void onDestroy() {
110  // cancel all runners
111  std::vector<std::pair<int, boost::shared_ptr<MessageRunner < E> > > > runners = runnersMap.clear();
112  for (typename std::vector<std::pair<int, boost::shared_ptr<MessageRunner < E> > > >
113  ::const_iterator it = runners.begin();it != runners.end();++it) {
114  it->second->cancel();
115  }
116 
117  // destroy the underlying ringbuffer
118  ringbuffer->destroy();
119  }
120 
121  private:
122  ReliableTopic(const std::string &instanceName, spi::ClientContext *context,
124  : proxy::ReliableTopicImpl(instanceName, context, rb) {
125  }
126 
127  template<typename T>
128  class MessageRunner
129  : ExecutionCallback<DataArray<topic::impl::reliable::ReliableTopicMessage> > {
130  public:
131  MessageRunner(int id, topic::ReliableMessageListener<T> *listener,
133  const std::string &topicName, serialization::pimpl::SerializationService *service,
134  const config::ReliableTopicConfig *reliableTopicConfig, util::ILogger &logger)
135  : cancelled(false), logger(logger), name(topicName), executor(*rb, logger),
136  serializationService(service), config(reliableTopicConfig) {
137  this->id = id;
138  this->listener = listener;
139  this->ringbuffer = rb;
140 
141  // we are going to listen to next publication. We don't care about what already has been published.
142  int64_t initialSequence = listener->retrieveInitialSequence();
143  if (initialSequence == -1) {
144  initialSequence = ringbuffer->tailSequence() + 1;
145  }
146  this->sequence = initialSequence;
147 
148  this->executor.start();
149  }
150 
151 
152  virtual ~MessageRunner() {}
153 
154  void next() {
155  if (cancelled) {
156  return;
157  }
158 
159  topic::impl::reliable::ReliableTopicExecutor::Message m;
160  m.type = topic::impl::reliable::ReliableTopicExecutor::GET_ONE_MESSAGE;
161  m.callback = this;
162  m.sequence = sequence;
163  m.maxCount = config->getReadBatchSize();
164  executor.execute(m);
165  }
166 
167  // This method is called from the provided executor.
168  void onResponse(
169  const boost::shared_ptr<DataArray<topic::impl::reliable::ReliableTopicMessage> > &allMessages) {
170  if (cancelled) {
171  return;
172  }
173 
174  size_t numMessages = allMessages->size();
175 
176  // we process all messages in batch. So we don't release the thread and reschedule ourselves;
177  // but we'll process whatever was received in 1 go.
178  for (size_t i = 0; i < numMessages; ++i) {
179  try {
180  listener->storeSequence(sequence);
181  process(allMessages->get(i));
182  } catch (exception::IException &e) {
183  if (terminate(e)) {
184  cancel();
185  return;
186  }
187  }
188 
189  sequence++;
190  }
191 
192  next();
193  }
194 
195  // This method is called from the provided executor.
196  void onFailure(const boost::shared_ptr<exception::IException> &throwable) {
197  if (cancelled) {
198  return;
199  }
200 
201  int32_t err = throwable->getErrorCode();
202  if (protocol::TIMEOUT == err) {
203  if (logger.isFinestEnabled()) {
204  logger.finest() << "MessageListener " << listener << " on topic: " << name << " timed out. "
205  << "Continuing from last known sequence: " << sequence;
206  }
207  next();
208  return;
209  } else if (protocol::EXECUTION == err &&
210  protocol::STALE_SEQUENCE == throwable->getCauseErrorCode()) {
211  // StaleSequenceException.getHeadSeq() is not available on the client-side, see #7317
212  int64_t remoteHeadSeq = ringbuffer->headSequence();
213 
214  if (listener->isLossTolerant()) {
215  if (logger.isFinestEnabled()) {
216  std::ostringstream out;
217  out << "MessageListener " << id << " on topic: " << name
218  << " ran into a stale sequence. "
219  << "Jumping from oldSequence: " << sequence << " to sequence: " << remoteHeadSeq;
220  logger.finest(out.str());
221  }
222  sequence = remoteHeadSeq;
223  next();
224  return;
225  }
226 
227  std::ostringstream out;
228  out << "Terminating MessageListener:" << id << " on topic: " << name << "Reason: The listener "
229  "was too slow or the retention period of the message has been violated. "
230  << "head: "
231  << remoteHeadSeq << " sequence:" << sequence;
232  logger.warning(out.str());
233  } else if (protocol::HAZELCAST_INSTANCE_NOT_ACTIVE == err) {
234  if (logger.isFinestEnabled()) {
235  std::ostringstream out;
236  out << "Terminating MessageListener " << id << " on topic: " << name << ". "
237  << " Reason: HazelcastInstance is shutting down";
238  logger.finest(out.str());
239  }
240  } else if (protocol::DISTRIBUTED_OBJECT_DESTROYED == err) {
241  if (logger.isFinestEnabled()) {
242  std::ostringstream out;
243  out << "Terminating MessageListener " << id << " on topic: " << name
244  << " Reason: Topic is destroyed";
245  logger.finest(out.str());
246  }
247  } else {
248  std::ostringstream out;
249  out << "Terminating MessageListener " << id << " on topic: " << name << ". "
250  << " Reason: Unhandled exception, details:" << throwable->what();
251  logger.warning(out.str());
252  }
253 
254  cancel();
255  }
256 
257  void cancel() {
258  cancelled = true;
259  executor.stop();
260  }
261 
262  private:
263  void process(const topic::impl::reliable::ReliableTopicMessage *message) {
264  // proxy.localTopicStats.incrementReceives();
265  listener->onMessage(toMessage(message));
266  }
267 
268  std::auto_ptr<topic::Message<T> > toMessage(const topic::impl::reliable::ReliableTopicMessage *m) {
269  boost::shared_ptr<Member> member;
270  const Address *addr = m->getPublisherAddress();
271  if (addr != NULL) {
272  member = boost::shared_ptr<Member>(new Member(*addr));
273  }
274  std::auto_ptr<T> msg = serializationService->toObject<T>(m->getPayload());
275  return std::auto_ptr<topic::Message<T> >(
276  new topic::impl::MessageImpl<T>(name, msg, m->getPublishTime(), member));
277  }
278 
279  bool terminate(const exception::IException &failure) {
280  if (cancelled) {
281  return true;
282  }
283 
284  try {
285  bool terminate = listener->isTerminal(failure);
286  if (terminate) {
287  std::ostringstream out;
288  out << "Terminating MessageListener " << id << " on topic: " << name << ". "
289  << " Reason: Unhandled exception, details: " << failure.what();
290  logger.warning(out.str());
291  } else {
292  if (logger.isFinestEnabled()) {
293  std::ostringstream out;
294  out << "MessageListener " << id << " on topic: " << name << ". "
295  << " ran into an exception, details:" << failure.what();
296  logger.finest(out.str());
297  }
298  }
299  return terminate;
300  } catch (exception::IException &t) {
301  std::ostringstream out;
302  out << "Terminating MessageListener " << id << " on topic: " << name << ". "
303  << " Reason: Unhandled exception while calling ReliableMessageListener::isTerminal() method. "
304  << t.what();
305  logger.warning(out.str());
306 
307  return true;
308  }
309  }
310 
311  private:
312  topic::ReliableMessageListener<T> *listener;
313  int id;
315  int64_t sequence;
316  util::AtomicBoolean cancelled;
317  util::ILogger &logger;
318  const std::string &name;
319  topic::impl::reliable::ReliableTopicExecutor executor;
320  serialization::pimpl::SerializationService *serializationService;
321  const config::ReliableTopicConfig *config;
322  };
323 
324  util::SynchronizedMap<int, MessageRunner<E> > runnersMap;
325  util::AtomicInt runnerCounter;
326  };
327  }
328 }
329 
330 #endif //HAZELCAST_CLIENT_RELIABLETOPIC_H_
331 
Base class for all exception originated from Hazelcast methods.
Definition: IException.h:53
Definition: DataArray.h:28
virtual int64_t tailSequence()=0
Returns the sequence of the tail.
bool removeMessageListener(const std::string &registrationId)
Stops receiving messages for the given message listener.
Definition: ReliableTopic.h:98
IP Address.
Definition: Address.h:41
void publish(const E *message)
Publishes the message to all subscribers of this topic Current implementation only supports DISCARD_O...
Definition: ReliableTopic.h:58
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
Definition: ReliableTopic.h:45
virtual char const * what() const
return pointer to the explanation string.
Definition: IException.cpp:66
ExecutionCallback allows to asynchronously get notified when the execution is completed, either successfully or with error.
Definition: ExecutionCallback.h:41
std::string addMessageListener(topic::ReliableMessageListener< E > &listener)
Subscribes to this topic.
Definition: ReliableTopic.h:80
Cluster member class.
Definition: Member.h:43
PN (Positive-Negative) CRDT counter.
Definition: MapEntryView.h:32
Definition: ReliableTopicConfig.h:30