Hazelcast C++ Client
Pipelining.h
1 /*
2  * Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef HAZELCAST_CLIENT_PIPELINING_H_
17 #define HAZELCAST_CLIENT_PIPELINING_H_
18 
19 #include <vector>
20 
21 #include "hazelcast/client/ICompletableFuture.h"
22 #include "hazelcast/util/Preconditions.h"
23 #include "hazelcast/util/ConditionVariable.h"
24 #include "hazelcast/util/concurrent/ConcurrencyUtil.h"
25 
26 namespace hazelcast {
27  namespace client {
75  template<typename E>
76  class Pipelining : public boost::enable_shared_from_this<Pipelining<E> > {
77  public:
89  static boost::shared_ptr<Pipelining> create(int depth) {
90  util::Preconditions::checkPositive(depth, "depth must be positive");
91 
92  return boost::shared_ptr<Pipelining>(new Pipelining(depth));
93  }
94 
105  std::vector<boost::shared_ptr<E> > results() {
106  std::vector<boost::shared_ptr<E> > result(futures.size());
107  size_t index = 0;
108  for (typename FuturesVector::const_iterator it = futures.begin(); it != futures.end(); ++it, ++index) {
109  result[index] = (*it)->get();
110  }
111  return result;
112  }
113 
124  const boost::shared_ptr<ICompletableFuture<E> > &
125  add(const boost::shared_ptr<ICompletableFuture<E> > &future) {
126  util::Preconditions::checkNotNull<ICompletableFuture<E> >(future, "future can't be null");
127 
128  down();
129  futures.push_back(future);
130  future->andThen(boost::shared_ptr<ExecutionCallback<E> >(
131  new PipeliningExecutionCallback(this->shared_from_this())),
132  util::concurrent::ConcurrencyUtil::CALLER_RUNS());
133  return future;
134  }
135 
136  private:
137  class PipeliningExecutionCallback : public ExecutionCallback<E> {
138  public:
139  PipeliningExecutionCallback(const boost::shared_ptr<Pipelining> &pipelining) : pipelining(pipelining) {}
140 
141  virtual void onResponse(const boost::shared_ptr<E> &response) {
142  pipelining->up();
143  }
144 
145  virtual void onFailure(const boost::shared_ptr<exception::IException> &e) {
146  pipelining->up();
147  }
148 
149  private:
150  const boost::shared_ptr<Pipelining> pipelining;
151  };
152 
153  Pipelining(int depth) : permits(util::Preconditions::checkPositive(depth, "depth must be positive")) {
154  }
155 
156  // TODO: Change with lock-free implementation when atomic is integrated into the library
157  void down() {
158  util::LockGuard lockGuard(mutex);
159  while (permits == 0) {
160  conditionVariable.wait(mutex);
161  }
162  --permits;
163  }
164 
165  void up() {
166  util::LockGuard lockGuard(mutex);
167  if (permits == 0) {
168  conditionVariable.notify_all();
169  }
170  ++permits;
171  }
172 
173  typedef std::vector<boost::shared_ptr<ICompletableFuture<E> > > FuturesVector;
174  int permits;
175  std::vector<boost::shared_ptr<ICompletableFuture<E> > > futures;
176  util::ConditionVariable conditionVariable;
177  util::Mutex mutex;
178  };
179  }
180 }
181 
182 #endif //HAZELCAST_CLIENT_PIPELINING_H_
std::vector< boost::shared_ptr< E > > results()
Returns the results.
Definition: Pipelining.h:105
const boost::shared_ptr< ICompletableFuture< E > > & add(const boost::shared_ptr< ICompletableFuture< E > > &future)
Adds a future to this Pipelining or blocks until there is capacity to add the future to the Pipelinin...
Definition: Pipelining.h:125
static boost::shared_ptr< Pipelining > create(int depth)
Creates a Pipelining with the given depth.
Definition: Pipelining.h:89
A Future where one can asynchronously listen on completion.
Definition: ICompletableFuture.h:38
Definition: Pipelining.h:76
PN (Positive-Negative) CRDT counter.
Definition: MapEntryView.h:32