45 Sync(
const T & data) : data(data) { }
46 Sync(T && data) noexcept : data(std::move(data)) { }
48 template<
typename ...
Args>
49 Sync(std::in_place_t,
Args &&... args) : data(std::forward<Args>(args)...) { }
58 std::unique_lock<M> lk;
60 Lock(Sync &s) : s(&s), lk(s.mutex) { }
61 Lock(Sync &s, std::unique_lock<M> lk) : s(&s), lk(std::move(lk)) { }
63 inline void checkLockingInvariants()
66 assert(lk.owns_lock());
70 Lock(Lock && l) : s(l.s), lk(std::move(l.lk))
75 Lock & operator=(Lock && other)
79 lk = std::move(other.lk);
85 Lock(
const Lock & l) =
delete;
91 checkLockingInvariants();
97 checkLockingInvariants();
106 void wait(std::condition_variable & cv)
108 checkLockingInvariants();
117 template<
class Rep,
class Period>
118 std::cv_status
wait_for(std::condition_variable & cv,
119 const std::chrono::duration<Rep, Period> & duration)
121 checkLockingInvariants();
122 return cv.wait_for(lk, duration);
130 template<
class Rep,
class Period,
class Predicate>
132 const std::chrono::duration<Rep, Period> & duration,
135 checkLockingInvariants();
136 return cv.wait_for(lk, duration, pred);
142 template<
class Clock,
class Duration>
144 const std::chrono::time_point<Clock, Duration> & duration)
146 checkLockingInvariants();
147 return cv.wait_until(lk, duration);
154 Lock
lock() {
return Lock(*
this); }
156 std::optional<Lock> tryLock()
158 if (std::unique_lock lk(mutex, std::try_to_lock_t{}); lk.owns_lock()) {
159 return Lock{*
this, std::move(lk)};
167class Sync<T, AsyncMutex> :
private Sync<T, std::mutex>
170 using base_type = Sync<T, std::mutex>;
172 std::mutex waitMutex;
173 std::list<kj::Own<kj::CrossThreadPromiseFulfiller<void>>> waiters;
175 std::mutex conditionMutex;
176 std::list<kj::Own<kj::CrossThreadPromiseFulfiller<void>>> conditionWaiters;
180 Sync(T && data) : base_type(std::move(data)) {}
182 class Lock :
private base_type::Lock
186 Lock(base_type::Lock lk) : base_type::Lock(std::move(lk)) {}
189 Lock(Lock &&) =
default;
190 Lock & operator=(Lock &&) =
default;
194 if (this->lk.owns_lock()) {
196 auto * s =
static_cast<Sync *
>(this->s);
197 std::lock_guard wlk(s->waitMutex);
200 for (
auto & f : s->waiters) {
207 using base_type::Lock::operator->, base_type::Lock::operator*;
217 auto * s =
static_cast<Sync *
>(this->s);
220 auto unlock = std::move(*
this);
223 auto pfp = kj::newPromiseAndCrossThreadFulfiller<void>();
225 std::lock_guard clk(s->conditionMutex);
226 s->conditionWaiters.push_back(std::move(pfp.fulfiller));
228 co_await pfp.promise;
230 *
this =
co_await s->lock();
240 std::lock_guard clk(conditionMutex);
241 for (
auto & f : conditionWaiters) {
244 conditionWaiters.clear();
249 return base_type::lock();
252 kj::Promise<Lock>
lock()
254 if (
auto lk = tryLock()) {
255 co_return std::move(*lk);
259 auto pfp = kj::newPromiseAndCrossThreadFulfiller<void>();
261 std::lock_guard wlk(waitMutex);
262 waiters.push_back(std::move(pfp.fulfiller));
264 if (
auto lk = tryLock()) {
265 co_return std::move(*lk);
267 co_await pfp.promise;
271 std::optional<Lock> tryLock()
273 if (
auto lk = base_type::tryLock()) {
274 return Lock(std::move(*lk));