00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef _DECAF_UTIL_CONCURRENT_FUTURETASK_H_
00019 #define _DECAF_UTIL_CONCURRENT_FUTURETASK_H_
00020
00021 #include <decaf/util/Config.h>
00022
00023 #include <decaf/lang/Thread.h>
00024 #include <decaf/lang/Pointer.h>
00025 #include <decaf/lang/exceptions/NullPointerException.h>
00026
00027 #include <decaf/util/concurrent/RunnableFuture.h>
00028 #include <decaf/util/concurrent/Callable.h>
00029 #include <decaf/util/concurrent/CancellationException.h>
00030 #include <decaf/util/concurrent/ExecutionException.h>
00031 #include <decaf/util/concurrent/TimeoutException.h>
00032 #include <decaf/util/concurrent/TimeUnit.h>
00033 #include <decaf/util/concurrent/locks/AbstractQueuedSynchronizer.h>
00034
00035 namespace decaf {
00036 namespace util {
00037 namespace concurrent {
00038
00039 using decaf::lang::Pointer;
00040
00057 template<typename T>
00058 class FutureTask : public RunnableFuture<T> {
00059 private:
00060
00065 class FutureTaskAdapter : public decaf::util::concurrent::Callable<T> {
00066 private:
00067
00068 decaf::lang::Runnable* task;
00069 decaf::util::concurrent::Callable<T>* callable;
00070 bool owns;
00071 T result;
00072
00073 private:
00074
00075 FutureTaskAdapter(const FutureTaskAdapter&);
00076 FutureTaskAdapter operator= (const FutureTaskAdapter&);
00077
00078 public:
00079
00080 FutureTaskAdapter(decaf::lang::Runnable* task, const T& result, bool owns = true) :
00081 decaf::util::concurrent::Callable<T>(), task(task), callable(NULL), owns(owns), result(result) {
00082 }
00083
00084 FutureTaskAdapter(decaf::util::concurrent::Callable<T>* task, bool owns = true) :
00085 decaf::util::concurrent::Callable<T>(), task(NULL), callable(task), owns(owns), result(T()) {
00086 }
00087
00088 virtual ~FutureTaskAdapter() {
00089 try{
00090 if (owns) {
00091 delete this->task;
00092 delete this->callable;
00093 }
00094 }
00095 DECAF_CATCHALL_NOTHROW()
00096 }
00097
00098 virtual T call() {
00099 if (this->task != NULL) {
00100 this->task->run();
00101 return result;
00102 } else {
00103 return this->callable->call();
00104 }
00105 }
00106 };
00107
00113 class FutureTaskSync : public locks::AbstractQueuedSynchronizer {
00114 private:
00115
00116 enum SyncState {
00118 READY = 0,
00120 RUNNING = 1,
00122 RAN = 2,
00124 CANCELLED = 4
00125 };
00126
00128 Pointer< Callable<T> > callable;
00129
00131 T result;
00132
00134 Pointer<decaf::lang::Exception> exception;
00135
00136
00137 FutureTask* parent;
00138
00143 decaf::lang::Thread* runner;
00144
00145 private:
00146
00147 FutureTaskSync(const FutureTaskSync&);
00148 FutureTaskSync operator= (const FutureTaskSync&);
00149
00150 public:
00151
00152 FutureTaskSync(FutureTask* parent, Callable<T>* callable) :
00153 AbstractQueuedSynchronizer(), callable(callable), result(), exception(), parent(parent), runner(NULL) {
00154 }
00155
00156 virtual ~FutureTaskSync() {
00157 }
00158
00159 bool innerIsCancelled() const {
00160 return getState() == CANCELLED;
00161 }
00162
00163 bool innerIsDone() const {
00164 return ranOrCancelled(getState()) && this->runner == NULL;
00165 }
00166
00167 T innerGet() {
00168 this->acquireSharedInterruptibly(0);
00169 if (getState() == CANCELLED) {
00170 throw CancellationException();
00171 }
00172 if (exception != NULL) {
00173 throw ExecutionException(exception->clone());
00174 }
00175 return result;
00176 }
00177
00178 T innerGet(long long nanosTimeout) {
00179 if (!tryAcquireSharedNanos(0, nanosTimeout)) {
00180 throw TimeoutException();
00181 }
00182 if (getState() == CANCELLED) {
00183 throw CancellationException();
00184 }
00185 if (exception != NULL) {
00186 throw ExecutionException(exception->clone());
00187 }
00188 return result;
00189 }
00190
00191 void innerSet(const T& result) {
00192 for (;;) {
00193 int s = getState();
00194 if (s == RAN) {
00195 return;
00196 }
00197 if (s == CANCELLED) {
00198
00199
00200
00201 releaseShared(0);
00202 return;
00203 }
00204 if (compareAndSetState(s, RAN)) {
00205 this->result = result;
00206 releaseShared(0);
00207 this->parent->done();
00208 return;
00209 }
00210 }
00211 }
00212
00213 void innerSetException(const decaf::lang::Exception& t) {
00214 for (;;) {
00215 int s = getState();
00216 if (s == RAN) {
00217 return;
00218 }
00219 if (s == CANCELLED) {
00220
00221
00222
00223 releaseShared(0);
00224 return;
00225 }
00226 if (compareAndSetState(s, RAN)) {
00227 exception.reset(t.clone());
00228 releaseShared(0);
00229 this->parent->done();
00230 return;
00231 }
00232 }
00233 }
00234
00235 bool innerCancel(bool mayInterruptIfRunning) {
00236 for (;;) {
00237 int s = getState();
00238 if (ranOrCancelled(s)) {
00239 return false;
00240 }
00241 if (compareAndSetState(s, CANCELLED)) {
00242 break;
00243 }
00244 }
00245
00246 if (mayInterruptIfRunning) {
00247 decaf::lang::Thread* r = runner;
00248 if (r != NULL) {
00249 r->interrupt();
00250 }
00251 }
00252
00253 releaseShared(0);
00254 this->parent->done();
00255 return true;
00256 }
00257
00258 void innerRun() {
00259 if (!compareAndSetState(READY, RUNNING)) {
00260 return;
00261 }
00262
00263 this->runner = decaf::lang::Thread::currentThread();
00264 if (getState() == RUNNING) {
00265 T result;
00266 try {
00267 result = this->callable->call();
00268 } catch(decaf::lang::Exception& ex) {
00269 this->parent->setException(ex);
00270 return;
00271 } catch(std::exception& stdex) {
00272 this->parent->setException(decaf::lang::Exception(new std::exception(stdex)));
00273 return;
00274 } catch(...) {
00275 this->parent->setException(decaf::lang::Exception(
00276 __FILE__, __LINE__, "FutureTask Caught Unknown exception during task execution."));
00277 return;
00278 }
00279 this->parent->set(result);
00280 } else {
00281 releaseShared(0);
00282 }
00283 }
00284
00285 bool innerRunAndReset() {
00286 if (!compareAndSetState(READY, RUNNING)) {
00287 return false;
00288 }
00289
00290 try {
00291 this->runner = decaf::lang::Thread::currentThread();
00292 if (getState() == RUNNING) {
00293 this->callable->call();
00294 }
00295 this->runner = NULL;
00296 return compareAndSetState(RUNNING, READY);
00297 } catch(decaf::lang::Exception& ex) {
00298 this->parent->setException(ex);
00299 return false;
00300 } catch(std::exception& stdex) {
00301 this->parent->setException(decaf::lang::Exception(new std::exception(stdex)));
00302 return false;
00303 } catch(...) {
00304 this->parent->setException(decaf::lang::Exception(
00305 __FILE__, __LINE__, "FutureTask Caught Unknown exception during task execution."));
00306 return false;
00307 }
00308 }
00309
00310 protected:
00311
00315 virtual int tryAcquireShared(int ignore DECAF_UNUSED) {
00316 return innerIsDone() ? 1 : -1;
00317 }
00318
00323 virtual bool tryReleaseShared(int ignore DECAF_UNUSED) {
00324 runner = NULL;
00325 return true;
00326 }
00327
00328 private:
00329
00330 bool ranOrCancelled(int state) const {
00331 return (state & (RAN | CANCELLED)) != 0;
00332 }
00333 };
00334
00335 private:
00336
00337 Pointer<FutureTaskSync> sync;
00338
00339 public:
00340
00352 FutureTask(Callable<T>* callable, bool takeOwnership = true) : sync(NULL) {
00353 if (callable == NULL ) {
00354 throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__,
00355 "The Callable pointer passed to the constructor was NULL");
00356 }
00357
00358 this->sync.reset(new FutureTaskSync(this, new FutureTaskAdapter(callable, takeOwnership)));
00359 }
00360
00375 FutureTask(decaf::lang::Runnable* runnable, const T& result, bool takeOwnership = true) : sync(NULL) {
00376 if (runnable == NULL ) {
00377 throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__,
00378 "The Runnable pointer passed to the constructor was NULL");
00379 }
00380
00381 this->sync.reset(new FutureTaskSync(this, new FutureTaskAdapter(runnable, result, takeOwnership)));
00382 }
00383
00384 virtual ~FutureTask() {
00385 }
00386
00387 virtual bool isCancelled() const {
00388 return this->sync->innerIsCancelled();
00389 }
00390
00391 virtual bool isDone() const {
00392 return this->sync->innerIsDone();
00393 }
00394
00395 virtual bool cancel(bool mayInterruptIfRunning) {
00396 return this->sync->innerCancel(mayInterruptIfRunning);
00397 }
00398
00399 virtual T get() {
00400 return this->sync->innerGet();
00401 }
00402
00403 virtual T get(long long timeout, const TimeUnit& unit) {
00404 return this->sync->innerGet(unit.toNanos(timeout));
00405 }
00406
00407 FutureTask<T>* clone() {
00408 return new FutureTask<T>(*this);
00409 }
00410
00411 public:
00412
00421 virtual void done() {}
00422
00432 virtual void set(const T& result) {
00433 this->sync->innerSet(result);
00434 }
00435
00445 virtual void setException(const decaf::lang::Exception& error) {
00446 this->sync->innerSetException(error);
00447 }
00448
00449 virtual void run() {
00450 this->sync->innerRun();
00451 }
00452
00461 virtual bool runAndReset() {
00462 return this->sync->innerRunAndReset();
00463 }
00464
00465 public:
00466
00467 FutureTask(const FutureTask<T>& source) : RunnableFuture<T>(), sync(source.sync) {
00468 }
00469
00470 FutureTask<T>& operator= (const FutureTask<T>& source) {
00471 this->sync = source.sync;
00472 return *this;
00473 }
00474
00475 };
00476
00477 }}}
00478
00479 #endif