00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_
00019 #define _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_
00020
00021 #include <decaf/util/Config.h>
00022
00023 #include <decaf/util/concurrent/atomic/AtomicInteger.h>
00024 #include <decaf/util/concurrent/BlockingQueue.h>
00025 #include <decaf/util/concurrent/locks/ReentrantLock.h>
00026 #include <decaf/util/AbstractQueue.h>
00027 #include <decaf/util/Iterator.h>
00028 #include <decaf/lang/Integer.h>
00029 #include <decaf/lang/Math.h>
00030 #include <decaf/lang/Pointer.h>
00031 #include <decaf/util/NoSuchElementException.h>
00032 #include <decaf/lang/exceptions/IllegalArgumentException.h>
00033 #include <decaf/lang/exceptions/IllegalStateException.h>
00034
00035 namespace decaf {
00036 namespace util {
00037 namespace concurrent {
00038
00039 using decaf::lang::Pointer;
00040
00051 template<typename E>
00052 class LinkedBlockingQueue : public BlockingQueue<E> {
00053 private:
00054
00055 template< typename U >
00056 class QueueNode {
00057 private:
00058
00059 U value;
00060 bool unlinked;
00061 bool dequeued;
00062
00063 public:
00064
00065 Pointer< QueueNode<E> > next;
00066
00067 private:
00068
00069 QueueNode(const QueueNode&);
00070 QueueNode& operator=(const QueueNode&);
00071
00072 public:
00073
00074 QueueNode() : value(), unlinked(false), dequeued(false), next() {}
00075 QueueNode(const U& value) : value(value), unlinked(false), dequeued(false), next() {}
00076
00077 void set(Pointer< QueueNode<U> > next, const U& value) {
00078 this->next = next;
00079 this->value = value;
00080 this->unlinked = false;
00081 this->dequeued = false;
00082 }
00083
00084 E get() const {
00085 return this->value;
00086 }
00087
00088 E getAndDequeue() {
00089 E result = this->value;
00090 this->value = E();
00091 this->dequeued = true;
00092
00093 return result;
00094 }
00095
00096 void unlink() {
00097 this->value = E();
00098 this->unlinked = true;
00099 }
00100
00101 bool isUnlinked() const {
00102 return this->unlinked;
00103 }
00104
00105 bool isDequeued() const {
00106 return this->dequeued;
00107 }
00108 };
00109
00110 class TotalLock {
00111 private:
00112
00113 TotalLock(const TotalLock& src);
00114 TotalLock& operator=(const TotalLock& src);
00115
00116 private:
00117
00118 const LinkedBlockingQueue<E>* parent;
00119
00120 public:
00121
00122 TotalLock(const LinkedBlockingQueue<E>* parent) : parent(parent) {
00123 parent->putLock.lock();
00124 parent->takeLock.lock();
00125 }
00126
00127 ~TotalLock() {
00128 parent->putLock.unlock();
00129 parent->takeLock.unlock();
00130 }
00131
00132 };
00133
00134 private:
00135
00136 int capacity;
00137 decaf::util::concurrent::atomic::AtomicInteger count;
00138
00140 mutable locks::ReentrantLock takeLock;
00141
00143 Pointer<locks::Condition> notEmpty;
00144
00146 mutable locks::ReentrantLock putLock;
00147
00149 Pointer<locks::Condition> notFull;
00150
00151 Pointer< QueueNode<E> > head;
00152 Pointer< QueueNode<E> > tail;
00153
00154 public:
00155
00159 LinkedBlockingQueue() : BlockingQueue<E>(), capacity(lang::Integer::MAX_VALUE), count(),
00160 takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode<E>()), tail() {
00161
00162 this->tail = this->head;
00163 this->notEmpty.reset(this->takeLock.newCondition());
00164 this->notFull.reset(this->putLock.newCondition());
00165 }
00166
00175 LinkedBlockingQueue(int capacity) : BlockingQueue<E>(), capacity(capacity), count(),
00176 takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode<E>()), tail() {
00177 if(capacity <= 0) {
00178 throw decaf::lang::exceptions::IllegalArgumentException(
00179 __FILE__, __LINE__, "Capacity value must be greater than zero.");
00180 }
00181
00182 this->tail = this->head;
00183 this->notEmpty.reset(this->takeLock.newCondition());
00184 this->notFull.reset(this->putLock.newCondition());
00185 }
00186
00197 LinkedBlockingQueue(const Collection<E>& collection) : BlockingQueue<E>(),
00198 capacity(lang::Integer::MAX_VALUE), count(),
00199 takeLock(), notEmpty(), putLock(), notFull(),
00200 head(new QueueNode<E>()), tail() {
00201
00202 this->tail = this->head;
00203 this->notEmpty.reset(this->takeLock.newCondition());
00204 this->notFull.reset(this->putLock.newCondition());
00205
00206 Pointer< Iterator<E> > iter(collection.iterator());
00207
00208 try {
00209
00210 int count = 0;
00211
00212 while(iter->hasNext()) {
00213 if(count == this->capacity) {
00214 throw decaf::lang::exceptions::IllegalStateException( __FILE__, __LINE__,
00215 "Number of elements in the Collection exceeds this Queue's Capacity.");
00216 }
00217
00218 this->enqueue(iter->next());
00219 ++count;
00220 }
00221
00222 this->count.set(count);
00223 }
00224 DECAF_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException)
00225 DECAF_CATCH_RETHROW(decaf::lang::Exception)
00226 DECAF_CATCHALL_THROW(decaf::lang::Exception)
00227 }
00228
00239 LinkedBlockingQueue(const LinkedBlockingQueue& queue) : BlockingQueue<E>(),
00240 capacity(lang::Integer::MAX_VALUE), count(),
00241 takeLock(), notEmpty(), putLock(), notFull(),
00242 head(new QueueNode<E>()), tail() {
00243
00244 this->tail = this->head;
00245 this->notEmpty.reset(this->takeLock.newCondition());
00246 this->notFull.reset(this->putLock.newCondition());
00247
00248 Pointer< Iterator<E> > iter(queue.iterator());
00249
00250 try {
00251
00252 int count = 0;
00253
00254 while(iter->hasNext()) {
00255 if(count == this->capacity) {
00256 throw decaf::lang::exceptions::IllegalStateException( __FILE__, __LINE__,
00257 "Number of elements in the Collection exceeds this Queue's Capacity.");
00258 }
00259
00260 this->enqueue(iter->next());
00261 ++count;
00262 }
00263
00264 this->count.set(count);
00265 }
00266 DECAF_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException)
00267 DECAF_CATCH_RETHROW(decaf::lang::Exception)
00268 DECAF_CATCHALL_THROW(decaf::lang::Exception)
00269 }
00270
00271 virtual ~LinkedBlockingQueue() {
00272 try{
00273 this->purgeList();
00274 } catch(...) {}
00275 }
00276
00277 public:
00278
00279 LinkedBlockingQueue<E>& operator= ( const LinkedBlockingQueue<E>& queue ) {
00280 this->clear();
00281 this->addAll(queue);
00282 return *this;
00283 }
00284
00285 LinkedBlockingQueue<E>& operator= ( const Collection<E>& collection ) {
00286 this->clear();
00287 this->addAll(collection);
00288 return *this;
00289 }
00290
00291 public:
00292
00293 virtual int size() const {
00294 return this->count.get();
00295 }
00296
00297 virtual void clear() {
00298
00299 TotalLock lock(this);
00300
00301 this->purgeList();
00302 this->tail = this->head;
00303 this->count.set(0);
00304
00305 if(this->count.getAndSet(0) == this->capacity) {
00306 this->notFull->signal();
00307 }
00308 }
00309
00310 virtual int remainingCapacity() const {
00311 return this->capacity - this->count.get();
00312 }
00313
00314 virtual void put( const E& value ) {
00315
00316 int c = -1;
00317
00318 this->putLock.lockInterruptibly();
00319 try {
00320
00321
00322
00323
00324
00325
00326 while (this->count.get() == this->capacity) {
00327 this->notFull->await();
00328 }
00329
00330
00331
00332
00333
00334 enqueue(value);
00335 c = this->count.getAndIncrement();
00336
00337 if(c + 1 < this->capacity) {
00338 this->notFull->signal();
00339 }
00340 } catch(decaf::lang::Exception& ex) {
00341 this->putLock.unlock();
00342 throw;
00343 }
00344
00345 this->putLock.unlock();
00346
00347
00348
00349
00350 if (c == 0) {
00351 this->signalNotEmpty();
00352 }
00353 }
00354
00355 virtual bool offer( const E& value, long long timeout, const TimeUnit& unit ) {
00356
00357 int c = -1;
00358 long long nanos = unit.toNanos(timeout);
00359
00360 this->putLock.lockInterruptibly();
00361 try {
00362
00363 while(this->count.get() == this->capacity) {
00364 if (nanos <= 0) {
00365 return false;
00366 }
00367
00368 nanos = this->notFull->awaitNanos(nanos);
00369 }
00370
00371 enqueue(value);
00372 c = this->count.getAndIncrement();
00373
00374 if(c + 1 < this->capacity) {
00375 this->notFull->signal();
00376 }
00377
00378 } catch(decaf::lang::Exception& ex) {
00379 this->putLock.unlock();
00380 throw;
00381 }
00382
00383 this->putLock.unlock();
00384
00385 if(c == 0) {
00386 this->signalNotEmpty();
00387 }
00388
00389 return true;
00390 }
00391
00392 virtual bool offer(const E& value) {
00393
00394 if (this->count.get() == this->capacity) {
00395 return false;
00396 }
00397
00398 int c = -1;
00399 this->putLock.lockInterruptibly();
00400 try {
00401
00402 if (this->count.get() < this->capacity) {
00403
00404 enqueue(value);
00405 c = this->count.getAndIncrement();
00406
00407 if (c + 1 < this->capacity) {
00408 this->notFull->signal();
00409 }
00410 }
00411
00412 } catch (decaf::lang::Exception& ex) {
00413 this->putLock.unlock();
00414 throw;
00415 }
00416
00417 this->putLock.unlock();
00418
00419 if (c == 0) {
00420 this->signalNotEmpty();
00421 }
00422
00423 return c >= 0;
00424 }
00425
00426 virtual E take() {
00427
00428 E value = E();
00429 int c = -1;
00430
00431 this->takeLock.lockInterruptibly();
00432 try {
00433
00434 while (this->count.get() == 0) {
00435 this->notEmpty->await();
00436 }
00437
00438
00439
00440
00441 value = dequeue();
00442 c = this->count.getAndDecrement();
00443
00444 if (c > 1) {
00445 this->notEmpty->signal();
00446 }
00447
00448 } catch (decaf::lang::Exception& ex) {
00449 this->takeLock.unlock();
00450 throw;
00451 }
00452
00453 this->takeLock.unlock();
00454
00455
00456
00457
00458 if (c == this->capacity) {
00459 this->signalNotFull();
00460 }
00461
00462 return value;
00463 }
00464
00465 virtual bool poll(E& result, long long timeout, const TimeUnit& unit) {
00466 int c = -1;
00467 long long nanos = unit.toNanos(timeout);
00468
00469 this->takeLock.lockInterruptibly();
00470 try {
00471
00472 while (this->count.get() == 0) {
00473 if (nanos <= 0) {
00474 return false;
00475 }
00476
00477 nanos = this->notEmpty->awaitNanos(nanos);
00478 }
00479
00480 result = dequeue();
00481 c = this->count.getAndDecrement();
00482
00483 if (c > 1) {
00484 this->notEmpty->signal();
00485 }
00486
00487 } catch (decaf::lang::Exception& ex) {
00488 this->takeLock.unlock();
00489 throw;
00490 }
00491
00492 this->takeLock.unlock();
00493
00494 if(c == this->capacity) {
00495 this->signalNotFull();
00496 }
00497
00498 return true;
00499 }
00500
00501 virtual bool poll(E& result) {
00502
00503 if (this->count.get() == 0) {
00504 return false;
00505 }
00506
00507 int c = -1;
00508 this->takeLock.lock();
00509 try {
00510
00511 if (this->count.get() > 0) {
00512 result = dequeue();
00513 c = this->count.getAndDecrement();
00514
00515 if (c > 1) {
00516 this->notEmpty->signal();
00517 }
00518 }
00519
00520 } catch (decaf::lang::Exception& ex) {
00521 this->takeLock.unlock();
00522 throw;
00523 }
00524
00525 this->takeLock.unlock();
00526
00527 if (c == this->capacity) {
00528 this->signalNotFull();
00529 }
00530
00531 return true;
00532 }
00533
00534 virtual bool peek(E& result) const {
00535
00536 if(this->count.get() == 0) {
00537 return false;
00538 }
00539
00540 this->takeLock.lock();
00541 try {
00542 Pointer< QueueNode<E> > front = this->head->next;
00543 if(front == NULL) {
00544 return false;
00545 } else {
00546 result = front->get();
00547 }
00548 } catch (decaf::lang::Exception& ex) {
00549 this->takeLock.unlock();
00550 throw;
00551 }
00552
00553 this->takeLock.unlock();
00554
00555 return true;
00556 }
00557
00558 using AbstractQueue<E>::remove;
00559
00560 virtual bool remove(const E& value) {
00561
00562 TotalLock lock(this);
00563
00564 for(Pointer< QueueNode<E> > predicessor = this->head, p = predicessor->next; p != NULL;
00565 predicessor = p, p = p->next) {
00566
00567 if(value == p->get()) {
00568 unlink(p, predicessor);
00569 return true;
00570 }
00571 }
00572
00573 return false;
00574 }
00575
00576 virtual std::vector<E> toArray() const {
00577
00578 TotalLock lock(this);
00579
00580 int size = this->count.get();
00581 std::vector<E> array;
00582 array.reserve(size);
00583
00584 for(Pointer< QueueNode<E> > p = this->head->next; p != NULL; p = p->next) {
00585 array.push_back(p->get());
00586 }
00587
00588 return array;
00589 }
00590
00591 virtual std::string toString() const {
00592 return std::string("LinkedBlockingQueue [ current size = ") +
00593 decaf::lang::Integer::toString(this->count.get()) + "]";
00594 }
00595
00596 virtual int drainTo( Collection<E>& c ) {
00597 return this->drainTo(c, decaf::lang::Integer::MAX_VALUE);
00598 }
00599
00600 virtual int drainTo( Collection<E>& sink, int maxElements ) {
00601
00602 if(&sink == this) {
00603 throw decaf::lang::exceptions::IllegalArgumentException(__FILE__, __LINE__,
00604 "Cannot drain this Collection to itself.");
00605 }
00606
00607 bool signalNotFull = false;
00608 bool shouldThrow = false;
00609 decaf::lang::Exception delayed;
00610 int result = 0;
00611
00612 this->takeLock.lock();
00613 try {
00614
00615
00616
00617
00618 result = decaf::lang::Math::min(maxElements, this->count.get());
00619 Pointer< QueueNode<E> > node = this->head;
00620 int i = 0;
00621 try {
00622
00623 while(i < result) {
00624 Pointer< QueueNode<E> > p = node->next;
00625 sink.add( p->getAndDequeue() );
00626 node = p;
00627 ++i;
00628 }
00629
00630 } catch(decaf::lang::Exception& e) {
00631 delayed = e;
00632 shouldThrow = true;
00633 }
00634
00635 if (i > 0) {
00636 this->head = node;
00637 signalNotFull = (this->count.getAndAdd(-i) == this->capacity);
00638 }
00639
00640 } catch(decaf::lang::Exception& ex) {
00641 this->takeLock.unlock();
00642 throw;
00643 }
00644
00645 this->takeLock.unlock();
00646
00647 if (signalNotFull) {
00648 this->signalNotFull();
00649 }
00650
00651 if (shouldThrow) {
00652 throw delayed;
00653 }
00654
00655 return result;
00656 }
00657
00658 private:
00659
00660 class LinkedIterator : public Iterator<E> {
00661 private:
00662
00663 Pointer< QueueNode<E> > current;
00664 Pointer< QueueNode<E> > last;
00665 E currentElement;
00666 LinkedBlockingQueue<E>* parent;
00667
00668 private:
00669
00670 LinkedIterator(const LinkedIterator&);
00671 LinkedIterator& operator= (const LinkedIterator&);
00672
00673 public:
00674
00675 LinkedIterator(LinkedBlockingQueue<E>* parent) : current(), last(),
00676 currentElement(), parent(parent) {
00677 TotalLock lock(parent);
00678
00679 this->current = parent->head->next;
00680 if(this->current != NULL) {
00681 this->currentElement = current->get();
00682 }
00683 }
00684
00685 virtual bool hasNext() const {
00686 return this->current != NULL;
00687 }
00688
00689 virtual E next() {
00690
00691 TotalLock lock(this->parent);
00692
00693 if(this->current == NULL) {
00694 throw decaf::util::NoSuchElementException(__FILE__, __LINE__,
00695 "Iterator next called with no matching next element.");
00696 }
00697
00698 E result = this->currentElement;
00699 this->last = this->current;
00700 this->current = this->nextNode(this->current);
00701 this->currentElement = (this->current == NULL) ? E() : this->current->get();
00702
00703 return result;
00704 }
00705
00706 virtual void remove() {
00707
00708 if(this->last == NULL) {
00709 throw decaf::lang::exceptions::IllegalStateException(__FILE__, __LINE__,
00710 "Iterator remove called without having called next().");
00711 }
00712
00713 TotalLock lock(this->parent);
00714
00715 Pointer< QueueNode<E> > node;
00716 node.swap(this->last);
00717
00718 for(Pointer< QueueNode<E> > trail = this->parent->head, p = trail->next; p != NULL;
00719 trail = p, p = p->next) {
00720
00721 if(p == node) {
00722 this->parent->unlink(p, trail);
00723 break;
00724 }
00725 }
00726 }
00727
00728 private:
00729
00730 Pointer< QueueNode<E> > nextNode(Pointer< QueueNode<E> >& p) {
00731
00732
00733
00734 if(p->isDequeued()) {
00735 return this->parent->head->next;
00736 }
00737
00738 Pointer< QueueNode<E> > s = p->next;
00739
00740
00741
00742
00743
00744
00745 while(s != NULL && s->isUnlinked()) {
00746 s = s->next;
00747 }
00748
00749 return s;
00750 }
00751
00752 };
00753
00754 class ConstLinkedIterator : public Iterator<E> {
00755 private:
00756
00757 Pointer< QueueNode<E> > current;
00758 Pointer< QueueNode<E> > last;
00759 E currentElement;
00760 const LinkedBlockingQueue<E>* parent;
00761
00762 private:
00763
00764 ConstLinkedIterator(const ConstLinkedIterator&);
00765 ConstLinkedIterator& operator= (const ConstLinkedIterator&);
00766
00767 public:
00768
00769 ConstLinkedIterator(const LinkedBlockingQueue<E>* parent) : current(), last(),
00770 currentElement(),
00771 parent(parent) {
00772 TotalLock lock(parent);
00773
00774 this->current = parent->head->next;
00775 if(this->current != NULL) {
00776 this->currentElement = current->get();
00777 }
00778 }
00779
00780 virtual bool hasNext() const {
00781 return this->current != NULL;
00782 }
00783
00784 virtual E next() {
00785
00786 TotalLock lock(this->parent);
00787
00788 if(this->current == NULL) {
00789 throw decaf::util::NoSuchElementException(__FILE__, __LINE__,
00790 "Iterator next called with no matching next element.");
00791 }
00792
00793 E result = this->currentElement;
00794 this->last = this->current;
00795 this->current = this->nextNode(this->current);
00796 this->currentElement = (this->current == NULL) ? E() : this->current->get();
00797
00798 return result;
00799 }
00800
00801 virtual void remove() {
00802 throw lang::exceptions::UnsupportedOperationException(
00803 __FILE__, __LINE__, "Cannot write to a const ListIterator." );
00804 }
00805
00806 private:
00807
00808 Pointer< QueueNode<E> > nextNode(Pointer< QueueNode<E> >& p) {
00809
00810
00811
00812 if(p->isDequeued()) {
00813 return this->parent->head->next;
00814 }
00815
00816 Pointer< QueueNode<E> > s = p->next;
00817
00818
00819
00820
00821
00822
00823 while(s != NULL && s->isUnlinked()) {
00824 s = s->next;
00825 }
00826
00827 return s;
00828 }
00829
00830 };
00831
00832 public:
00833
00834 virtual decaf::util::Iterator<E>* iterator() {
00835 return new LinkedIterator(this);
00836 }
00837
00838 virtual decaf::util::Iterator<E>* iterator() const {
00839 return new ConstLinkedIterator(this);
00840 }
00841
00842 private:
00843
00844 void unlink(Pointer< QueueNode<E> >& p, Pointer< QueueNode<E> >& predicessor) {
00845
00846
00847
00848
00849 p->unlink();
00850
00851 predicessor->next = p->next;
00852
00853 if(this->tail == p) {
00854 this->tail = predicessor;
00855 }
00856
00857 if(this->count.getAndDecrement() == capacity) {
00858 this->signalNotFull();
00859 }
00860 }
00861
00862 void signalNotEmpty() {
00863 this->takeLock.lock();
00864 try {
00865 this->notEmpty->signal();
00866 } catch(decaf::lang::Exception& ex) {
00867 this->takeLock.unlock();
00868 throw;
00869 }
00870 this->takeLock.unlock();
00871 }
00872
00873 void signalNotFull() {
00874 this->putLock.lock();
00875 try {
00876 this->notFull->signal();
00877 } catch(decaf::lang::Exception& ex) {
00878 this->putLock.unlock();
00879 throw;
00880 }
00881 this->putLock.unlock();
00882 }
00883
00884
00885 void enqueue(E value) {
00886 Pointer< QueueNode<E> > newTail( new QueueNode<E>(value) );
00887 this->tail->next = newTail;
00888 this->tail = newTail;
00889 }
00890
00891
00892 E dequeue() {
00893 Pointer< QueueNode<E> > temp = this->head;
00894 Pointer< QueueNode<E> > newHead = temp->next;
00895 this->head = newHead;
00896
00897 return newHead->getAndDequeue();
00898 }
00899
00900 void purgeList() {
00901 Pointer< QueueNode<E> > current = this->head->next;
00902 Pointer< QueueNode<E> > temp;
00903 while(current != NULL) {
00904 temp = current;
00905 current = current->next;
00906 temp->next.reset(NULL);
00907 temp.reset(NULL);
00908 }
00909 }
00910 };
00911
00912 }}}
00913
00914 #endif