Nix 2.93.3
Lix: A modern, delicious implementation of the Nix package manager; unstable internal interfaces
Loading...
Searching...
No Matches
sync.hh
Go to the documentation of this file.
1#pragma once
3
5#include <cstdlib>
6#include <kj/async.h>
7#include <kj/common.h>
8#include <list>
9#include <mutex>
10#include <condition_variable>
11#include <cassert>
12#include <optional>
13#include <utility>
14
15namespace nix {
16
17struct AsyncMutex;
18
35template<class T, class M = std::mutex>
36class Sync
37{
38private:
39 M mutex;
40 T data;
41
42public:
43
44 Sync() { }
45 Sync(const T & data) : data(data) { }
46 Sync(T && data) noexcept : data(std::move(data)) { }
47
48 template<typename ... Args>
49 Sync(std::in_place_t, Args &&... args) : data(std::forward<Args>(args)...) { }
50
51 class Lock
52 {
53 protected:
54 // Non-owning pointer. This would be an
55 // optional<reference_wrapper<Sync>> if it didn't break gdb accessing
56 // Lock values (as of 2024-06-15, gdb 14.2)
57 Sync * s;
58 std::unique_lock<M> lk;
59 friend Sync;
60 Lock(Sync &s) : s(&s), lk(s.mutex) { }
61 Lock(Sync &s, std::unique_lock<M> lk) : s(&s), lk(std::move(lk)) { }
62
63 inline void checkLockingInvariants()
64 {
65 assert(s);
66 assert(lk.owns_lock());
67 }
68
69 public:
70 Lock(Lock && l) : s(l.s), lk(std::move(l.lk))
71 {
72 l.s = nullptr;
73 }
74
75 Lock & operator=(Lock && other)
76 {
77 if (this != &other) {
78 s = other.s;
79 lk = std::move(other.lk);
80 other.s = nullptr;
81 }
82 return *this;
83 }
84
85 Lock(const Lock & l) = delete;
86
87 ~Lock() = default;
88
89 T * operator -> ()
90 {
91 checkLockingInvariants();
92 return &s->data;
93 }
94
95 T & operator * ()
96 {
97 checkLockingInvariants();
98 return s->data;
99 }
100
106 void wait(std::condition_variable & cv)
107 {
108 checkLockingInvariants();
109 cv.wait(lk);
110 }
111
117 template<class Rep, class Period>
118 std::cv_status wait_for(std::condition_variable & cv,
119 const std::chrono::duration<Rep, Period> & duration)
120 {
121 checkLockingInvariants();
122 return cv.wait_for(lk, duration);
123 }
124
130 template<class Rep, class Period, class Predicate>
131 bool wait_for(std::condition_variable & cv,
132 const std::chrono::duration<Rep, Period> & duration,
133 Predicate pred)
134 {
135 checkLockingInvariants();
136 return cv.wait_for(lk, duration, pred);
137 }
138
142 template<class Clock, class Duration>
143 std::cv_status wait_until(std::condition_variable & cv,
144 const std::chrono::time_point<Clock, Duration> & duration)
145 {
146 checkLockingInvariants();
147 return cv.wait_until(lk, duration);
148 }
149 };
150
154 Lock lock() { return Lock(*this); }
155
156 std::optional<Lock> tryLock()
157 {
158 if (std::unique_lock lk(mutex, std::try_to_lock_t{}); lk.owns_lock()) {
159 return Lock{*this, std::move(lk)};
160 } else {
161 return std::nullopt;
162 }
163 }
164};
165
166template<class T>
167class Sync<T, AsyncMutex> : private Sync<T, std::mutex>
168{
169private:
170 using base_type = Sync<T, std::mutex>;
171
172 std::mutex waitMutex;
173 std::list<kj::Own<kj::CrossThreadPromiseFulfiller<void>>> waiters;
174
175 std::mutex conditionMutex;
176 std::list<kj::Own<kj::CrossThreadPromiseFulfiller<void>>> conditionWaiters;
177
178public:
179 Sync() = default;
180 Sync(T && data) : base_type(std::move(data)) {}
181
182 class Lock : private base_type::Lock
183 {
184 friend Sync;
185
186 Lock(base_type::Lock lk) : base_type::Lock(std::move(lk)) {}
187
188 public:
189 Lock(Lock &&) = default;
190 Lock & operator=(Lock &&) = default;
191
192 ~Lock()
193 {
194 if (this->lk.owns_lock()) {
195 this->lk.unlock();
196 auto * s = static_cast<Sync *>(this->s);
197 std::lock_guard wlk(s->waitMutex);
198 // wake them all. it's too hard to ensure liveness with promises
199 // that can be cancelled, and contention isn't usually that big.
200 for (auto & f : s->waiters) {
201 f->fulfill();
202 }
203 s->waiters.clear();
204 }
205 }
206
207 using base_type::Lock::operator->, base_type::Lock::operator*;
208
215 kj::Promise<void> wait()
216 {
217 auto * s = static_cast<Sync *>(this->s);
218
219 {
220 auto unlock = std::move(*this);
221 }
222
223 auto pfp = kj::newPromiseAndCrossThreadFulfiller<void>();
224 {
225 std::lock_guard clk(s->conditionMutex);
226 s->conditionWaiters.push_back(std::move(pfp.fulfiller));
227 }
228 co_await pfp.promise;
229
230 *this = co_await s->lock();
231 }
232 };
233
238 void notify()
239 {
240 std::lock_guard clk(conditionMutex);
241 for (auto & f : conditionWaiters) {
242 f->fulfill();
243 }
244 conditionWaiters.clear();
245 }
246
247 auto lockSync(NeverAsync = {})
248 {
249 return base_type::lock();
250 }
251
252 kj::Promise<Lock> lock()
253 {
254 if (auto lk = tryLock()) {
255 co_return std::move(*lk);
256 }
257
258 while (true) {
259 auto pfp = kj::newPromiseAndCrossThreadFulfiller<void>();
260 {
261 std::lock_guard wlk(waitMutex);
262 waiters.push_back(std::move(pfp.fulfiller));
263 }
264 if (auto lk = tryLock()) {
265 co_return std::move(*lk);
266 }
267 co_await pfp.promise;
268 }
269 }
270
271 std::optional<Lock> tryLock()
272 {
273 if (auto lk = base_type::tryLock()) {
274 return Lock(std::move(*lk));
275 } else {
276 return std::nullopt;
277 }
278 }
279};
280
281}
Definition args.hh:31
Definition sync.hh:52
std::cv_status wait_for(std::condition_variable &cv, const std::chrono::duration< Rep, Period > &duration)
Definition sync.hh:118
void wait(std::condition_variable &cv)
Definition sync.hh:106
bool wait_for(std::condition_variable &cv, const std::chrono::duration< Rep, Period > &duration, Predicate pred)
Definition sync.hh:131
std::cv_status wait_until(std::condition_variable &cv, const std::chrono::time_point< Clock, Duration > &duration)
Definition sync.hh:143
kj::Promise< void > wait()
Definition sync.hh:215
void notify()
Definition sync.hh:238
Lock lock()
Definition sync.hh:154
Definition types.hh:172