Nix 2.93.3
Lix: A modern, delicious implementation of the Nix package manager; unstable internal interfaces
Loading...
Searching...
No Matches
async-semaphore.hh
Go to the documentation of this file.
1#pragma once
4
5#include <cassert>
6#include <kj/async.h>
7#include <kj/common.h>
8#include <kj/exception.h>
9#include <kj/list.h>
10#include <kj/source-location.h>
11#include <memory>
12#include <optional>
13
14namespace nix {
15
16class AsyncSemaphore
17{
18public:
19 class [[nodiscard("destroying a semaphore guard releases the semaphore immediately")]] Token
20 {
21 struct Release
22 {
23 void operator()(AsyncSemaphore * sem) const
24 {
25 sem->unsafeRelease();
26 }
27 };
28
29 std::unique_ptr<AsyncSemaphore, Release> parent;
30
31 public:
32 Token() = default;
33 Token(AsyncSemaphore & parent, kj::Badge<AsyncSemaphore>) : parent(&parent) {}
34
35 bool valid() const
36 {
37 return parent != nullptr;
38 }
39 };
40
41private:
42 struct Waiter
43 {
44 kj::PromiseFulfiller<Token> & fulfiller;
45 kj::ListLink<Waiter> link;
46 kj::List<Waiter, &Waiter::link> & list;
47
48 Waiter(kj::PromiseFulfiller<Token> & fulfiller, kj::List<Waiter, &Waiter::link> & list)
49 : fulfiller(fulfiller)
50 , list(list)
51 {
52 list.add(*this);
53 }
54
55 ~Waiter()
56 {
57 if (link.isLinked()) {
58 list.remove(*this);
59 }
60 }
61 };
62
63 const unsigned capacity_;
64 unsigned used_ = 0;
65 kj::List<Waiter, &Waiter::link> waiters;
66
67 void unsafeRelease()
68 {
69 used_ -= 1;
70 while (used_ < capacity_ && !waiters.empty()) {
71 used_ += 1;
72 auto & w = waiters.front();
73 w.fulfiller.fulfill(Token{*this, {}});
74 waiters.remove(w);
75 }
76 }
77
78public:
79 explicit AsyncSemaphore(unsigned capacity) : capacity_(capacity) {}
80
81 KJ_DISALLOW_COPY_AND_MOVE(AsyncSemaphore);
82
83 ~AsyncSemaphore()
84 {
85 assert(waiters.empty() && "destroyed a semaphore with active waiters");
86 }
87
88 std::optional<Token> tryAcquire()
89 {
90 if (used_ < capacity_) {
91 used_ += 1;
92 return Token{*this, {}};
93 } else {
94 return {};
95 }
96 }
97
98 kj::Promise<Token> acquire()
99 {
100 if (auto t = tryAcquire()) {
101 return std::move(*t);
102 } else {
103 return kj::newAdaptedPromise<Token, Waiter>(waiters);
104 }
105 }
106
107 unsigned capacity() const
108 {
109 return capacity_;
110 }
111
112 unsigned used() const
113 {
114 return used_;
115 }
116
117 unsigned available() const
118 {
119 return capacity_ - used_;
120 }
121};
122}
Definition async-semaphore.hh:20