19MakeError(ThreadPoolShutDown, Error);
29 ThreadPool(
const char * name,
size_t maxThreads = 0);
45 void enqueue(std::function<
void()> t)
75 std::queue<work_t> pending;
77 std::exception_ptr exception;
78 std::vector<std::thread> workers;
79 bool draining =
false;
80 std::optional<kj::Own<kj::CrossThreadPromiseFulfiller<void>>> anyWorkerExited;
83 std::atomic_bool quit{
false};
87 std::condition_variable work;
101 const char *poolName,
102 const std::set<T> & nodes,
103 std::function<std::set<T>(
AsyncIoRoot &,
const T &)> getEdges,
104 std::function<
void(
AsyncIoRoot &,
const T &)> processNode)
108 std::map<T, std::set<T>> refs, rrefs;
113 std::function<void(
AsyncIoRoot &,
const T &)> worker;
123 auto graph(graph_.lock());
124 auto i = graph->refs.find(node);
125 if (i == graph->refs.end())
132 auto refs = getEdges(aio, node);
136 auto graph(graph_.lock());
137 for (
auto &
ref : refs)
138 if (graph->left.count(
ref)) {
139 graph->refs[node].insert(
ref);
140 graph->rrefs[
ref].insert(node);
142 if (graph->refs[node].empty())
150 processNode(aio, node);
155 auto graph(graph_.lock());
156 for (
auto & rref : graph->rrefs[node]) {
157 auto & refs(graph->refs[rref]);
158 auto i = refs.find(node);
159 assert(i != refs.end());
162 pool.
enqueueWithAio(std::bind(worker, std::placeholders::_1, rref));
164 graph->left.erase(node);
165 graph->refs.erase(node);
166 graph->rrefs.erase(node);
170 for (
auto & node : nodes)
171 pool.
enqueueWithAio(std::bind(worker, std::placeholders::_1, std::ref(node)));
175 if (!graph_.lock()->left.empty())
176 throw Error(
"graph processing incomplete (cyclic reference?)");
181 const char *poolName,
182 const std::set<T> & nodes,
183 std::function<std::set<T>(
const T &)> getEdges,
184 std::function<
void(
const T &)> processNode)
189 [&](AsyncIoRoot &,
const T & node) {
return getEdges(node); },
190 [&](
AsyncIoRoot &,
const T & node) { processNode(node); }
195kj::Promise<Result<void>> processGraphAsync(
196 const std::set<T> & nodes,
197 std::function<kj::Promise<Result<std::set<T>>>(
const T &)> getEdges,
198 std::function<kj::Promise<Result<void>>(
const T &)> processNode)
202 std::map<T, std::set<T>> refs, rrefs;
205 Graph graph{nodes, {}, {}};
207 std::function<kj::Promise<Result<void>>(
const T &)> worker;
210 worker = [&](
const T & node) -> kj::Promise<Result<void>> {
212 auto i = graph.refs.find(node);
213 if (i == graph.refs.end())
219 auto refs = LIX_TRY_AWAIT(getEdges(node));
223 for (
auto &
ref : refs)
224 if (graph.left.count(
ref)) {
225 graph.refs[node].insert(
ref);
226 graph.rrefs[
ref].insert(node);
228 if (graph.refs[node].empty())
233 co_return result::success();
236 LIX_TRY_AWAIT(processNode(node));
240 std::vector<T> unblocked;
241 for (
auto & rref : graph.rrefs[node]) {
242 auto & refs(graph.refs[rref]);
243 auto i = refs.find(node);
244 assert(i != refs.end());
247 unblocked.push_back(rref);
249 graph.left.erase(node);
250 graph.refs.erase(node);
251 graph.rrefs.erase(node);
253 co_return result::success();
255 co_return result::current_exception();
261 if (!graph.left.empty())
262 throw Error(
"graph processing incomplete (cyclic reference?)");
263 co_return result::success();
265 co_return result::current_exception();
kj::Promise< Result< void > > asyncSpread(Input &&input, Fn fn)
Definition async-collect.hh:113
Definition thread-pool.hh:26
void process()
Definition thread-pool.cc:55
std::function< void(AsyncIoRoot &)> work_t
Definition thread-pool.hh:38
kj::Promise< Result< void > > processAsync()
Definition thread-pool.cc:84
void enqueueWithAio(const work_t &t)
Definition thread-pool.cc:44
This file defines two main structs/classes used in nix error handling.
void processGraph(const char *poolName, const std::set< T > &nodes, std::function< std::set< T >(AsyncIoRoot &, const T &)> getEdges, std::function< void(AsyncIoRoot &, const T &)> processNode)
Definition thread-pool.hh:100