18 #ifndef HAZELCAST_CLIENT_RELIABLETOPIC_H_    19 #define HAZELCAST_CLIENT_RELIABLETOPIC_H_    21 #include "hazelcast/client/proxy/ReliableTopicImpl.h"    22 #include "hazelcast/client/Ringbuffer.h"    23 #include "hazelcast/client/DataArray.h"    24 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h"    46             friend class impl::HazelcastClientInstanceImpl;
    59                 serialization::pimpl::Data data = context->getSerializationService().toData<E>(message);
    60                 proxy::ReliableTopicImpl::publish(data);
    81                 int id = ++runnerCounter;
    82                 boost::shared_ptr<MessageRunner < E> >
    83                 runner(
new MessageRunner<E>(
id, &listener, ringbuffer.get(), getName(),
    84                                             &context->getSerializationService(), config));
    85                 runnersMap.put(
id, runner);
    87                 return util::IOUtil::to_string<int>(id);
    99                 int id = util::IOUtil::to_value<int>(registrationId);
   100                 boost::shared_ptr<MessageRunner < E> > runner = runnersMap.get(
id);
   101                 if (NULL == runner) {
   105                 runnersMap.remove(
id);
   109             virtual void onDestroy() {
   111                 std::vector<std::pair<int, boost::shared_ptr<MessageRunner < E> > > > runners = runnersMap.clear();
   112                 for (
typename std::vector<std::pair<
int, boost::shared_ptr<MessageRunner < E> > > >
   113                      ::const_iterator it = runners.begin();it != runners.end();++it) {
   114                     it->second->cancel();
   118                 ringbuffer->destroy();
   122             ReliableTopic(
const std::string &instanceName, spi::ClientContext *context,
   123                           boost::shared_ptr<Ringbuffer<topic::impl::reliable::ReliableTopicMessage> > rb)
   124                     : proxy::ReliableTopicImpl(instanceName, context, rb) {
   129                     : impl::ExecutionCallback<boost::shared_ptr<DataArray<topic::impl::reliable::ReliableTopicMessage> > > {
   131                 MessageRunner(
int id, topic::ReliableMessageListener<T> *listener,
   132                               Ringbuffer<topic::impl::reliable::ReliableTopicMessage> *rb,
   133                               const std::string &topicName, serialization::pimpl::SerializationService *service,
   134                               const config::ReliableTopicConfig *reliableTopicConfig)
   135                         : cancelled(false), logger(util::ILogger::getLogger()), name(topicName), executor(*rb),
   136                           serializationService(service), config(reliableTopicConfig) {
   138                     this->listener = listener;
   139                     this->ringbuffer = rb;
   142                     int64_t initialSequence = listener->retrieveInitialSequence();
   143                     if (initialSequence == -1) {
   144                         initialSequence = ringbuffer->tailSequence() + 1;
   146                     this->sequence = initialSequence;
   148                     this->executor.start();
   152                 virtual ~MessageRunner() {}
   159                     topic::impl::reliable::ReliableTopicExecutor::Message m;
   160                     m.type = topic::impl::reliable::ReliableTopicExecutor::GET_ONE_MESSAGE;
   162                     m.sequence = sequence;
   163                     m.maxCount = config->getReadBatchSize();
   169                         const boost::shared_ptr<DataArray<topic::impl::reliable::ReliableTopicMessage> > &allMessages) {
   174                     size_t numMessages = allMessages->size();
   178                     for (
size_t i = 0; i < numMessages; ++i) {
   180                             listener->storeSequence(sequence);
   181                             process(allMessages->get(i));
   182                         } 
catch (exception::IException &e) {
   196                 void onFailure(
const boost::shared_ptr<exception::IException> &throwable) {
   201                     int32_t err = throwable->getErrorCode();
   202                     if (protocol::EXECUTION == err &&
   203                         protocol::STALE_SEQUENCE == throwable->getCauseErrorCode()) {
   205                         int64_t remoteHeadSeq = ringbuffer->headSequence();
   207                         if (listener->isLossTolerant()) {
   208                             if (logger.isFinestEnabled()) {
   209                                 std::ostringstream out;
   210                                 out << 
"MessageListener " << 
id << 
" on topic: " << name
   211                                     << 
" ran into a stale sequence. "   212                                     << 
"Jumping from oldSequence: " << sequence << 
" to sequence: " << remoteHeadSeq;
   213                                 logger.finest(out.str());
   215                             sequence = remoteHeadSeq;
   220                         std::ostringstream out;
   221                         out << 
"Terminating MessageListener:" << 
id << 
" on topic: " << name << 
"Reason: The listener "   222                                                                                                 "was too slow or the retention period of the message has been violated. "   224                             << remoteHeadSeq << 
" sequence:" << sequence;
   225                         logger.warning(out.str());
   226                     } 
else if (protocol::HAZELCAST_INSTANCE_NOT_ACTIVE == err) {
   227                         if (logger.isFinestEnabled()) {
   228                             std::ostringstream out;
   229                             out << 
"Terminating MessageListener " << 
id << 
" on topic: " << name << 
". "   230                                 << 
" Reason: HazelcastInstance is shutting down";
   231                             logger.finest(out.str());
   233                     } 
else if (protocol::DISTRIBUTED_OBJECT_DESTROYED == err) {
   234                         if (logger.isFinestEnabled()) {
   235                             std::ostringstream out;
   236                             out << 
"Terminating MessageListener " << 
id << 
" on topic: " << name
   237                                 << 
" Reason: Topic is destroyed";
   238                             logger.finest(out.str());
   241                         std::ostringstream out;
   242                         out << 
"Terminating MessageListener " << 
id << 
" on topic: " << name << 
". "   243                             << 
" Reason: Unhandled exception, details:" << throwable->what();
   244                         logger.warning(out.str());
   256                 void process(
const topic::impl::reliable::ReliableTopicMessage *message) {
   258                     listener->onMessage(toMessage(message));
   261                 std::auto_ptr<topic::Message<T> > toMessage(
const topic::impl::reliable::ReliableTopicMessage *m) {
   262                     boost::shared_ptr<Member> member;
   263                     const Address *addr = m->getPublisherAddress();
   265                         member = boost::shared_ptr<Member>(
new Member(*addr));
   267                     std::auto_ptr<T> msg = serializationService->toObject<T>(m->getPayload());
   268                     return std::auto_ptr<topic::Message<T> >(
   269                             new topic::impl::MessageImpl<T>(name, msg, m->getPublishTime(), member));
   272                 bool terminate(
const exception::IException &failure) {
   278                         bool terminate = listener->isTerminal(failure);
   280                             std::ostringstream out;
   281                             out << 
"Terminating MessageListener " << 
id << 
" on topic: " << name << 
". "   282                                 << 
" Reason: Unhandled exception, details: " << failure.what();
   283                             logger.warning(out.str());
   285                             if (logger.isFinestEnabled()) {
   286                                 std::ostringstream out;
   287                                 out << 
"MessageListener " << 
id << 
" on topic: " << name << 
". "   288                                     << 
" ran into an exception, details:" << failure.what();
   289                                 logger.finest(out.str());
   293                     } 
catch (exception::IException &t) {
   294                         std::ostringstream out;
   295                         out << 
"Terminating MessageListener " << 
id << 
" on topic: " << name << 
". "   296                             << 
" Reason: Unhandled exception while calling ReliableMessageListener::isTerminal() method. "   298                         logger.warning(out.str());
   305                 topic::ReliableMessageListener<T> *listener;
   307                 Ringbuffer<topic::impl::reliable::ReliableTopicMessage> *ringbuffer;
   309                 util::AtomicBoolean cancelled;
   310                 util::ILogger &logger;
   311                 const std::string &name;
   312                 topic::impl::reliable::ReliableTopicExecutor executor;
   313                 serialization::pimpl::SerializationService *serializationService;
   314                 const config::ReliableTopicConfig *config;
   317             util::SynchronizedMap<int, MessageRunner<E> > runnersMap;
   318             util::AtomicInt runnerCounter;
   323 #endif //HAZELCAST_CLIENT_RELIABLETOPIC_H_ bool removeMessageListener(const std::string ®istrationId)
Stops receiving messages for the given message listener. 
Definition: ReliableTopic.h:98
void publish(const E *message)
Publishes the message to all subscribers of this topic Current implementation only supports DISCARD_O...
Definition: ReliableTopic.h:58
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
Definition: ReliableTopic.h:45
std::string addMessageListener(topic::ReliableMessageListener< E > &listener)
Subscribes to this topic. 
Definition: ReliableTopic.h:80
Definition: MapEntryView.h:32