Hazelcast C++ Client
 All Classes Functions Variables Enumerations Enumerator Pages
ReliableTopic.h
1 /*
2  * Copyright (c) 2008-2017, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 //
17 // Created by ihsan demir on 31 May 2016.
18 #ifndef HAZELCAST_CLIENT_RELIABLETOPIC_H_
19 #define HAZELCAST_CLIENT_RELIABLETOPIC_H_
20 
21 #include "hazelcast/client/proxy/ReliableTopicImpl.h"
22 #include "hazelcast/client/Ringbuffer.h"
23 #include "hazelcast/client/DataArray.h"
24 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"
25 
26 #include <string>
27 #include <memory>
28 
29 namespace hazelcast {
30  namespace client {
31 
44  template<typename E>
45  class ReliableTopic : public proxy::ReliableTopicImpl {
46  friend class HazelcastClient;
47 
48  public:
49 
58  void publish(const E *message) {
59  serialization::pimpl::Data data = context->getSerializationService().toData<E>(message);
60  proxy::ReliableTopicImpl::publish(data);
61  }
62 
80  std::string addMessageListener(topic::ReliableMessageListener<E> &listener) {
81  int id = ++runnerCounter;
82  boost::shared_ptr<MessageRunner<E> > runner(new MessageRunner<E>(id, &listener, ringbuffer.get(), getName(),
83  &context->getSerializationService(), config));
84  runnersMap.put(id, runner);
85  runner->next();
86  return util::IOUtil::to_string<int>(id);
87  }
88 
97  bool removeMessageListener(const std::string &registrationId) {
98  int id = util::IOUtil::to_value<int>(registrationId);
99  boost::shared_ptr<MessageRunner<E> > runner = runnersMap.get(id);
100  if (NULL == runner) {
101  return false;
102  }
103  runner->cancel();
104  runnersMap.remove(id);
105  return true;
106  };
107  protected:
108  virtual void onDestroy() {
109  // cancel all runners
110  std::vector<std::pair<int, boost::shared_ptr<MessageRunner<E> > > > runners = runnersMap.clear();
111  for (typename std::vector<std::pair<int, boost::shared_ptr<MessageRunner<E> > > >::const_iterator it = runners.begin();
112  it != runners.end(); ++it) {
113  it->second->cancel();
114  }
115 
116  // destroy the underlying ringbuffer
117  ringbuffer->destroy();
118  }
119  private:
120  ReliableTopic(const std::string &instanceName, spi::ClientContext *context, boost::shared_ptr<Ringbuffer<topic::impl::reliable::ReliableTopicMessage> > rb)
121  : proxy::ReliableTopicImpl(instanceName, context, rb) {
122  }
123 
124  template <typename T>
125  class MessageRunner : impl::ExecutionCallback<DataArray<topic::impl::reliable::ReliableTopicMessage> > {
126  public:
127  MessageRunner(int id, topic::ReliableMessageListener<T> *listener,
128  Ringbuffer<topic::impl::reliable::ReliableTopicMessage> *rb,
129  const std::string &topicName, serialization::pimpl::SerializationService *service,
130  const config::ReliableTopicConfig *reliableTopicConfig)
131  : cancelled(false), logger(util::ILogger::getLogger()), name(topicName), executor(rb),
132  serializationService(service), config(reliableTopicConfig) {
133  this->id = id;
134  this->listener = listener;
135  this->ringbuffer = rb;
136 
137  // we are going to listen to next publication. We don't care about what already has been published.
138  int64_t initialSequence = listener->retrieveInitialSequence();
139  if (initialSequence == -1) {
140  initialSequence = ringbuffer->tailSequence() + 1;
141  }
142  this->sequence = initialSequence;
143 
144  this->executor.start();
145  }
146 
147 
148  virtual ~MessageRunner() { }
149 
150  void next() {
151  if (cancelled) {
152  return;
153  }
154 
155  topic::impl::reliable::ReliableTopicExecutor::Message m;
156  m.type = topic::impl::reliable::ReliableTopicExecutor::GET_ONE_MESSAGE;
157  m.callback = this;
158  m.sequence = sequence;
159  m.maxCount = config->getReadBatchSize();
160  executor.execute(m);
161  }
162 
163  // This method is called from the provided executor.
164  void onResponse(DataArray<topic::impl::reliable::ReliableTopicMessage> *allMessages) {
165  if (cancelled) {
166  return;
167  }
168 
169  size_t numMessages = allMessages->size();
170 
171  // we process all messages in batch. So we don't release the thread and reschedule ourselves;
172  // but we'll process whatever was received in 1 go.
173  for (size_t i = 0; i < numMessages; ++i) {
174  try {
175  listener->storeSequence(sequence);
176  process(allMessages->get(i));
177  } catch (exception::IException &e) {
178  if (terminate(e)) {
179  cancel();
180  return;
181  }
182  }
183 
184  sequence++;
185  }
186 
187  next();
188  }
189 
190  // This method is called from the provided executor.
191  void onFailure(const exception::ProtocolException *t) {
192  if (cancelled) {
193  return;
194  }
195 
196  int32_t err = t->getErrorCode();
197  if (protocol::EXECUTION == err &&
198  protocol::STALE_SEQUENCE == t->getCauseErrorCode()) {
199  // StaleSequenceException.getHeadSeq() is not available on the client-side, see #7317
200  int64_t remoteHeadSeq = ringbuffer->headSequence();
201 
202  if (listener->isLossTolerant()) {
203  if (logger.isFinestEnabled()) {
204  std::ostringstream out;
205  out << "MessageListener " << id << " on topic: " << name << " ran into a stale sequence. "
206  << "Jumping from oldSequence: " << sequence << " to sequence: " << remoteHeadSeq;
207  logger.finest(out.str());
208  }
209  sequence = remoteHeadSeq;
210  next();
211  return;
212  }
213 
214  std::ostringstream out;
215  out << "Terminating MessageListener:" << id << " on topic: " << name << "Reason: The listener "
216  "was too slow or the retention period of the message has been violated. " << "head: "
217  << remoteHeadSeq << " sequence:" << sequence;
218  logger.warning(out.str());
219  } else if (protocol::HAZELCAST_INSTANCE_NOT_ACTIVE == err) {
220  if (logger.isFinestEnabled()) {
221  std::ostringstream out;
222  out << "Terminating MessageListener " << id << " on topic: " << name << ". "
223  << " Reason: HazelcastInstance is shutting down";
224  logger.finest(out.str());
225  }
226  } else if (protocol::DISTRIBUTED_OBJECT_DESTROYED == err) {
227  if (logger.isFinestEnabled()) {
228  std::ostringstream out;
229  out << "Terminating MessageListener " << id << " on topic: " << name << " Reason: Topic is destroyed";
230  logger.finest(out.str());
231  }
232  } else {
233  std::ostringstream out;
234  out << "Terminating MessageListener " << id << " on topic: " << name << ". "
235  << " Reason: Unhandled exception, details:" << t->what();
236  logger.warning(out.str());
237  }
238 
239  cancel();
240  }
241 
242  void cancel() {
243  cancelled = true;
244  executor.stop();
245  }
246  private:
247  void process(const topic::impl::reliable::ReliableTopicMessage *message) {
248  // proxy.localTopicStats.incrementReceives();
249  listener->onMessage(toMessage(message));
250  }
251 
252  std::auto_ptr<topic::Message<T> > toMessage(const topic::impl::reliable::ReliableTopicMessage *m) {
253  std::auto_ptr<Member> member;
254  const Address *addr = m->getPublisherAddress();
255  if (addr != NULL) {
256  member = std::auto_ptr<Member>(new Member(*addr));
257  }
258  std::auto_ptr<T> msg = serializationService->toObject<T>(m->getPayload());
259  return std::auto_ptr<topic::Message<T> >(new topic::impl::MessageImpl<T>(name, msg, m->getPublishTime(), member));
260  }
261 
262  bool terminate(const exception::IException &failure) {
263  if (cancelled) {
264  return true;
265  }
266 
267  try {
268  bool terminate = listener->isTerminal(failure);
269  if (terminate) {
270  std::ostringstream out;
271  out << "Terminating MessageListener " << id << " on topic: " << name << ". "
272  << " Reason: Unhandled exception, details: " << failure.what();
273  logger.warning(out.str());
274  } else {
275  if (logger.isFinestEnabled()) {
276  std::ostringstream out;
277  out << "MessageListener " << id << " on topic: " << name << ". "
278  << " ran into an exception, details:" << failure.what();
279  logger.finest(out.str());
280  }
281  }
282  return terminate;
283  } catch (exception::IException &t) {
284  std::ostringstream out;
285  out << "Terminating MessageListener " << id << " on topic: " << name << ". "
286  << " Reason: Unhandled exception while calling ReliableMessageListener::isTerminal() method. "
287  << t.what();
288  logger.warning(out.str());
289 
290  return true;
291  }
292  }
293 
294  private:
295  topic::ReliableMessageListener<T> *listener;
296  int id;
297  Ringbuffer<topic::impl::reliable::ReliableTopicMessage> *ringbuffer;
298  int64_t sequence;
299  util::AtomicBoolean cancelled;
300  util::ILogger &logger;
301  const std::string &name;
302  topic::impl::reliable::ReliableTopicExecutor executor;
303  serialization::pimpl::SerializationService *serializationService;
304  const config::ReliableTopicConfig *config;
305  };
306 
307  util::SynchronizedMap<int, MessageRunner<E> > runnersMap;
308  util::AtomicInt runnerCounter;
309  };
310  }
311 }
312 
313 #endif //HAZELCAST_CLIENT_RELIABLETOPIC_H_
314 
bool removeMessageListener(const std::string &registrationId)
Stops receiving messages for the given message listener.
Definition: ReliableTopic.h:97
void publish(const E *message)
Publishes the message to all subscribers of this topic Current implementation only supports DISCARD_O...
Definition: ReliableTopic.h:58
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
Definition: ReliableTopic.h:45
std::string addMessageListener(topic::ReliableMessageListener< E > &listener)
Subscribes to this topic.
Definition: ReliableTopic.h:80
Hazelcast Client enables you to do all Hazelcast operations without being a member of the cluster...
Definition: HazelcastClient.h:459