activemq-cpp-3.9.5
FutureTask.h
Go to the documentation of this file.
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#ifndef _DECAF_UTIL_CONCURRENT_FUTURETASK_H_
19#define _DECAF_UTIL_CONCURRENT_FUTURETASK_H_
20
21#include <decaf/util/Config.h>
22
23#include <decaf/lang/Thread.h>
24#include <decaf/lang/Pointer.h>
26
34
35namespace decaf {
36namespace util {
37namespace concurrent {
38
40
57 template<typename T>
58 class FutureTask : public RunnableFuture<T> {
59 private:
60
65 class FutureTaskAdapter : public decaf::util::concurrent::Callable<T> {
66 private:
67
70 bool owns;
71 T result;
72
73 private:
74
75 FutureTaskAdapter(const FutureTaskAdapter&);
76 FutureTaskAdapter operator= (const FutureTaskAdapter&);
77
78 public:
79
80 FutureTaskAdapter(decaf::lang::Runnable* task, const T& result, bool owns = true) :
81 decaf::util::concurrent::Callable<T>(), task(task), callable(NULL), owns(owns), result(result) {
82 }
83
84 FutureTaskAdapter(decaf::util::concurrent::Callable<T>* task, bool owns = true) :
85 decaf::util::concurrent::Callable<T>(), task(NULL), callable(task), owns(owns), result(T()) {
86 }
87
88 virtual ~FutureTaskAdapter() {
89 try{
90 if (owns) {
91 delete this->task;
92 delete this->callable;
93 }
94 }
96 }
97
98 virtual T call() {
99 if (this->task != NULL) {
100 this->task->run();
101 return result;
102 } else {
103 return this->callable->call();
104 }
105 }
106 };
107
113 class FutureTaskSync : public locks::AbstractQueuedSynchronizer {
114 private:
115
116 enum SyncState {
118 READY = 0,
120 RUNNING = 1,
122 RAN = 2,
124 CANCELLED = 4
125 };
126
128 Pointer< Callable<T> > callable;
129
131 T result;
132
135
136 // The FutureTask parent of the Sync object.
137 FutureTask* parent;
138
143 decaf::lang::Thread* runner;
144
145 private:
146
147 FutureTaskSync(const FutureTaskSync&);
148 FutureTaskSync operator= (const FutureTaskSync&);
149
150 public:
151
152 FutureTaskSync(FutureTask* parent, Callable<T>* callable) :
153 AbstractQueuedSynchronizer(), callable(callable), result(), exception(), parent(parent), runner(NULL) {
154 }
155
156 virtual ~FutureTaskSync() {
157 }
158
159 bool innerIsCancelled() const {
160 return getState() == CANCELLED;
161 }
162
163 bool innerIsDone() const {
164 return ranOrCancelled(getState()) && this->runner == NULL;
165 }
166
167 T innerGet() {
168 this->acquireSharedInterruptibly(0);
169 if (getState() == CANCELLED) {
170 throw CancellationException();
171 }
172 if (exception != NULL) {
173 throw ExecutionException(exception->clone());
174 }
175 return result;
176 }
177
178 T innerGet(long long nanosTimeout) {
179 if (!tryAcquireSharedNanos(0, nanosTimeout)) {
180 throw TimeoutException();
181 }
182 if (getState() == CANCELLED) {
183 throw CancellationException();
184 }
185 if (exception != NULL) {
186 throw ExecutionException(exception->clone());
187 }
188 return result;
189 }
190
191 void innerSet(const T& result) {
192 for (;;) {
193 int s = getState();
194 if (s == RAN) {
195 return;
196 }
197 if (s == CANCELLED) {
198 // aggressively release to set runner to null,
199 // in case we are racing with a cancel request
200 // that will try to interrupt runner
201 releaseShared(0);
202 return;
203 }
204 if (compareAndSetState(s, RAN)) {
205 this->result = result;
206 releaseShared(0);
207 this->parent->done();
208 return;
209 }
210 }
211 }
212
213 void innerSetException(const decaf::lang::Exception& t) {
214 for (;;) {
215 int s = getState();
216 if (s == RAN) {
217 return;
218 }
219 if (s == CANCELLED) {
220 // aggressively release to set runner to null,
221 // in case we are racing with a cancel request
222 // that will try to interrupt runner
223 releaseShared(0);
224 return;
225 }
226 if (compareAndSetState(s, RAN)) {
227 exception.reset(t.clone());
228 releaseShared(0);
229 this->parent->done();
230 return;
231 }
232 }
233 }
234
235 bool innerCancel(bool mayInterruptIfRunning) {
236 for (;;) {
237 int s = getState();
238 if (ranOrCancelled(s)) {
239 return false;
240 }
241 if (compareAndSetState(s, CANCELLED)) {
242 break;
243 }
244 }
245
246 if (mayInterruptIfRunning) {
247 decaf::lang::Thread* r = runner;
248 if (r != NULL) {
249 r->interrupt();
250 }
251 }
252
253 releaseShared(0);
254 this->parent->done();
255 return true;
256 }
257
258 void innerRun() {
259 if (!compareAndSetState(READY, RUNNING)) {
260 return;
261 }
262
263 this->runner = decaf::lang::Thread::currentThread();
264 if (getState() == RUNNING) { // recheck after setting thread
265 T result;
266 try {
267 result = this->callable->call();
268 } catch(decaf::lang::Exception& ex) {
269 this->parent->setException(ex);
270 return;
271 } catch(std::exception& stdex) {
272 this->parent->setException(decaf::lang::Exception(new std::exception(stdex)));
273 return;
274 } catch(...) {
276 __FILE__, __LINE__, "FutureTask Caught Unknown exception during task execution."));
277 return;
278 }
279 this->parent->set(result);
280 } else {
281 releaseShared(0); // cancel
282 }
283 }
284
285 bool innerRunAndReset() {
286 if (!compareAndSetState(READY, RUNNING)) {
287 return false;
288 }
289
290 try {
291 this->runner = decaf::lang::Thread::currentThread();
292 if (getState() == RUNNING) {
293 this->callable->call(); // don't set result
294 }
295 this->runner = NULL;
296 return compareAndSetState(RUNNING, READY);
297 } catch(decaf::lang::Exception& ex) {
298 this->parent->setException(ex);
299 return false;
300 } catch(std::exception& stdex) {
301 this->parent->setException(decaf::lang::Exception(new std::exception(stdex)));
302 return false;
303 } catch(...) {
305 __FILE__, __LINE__, "FutureTask Caught Unknown exception during task execution."));
306 return false;
307 }
308 }
309
310 protected:
311
315 virtual int tryAcquireShared(int ignore DECAF_UNUSED) {
316 return innerIsDone() ? 1 : -1;
317 }
318
323 virtual bool tryReleaseShared(int ignore DECAF_UNUSED) {
324 runner = NULL;
325 return true;
326 }
327
328 private:
329
330 bool ranOrCancelled(int state) const {
331 return (state & (RAN | CANCELLED)) != 0;
332 }
333 };
334
335 private:
336
338
339 public:
340
352 FutureTask(Callable<T>* callable, bool takeOwnership = true) : sync(NULL) {
353 if (callable == NULL ) {
354 throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__,
355 "The Callable pointer passed to the constructor was NULL");
356 }
357
358 this->sync.reset(new FutureTaskSync(this, new FutureTaskAdapter(callable, takeOwnership)));
359 }
360
375 FutureTask(decaf::lang::Runnable* runnable, const T& result, bool takeOwnership = true) : sync(NULL) {
376 if (runnable == NULL ) {
377 throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__,
378 "The Runnable pointer passed to the constructor was NULL");
379 }
380
381 this->sync.reset(new FutureTaskSync(this, new FutureTaskAdapter(runnable, result, takeOwnership)));
382 }
383
384 virtual ~FutureTask() {
385 }
386
387 virtual bool isCancelled() const {
388 return this->sync->innerIsCancelled();
389 }
390
391 virtual bool isDone() const {
392 return this->sync->innerIsDone();
393 }
394
395 virtual bool cancel(bool mayInterruptIfRunning) {
396 return this->sync->innerCancel(mayInterruptIfRunning);
397 }
398
399 virtual T get() {
400 return this->sync->innerGet();
401 }
402
403 virtual T get(long long timeout, const TimeUnit& unit) {
404 return this->sync->innerGet(unit.toNanos(timeout));
405 }
406
408 return new FutureTask<T>(*this);
409 }
410
411 public:
412
421 virtual void done() {}
422
432 virtual void set(const T& result) {
433 this->sync->innerSet(result);
434 }
435
445 virtual void setException(const decaf::lang::Exception& error) {
446 this->sync->innerSetException(error);
447 }
448
449 virtual void run() {
450 this->sync->innerRun();
451 }
452
461 virtual bool runAndReset() {
462 return this->sync->innerRunAndReset();
463 }
464
465 public:
466
467 FutureTask(const FutureTask<T>& source) : RunnableFuture<T>(), sync(source.sync) {
468 }
469
471 this->sync = source.sync;
472 return *this;
473 }
474
475 };
476
477}}}
478
479#endif /* _DECAF_UTIL_CONCURRENT_FUTURETASK_H_ */
Definition: Exception.h:38
virtual Exception * clone() const
Clones this exception.
Decaf's implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition: Pointer.h:53
void reset(T *value=NULL)
Resets the Pointer to hold the new value.
Definition: Pointer.h:161
Interface for a runnable object - defines a task that can be run by a thread.
Definition: Runnable.h:29
virtual void run()=0
Run method - called by the Thread class in the context of the thread.
A Thread is a concurrent unit of execution.
Definition: Thread.h:64
void interrupt()
Interrupts the Thread if it is blocked and in an interruptible state.
static Thread * currentThread()
Returns a pointer to the currently executing thread object.
Definition: NullPointerException.h:32
A task that returns a result and may throw an exception.
Definition: Callable.h:47
virtual V call()=0
Computes a result, or throws an exception if unable to do so.
Definition: CancellationException.h:30
Definition: ExecutionException.h:31
A cancellable asynchronous computation.
Definition: FutureTask.h:58
virtual void run()
Run method - called by the Thread class in the context of the thread.
Definition: FutureTask.h:449
virtual bool isDone() const
Returns true if this task completed.
Definition: FutureTask.h:391
virtual void setException(const decaf::lang::Exception &error)
Causes this future to report an ExecutionException with the given Exception as its cause,...
Definition: FutureTask.h:445
virtual bool cancel(bool mayInterruptIfRunning)
Attempts to cancel execution of this task.
Definition: FutureTask.h:395
FutureTask< T > & operator=(const FutureTask< T > &source)
Definition: FutureTask.h:470
FutureTask(const FutureTask< T > &source)
Definition: FutureTask.h:467
FutureTask(Callable< T > *callable, bool takeOwnership=true)
Creates a FutureTask instance that will, upon running, execute the given Callable.
Definition: FutureTask.h:352
virtual T get(long long timeout, const TimeUnit &unit)
Waits if necessary for at most the given time for the computation to complete, and then retrieves its...
Definition: FutureTask.h:403
FutureTask< T > * clone()
Definition: FutureTask.h:407
virtual bool runAndReset()
Executes the computation without setting its result, and then resets this Future to initial state,...
Definition: FutureTask.h:461
virtual ~FutureTask()
Definition: FutureTask.h:384
virtual void done()
Protected method invoked when this task transitions to state isDone (whether normally or via cancella...
Definition: FutureTask.h:421
virtual T get()
Waits if necessary for the computation to complete, and then retrieves its result.
Definition: FutureTask.h:399
virtual bool isCancelled() const
Returns true if this task was canceled before it completed normally.
Definition: FutureTask.h:387
virtual void set(const T &result)
Sets the result of this Future to the given value unless this future has already been set or has been...
Definition: FutureTask.h:432
FutureTask(decaf::lang::Runnable *runnable, const T &result, bool takeOwnership=true)
Creates a FutureTask that will, upon running, execute the given Runnable, and arrange that the get me...
Definition: FutureTask.h:375
A Runnable version of the Future type.
Definition: RunnableFuture.h:38
A TimeUnit represents time durations at a given unit of granularity and provides utility methods to c...
Definition: TimeUnit.h:62
long long toNanos(long long duration) const
Equivalent to NANOSECONDS.convert(duration, this).
Definition: TimeUnit.h:126
Definition: TimeoutException.h:32
Definition: AbstractQueuedSynchronizer.h:35
#define DECAF_CATCHALL_NOTHROW()
A catch-all that does not throw an exception, one use would be to catch any exception in a destructor...
Definition: ExceptionDefines.h:62
#define NULL
Definition: Config.h:33
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
Definition: AprPool.h:25