00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifndef _DECAF_UTIL_CONCURRENT_THREADPOOLEXECUTOR_H_
00018 #define _DECAF_UTIL_CONCURRENT_THREADPOOLEXECUTOR_H_
00019
00020 #include <decaf/lang/Runnable.h>
00021 #include <decaf/lang/Throwable.h>
00022 #include <decaf/util/concurrent/ThreadFactory.h>
00023 #include <decaf/util/concurrent/BlockingQueue.h>
00024 #include <decaf/util/concurrent/TimeUnit.h>
00025 #include <decaf/util/concurrent/AbstractExecutorService.h>
00026 #include <decaf/util/concurrent/RejectedExecutionHandler.h>
00027 #include <decaf/util/concurrent/RejectedExecutionException.h>
00028 #include <decaf/util/LinkedList.h>
00029 #include <decaf/util/ArrayList.h>
00030 #include <decaf/util/Config.h>
00031
00032 #include <vector>
00033
00034 namespace decaf{
00035 namespace util{
00036 namespace concurrent{
00037
00038 using decaf::lang::Pointer;
00039
00040 class ExecutorKernel;
00041
00058 class DECAF_API ThreadPoolExecutor : public AbstractExecutorService {
00059 private:
00060
00061 ThreadPoolExecutor( const ThreadPoolExecutor& );
00062 ThreadPoolExecutor& operator= ( const ThreadPoolExecutor& );
00063
00064 private:
00065
00066 friend class ExecutorKernel;
00067 ExecutorKernel* kernel;
00068
00069 public:
00070
00097 ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
00098 long long keepAliveTime, const TimeUnit& unit,
00099 BlockingQueue<decaf::lang::Runnable*>* workQueue);
00100
00131 ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
00132 long long keepAliveTime, const TimeUnit& unit,
00133 BlockingQueue<decaf::lang::Runnable*>* workQueue,
00134 RejectedExecutionHandler* handler);
00135
00166 ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
00167 long long keepAliveTime, const TimeUnit& unit,
00168 BlockingQueue<decaf::lang::Runnable*>* workQueue,
00169 ThreadFactory* threadFactory);
00170
00205 ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
00206 long long keepAliveTime, const TimeUnit& unit,
00207 BlockingQueue<decaf::lang::Runnable*>* workQueue,
00208 ThreadFactory* threadFactory,
00209 RejectedExecutionHandler* handler);
00210
00211 virtual ~ThreadPoolExecutor();
00212
00213 virtual void execute(decaf::lang::Runnable* task);
00214
00215 virtual void execute(decaf::lang::Runnable* task, bool takeOwnership);
00216
00217 virtual void shutdown();
00218
00219 virtual ArrayList<decaf::lang::Runnable*> shutdownNow();
00220
00221 virtual bool awaitTermination(long long timeout, const decaf::util::concurrent::TimeUnit& unit);
00222
00223 virtual bool isShutdown() const;
00224
00225 virtual bool isTerminated() const;
00226
00232 virtual int getPoolSize() const;
00233
00239 virtual int getCorePoolSize() const;
00240
00254 virtual void setCorePoolSize(int poolSize);
00255
00261 virtual int getMaximumPoolSize() const;
00262
00274 virtual void setMaximumPoolSize(int maxSize);
00275
00283 virtual long long getTaskCount() const;
00284
00291 virtual int getActiveCount() const;
00292
00299 virtual long long getCompletedTaskCount() const;
00300
00307 virtual int getLargestPoolSize() const;
00308
00316 virtual BlockingQueue<decaf::lang::Runnable*>* getQueue();
00317
00327 virtual bool isTerminating() const;
00328
00341 virtual void allowCoreThreadTimeout(bool value);
00342
00351 virtual bool allowsCoreThreadTimeout() const;
00352
00362 virtual long long getKeepAliveTime(const TimeUnit& unit) const;
00363
00377 virtual void setKeepAliveTime(long long timeout, const TimeUnit& unit);
00378
00390 virtual void setThreadFactory(ThreadFactory* factory);
00391
00398 virtual ThreadFactory* getThreadFactory() const;
00399
00405 virtual RejectedExecutionHandler* getRejectedExecutionHandler() const;
00406
00417 virtual void setRejectedExecutionHandler(RejectedExecutionHandler* handler);
00418
00427 virtual bool prestartCoreThread();
00428
00436 virtual int prestartAllCoreThreads();
00437
00447 bool remove(decaf::lang::Runnable* task);
00448
00456 virtual void purge();
00457
00458 protected:
00459
00473 virtual void beforeExecute(decaf::lang::Thread* thread, decaf::lang::Runnable* task);
00474
00489 virtual void afterExecute(decaf::lang::Runnable* task, decaf::lang::Throwable* error);
00490
00496 virtual void terminated();
00497
00498 protected:
00499
00503 virtual void onShutdown();
00504
00505 public:
00506
00513 class AbortPolicy : public RejectedExecutionHandler {
00514 public:
00515
00516 AbortPolicy() : RejectedExecutionHandler() {
00517 }
00518
00519 virtual ~AbortPolicy() {
00520 }
00521
00522 virtual void rejectedExecution(decaf::lang::Runnable* task, ThreadPoolExecutor* executer DECAF_UNUSED) {
00523 delete task;
00524 throw RejectedExecutionException(__FILE__, __LINE__, "Unable to execute task.");
00525 }
00526
00527 };
00528
00536 class CallerRunsPolicy : public RejectedExecutionHandler {
00537 public:
00538
00539 CallerRunsPolicy() : RejectedExecutionHandler() {
00540 }
00541
00542 virtual ~CallerRunsPolicy() {
00543 }
00544
00545 virtual void rejectedExecution(decaf::lang::Runnable* task, ThreadPoolExecutor* executer DECAF_UNUSED) {
00546
00547 if (executer->isShutdown()) {
00548 delete task;
00549 return;
00550 }
00551
00552 try{
00553 task->run();
00554 delete task;
00555 } catch(decaf::lang::Exception& ex) {
00556 delete task;
00557 throw ex;
00558 }
00559 }
00560 };
00561
00568 class DiscardPolicy : public RejectedExecutionHandler {
00569 public:
00570
00571 DiscardPolicy() : RejectedExecutionHandler() {
00572 }
00573
00574 virtual ~DiscardPolicy() {
00575 }
00576
00577 virtual void rejectedExecution(decaf::lang::Runnable* task, ThreadPoolExecutor* executer DECAF_UNUSED) {
00578 delete task;
00579 }
00580
00581 };
00582
00590 class DiscardOldestPolicy : public RejectedExecutionHandler {
00591 public:
00592
00593 DiscardOldestPolicy() : RejectedExecutionHandler() {
00594 }
00595
00596 virtual ~DiscardOldestPolicy() {
00597 }
00598
00599 virtual void rejectedExecution( decaf::lang::Runnable* task, ThreadPoolExecutor* executer ) {
00600
00601 if (executer->isShutdown()) {
00602 delete task;
00603 return;
00604 }
00605
00606 try{
00607
00608 decaf::lang::Runnable* oldest = NULL;
00609 executer->getQueue()->poll(oldest);
00610 delete oldest;
00611
00612 executer->execute(task);
00613 } catch(decaf::lang::Exception& ex) {
00614 delete task;
00615 throw ex;
00616 }
00617 }
00618
00619 };
00620
00621 };
00622
00623 }}}
00624
00625 #endif