Hazelcast C++ Client
ReliableTopic.h
1 /*
2  * Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 //
17 // Created by ihsan demir on 31 May 2016.
18 #ifndef HAZELCAST_CLIENT_RELIABLETOPIC_H_
19 #define HAZELCAST_CLIENT_RELIABLETOPIC_H_
20 
21 #include "hazelcast/client/proxy/ReliableTopicImpl.h"
22 #include "hazelcast/client/Ringbuffer.h"
23 #include "hazelcast/client/DataArray.h"
24 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
25 
26 #include <string>
27 #include <memory>
28 
29 namespace hazelcast {
30  namespace client {
31 
44  template<typename E>
45  class ReliableTopic : public proxy::ReliableTopicImpl {
46  friend class impl::HazelcastClientInstanceImpl;
47 
48  public:
49 
58  void publish(const E *message) {
59  serialization::pimpl::Data data = context->getSerializationService().toData<E>(message);
60  proxy::ReliableTopicImpl::publish(data);
61  }
62 
80  std::string addMessageListener(topic::ReliableMessageListener<E> &listener) {
81  int id = ++runnerCounter;
82  boost::shared_ptr<MessageRunner < E> >
83  runner(new MessageRunner<E>(id, &listener, ringbuffer.get(), getName(),
84  &context->getSerializationService(), config));
85  runnersMap.put(id, runner);
86  runner->next();
87  return util::IOUtil::to_string<int>(id);
88  }
89 
98  bool removeMessageListener(const std::string &registrationId) {
99  int id = util::IOUtil::to_value<int>(registrationId);
100  boost::shared_ptr<MessageRunner < E> > runner = runnersMap.get(id);
101  if (NULL == runner) {
102  return false;
103  }
104  runner->cancel();
105  runnersMap.remove(id);
106  return true;
107  };
108  protected:
109  virtual void onDestroy() {
110  // cancel all runners
111  std::vector<std::pair<int, boost::shared_ptr<MessageRunner < E> > > > runners = runnersMap.clear();
112  for (typename std::vector<std::pair<int, boost::shared_ptr<MessageRunner < E> > > >
113  ::const_iterator it = runners.begin();it != runners.end();++it) {
114  it->second->cancel();
115  }
116 
117  // destroy the underlying ringbuffer
118  ringbuffer->destroy();
119  }
120 
121  private:
122  ReliableTopic(const std::string &instanceName, spi::ClientContext *context,
123  boost::shared_ptr<Ringbuffer<topic::impl::reliable::ReliableTopicMessage> > rb)
124  : proxy::ReliableTopicImpl(instanceName, context, rb) {
125  }
126 
127  template<typename T>
128  class MessageRunner
129  : impl::ExecutionCallback<boost::shared_ptr<DataArray<topic::impl::reliable::ReliableTopicMessage> > > {
130  public:
131  MessageRunner(int id, topic::ReliableMessageListener<T> *listener,
132  Ringbuffer<topic::impl::reliable::ReliableTopicMessage> *rb,
133  const std::string &topicName, serialization::pimpl::SerializationService *service,
134  const config::ReliableTopicConfig *reliableTopicConfig)
135  : cancelled(false), logger(util::ILogger::getLogger()), name(topicName), executor(*rb),
136  serializationService(service), config(reliableTopicConfig) {
137  this->id = id;
138  this->listener = listener;
139  this->ringbuffer = rb;
140 
141  // we are going to listen to next publication. We don't care about what already has been published.
142  int64_t initialSequence = listener->retrieveInitialSequence();
143  if (initialSequence == -1) {
144  initialSequence = ringbuffer->tailSequence() + 1;
145  }
146  this->sequence = initialSequence;
147 
148  this->executor.start();
149  }
150 
151 
152  virtual ~MessageRunner() {}
153 
154  void next() {
155  if (cancelled) {
156  return;
157  }
158 
159  topic::impl::reliable::ReliableTopicExecutor::Message m;
160  m.type = topic::impl::reliable::ReliableTopicExecutor::GET_ONE_MESSAGE;
161  m.callback = this;
162  m.sequence = sequence;
163  m.maxCount = config->getReadBatchSize();
164  executor.execute(m);
165  }
166 
167  // This method is called from the provided executor.
168  void onResponse(
169  const boost::shared_ptr<DataArray<topic::impl::reliable::ReliableTopicMessage> > &allMessages) {
170  if (cancelled) {
171  return;
172  }
173 
174  size_t numMessages = allMessages->size();
175 
176  // we process all messages in batch. So we don't release the thread and reschedule ourselves;
177  // but we'll process whatever was received in 1 go.
178  for (size_t i = 0; i < numMessages; ++i) {
179  try {
180  listener->storeSequence(sequence);
181  process(allMessages->get(i));
182  } catch (exception::IException &e) {
183  if (terminate(e)) {
184  cancel();
185  return;
186  }
187  }
188 
189  sequence++;
190  }
191 
192  next();
193  }
194 
195  // This method is called from the provided executor.
196  void onFailure(const boost::shared_ptr<exception::IException> &throwable) {
197  if (cancelled) {
198  return;
199  }
200 
201  int32_t err = throwable->getErrorCode();
202  if (protocol::EXECUTION == err &&
203  protocol::STALE_SEQUENCE == throwable->getCauseErrorCode()) {
204  // StaleSequenceException.getHeadSeq() is not available on the client-side, see #7317
205  int64_t remoteHeadSeq = ringbuffer->headSequence();
206 
207  if (listener->isLossTolerant()) {
208  if (logger.isFinestEnabled()) {
209  std::ostringstream out;
210  out << "MessageListener " << id << " on topic: " << name
211  << " ran into a stale sequence. "
212  << "Jumping from oldSequence: " << sequence << " to sequence: " << remoteHeadSeq;
213  logger.finest(out.str());
214  }
215  sequence = remoteHeadSeq;
216  next();
217  return;
218  }
219 
220  std::ostringstream out;
221  out << "Terminating MessageListener:" << id << " on topic: " << name << "Reason: The listener "
222  "was too slow or the retention period of the message has been violated. "
223  << "head: "
224  << remoteHeadSeq << " sequence:" << sequence;
225  logger.warning(out.str());
226  } else if (protocol::HAZELCAST_INSTANCE_NOT_ACTIVE == err) {
227  if (logger.isFinestEnabled()) {
228  std::ostringstream out;
229  out << "Terminating MessageListener " << id << " on topic: " << name << ". "
230  << " Reason: HazelcastInstance is shutting down";
231  logger.finest(out.str());
232  }
233  } else if (protocol::DISTRIBUTED_OBJECT_DESTROYED == err) {
234  if (logger.isFinestEnabled()) {
235  std::ostringstream out;
236  out << "Terminating MessageListener " << id << " on topic: " << name
237  << " Reason: Topic is destroyed";
238  logger.finest(out.str());
239  }
240  } else {
241  std::ostringstream out;
242  out << "Terminating MessageListener " << id << " on topic: " << name << ". "
243  << " Reason: Unhandled exception, details:" << throwable->what();
244  logger.warning(out.str());
245  }
246 
247  cancel();
248  }
249 
250  void cancel() {
251  cancelled = true;
252  executor.stop();
253  }
254 
255  private:
256  void process(const topic::impl::reliable::ReliableTopicMessage *message) {
257  // proxy.localTopicStats.incrementReceives();
258  listener->onMessage(toMessage(message));
259  }
260 
261  std::auto_ptr<topic::Message<T> > toMessage(const topic::impl::reliable::ReliableTopicMessage *m) {
262  boost::shared_ptr<Member> member;
263  const Address *addr = m->getPublisherAddress();
264  if (addr != NULL) {
265  member = boost::shared_ptr<Member>(new Member(*addr));
266  }
267  std::auto_ptr<T> msg = serializationService->toObject<T>(m->getPayload());
268  return std::auto_ptr<topic::Message<T> >(
269  new topic::impl::MessageImpl<T>(name, msg, m->getPublishTime(), member));
270  }
271 
272  bool terminate(const exception::IException &failure) {
273  if (cancelled) {
274  return true;
275  }
276 
277  try {
278  bool terminate = listener->isTerminal(failure);
279  if (terminate) {
280  std::ostringstream out;
281  out << "Terminating MessageListener " << id << " on topic: " << name << ". "
282  << " Reason: Unhandled exception, details: " << failure.what();
283  logger.warning(out.str());
284  } else {
285  if (logger.isFinestEnabled()) {
286  std::ostringstream out;
287  out << "MessageListener " << id << " on topic: " << name << ". "
288  << " ran into an exception, details:" << failure.what();
289  logger.finest(out.str());
290  }
291  }
292  return terminate;
293  } catch (exception::IException &t) {
294  std::ostringstream out;
295  out << "Terminating MessageListener " << id << " on topic: " << name << ". "
296  << " Reason: Unhandled exception while calling ReliableMessageListener::isTerminal() method. "
297  << t.what();
298  logger.warning(out.str());
299 
300  return true;
301  }
302  }
303 
304  private:
305  topic::ReliableMessageListener<T> *listener;
306  int id;
307  Ringbuffer<topic::impl::reliable::ReliableTopicMessage> *ringbuffer;
308  int64_t sequence;
309  util::AtomicBoolean cancelled;
310  util::ILogger &logger;
311  const std::string &name;
312  topic::impl::reliable::ReliableTopicExecutor executor;
313  serialization::pimpl::SerializationService *serializationService;
314  const config::ReliableTopicConfig *config;
315  };
316 
317  util::SynchronizedMap<int, MessageRunner<E> > runnersMap;
318  util::AtomicInt runnerCounter;
319  };
320  }
321 }
322 
323 #endif //HAZELCAST_CLIENT_RELIABLETOPIC_H_
324 
bool removeMessageListener(const std::string &registrationId)
Stops receiving messages for the given message listener.
Definition: ReliableTopic.h:98
void publish(const E *message)
Publishes the message to all subscribers of this topic Current implementation only supports DISCARD_O...
Definition: ReliableTopic.h:58
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
Definition: ReliableTopic.h:45
std::string addMessageListener(topic::ReliableMessageListener< E > &listener)
Subscribes to this topic.
Definition: ReliableTopic.h:80
PN (Positive-Negative) CRDT counter.
Definition: MapEntryView.h:32