24#include <condition_variable>
43 void run(std::function<
void()>&& cb);
46 std::future<T> get(std::function<T()>&& cb) {
47 auto ret = std::make_shared<std::promise<T>>();
48 run([cb = std::move(cb), ret]()
mutable {
53 ret->set_exception(std::current_exception());
57 return ret->get_future();
60 std::shared_future<T> getShared(std::function<T()>&& cb) {
61 return get(std::move(cb));
69 std::condition_variable cv_ {};
70 std::queue<std::function<void()>> tasks_ {};
71 std::vector<std::unique_ptr<std::thread>> threads_;
72 unsigned readyThreads_ {0};
75 const unsigned maxThreads_;
78class OPENDHT_PUBLIC
Executor :
public std::enable_shared_from_this<Executor> {
81 : threadPool_(pool), maxConcurrent_(maxConcurrent)
84 void run(std::function<
void()>&& task);
87 std::reference_wrapper<ThreadPool> threadPool_;
88 const unsigned maxConcurrent_ {1};
90 unsigned current_ {0};
91 std::queue<std::function<void()>> tasks_ {};
93 void run_(std::function<
void()>&& task);
100 : threadPool_(pool), state_(std::make_shared<SharedState>())
109 state_->destroy(
false);
112 void run(std::function<
void()>&& task) {
113 std::lock_guard<std::mutex> lock(state_->mtx);
114 if (state_->shutdown_)
return;
115 state_->pendingTasks++;
116 threadPool_.get().run([task = std::move(task), state = state_] {
124 std::condition_variable cv {};
125 unsigned pendingTasks {0};
126 unsigned ongoingTasks {0};
128 bool shutdown_ {
false};
130 std::atomic_bool destroyed {
false};
132 void destroy(
bool wait =
true) {
133 std::unique_lock<std::mutex> lock(mtx);
134 if (destroyed)
return;
136 cv.wait(lock, [
this] {
return pendingTasks == 0 && ongoingTasks == 0; });
140 cv.wait(lock, [
this] {
return ongoingTasks == 0; });
145 void run(
const std::function<
void()>& task) {
147 std::lock_guard<std::mutex> lock(mtx);
151 if (destroyed)
return;
154 std::lock_guard<std::mutex> lock(mtx);
160 std::reference_wrapper<ThreadPool> threadPool_;
161 std::shared_ptr<SharedState> state_;