18 #ifndef HAZELCAST_CLIENT_RELIABLETOPIC_H_ 19 #define HAZELCAST_CLIENT_RELIABLETOPIC_H_ 21 #include "hazelcast/client/proxy/ReliableTopicImpl.h" 22 #include "hazelcast/client/Ringbuffer.h" 23 #include "hazelcast/client/DataArray.h" 24 #include "hazelcast/client/topic/impl/TopicEventHandlerImpl.h" 46 friend class impl::HazelcastClientInstanceImpl;
59 serialization::pimpl::Data data = getContext().getSerializationService().template toData<E>(message);
60 proxy::ReliableTopicImpl::publish(data);
81 int id = ++runnerCounter;
82 boost::shared_ptr<MessageRunner < E> >
83 runner(
new MessageRunner<E>(
id, &listener, ringbuffer.get(), getName(),
84 &getContext().getSerializationService(), config, logger));
85 runnersMap.put(
id, runner);
87 return util::IOUtil::to_string<int>(id);
99 int id = util::IOUtil::to_value<int>(registrationId);
100 boost::shared_ptr<MessageRunner < E> > runner = runnersMap.get(
id);
101 if (NULL == runner) {
105 runnersMap.remove(
id);
109 virtual void onDestroy() {
111 std::vector<std::pair<int, boost::shared_ptr<MessageRunner < E> > > > runners = runnersMap.clear();
112 for (
typename std::vector<std::pair<
int, boost::shared_ptr<MessageRunner < E> > > >
113 ::const_iterator it = runners.begin();it != runners.end();++it) {
114 it->second->cancel();
118 ringbuffer->destroy();
122 ReliableTopic(
const std::string &instanceName, spi::ClientContext *context,
123 boost::shared_ptr<Ringbuffer<topic::impl::reliable::ReliableTopicMessage> > rb)
124 : proxy::ReliableTopicImpl(instanceName, context, rb) {
129 : impl::ExecutionCallback<boost::shared_ptr<DataArray<topic::impl::reliable::ReliableTopicMessage> > > {
131 MessageRunner(
int id, topic::ReliableMessageListener<T> *listener,
132 Ringbuffer<topic::impl::reliable::ReliableTopicMessage> *rb,
133 const std::string &topicName, serialization::pimpl::SerializationService *service,
134 const config::ReliableTopicConfig *reliableTopicConfig, util::ILogger &logger)
135 : cancelled(false), logger(logger), name(topicName), executor(*rb, logger),
136 serializationService(service), config(reliableTopicConfig) {
138 this->listener = listener;
139 this->ringbuffer = rb;
142 int64_t initialSequence = listener->retrieveInitialSequence();
143 if (initialSequence == -1) {
144 initialSequence = ringbuffer->tailSequence() + 1;
146 this->sequence = initialSequence;
148 this->executor.start();
152 virtual ~MessageRunner() {}
159 topic::impl::reliable::ReliableTopicExecutor::Message m;
160 m.type = topic::impl::reliable::ReliableTopicExecutor::GET_ONE_MESSAGE;
162 m.sequence = sequence;
163 m.maxCount = config->getReadBatchSize();
169 const boost::shared_ptr<DataArray<topic::impl::reliable::ReliableTopicMessage> > &allMessages) {
174 size_t numMessages = allMessages->size();
178 for (
size_t i = 0; i < numMessages; ++i) {
180 listener->storeSequence(sequence);
181 process(allMessages->get(i));
182 }
catch (exception::IException &e) {
196 void onFailure(
const boost::shared_ptr<exception::IException> &throwable) {
201 int32_t err = throwable->getErrorCode();
202 if (protocol::EXECUTION == err &&
203 protocol::STALE_SEQUENCE == throwable->getCauseErrorCode()) {
205 int64_t remoteHeadSeq = ringbuffer->headSequence();
207 if (listener->isLossTolerant()) {
208 if (logger.isFinestEnabled()) {
209 std::ostringstream out;
210 out <<
"MessageListener " <<
id <<
" on topic: " << name
211 <<
" ran into a stale sequence. " 212 <<
"Jumping from oldSequence: " << sequence <<
" to sequence: " << remoteHeadSeq;
213 logger.finest(out.str());
215 sequence = remoteHeadSeq;
220 std::ostringstream out;
221 out <<
"Terminating MessageListener:" <<
id <<
" on topic: " << name <<
"Reason: The listener " 222 "was too slow or the retention period of the message has been violated. " 224 << remoteHeadSeq <<
" sequence:" << sequence;
225 logger.warning(out.str());
226 }
else if (protocol::HAZELCAST_INSTANCE_NOT_ACTIVE == err) {
227 if (logger.isFinestEnabled()) {
228 std::ostringstream out;
229 out <<
"Terminating MessageListener " <<
id <<
" on topic: " << name <<
". " 230 <<
" Reason: HazelcastInstance is shutting down";
231 logger.finest(out.str());
233 }
else if (protocol::DISTRIBUTED_OBJECT_DESTROYED == err) {
234 if (logger.isFinestEnabled()) {
235 std::ostringstream out;
236 out <<
"Terminating MessageListener " <<
id <<
" on topic: " << name
237 <<
" Reason: Topic is destroyed";
238 logger.finest(out.str());
241 std::ostringstream out;
242 out <<
"Terminating MessageListener " <<
id <<
" on topic: " << name <<
". " 243 <<
" Reason: Unhandled exception, details:" << throwable->what();
244 logger.warning(out.str());
256 void process(
const topic::impl::reliable::ReliableTopicMessage *message) {
258 listener->onMessage(toMessage(message));
261 std::auto_ptr<topic::Message<T> > toMessage(
const topic::impl::reliable::ReliableTopicMessage *m) {
262 boost::shared_ptr<Member> member;
263 const Address *addr = m->getPublisherAddress();
265 member = boost::shared_ptr<Member>(
new Member(*addr));
267 std::auto_ptr<T> msg = serializationService->toObject<T>(m->getPayload());
268 return std::auto_ptr<topic::Message<T> >(
269 new topic::impl::MessageImpl<T>(name, msg, m->getPublishTime(), member));
272 bool terminate(
const exception::IException &failure) {
278 bool terminate = listener->isTerminal(failure);
280 std::ostringstream out;
281 out <<
"Terminating MessageListener " <<
id <<
" on topic: " << name <<
". " 282 <<
" Reason: Unhandled exception, details: " << failure.what();
283 logger.warning(out.str());
285 if (logger.isFinestEnabled()) {
286 std::ostringstream out;
287 out <<
"MessageListener " <<
id <<
" on topic: " << name <<
". " 288 <<
" ran into an exception, details:" << failure.what();
289 logger.finest(out.str());
293 }
catch (exception::IException &t) {
294 std::ostringstream out;
295 out <<
"Terminating MessageListener " <<
id <<
" on topic: " << name <<
". " 296 <<
" Reason: Unhandled exception while calling ReliableMessageListener::isTerminal() method. " 298 logger.warning(out.str());
305 topic::ReliableMessageListener<T> *listener;
307 Ringbuffer<topic::impl::reliable::ReliableTopicMessage> *ringbuffer;
309 util::AtomicBoolean cancelled;
310 util::ILogger &logger;
311 const std::string &name;
312 topic::impl::reliable::ReliableTopicExecutor executor;
313 serialization::pimpl::SerializationService *serializationService;
314 const config::ReliableTopicConfig *config;
317 util::SynchronizedMap<int, MessageRunner<E> > runnersMap;
318 util::AtomicInt runnerCounter;
323 #endif //HAZELCAST_CLIENT_RELIABLETOPIC_H_ bool removeMessageListener(const std::string ®istrationId)
Stops receiving messages for the given message listener.
Definition: ReliableTopic.h:98
void publish(const E *message)
Publishes the message to all subscribers of this topic Current implementation only supports DISCARD_O...
Definition: ReliableTopic.h:58
Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subs...
Definition: ReliableTopic.h:45
std::string addMessageListener(topic::ReliableMessageListener< E > &listener)
Subscribes to this topic.
Definition: ReliableTopic.h:80
PN (Positive-Negative) CRDT counter.
Definition: MapEntryView.h:32