19 using Item = std::conditional_t<std::is_void_v<V>, K, std::pair<K, V>>;
22 kj::ForkedPromise<void> allPromises;
23 std::list<Item> results;
26 kj::ForkedPromise<void> signal;
27 kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> notify;
29 void oneDone(Item item)
31 results.emplace_back(std::move(item));
33 KJ_IF_MAYBE (n, notify) {
39 kj::Promise<void> collectorFor(K key, kj::Promise<V> promise)
41 if constexpr (std::is_void_v<V>) {
42 return promise.then([
this, key{std::move(key)}] { oneDone(std::move(key)); });
44 return promise.then([
this, key{std::move(key)}](V v) {
45 oneDone(Item{std::move(key), std::move(v)});
50 kj::ForkedPromise<void> waitForAll(kj::Array<std::pair<K, kj::Promise<V>>> & promises)
52 kj::Vector<kj::Promise<void>> wrappers;
53 for (
auto & [key, promise] : promises) {
54 wrappers.add(collectorFor(std::move(key), std::move(promise)));
57 return kj::joinPromisesFailFast(wrappers.releaseAsArray()).fork();
61 AsyncCollect(kj::Array<std::pair<K, kj::Promise<V>>> && promises)
62 : allPromises(waitForAll(promises))
63 , remaining(promises.size())
69 KJ_DISALLOW_COPY_AND_MOVE(AsyncCollect);
71 kj::Promise<std::optional<Item>> next()
73 if (remaining == 0 && results.empty()) {
74 return {std::nullopt};
77 if (!results.empty()) {
78 auto result = std::move(results.front());
80 return {{std::move(result)}};
83 if (notify ==
nullptr) {
84 auto pair = kj::newPromiseAndFulfiller<void>();
85 notify = std::move(pair.fulfiller);
86 signal = pair.promise.fork();
89 return signal.addBranch().exclusiveJoin(allPromises.addBranch()).then([
this] {
117 } -> std::same_as<kj::Promise<Result<void>>>;
120 kj::Vector<std::pair<std::tuple<>, kj::Promise<Result<void>>>> children;
121 if constexpr (
requires { input.size(); }) {
122 children.reserve(input.size());
125 for (
auto & i : input) {
126 children.add(std::tuple(), fn(i));
130 while (
auto r =
co_await collect.next()) {
131 if (!r->second.has_value()) {
132 co_return std::move(r->second);
136 co_return result::success();
AsyncCollect< K, V > asyncCollect(kj::Array< std::pair< K, kj::Promise< V > > > promises)
Definition async-collect.hh:101