18 #ifndef HAZELCAST_CLIENT_RELIABLETOPIC_H_ 
   19 #define HAZELCAST_CLIENT_RELIABLETOPIC_H_ 
   21 #include "hazelcast/client/proxy/ReliableTopicImpl.h" 
   22 #include "hazelcast/client/Ringbuffer.h" 
   23 #include "hazelcast/client/DataArray.h" 
   24 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h" 
   59                 serialization::pimpl::Data data = context->getSerializationService().toData<E>(message);
 
   60                 proxy::ReliableTopicImpl::publish(data);
 
   81                 int id = ++runnerCounter;
 
   82                 boost::shared_ptr<MessageRunner<E> > runner(
new MessageRunner<E>(
id, &listener, ringbuffer.get(), getName(),
 
   83                                                                           &context->getSerializationService(), config));
 
   84                 runnersMap.put(
id, runner);
 
   86                 return util::IOUtil::to_string<int>(id);
 
   98                 int id = util::IOUtil::to_value<int>(registrationId);
 
   99                 boost::shared_ptr<MessageRunner<E> > runner = runnersMap.get(
id);
 
  100                 if (NULL == runner) {
 
  104                 runnersMap.remove(
id);
 
  108             virtual void onDestroy() {
 
  110                 std::vector<std::pair<int, boost::shared_ptr<MessageRunner<E> > > > runners = runnersMap.clear();
 
  111                 for (
typename std::vector<std::pair<
int, boost::shared_ptr<MessageRunner<E> > > >::const_iterator it = runners.begin();
 
  112                         it != runners.end(); ++it) {
 
  113                     it->second->cancel();
 
  117                 ringbuffer->destroy();
 
  120             ReliableTopic(
const std::string &instanceName, spi::ClientContext *context, boost::shared_ptr<Ringbuffer<topic::impl::reliable::ReliableTopicMessage> > rb)
 
  121                     : proxy::ReliableTopicImpl(instanceName, context, rb) {
 
  124             template <
typename T>
 
  125             class MessageRunner : impl::ExecutionCallback<DataArray<topic::impl::reliable::ReliableTopicMessage> > {
 
  127                 MessageRunner(
int id, topic::ReliableMessageListener<T> *listener,
 
  128                               Ringbuffer<topic::impl::reliable::ReliableTopicMessage> *rb,
 
  129                               const std::string &topicName, serialization::pimpl::SerializationService *service,
 
  130                               const config::ReliableTopicConfig *reliableTopicConfig)
 
  131                         : cancelled(false), logger(util::ILogger::getLogger()), name(topicName), executor(rb),
 
  132                           serializationService(service), config(reliableTopicConfig) {
 
  134                     this->listener = listener;
 
  135                     this->ringbuffer = rb;
 
  138                     int64_t initialSequence = listener->retrieveInitialSequence();
 
  139                     if (initialSequence == -1) {
 
  140                         initialSequence = ringbuffer->tailSequence() + 1;
 
  142                     this->sequence = initialSequence;
 
  144                     this->executor.start();
 
  148                 virtual ~MessageRunner() { }
 
  155                     topic::impl::reliable::ReliableTopicExecutor::Message m;
 
  156                     m.type = topic::impl::reliable::ReliableTopicExecutor::GET_ONE_MESSAGE;
 
  158                     m.sequence = sequence;
 
  159                     m.maxCount = config->getReadBatchSize();
 
  164                 void onResponse(DataArray<topic::impl::reliable::ReliableTopicMessage> *allMessages) {
 
  169                     size_t numMessages = allMessages->size();
 
  173                     for (
size_t i = 0; i < numMessages; ++i) {
 
  175                             listener->storeSequence(sequence);
 
  176                             process(allMessages->get(i));
 
  177                         } 
catch (exception::IException &e) {
 
  191                 void onFailure(
const exception::ProtocolException *t) {
 
  196                     int32_t err = t->getErrorCode();
 
  197                     if (protocol::EXECUTION == err &&
 
  198                         protocol::STALE_SEQUENCE == t->getCauseErrorCode()) {
 
  200                         int64_t remoteHeadSeq = ringbuffer->headSequence();
 
  202                         if (listener->isLossTolerant()) {
 
  203                             if (logger.isFinestEnabled()) {
 
  204                                 std::ostringstream out;
 
  205                                 out << 
"MessageListener " << 
id << 
" on topic: " << name << 
" ran into a stale sequence. " 
  206                                 << 
"Jumping from oldSequence: " << sequence << 
" to sequence: " << remoteHeadSeq;
 
  207                                 logger.finest(out.str());
 
  209                             sequence = remoteHeadSeq;
 
  214                         std::ostringstream out;
 
  215                         out << 
"Terminating MessageListener:" << 
id << 
" on topic: " << name << 
"Reason: The listener " 
  216                                 "was too slow or the retention period of the message has been violated. " << 
"head: " 
  217                                 << remoteHeadSeq << 
" sequence:" << sequence;
 
  218                         logger.warning(out.str());
 
  219                     } 
else if (protocol::HAZELCAST_INSTANCE_NOT_ACTIVE == err) {
 
  220                         if (logger.isFinestEnabled()) {
 
  221                             std::ostringstream out;
 
  222                             out << 
"Terminating MessageListener " << 
id << 
" on topic: " << name << 
". " 
  223                                 << 
" Reason: HazelcastInstance is shutting down";
 
  224                             logger.finest(out.str());
 
  226                     } 
else if (protocol::DISTRIBUTED_OBJECT_DESTROYED == err) {
 
  227                         if (logger.isFinestEnabled()) {
 
  228                             std::ostringstream out;
 
  229                             out << 
"Terminating MessageListener " << 
id << 
" on topic: " << name << 
" Reason: Topic is destroyed";
 
  230                             logger.finest(out.str());
 
  233                         std::ostringstream out;
 
  234                         out << 
"Terminating MessageListener " << 
id << 
" on topic: " << name << 
". " 
  235                             << 
" Reason: Unhandled exception, details:" << t->what();
 
  236                         logger.warning(out.str());
 
  247                 void process(
const topic::impl::reliable::ReliableTopicMessage *message) {
 
  249                     listener->onMessage(toMessage(message));
 
  252                 std::auto_ptr<topic::Message<T> > toMessage(
const topic::impl::reliable::ReliableTopicMessage *m) {
 
  253                     std::auto_ptr<Member> member;
 
  254                     const Address *addr = m->getPublisherAddress();
 
  256                         member = std::auto_ptr<Member>(
new Member(*addr));
 
  258                     std::auto_ptr<T> msg = serializationService->toObject<T>(m->getPayload());
 
  259                     return std::auto_ptr<topic::Message<T> >(
new topic::impl::MessageImpl<T>(name, msg, m->getPublishTime(), member));
 
  262                 bool terminate(
const exception::IException &failure) {
 
  268                         bool terminate = listener->isTerminal(failure);
 
  270                             std::ostringstream out;
 
  271                             out << 
"Terminating MessageListener " << 
id << 
" on topic: " << name << 
". " 
  272                                 << 
" Reason: Unhandled exception, details: " << failure.what();
 
  273                             logger.warning(out.str());
 
  275                             if (logger.isFinestEnabled()) {
 
  276                                 std::ostringstream out;
 
  277                                 out << 
"MessageListener " << 
id << 
" on topic: " << name << 
". " 
  278                                     << 
" ran into an exception, details:" << failure.what();
 
  279                                 logger.finest(out.str());
 
  283                     } 
catch (exception::IException &t) {
 
  284                         std::ostringstream out;
 
  285                         out << 
"Terminating MessageListener " << 
id << 
" on topic: " << name << 
". " 
  286                             << 
" Reason: Unhandled exception while calling ReliableMessageListener::isTerminal() method. " 
  288                         logger.warning(out.str());
 
  295                 topic::ReliableMessageListener<T> *listener;
 
  297                 Ringbuffer<topic::impl::reliable::ReliableTopicMessage> *ringbuffer;
 
  299                 util::AtomicBoolean cancelled;
 
  300                 util::ILogger &logger;
 
  301                 const std::string &name;
 
  302                 topic::impl::reliable::ReliableTopicExecutor executor;
 
  303                 serialization::pimpl::SerializationService *serializationService;
 
  304                 const config::ReliableTopicConfig *config;
 
  307             util::SynchronizedMap<int, MessageRunner<E> > runnersMap;
 
  308             util::AtomicInt runnerCounter;
 
  313 #endif //HAZELCAST_CLIENT_RELIABLETOPIC_H_ 
bool removeMessageListener(const std::string ®istrationId)
Stops receiving messages for the given message listener. 
Definition: ReliableTopic.h:97
 
void publish(const E *message)
Publishes the message to all subscribers of this topic Current implementation only supports DISCARD_O...
Definition: ReliableTopic.h:58
 
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
Definition: ReliableTopic.h:45
 
std::string addMessageListener(topic::ReliableMessageListener< E > &listener)
Subscribes to this topic. 
Definition: ReliableTopic.h:80
 
Hazelcast Client enables you to do all Hazelcast operations without being a member of the cluster...
Definition: HazelcastClient.h:459