blocxx
ThreadBarrier.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2 * Copyright (C) 2005, Vintela, Inc. All rights reserved.
3 * Copyright (C) 2006, Novell, Inc. All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of
14 * Vintela, Inc.,
15 * nor Novell, Inc.,
16 * nor the names of its contributors or employees may be used to
17 * endorse or promote products derived from this software without
18 * specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 *******************************************************************************/
32 
33 
38 #include "blocxx/BLOCXX_config.h"
39 #include "blocxx/ThreadBarrier.hpp"
40 #include "blocxx/Assertion.hpp"
41 #include "blocxx/Format.hpp"
42 #include "blocxx/ExceptionIds.hpp"
44 
45 #if defined(BLOCXX_USE_PTHREAD) && defined(BLOCXX_HAVE_PTHREAD_BARRIER) && !defined(BLOCXX_VALGRIND_SUPPORT)
46  #include <pthread.h>
47  #include <cstring>
48 #else
49  // This is for the generic less-efficient version
50  #include "blocxx/Condition.hpp"
53 #endif
54 
56 namespace BLOCXX_NAMESPACE
57 {
58 
59 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ThreadBarrier);
60 
61 #if defined(BLOCXX_USE_PTHREAD) && defined(BLOCXX_HAVE_PTHREAD_BARRIER) && !defined(BLOCXX_VALGRIND_SUPPORT) // valgrind doesn't support pthread_barrier_*()
62 class ThreadBarrierImpl : public IntrusiveCountableBase
63 {
64 public:
65  ThreadBarrierImpl(UInt32 threshold)
66  {
67  BLOCXX_ASSERT(threshold != 0);
68  memset(&barrier, 0, sizeof(barrier)); // AIX pukes if the barrier isn't zeroed out.
69  int res = pthread_barrier_init(&barrier, NULL, threshold);
70  if (res != 0)
71  {
72  BLOCXX_THROW(ThreadBarrierException, Format("pthread_barrier_init failed: %1(%2)", res, strerror(res)).c_str());
73  }
74  }
76  {
77  int res = pthread_barrier_destroy(&barrier);
78  if (res != 0)
79  {
80  // can't throw... just log it or something...
81  }
82  }
83 
84  void wait()
85  {
86  int res = pthread_barrier_wait(&barrier);
87  if (res != 0 && res != PTHREAD_BARRIER_SERIAL_THREAD)
88  {
89  BLOCXX_THROW(ThreadBarrierException, Format("pthread_barrier_wait failed: %1(%2)", res, strerror(res)).c_str());
90  }
91  }
92 private:
93  pthread_barrier_t barrier;
94 };
95 
96 #else
97 
98 // This is the generic less-efficient version
99 
100 class ThreadBarrierImpl : public IntrusiveCountableBase
101 {
102 public:
113  struct SubBarrier
114  {
115  SubBarrier() : m_waitingCount(0) {}
117  UInt32 m_waitingCount;
119  Condition m_cond;
120  };
121  ThreadBarrierImpl(UInt32 threshold)
122  : m_threshold(threshold)
123  , m_curSubBarrier(0)
124  {
125  }
126  void wait()
127  {
128  NonRecursiveMutexLock l(m_mutex);
129  // Select the current SubBarrier
131  ++curBarrier.m_waitingCount;
132  if (curBarrier.m_waitingCount == m_threshold)
133  {
134  // reset the sub barrier so it can be reused
135  curBarrier.m_waitingCount = 0;
136  // swap current barriers
138  // now wake up all the threads that were stopped
139  curBarrier.m_cond.notifyAll();
140  }
141  else
142  {
143  // because of spurious wake-ups we need to put this in a loop.
144  // we need to wait until the count is 0, which will only happen
145  // once m_threshold threads have called wait()
146  while (curBarrier.m_waitingCount != 0)
147  {
148  curBarrier.m_cond.wait(l);
149  }
150  }
151  }
152 private:
154  UInt32 m_threshold;
157  int m_curSubBarrier;
159  SubBarrier m_subBarrier0;
160  SubBarrier m_subBarrier1;
161 };
162 
163 #endif
164 
166 ThreadBarrier::ThreadBarrier(UInt32 threshold)
167  : m_impl(new ThreadBarrierImpl(threshold))
168 {
169  BLOCXX_ASSERT(threshold != 0);
170 }
172 void ThreadBarrier::wait()
173 {
174  m_impl->wait();
175 }
178 {
179 }
181 ThreadBarrier::ThreadBarrier(const ThreadBarrier& x)
182  : m_impl(x.m_impl)
183 {
184 }
187 {
189  return *this;
190 }
191 
192 } // end namespace BLOCXX_NAMESPACE
193 
BLOCXX_NAMESPACE::ThreadBarrier
The ThreadBarrier class is used to synchronize threads.
Definition: ThreadBarrier.hpp:90
BLOCXX_NAMESPACE::ThreadBarrierException
Definition: ThreadBarrier.hpp:79
ExceptionIds.hpp
BLOCXX_THROW
#define BLOCXX_THROW(exType, msg)
Throw an exception using FILE and LINE.
Definition: Exception.hpp:293
BLOCXX_NAMESPACE::ThreadBarrier::ThreadBarrier
ThreadBarrier(UInt32 threshold)
Constructor.
Definition: ThreadBarrier.cpp:196
IntrusiveCountableBase.hpp
BLOCXX_NAMESPACE
Taken from RFC 1321.
Definition: AppenderLogger.cpp:48
Format.hpp
Assertion.hpp
BLOCXX_NAMESPACE::Format
Definition: Format.hpp:80
BLOCXX_NAMESPACE::ThreadBarrierImpl::SubBarrier::m_cond
Condition m_cond
Condition for threads to wait on.
Definition: ThreadBarrier.cpp:149
Condition.hpp
ThreadBarrier.hpp
BLOCXX_NAMESPACE::ThreadBarrier::m_impl
IntrusiveReference< ThreadBarrierImpl > m_impl
Definition: ThreadBarrier.hpp:117
BLOCXX_NAMESPACE::ThreadBarrierImpl::ThreadBarrierImpl
ThreadBarrierImpl(UInt32 threshold)
Definition: ThreadBarrier.cpp:151
BLOCXX_NAMESPACE::ThreadBarrierImpl::m_mutex
NonRecursiveMutex m_mutex
Definition: ThreadBarrier.cpp:188
BLOCXX_NAMESPACE::ThreadBarrierImpl::SubBarrier
This code is inspired by ACE, by Douglas C.
Definition: ThreadBarrier.cpp:144
NonRecursiveMutex.hpp
BLOCXX_NAMESPACE::ThreadBarrierImpl::SubBarrier::m_waitingCount
UInt32 m_waitingCount
The number of waiting threads.
Definition: ThreadBarrier.cpp:147
BLOCXX_NAMESPACE::NonRecursiveMutex
Note that descriptions of what exceptions may be thrown assumes that object is used correctly,...
Definition: NonRecursiveMutex.hpp:83
BLOCXX_DEFINE_EXCEPTION_WITH_ID
#define BLOCXX_DEFINE_EXCEPTION_WITH_ID(NAME)
Define a new exception class named <NAME>Exception that derives from Exception.
Definition: Exception.hpp:479
BLOCXX_NAMESPACE::ThreadBarrier::operator=
ThreadBarrier & operator=(const ThreadBarrier &x)
Definition: ThreadBarrier.cpp:216
BLOCXX_NAMESPACE::ThreadBarrierImpl::m_subBarrier0
SubBarrier m_subBarrier0
Definition: ThreadBarrier.cpp:189
BLOCXX_NAMESPACE::ThreadBarrier::~ThreadBarrier
~ThreadBarrier()
Definition: ThreadBarrier.cpp:207
BLOCXX_NAMESPACE::ThreadBarrierImpl::m_subBarrier1
SubBarrier m_subBarrier1
Definition: ThreadBarrier.cpp:190
BLOCXX_NAMESPACE::ThreadBarrierImpl::SubBarrier::SubBarrier
SubBarrier()
Definition: ThreadBarrier.cpp:145
BLOCXX_NAMESPACE::ThreadBarrierImpl::m_threshold
UInt32 m_threshold
The number of threads to synchronize.
Definition: ThreadBarrier.cpp:184
NonRecursiveMutexLock.hpp
BLOCXX_NAMESPACE::ThreadBarrierImpl::m_curSubBarrier
int m_curSubBarrier
Either 0 or 1, depending on whether we are the first generation of waiters or the next generation of ...
Definition: ThreadBarrier.cpp:187
m_impl
ProcessImplRef m_impl
Definition: Process.cpp:348
BLOCXX_NAMESPACE::ThreadBarrierImpl::wait
void wait()
Definition: ThreadBarrier.cpp:156
BLOCXX_ASSERT
#define BLOCXX_ASSERT(CON)
BLOCXX_ASSERT works similar to the assert() macro, but instead of calling abort(),...
Definition: Assertion.hpp:87
BLOCXX_NAMESPACE::ThreadBarrier::wait
void wait()
Synchronize participating threads at the barrier.
Definition: ThreadBarrier.cpp:202