Nix 2.93.3
Lix: A modern, delicious implementation of the Nix package manager; unstable internal interfaces
Loading...
Searching...
No Matches
pool.hh
Go to the documentation of this file.
1#pragma once
3
4#include <functional>
5#include <kj/async.h>
6#include <limits>
7#include <list>
8#include <memory>
9#include <cassert>
10#include <optional>
11#include <vector>
12
13#include "lix/libutil/async.hh"
14#include "lix/libutil/result.hh"
15#include "lix/libutil/sync.hh"
16#include "lix/libutil/ref.hh"
17#include "lix/libutil/types.hh"
18
19namespace nix {
20
38template <class R>
39class Pool
40{
41public:
42
46 typedef std::function<ref<R>()> Factory;
47
52 typedef std::function<bool(const ref<R> &)> Validator;
53
54private:
55
56 Factory factory;
57 Validator validator;
58
59 struct State
60 {
61 size_t inUse = 0;
62 size_t max;
63 std::vector<ref<R>> idle;
64 std::list<kj::Own<kj::CrossThreadPromiseFulfiller<void>>> waiters;
65
66 void notify()
67 {
68 for (auto & waiter : waiters) {
69 waiter->fulfill();
70 }
71 waiters.clear();
72 }
73 };
74
75 Sync<State> state;
76
77public:
78
79 Pool(size_t max = std::numeric_limits<size_t>::max(),
80 const Factory & factory = []() { return make_ref<R>(); },
81 const Validator & validator = [](ref<R> r) { return true; })
82 : factory(factory)
83 , validator(validator)
84 {
85 auto state_(state.lock());
86 state_->max = max;
87 }
88
89 void incCapacity()
90 {
91 auto state_(state.lock());
92 state_->max++;
93 /* we could wakeup here, but this is only used when we're
94 * about to nest Pool usages, and we want to save the slot for
95 * the nested use if we can
96 */
97 }
98
99 void decCapacity()
100 {
101 auto state_(state.lock());
102 state_->max--;
103 }
104
105 ~Pool()
106 {
107 auto state_(state.lock());
108 assert(!state_->inUse);
109 state_->max = 0;
110 state_->idle.clear();
111 }
112
113 class Handle
114 {
115 private:
116 Pool & pool;
117 std::shared_ptr<R> r;
118 bool bad = false;
119
120 friend Pool;
121
122 Handle(Pool & pool, std::shared_ptr<R> r) : pool(pool), r(r) { }
123
124 public:
125 Handle(Handle && h) : pool(h.pool), r(h.r) { h.r.reset(); }
126
127 Handle(const Handle & l) = delete;
128
129 ~Handle()
130 {
131 if (!r) return;
132 {
133 auto state_(pool.state.lock());
134 if (!bad)
135 state_->idle.push_back(ref<R>::unsafeFromPtr(r));
136 assert(state_->inUse);
137 state_->inUse--;
138 state_->notify();
139 }
140 }
141
142 R * operator -> () { return &*r; }
143 R & operator * () { return *r; }
144
145 void markBad() { bad = true; }
146 };
147
148private:
149 void getFailed()
150 {
151 auto state_(state.lock());
152 state_->inUse--;
153 state_->notify();
154 }
155
156 // lock lifetimes must always be short, and *NEVER* cross a yield point.
157 // we ensure this by using explicit continuations instead of coroutines.
158 kj::Promise<Result<std::optional<Handle>>> tryGet()
159 try {
160 auto state_(state.lock());
161
162 /* If we're over the maximum number of instance, we need
163 to wait until a slot becomes available. */
164 if (state_->idle.empty() && state_->inUse >= state_->max) {
165 auto pfp = kj::newPromiseAndCrossThreadFulfiller<void>();
166 state_->waiters.push_back(std::move(pfp.fulfiller));
167 return pfp.promise.then([this] { return tryGet(); });
168 }
169
170 while (!state_->idle.empty()) {
171 auto p = state_->idle.back();
172 state_->idle.pop_back();
173 if (validator(p)) {
174 state_->inUse++;
175 return {Handle(*this, p)};
176 }
177 }
178
179 state_->inUse++;
180 return {std::nullopt};
181 } catch (...) {
182 return {result::current_exception()};
183 }
184
185public:
186 kj::Promise<Result<Handle>> get()
187 try {
188 if (auto existing = LIX_TRY_AWAIT(tryGet())) {
189 co_return std::move(*existing);
190 }
191
192 /* We need to create a new instance. Because that might take a
193 while, we don't hold the lock in the meantime. */
194 try {
195 Handle h(*this, factory());
196 co_return h;
197 } catch (...) {
198 getFailed();
199 throw;
200 }
201 } catch (...) {
202 co_return result::current_exception();
203 }
204
205 size_t count()
206 {
207 auto state_(state.lock());
208 return state_->idle.size() + state_->inUse;
209 }
210
211 size_t capacity()
212 {
213 return state.lock()->max;
214 }
215};
216
217}
Definition pool.hh:114
Definition pool.hh:40
std::function< ref< R >()> Factory
Definition pool.hh:46
std::function< bool(const ref< R > &)> Validator
Definition pool.hh:52
Definition sync.hh:37
Lock lock()
Definition sync.hh:154
Definition ref.hh:19