Nix 2.93.3
Lix: A modern, delicious implementation of the Nix package manager; unstable internal interfaces
Loading...
Searching...
No Matches
thread-pool.hh
Go to the documentation of this file.
1#pragma once
3
8#include "lix/libutil/sync.hh"
9
10#include <kj/async.h>
11#include <map>
12#include <queue>
13#include <functional>
14#include <thread>
15#include <atomic>
16
17namespace nix {
18
19MakeError(ThreadPoolShutDown, Error);
20
25class ThreadPool
26{
27public:
28
29 ThreadPool(const char * name, size_t maxThreads = 0);
30
31 ~ThreadPool();
32
38 typedef std::function<void(AsyncIoRoot &)> work_t;
39
43 void enqueueWithAio(const work_t & t);
44
45 void enqueue(std::function<void()> t)
46 {
47 enqueueWithAio([t{std::move(t)}](AsyncIoRoot &) { t(); });
48 }
49
62 void process();
63
65 kj::Promise<Result<void>> processAsync();
66
67private:
68
69 size_t maxThreads;
70
71 const char * name;
72
73 struct State
74 {
75 std::queue<work_t> pending;
76 size_t active = 0;
77 std::exception_ptr exception;
78 std::vector<std::thread> workers;
79 bool draining = false;
80 std::optional<kj::Own<kj::CrossThreadPromiseFulfiller<void>>> anyWorkerExited;
81 };
82
83 std::atomic_bool quit{false};
84
85 Sync<State> state_;
86
87 std::condition_variable work;
88
89 void doWork();
90
91 void shutdown();
92};
93
99template<typename T>
101 const char *poolName,
102 const std::set<T> & nodes,
103 std::function<std::set<T>(AsyncIoRoot &, const T &)> getEdges,
104 std::function<void(AsyncIoRoot &, const T &)> processNode)
105{
106 struct Graph {
107 std::set<T> left;
108 std::map<T, std::set<T>> refs, rrefs;
109 };
110
111 Sync<Graph> graph_(Graph{nodes, {}, {}});
112
113 std::function<void(AsyncIoRoot &, const T &)> worker;
114
115 /* Create pool last to ensure threads are stopped before other destructors
116 * run */
117 ThreadPool pool{poolName};
118
119
120 worker = [&](AsyncIoRoot & aio, const T & node) {
121
122 {
123 auto graph(graph_.lock());
124 auto i = graph->refs.find(node);
125 if (i == graph->refs.end())
126 goto getRefs;
127 goto doWork;
128 }
129
130 getRefs:
131 {
132 auto refs = getEdges(aio, node);
133 refs.erase(node);
134
135 {
136 auto graph(graph_.lock());
137 for (auto & ref : refs)
138 if (graph->left.count(ref)) {
139 graph->refs[node].insert(ref);
140 graph->rrefs[ref].insert(node);
141 }
142 if (graph->refs[node].empty())
143 goto doWork;
144 }
145 }
146
147 return;
148
149 doWork:
150 processNode(aio, node);
151
152 /* Enqueue work for all nodes that were waiting on this one
153 and have no unprocessed dependencies. */
154 {
155 auto graph(graph_.lock());
156 for (auto & rref : graph->rrefs[node]) {
157 auto & refs(graph->refs[rref]);
158 auto i = refs.find(node);
159 assert(i != refs.end());
160 refs.erase(i);
161 if (refs.empty())
162 pool.enqueueWithAio(std::bind(worker, std::placeholders::_1, rref));
163 }
164 graph->left.erase(node);
165 graph->refs.erase(node);
166 graph->rrefs.erase(node);
167 }
168 };
169
170 for (auto & node : nodes)
171 pool.enqueueWithAio(std::bind(worker, std::placeholders::_1, std::ref(node)));
172
173 pool.process();
174
175 if (!graph_.lock()->left.empty())
176 throw Error("graph processing incomplete (cyclic reference?)");
177}
178
179template<typename T>
180void processGraph(
181 const char *poolName,
182 const std::set<T> & nodes,
183 std::function<std::set<T>(const T &)> getEdges,
184 std::function<void(const T &)> processNode)
185{
186 processGraph<T>(
187 poolName,
188 nodes,
189 [&](AsyncIoRoot &, const T & node) { return getEdges(node); },
190 [&](AsyncIoRoot &, const T & node) { processNode(node); }
191 );
192}
193
194template<typename T>
195kj::Promise<Result<void>> processGraphAsync(
196 const std::set<T> & nodes,
197 std::function<kj::Promise<Result<std::set<T>>>(const T &)> getEdges,
198 std::function<kj::Promise<Result<void>>(const T &)> processNode)
199try {
200 struct Graph {
201 std::set<T> left;
202 std::map<T, std::set<T>> refs, rrefs;
203 };
204
205 Graph graph{nodes, {}, {}};
206
207 std::function<kj::Promise<Result<void>>(const T &)> worker;
208
209 // NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines)
210 worker = [&](const T & node) -> kj::Promise<Result<void>> {
211 try {
212 auto i = graph.refs.find(node);
213 if (i == graph.refs.end())
214 goto getRefs;
215 goto doWork;
216
217 getRefs:
218 {
219 auto refs = LIX_TRY_AWAIT(getEdges(node));
220 refs.erase(node);
221
222 {
223 for (auto & ref : refs)
224 if (graph.left.count(ref)) {
225 graph.refs[node].insert(ref);
226 graph.rrefs[ref].insert(node);
227 }
228 if (graph.refs[node].empty())
229 goto doWork;
230 }
231 }
232
233 co_return result::success();
234
235 doWork:
236 LIX_TRY_AWAIT(processNode(node));
237
238 /* Enqueue work for all nodes that were waiting on this one
239 and have no unprocessed dependencies. */
240 std::vector<T> unblocked;
241 for (auto & rref : graph.rrefs[node]) {
242 auto & refs(graph.refs[rref]);
243 auto i = refs.find(node);
244 assert(i != refs.end());
245 refs.erase(i);
246 if (refs.empty())
247 unblocked.push_back(rref);
248 }
249 graph.left.erase(node);
250 graph.refs.erase(node);
251 graph.rrefs.erase(node);
252 LIX_TRY_AWAIT(asyncSpread(unblocked, worker));
253 co_return result::success();
254 } catch (...) {
255 co_return result::current_exception();
256 }
257 };
258
259 LIX_TRY_AWAIT(asyncSpread(nodes, worker));
260
261 if (!graph.left.empty())
262 throw Error("graph processing incomplete (cyclic reference?)");
263 co_return result::success();
264} catch (...) {
265 co_return result::current_exception();
266}
267
268}
kj::Promise< Result< void > > asyncSpread(Input &&input, Fn fn)
Definition async-collect.hh:113
Definition sync.hh:37
Definition thread-pool.hh:26
void process()
Definition thread-pool.cc:55
std::function< void(AsyncIoRoot &)> work_t
Definition thread-pool.hh:38
kj::Promise< Result< void > > processAsync()
Definition thread-pool.cc:84
void enqueueWithAio(const work_t &t)
Definition thread-pool.cc:44
Definition ref.hh:19
This file defines two main structs/classes used in nix error handling.
Definition async.hh:39
void processGraph(const char *poolName, const std::set< T > &nodes, std::function< std::set< T >(AsyncIoRoot &, const T &)> getEdges, std::function< void(AsyncIoRoot &, const T &)> processNode)
Definition thread-pool.hh:100