Nix 2.93.3
Lix: A modern, delicious implementation of the Nix package manager; unstable internal interfaces
Loading...
Searching...
No Matches
async-collect.hh
Go to the documentation of this file.
1#pragma once
3
5#include <concepts>
6#include <kj/async.h>
7#include <kj/common.h>
8#include <kj/vector.h>
9#include <list>
10#include <optional>
11#include <type_traits>
12
13namespace nix {
14
15template<typename K, typename V>
16class AsyncCollect
17{
18public:
19 using Item = std::conditional_t<std::is_void_v<V>, K, std::pair<K, V>>;
20
21private:
22 kj::ForkedPromise<void> allPromises;
23 std::list<Item> results;
24 size_t remaining;
25
26 kj::ForkedPromise<void> signal;
27 kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> notify;
28
29 void oneDone(Item item)
30 {
31 results.emplace_back(std::move(item));
32 remaining -= 1;
33 KJ_IF_MAYBE (n, notify) {
34 (*n)->fulfill();
35 notify = nullptr;
36 }
37 }
38
39 kj::Promise<void> collectorFor(K key, kj::Promise<V> promise)
40 {
41 if constexpr (std::is_void_v<V>) {
42 return promise.then([this, key{std::move(key)}] { oneDone(std::move(key)); });
43 } else {
44 return promise.then([this, key{std::move(key)}](V v) {
45 oneDone(Item{std::move(key), std::move(v)});
46 });
47 }
48 }
49
50 kj::ForkedPromise<void> waitForAll(kj::Array<std::pair<K, kj::Promise<V>>> & promises)
51 {
52 kj::Vector<kj::Promise<void>> wrappers;
53 for (auto & [key, promise] : promises) {
54 wrappers.add(collectorFor(std::move(key), std::move(promise)));
55 }
56
57 return kj::joinPromisesFailFast(wrappers.releaseAsArray()).fork();
58 }
59
60public:
61 AsyncCollect(kj::Array<std::pair<K, kj::Promise<V>>> && promises)
62 : allPromises(waitForAll(promises))
63 , remaining(promises.size())
64 , signal{nullptr}
65 {
66 }
67
68 // oneDone promises capture `this`
69 KJ_DISALLOW_COPY_AND_MOVE(AsyncCollect);
70
71 kj::Promise<std::optional<Item>> next()
72 {
73 if (remaining == 0 && results.empty()) {
74 return {std::nullopt};
75 }
76
77 if (!results.empty()) {
78 auto result = std::move(results.front());
79 results.pop_front();
80 return {{std::move(result)}};
81 }
82
83 if (notify == nullptr) {
84 auto pair = kj::newPromiseAndFulfiller<void>();
85 notify = std::move(pair.fulfiller);
86 signal = pair.promise.fork();
87 }
88
89 return signal.addBranch().exclusiveJoin(allPromises.addBranch()).then([this] {
90 return next();
91 });
92 }
93};
94
100template<typename K, typename V>
101AsyncCollect<K, V> asyncCollect(kj::Array<std::pair<K, kj::Promise<V>>> promises)
102{
103 return AsyncCollect<K, V>(std::move(promises));
104}
105
112template<typename Input, typename Fn>
113kj::Promise<Result<void>> asyncSpread(Input && input, Fn fn)
114 requires requires {
115 {
116 fn(*begin(input))
117 } -> std::same_as<kj::Promise<Result<void>>>;
118 }
119{
120 kj::Vector<std::pair<std::tuple<>, kj::Promise<Result<void>>>> children;
121 if constexpr (requires { input.size(); }) {
122 children.reserve(input.size());
123 }
124
125 for (auto & i : input) {
126 children.add(std::tuple(), fn(i));
127 }
128
129 auto collect = asyncCollect(children.releaseAsArray());
130 while (auto r = co_await collect.next()) {
131 if (!r->second.has_value()) {
132 co_return std::move(r->second);
133 }
134 }
135
136 co_return result::success();
137}
138
139}
kj::Promise< Result< void > > asyncSpread(Input &&input, Fn fn)
Definition async-collect.hh:113
AsyncCollect< K, V > asyncCollect(kj::Array< std::pair< K, kj::Promise< V > > > promises)
Definition async-collect.hh:101
Definition async-collect.hh:17