18 #ifndef HAZELCAST_CLIENT_RELIABLETOPIC_H_ 19 #define HAZELCAST_CLIENT_RELIABLETOPIC_H_ 21 #include "hazelcast/client/proxy/ReliableTopicImpl.h" 22 #include "hazelcast/client/Ringbuffer.h" 23 #include "hazelcast/client/DataArray.h" 24 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h" 46 friend class impl::HazelcastClientInstanceImpl;
59 serialization::pimpl::Data data = getContext().getSerializationService().template toData<E>(message);
60 proxy::ReliableTopicImpl::publish(data);
81 int id = ++runnerCounter;
82 boost::shared_ptr<MessageRunner < E> >
83 runner(
new MessageRunner<E>(
id, &listener, ringbuffer.get(), getName(),
84 &getContext().getSerializationService(), config, logger));
85 runnersMap.put(
id, runner);
87 return util::IOUtil::to_string<int>(id);
99 int id = util::IOUtil::to_value<int>(registrationId);
100 boost::shared_ptr<MessageRunner < E> > runner = runnersMap.get(
id);
101 if (NULL == runner) {
105 runnersMap.remove(
id);
109 virtual void onDestroy() {
111 std::vector<std::pair<int, boost::shared_ptr<MessageRunner < E> > > > runners = runnersMap.clear();
112 for (
typename std::vector<std::pair<
int, boost::shared_ptr<MessageRunner < E> > > >
113 ::const_iterator it = runners.begin();it != runners.end();++it) {
114 it->second->cancel();
118 ringbuffer->destroy();
122 ReliableTopic(
const std::string &instanceName, spi::ClientContext *context,
124 : proxy::ReliableTopicImpl(instanceName, context, rb) {
131 MessageRunner(
int id, topic::ReliableMessageListener<T> *listener,
133 const std::string &topicName, serialization::pimpl::SerializationService *service,
135 : cancelled(false), logger(logger), name(topicName), executor(*rb, logger),
136 serializationService(service), config(reliableTopicConfig) {
138 this->listener = listener;
139 this->ringbuffer = rb;
142 int64_t initialSequence = listener->retrieveInitialSequence();
143 if (initialSequence == -1) {
146 this->sequence = initialSequence;
148 this->executor.start();
152 virtual ~MessageRunner() {}
159 topic::impl::reliable::ReliableTopicExecutor::Message m;
160 m.type = topic::impl::reliable::ReliableTopicExecutor::GET_ONE_MESSAGE;
162 m.sequence = sequence;
163 m.maxCount = config->getReadBatchSize();
174 size_t numMessages = allMessages->size();
178 for (
size_t i = 0; i < numMessages; ++i) {
180 listener->storeSequence(sequence);
181 process(allMessages->get(i));
196 void onFailure(
const boost::shared_ptr<exception::IException> &throwable) {
201 int32_t err = throwable->getErrorCode();
202 if (protocol::TIMEOUT == err) {
203 if (logger.isFinestEnabled()) {
204 logger.finest() <<
"MessageListener " << listener <<
" on topic: " << name <<
" timed out. " 205 <<
"Continuing from last known sequence: " << sequence;
209 }
else if (protocol::EXECUTION == err &&
210 protocol::STALE_SEQUENCE == throwable->getCauseErrorCode()) {
212 int64_t remoteHeadSeq = ringbuffer->headSequence();
214 if (listener->isLossTolerant()) {
215 if (logger.isFinestEnabled()) {
216 std::ostringstream out;
217 out <<
"MessageListener " <<
id <<
" on topic: " << name
218 <<
" ran into a stale sequence. " 219 <<
"Jumping from oldSequence: " << sequence <<
" to sequence: " << remoteHeadSeq;
220 logger.finest(out.str());
222 sequence = remoteHeadSeq;
227 std::ostringstream out;
228 out <<
"Terminating MessageListener:" <<
id <<
" on topic: " << name <<
"Reason: The listener " 229 "was too slow or the retention period of the message has been violated. " 231 << remoteHeadSeq <<
" sequence:" << sequence;
232 logger.warning(out.str());
233 }
else if (protocol::HAZELCAST_INSTANCE_NOT_ACTIVE == err) {
234 if (logger.isFinestEnabled()) {
235 std::ostringstream out;
236 out <<
"Terminating MessageListener " <<
id <<
" on topic: " << name <<
". " 237 <<
" Reason: HazelcastInstance is shutting down";
238 logger.finest(out.str());
240 }
else if (protocol::DISTRIBUTED_OBJECT_DESTROYED == err) {
241 if (logger.isFinestEnabled()) {
242 std::ostringstream out;
243 out <<
"Terminating MessageListener " <<
id <<
" on topic: " << name
244 <<
" Reason: Topic is destroyed";
245 logger.finest(out.str());
248 std::ostringstream out;
249 out <<
"Terminating MessageListener " <<
id <<
" on topic: " << name <<
". " 250 <<
" Reason: Unhandled exception, details:" << throwable->what();
251 logger.warning(out.str());
263 void process(
const topic::impl::reliable::ReliableTopicMessage *message) {
265 listener->onMessage(toMessage(message));
268 std::auto_ptr<topic::Message<T> > toMessage(
const topic::impl::reliable::ReliableTopicMessage *m) {
269 boost::shared_ptr<Member> member;
270 const Address *addr = m->getPublisherAddress();
272 member = boost::shared_ptr<Member>(
new Member(*addr));
274 std::auto_ptr<T> msg = serializationService->toObject<T>(m->getPayload());
275 return std::auto_ptr<topic::Message<T> >(
276 new topic::impl::MessageImpl<T>(name, msg, m->getPublishTime(), member));
285 bool terminate = listener->isTerminal(failure);
287 std::ostringstream out;
288 out <<
"Terminating MessageListener " <<
id <<
" on topic: " << name <<
". " 289 <<
" Reason: Unhandled exception, details: " << failure.
what();
290 logger.warning(out.str());
292 if (logger.isFinestEnabled()) {
293 std::ostringstream out;
294 out <<
"MessageListener " <<
id <<
" on topic: " << name <<
". " 295 <<
" ran into an exception, details:" << failure.
what();
296 logger.finest(out.str());
301 std::ostringstream out;
302 out <<
"Terminating MessageListener " <<
id <<
" on topic: " << name <<
". " 303 <<
" Reason: Unhandled exception while calling ReliableMessageListener::isTerminal() method. " 305 logger.warning(out.str());
312 topic::ReliableMessageListener<T> *listener;
316 util::AtomicBoolean cancelled;
317 util::ILogger &logger;
318 const std::string &name;
319 topic::impl::reliable::ReliableTopicExecutor executor;
320 serialization::pimpl::SerializationService *serializationService;
324 util::SynchronizedMap<int, MessageRunner<E> > runnersMap;
325 util::AtomicInt runnerCounter;
330 #endif //HAZELCAST_CLIENT_RELIABLETOPIC_H_ Base class for all exception originated from Hazelcast methods.
Definition: IException.h:53
Definition: DataArray.h:28
virtual int64_t tailSequence()=0
Returns the sequence of the tail.
bool removeMessageListener(const std::string ®istrationId)
Stops receiving messages for the given message listener.
Definition: ReliableTopic.h:98
IP Address.
Definition: Address.h:41
void publish(const E *message)
Publishes the message to all subscribers of this topic Current implementation only supports DISCARD_O...
Definition: ReliableTopic.h:58
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
Definition: ReliableTopic.h:45
virtual char const * what() const
return pointer to the explanation string.
Definition: IException.cpp:66
ExecutionCallback allows to asynchronously get notified when the execution is completed, either successfully or with error.
Definition: ExecutionCallback.h:41
std::string addMessageListener(topic::ReliableMessageListener< E > &listener)
Subscribes to this topic.
Definition: ReliableTopic.h:80
Cluster member class.
Definition: Member.h:43
PN (Positive-Negative) CRDT counter.
Definition: MapEntryView.h:32
Definition: ReliableTopicConfig.h:30