00001 /* 00002 * Licensed to the Apache Software Foundation (ASF) under one or more 00003 * contributor license agreements. See the NOTICE file distributed with 00004 * this work for additional information regarding copyright ownership. 00005 * The ASF licenses this file to You under the Apache License, Version 2.0 00006 * (the "License"); you may not use this file except in compliance with 00007 * the License. You may obtain a copy of the License at 00008 * 00009 * http://www.apache.org/licenses/LICENSE-2.0 00010 * 00011 * Unless required by applicable law or agreed to in writing, software 00012 * distributed under the License is distributed on an "AS IS" BASIS, 00013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00014 * See the License for the specific language governing permissions and 00015 * limitations under the License. 00016 */ 00017 00018 #ifndef _DECAF_INTERNAL_UTIL_CONCURRENT_TRANSFERQUEUE_H_ 00019 #define _DECAF_INTERNAL_UTIL_CONCURRENT_TRANSFERQUEUE_H_ 00020 00021 #include <decaf/internal/util/concurrent/Transferer.h> 00022 00023 #include <decaf/util/concurrent/locks/LockSupport.h> 00024 #include <decaf/util/concurrent/atomic/AtomicReference.h> 00025 #include <decaf/util/concurrent/TimeoutException.h> 00026 #include <decaf/lang/exceptions/InterruptedException.h> 00027 00028 #include <decaf/lang/Thread.h> 00029 00030 namespace decaf { 00031 namespace internal { 00032 namespace util { 00033 namespace concurrent { 00034 00035 using decaf::util::concurrent::atomic::AtomicReference; 00036 00045 template< typename E > 00046 class TransferQueue : public Transferer<E> { 00047 private: 00048 00049 // /** Node class for TransferQueue. */ 00050 // class QNode { 00051 // private: 00052 // 00053 // AtomicReference<QNode> next; // next node in queue 00054 // volatile decaf::lang::Thread* waiter; // to control park/unpark 00055 // AtomicReference<E*> item; // CAS'ed to or from null 00056 // 00057 // bool data; 00058 // bool cancelled; 00059 // 00060 // public: 00061 // 00062 // QNode() : cancelled( false ), data( false ) { 00063 // } 00064 // 00065 // QNode( E* item ) : cancelled( false ), data( true ) { 00066 // this->item.set( item ); 00067 // } 00068 // 00069 // bool casNext( QNode* cmp, QNode* val ) { 00070 // return ( this->next == cmp && this->next.compareAndSet( cmp, val ) ); 00071 // } 00072 // 00073 // bool casItem( E* cmp, E* val ) { 00074 // return ( this->item == cmp && this->item.compareAndSet( cmp, val ) ); 00075 // } 00076 // 00077 // /** 00078 // * Tries to cancel by CAS'ing ref to NULL if that succeeds then we mark as cancelled. 00079 // */ 00080 // void tryCancel( E* cmp ) { 00081 // if( item.compareAndSet( cmp, NULL ) ) { 00082 // this->cancelled = true; 00083 // } 00084 // } 00085 // 00086 // bool isCancelled() { 00087 // return this->item == this; 00088 // } 00089 // 00090 // /** 00091 // * Returns true if this node is known to be off the queue 00092 // * because its next pointer has been forgotten due to 00093 // * an advanceHead operation. 00094 // */ 00095 // bool isOffList() { 00096 // return this->next == NULL; 00097 // } 00098 // }; 00099 00100 private: 00101 00102 // decaf::util::concurrent::atomic::AtomicReference<QNode> head; 00103 // decaf::util::concurrent::atomic::AtomicReference<QNode> tail; 00104 // 00105 // decaf::util::concurrent::atomic::AtomicReference<QNode> cleanMe; 00106 00107 public: 00108 00109 TransferQueue() { 00110 // // Initialize with a dummy Node. 00111 // this->head.set( new QNode() ); 00112 // this->tail.set( this->head.get() ); 00113 } 00114 00115 virtual ~TransferQueue() {} 00116 00117 virtual void transfer( E* e, bool timed, long long nanos ) { 00118 00119 } 00120 00121 virtual E* transfer( bool timed, long long nanos ) { 00122 return NULL; 00123 } 00124 00128 // E* transfer( E* e, bool timed, long long nanos ) { 00129 00130 /* Basic algorithm is to loop trying to take either of 00131 * two actions: 00132 * 00133 * 1. If queue apparently empty or holding same-mode nodes, 00134 * try to add node to queue of waiters, wait to be 00135 * fulfilled (or cancelled) and return matching item. 00136 * 00137 * 2. If queue apparently contains waiting items, and this 00138 * call is of complementary mode, try to fulfill by CAS'ing 00139 * item field of waiting node and dequeuing it, and then 00140 * returning matching item. 00141 * 00142 * In each case, along the way, check for and try to help 00143 * advance head and tail on behalf of other stalled/slow 00144 * threads. 00145 * 00146 * The loop starts off with a null check guarding against 00147 * seeing uninitialized head or tail values. This never 00148 * happens in current SynchronousQueue, but could if 00149 * callers held non-volatile/final ref to the 00150 * transferer. The check is here anyway because it places 00151 * null checks at top of loop, which is usually faster 00152 * than having them implicitly interspersed. 00153 */ 00154 00155 // QNode s = null; // constructed/reused as needed 00156 // boolean isData = (e != null); 00157 // 00158 // for (;;) { 00159 // QNode t = tail; 00160 // QNode h = head; 00161 // if (t == null || h == null) // saw uninitialized value 00162 // continue; // spin 00163 // 00164 // if (h == t || t.isData == isData) { // empty or same-mode 00165 // QNode tn = t.next; 00166 // if (t != tail) // inconsistent read 00167 // continue; 00168 // if (tn != null) { // lagging tail 00169 // advanceTail(t, tn); 00170 // continue; 00171 // } 00172 // if (timed && nanos <= 0) // can't wait 00173 // return null; 00174 // if (s == null) 00175 // s = new QNode(e, isData); 00176 // if (!t.casNext(null, s)) // failed to link in 00177 // continue; 00178 // 00179 // advanceTail(t, s); // swing tail and wait 00180 // Object x = awaitFulfill(s, e, timed, nanos); 00181 // if (x == s) { // wait was cancelled 00182 // clean(t, s); 00183 // return null; 00184 // } 00185 // 00186 // if (!s.isOffList()) { // not already unlinked 00187 // advanceHead(t, s); // unlink if head 00188 // if (x != null) // and forget fields 00189 // s.item = s; 00190 // s.waiter = null; 00191 // } 00192 // return (x != null)? x : e; 00193 // 00194 // } else { // complementary-mode 00195 // QNode m = h.next; // node to fulfill 00196 // if (t != tail || m == null || h != head) 00197 // continue; // inconsistent read 00198 // 00199 // Object x = m.item; 00200 // if (isData == (x != null) || // m already fulfilled 00201 // x == m || // m cancelled 00202 // !m.casItem(x, e)) { // lost CAS 00203 // advanceHead(h, m); // dequeue and retry 00204 // continue; 00205 // } 00206 // 00207 // advanceHead(h, m); // successfully fulfilled 00208 // LockSupport.unpark(m.waiter); 00209 // return (x != null)? x : e; 00210 // } 00211 // } 00212 // } 00213 00214 private: 00215 00225 // Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) { 00226 // /* Same idea as TransferStack.awaitFulfill */ 00227 // long lastTime = (timed)? System.nanoTime() : 0; 00228 // Thread w = Thread.currentThread(); 00229 // int spins = ((head.next == s) ? 00230 // (timed? maxTimedSpins : maxUntimedSpins) : 0); 00231 // for (;;) { 00232 // if (w.isInterrupted()) 00233 // s.tryCancel(e); 00234 // Object x = s.item; 00235 // if (x != e) 00236 // return x; 00237 // if (timed) { 00238 // long now = System.nanoTime(); 00239 // nanos -= now - lastTime; 00240 // lastTime = now; 00241 // if (nanos <= 0) { 00242 // s.tryCancel(e); 00243 // continue; 00244 // } 00245 // } 00246 // if (spins > 0) 00247 // --spins; 00248 // else if (s.waiter == null) 00249 // s.waiter = w; 00250 // else if (!timed) 00251 // LockSupport.park(); 00252 // else if (nanos > spinForTimeoutThreshold) 00253 // LockSupport.parkNanos(nanos); 00254 // } 00255 // } 00256 00260 // void clean(QNode pred, QNode s) { 00261 // s.waiter = null; // forget thread 00262 // /* 00263 // * At any given time, exactly one node on list cannot be 00264 // * deleted -- the last inserted node. To accommodate this, 00265 // * if we cannot delete s, we save its predecessor as 00266 // * "cleanMe", deleting the previously saved version 00267 // * first. At least one of node s or the node previously 00268 // * saved can always be deleted, so this always terminates. 00269 // */ 00270 // while (pred.next == s) { // Return early if already unlinked 00271 // QNode h = head; 00272 // QNode hn = h.next; // Absorb cancelled first node as head 00273 // if (hn != null && hn.isCancelled()) { 00274 // advanceHead(h, hn); 00275 // continue; 00276 // } 00277 // QNode t = tail; // Ensure consistent read for tail 00278 // if (t == h) 00279 // return; 00280 // QNode tn = t.next; 00281 // if (t != tail) 00282 // continue; 00283 // if (tn != null) { 00284 // advanceTail(t, tn); 00285 // continue; 00286 // } 00287 // if (s != t) { // If not tail, try to unsplice 00288 // QNode sn = s.next; 00289 // if (sn == s || pred.casNext(s, sn)) 00290 // return; 00291 // } 00292 // QNode dp = cleanMe; 00293 // if (dp != null) { // Try unlinking previous cancelled node 00294 // QNode d = dp.next; 00295 // QNode dn; 00296 // if (d == null || // d is gone or 00297 // d == dp || // d is off list or 00298 // !d.isCancelled() || // d not cancelled or 00299 // (d != t && // d not tail and 00300 // (dn = d.next) != null && // has successor 00301 // dn != d && // that is on list 00302 // dp.casNext(d, dn))) // d unspliced 00303 // casCleanMe(dp, null); 00304 // if (dp == pred) 00305 // return; // s is already saved node 00306 // } else if (casCleanMe(null, pred)) 00307 // return; // Postpone cleaning s 00308 // } 00309 // } 00310 00311 // /** 00312 // * Tries to cas nh as new head; if successful, unlink 00313 // * old head's next node to avoid garbage retention. 00314 // */ 00315 // void advanceHead( QNode* h, QNode* nh ) { 00316 // if( h == head.get() && this->head.compareAndSet( h, nh ) ) { 00317 // h->next = h; // forget old next 00318 // } 00319 // } 00320 // 00321 // /** 00322 // * Tries to cas nt as new tail. 00323 // */ 00324 // void advanceTail( QNode* t, QNode* nt ) { 00325 // if( this->tail.get() == t ) { 00326 // this->tail.compareAndSet( t, nt ); 00327 // } 00328 // } 00329 // 00330 // /** 00331 // * Tries to CAS cleanMe slot. 00332 // */ 00333 // bool casCleanMe( QNode* cmp, QNode* val ) { 00334 // return ( this->cleanMe.get() == cmp && 00335 // this->cleanMe.compareAndSet( cmp, val ) ); 00336 // } 00337 00338 }; 00339 00340 }}}} 00341 00342 #endif /* _DECAF_INTERNAL_UTIL_CONCURRENT_TRANSFERQUEUE_H_ */
1.6.1