xrootd
XrdSysLinuxSemaphore.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2013 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#ifndef __XRD_SYS_LINUX_SEMAPHORE__
26#define __XRD_SYS_LINUX_SEMAPHORE__
27
28#if defined(__linux__) && defined(HAVE_ATOMICS)
29
30#include <pthread.h>
31#include <linux/futex.h>
32#include <sys/syscall.h>
33#include <unistd.h>
34#include <cerrno>
35#include <exception>
36#include <string>
37#include <cstdlib>
38
39namespace XrdSys
40{
41 //----------------------------------------------------------------------------
43 //----------------------------------------------------------------------------
44 class LinuxSemaphoreError: public std::exception
45 {
46 public:
47 LinuxSemaphoreError( const std::string &error ): pError( error ) {}
48 virtual ~LinuxSemaphoreError() throw() {};
49
50 virtual const char *what() const throw()
51 {
52 return pError.c_str();
53 }
54
55 private:
56 std::string pError;
57 };
58
59 //----------------------------------------------------------------------------
77 //----------------------------------------------------------------------------
78 class LinuxSemaphore
79 {
80 public:
81 //------------------------------------------------------------------------
85 //------------------------------------------------------------------------
86 inline int CondWait()
87 {
88 int value = 0;
89 int val = 0;
90 int waiters = 0;
91 int newVal = 0;
92
93 //----------------------------------------------------------------------
94 // We get the value of the semaphore try to atomically decrement it if
95 // it's larger than 0.
96 //----------------------------------------------------------------------
97 while( 1 )
98 {
99 Unpack( pValue, value, val, waiters );
100 if( val == 0 )
101 return 0;
102 newVal = Pack( --val, waiters );
103 if( __sync_bool_compare_and_swap( pValue, value, newVal ) )
104 return 1;
105 }
106 }
107
108 //------------------------------------------------------------------------
113 //------------------------------------------------------------------------
114 inline void Wait()
115 {
116 //----------------------------------------------------------------------
117 // Examine the state of the semaphore and atomically decrement it if
118 // possible. If CondWait fails, it means that the semaphore value was 0.
119 // In this case we atomically bump the number of waiters and go to sleep
120 //----------------------------------------------------------------------
121 while( !CondWait() )
122 {
123 int value = 0;
124 int val = 0;
125 int waiters = 0;
126 int cancelType = 0;
127
128 Unpack( pValue, value, val, waiters );
129
130 //--------------------------------------------------------------------
131 // We need to make sure again that the value of the semaphore is 0
132 // because we fetched it again (first time was in CondWait()) and
133 // it may have changed in the mean time.
134 //--------------------------------------------------------------------
135 if( val != 0 )
136 continue;
137
138 if( waiters == WaitersMask )
139 throw LinuxSemaphoreError( "Reached maximum number of waiters" );
140
141 int newVal = Pack( val, ++waiters );
142
143 //--------------------------------------------------------------------
144 // We have bumped the number of waiters successfuly if neither the
145 // semaphore value nor the number of waiters changed in the mean time.
146 // We can safely go to sleep.
147 //
148 // Once the number of waiters was bumped we cannot get cancelled
149 // without decrementing it.
150 //--------------------------------------------------------------------
151 pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, &cancelType );
152 if( __sync_bool_compare_and_swap( pValue, value, newVal ) )
153 {
154 while( 1 )
155 {
156 int r = 0;
157
158 pthread_cleanup_push( Cleanup, pValue );
159 pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, 0 );
160
161 r = syscall( SYS_futex, pValue, FUTEX_WAIT, newVal, 0, 0, 0 );
162
163 pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, 0 );
164 pthread_cleanup_pop( 0 );
165
166 if( r == 0 ) // we've been woken up
167 break;
168
169 if( errno == EINTR ) // interrupt
170 continue;
171
172 if( errno == EWOULDBLOCK ) // futex value changed
173 break;
174
175 throw LinuxSemaphoreError( "FUTEX_WAIT syscall error" );
176 }
177
178 //------------------------------------------------------------------
179 // We have been woken up, so we need to decrement the number of
180 // waiters
181 //------------------------------------------------------------------
182 do
183 {
184 Unpack( pValue, value, val, waiters );
185 newVal = Pack( val, --waiters );
186 }
187 while( !__sync_bool_compare_and_swap( pValue, value, newVal ) );
188 }
189
190 //--------------------------------------------------------------------
191 // We are here if:
192 // 1) we were unable to increase the number of waiters bacause the
193 // atomic changed in the mean time in another execution thread
194 // 2) *pValue != newVal upon futex call, this indicates the state
195 // change in another thread
196 // 3) we have been woken up by another thread
197 //
198 // In either of the above cases we need to re-examine the atomic and
199 // decide whether we need to sleep or are free to proceed
200 //--------------------------------------------------------------------
201 pthread_setcanceltype( cancelType, 0 );
202 }
203 }
204
205 //------------------------------------------------------------------------
210 //------------------------------------------------------------------------
211 inline void Post()
212 {
213 int value = 0;
214 int val = 0;
215 int waiters = 0;
216 int newVal = 0;
217
218 //----------------------------------------------------------------------
219 // We atomically increment the value of the semaphore and wake one of
220 // the threads that was waiting for the semaphore value to change
221 //----------------------------------------------------------------------
222 while( 1 )
223 {
224 Unpack( pValue, value, val, waiters );
225
226 if( val == ValueMask )
227 throw LinuxSemaphoreError( "Reached maximum value" );
228
229 newVal = Pack( ++val, waiters );
230 if( __sync_bool_compare_and_swap( pValue, value, newVal ) )
231 {
232 if( waiters )
233 syscall( SYS_futex, pValue, FUTEX_WAKE, 1, 0, 0, 0 );
234 return;
235 }
236 }
237 }
238
239 //------------------------------------------------------------------------
241 //------------------------------------------------------------------------
242 int GetValue() const
243 {
244 int value = __sync_fetch_and_add( pValue, 0 );
245 return value & ValueMask;
246 }
247
248 //------------------------------------------------------------------------
252 //------------------------------------------------------------------------
253 LinuxSemaphore( int value )
254 {
255 pValue = (int *)malloc(sizeof(int));
256 *pValue = (value & ValueMask);
257 }
258
259 //------------------------------------------------------------------------
261 //------------------------------------------------------------------------
262 ~LinuxSemaphore()
263 {
264 free( pValue );
265 }
266
267 private:
268 static const int ValueMask = 0x000fffff;
269 static const int WaitersOffset = 20;
270 static const int WaitersMask = 0x00000fff;
271
272 //------------------------------------------------------------------------
273 // Unpack the semaphore value
274 //------------------------------------------------------------------------
275 static inline void Unpack( int *sourcePtr,
276 int &source,
277 int &value,
278 int &nwaiters )
279 {
280 source = __sync_fetch_and_add( sourcePtr, 0 );
281 value = source & ValueMask;
282 nwaiters = (source >> WaitersOffset) & WaitersMask;
283 }
284
285 //------------------------------------------------------------------------
286 // Pack the semaphore value
287 //------------------------------------------------------------------------
288 static inline int Pack( int value, int nwaiters )
289 {
290 return (nwaiters << WaitersOffset) | (value & ValueMask);
291 }
292
293 //------------------------------------------------------------------------
294 // Cancellation cleaner
295 //------------------------------------------------------------------------
296 static void Cleanup( void *param )
297 {
298 int *iParam = (int*)param;
299 int value = 0;
300 int val = 0;
301 int waiters = 0;
302 int newVal = 0;
303
304 do
305 {
306 Unpack( iParam, value, val, waiters );
307 newVal = Pack( val, --waiters );
308 }
309 while( !__sync_bool_compare_and_swap( iParam, value, newVal ) );
310 }
311
312 int *pValue;
313 };
314};
315
316#endif // __linux__ && HAVE_ATOMICS
317
318#endif // __XRD_SYS_LINUX_SEMAPHORE__
Definition: XrdClPollerBuiltIn.hh:28