Hazelcast C++ Client
ClientDelegatingFuture.h
1 /*
2  * Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef HAZELCAST_CLIENT_INTERNAL_CLIENTDELEGATINGFUTURE_H
17 #define HAZELCAST_CLIENT_INTERNAL_CLIENTDELEGATINGFUTURE_H
18 
19 #include <assert.h>
20 #include <boost/enable_shared_from_this.hpp>
21 
22 #include <hazelcast/client/spi/InternalCompletableFuture.h>
23 #include <hazelcast/client/spi/impl/ClientInvocationFuture.h>
24 #include <hazelcast/client/serialization/pimpl/SerializationService.h>
25 #include <hazelcast/client/impl/ClientMessageDecoder.h>
26 #include <hazelcast/client/spi/impl/ClientInvocation.h>
27 
28 namespace hazelcast {
29  namespace client {
30  namespace internal {
39  template<typename V>
41  : public spi::InternalCompletableFuture<V>,
42  public boost::enable_shared_from_this<ClientDelegatingFuture<V> > {
43  public:
45  const boost::shared_ptr<spi::impl::ClientInvocationFuture> &clientInvocationFuture,
46  serialization::pimpl::SerializationService &serializationService,
47  const boost::shared_ptr<impl::ClientMessageDecoder<V> > &clientMessageDecoder,
48  const boost::shared_ptr<V> &defaultValue) : future(
49  clientInvocationFuture), serializationService(serializationService), clientMessageDecoder(
50  clientMessageDecoder), defaultValue(defaultValue), decodedResponse(
51  boost::static_pointer_cast<V>(VOIDOBJ)) {
52  const boost::shared_ptr<spi::impl::ClientInvocation> invocation = clientInvocationFuture->getInvocation();
53  if (invocation.get()) {
54  userExecutor = invocation->getUserExecutor();
55  }
56  }
57 
59  const boost::shared_ptr<spi::impl::ClientInvocationFuture> &clientInvocationFuture,
60  serialization::pimpl::SerializationService &serializationService,
61  const boost::shared_ptr<impl::ClientMessageDecoder<V> > &clientMessageDecoder) : future(
62  clientInvocationFuture), serializationService(serializationService), clientMessageDecoder(
63  clientMessageDecoder), decodedResponse(boost::static_pointer_cast<V>(VOIDOBJ)) {
64  const boost::shared_ptr<spi::impl::ClientInvocation> invocation = clientInvocationFuture->getInvocation();
65  if (invocation.get()) {
66  userExecutor = invocation->getUserExecutor();
67  }
68  }
69 
70  virtual void andThen(const boost::shared_ptr<ExecutionCallback<V> > &callback) {
71  future->andThen(boost::shared_ptr<ExecutionCallback<protocol::ClientMessage> >(
72  new DelegatingExecutionCallback(
73  boost::enable_shared_from_this<ClientDelegatingFuture<V> >::shared_from_this(),
74  callback)), userExecutor);
75  }
76 
77  virtual void andThen(const boost::shared_ptr<ExecutionCallback<V> > &callback,
78  const boost::shared_ptr<Executor> &executor) {
79  future->andThen(boost::shared_ptr<ExecutionCallback<protocol::ClientMessage> >(
80  new DelegatingExecutionCallback(
81  boost::enable_shared_from_this<ClientDelegatingFuture<V> >::shared_from_this(),
82  callback)), executor);
83  }
84 
85  virtual bool cancel(bool mayInterruptIfRunning) {
86  return future->cancel(mayInterruptIfRunning);
87  }
88 
89  virtual bool isCancelled() {
90  return future->isCancelled();
91  }
92 
93  virtual bool isDone() {
94  return future->isDone();
95  }
96 
97  virtual boost::shared_ptr<V> get() {
98  return get(INT64_MAX, hazelcast::util::concurrent::TimeUnit::MILLISECONDS());
99  }
100 
101  virtual boost::shared_ptr<V> get(int64_t timeout, const TimeUnit &unit) {
102  boost::shared_ptr<protocol::ClientMessage> response = future->get(timeout, unit);
103  return resolveResponse(response);
104  }
105 
106  virtual boost::shared_ptr<V> join() {
107  try {
108  return get();
109  } catch (exception::IException &e) {
110  util::ExceptionUtil::rethrow(e);
111  }
112  return boost::shared_ptr<V>();
113  }
114 
115  virtual bool complete(const boost::shared_ptr<V> &value) {
116  assert(0);
117  return false;
118  }
119 
120  virtual bool complete(const boost::shared_ptr<exception::IException> &value) {
121  return future->complete(value);
122  }
123 
124  protected:
125  const boost::shared_ptr<spi::impl::ClientInvocationFuture> &getFuture() const {
126  return future;
127  }
128 
129  private:
130  class DelegatingExecutionCallback : public ExecutionCallback<protocol::ClientMessage> {
131  public:
132  DelegatingExecutionCallback(
133  const boost::shared_ptr<ClientDelegatingFuture<V> > &delegatingFuture,
134  const boost::shared_ptr<ExecutionCallback<V> > &callback) : delegatingFuture(
135  delegatingFuture), callback(callback) {}
136 
137  virtual void onResponse(const boost::shared_ptr<protocol::ClientMessage> &message) {
138  boost::shared_ptr<V> response = delegatingFuture->resolveResponse(message);
139  callback->onResponse(response);
140  }
141 
142  virtual void onFailure(const boost::shared_ptr<exception::IException> &e) {
143  callback->onFailure(e);
144  }
145 
146  private:
147  boost::shared_ptr<ClientDelegatingFuture> delegatingFuture;
148  const boost::shared_ptr<ExecutionCallback<V> > callback;
149  };
150 
151  boost::shared_ptr<V>
152  decodeResponse(const boost::shared_ptr<protocol::ClientMessage> &clientMessage) {
153  if (decodedResponse != boost::static_pointer_cast<V>(VOIDOBJ)) {
154  return decodedResponse;
155  }
156  // TODO: Java uses a message wrapper here --> ClientMessage message = ClientMessage.createForDecode(clientMessage.buffer(), 0);
157  boost::shared_ptr<V> newDecodedResponse = clientMessageDecoder->decodeClientMessage(clientMessage,
158  serializationService);
159 
160  decodedResponse.compareAndSet(boost::static_pointer_cast<V>(VOIDOBJ), newDecodedResponse);
161  return newDecodedResponse;
162  }
163 
164  /* TODO: Java client does deserialization inside this method, do we need it ? */
165  boost::shared_ptr<V>
166  resolveResponse(const boost::shared_ptr<protocol::ClientMessage> &clientMessage) {
167  if (defaultValue.get() != NULL) {
168  return defaultValue;
169  }
170 
171  return decodeResponse(clientMessage);
172  }
173 
174  static const boost::shared_ptr<void> VOIDOBJ;
175  const boost::shared_ptr<spi::impl::ClientInvocationFuture> future;
176  serialization::pimpl::SerializationService &serializationService;
177  const boost::shared_ptr<impl::ClientMessageDecoder<V> > clientMessageDecoder;
178  const boost::shared_ptr<V> defaultValue;
179  boost::shared_ptr<hazelcast::util::Executor> userExecutor;
180  hazelcast::util::Atomic<boost::shared_ptr<V> > decodedResponse;
181  };
182 
183  template<typename V>
184  const boost::shared_ptr<void> ClientDelegatingFuture<V>::VOIDOBJ(new char);
185  }
186  }
187 };
188 
189 #endif /* HAZELCAST_CLIENT_INTERNAL_CLIENTDELEGATINGFUTURE_H */
The Client Delegating Future is used to delegate ClientInvocationFuture to a user type to be used wit...
Definition: ClientDelegatingFuture.h:40
Base class for all exception originated from Hazelcast methods.
Definition: IException.h:53
ExecutionCallback allows to asynchronously get notified when the execution is completed, either successfully or with error.
Definition: ExecutionCallback.h:41
PN (Positive-Negative) CRDT counter.
Definition: MapEntryView.h:32