Bitcoin Core 28.0.0
P2P Digital Currency
Loading...
Searching...
No Matches
checkqueue.h
Go to the documentation of this file.
1// Copyright (c) 2012-2022 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef BITCOIN_CHECKQUEUE_H
6#define BITCOIN_CHECKQUEUE_H
7
8#include <sync.h>
9#include <tinyformat.h>
10#include <util/threadnames.h>
11
12#include <algorithm>
13#include <iterator>
14#include <vector>
15
26template <typename T>
28{
29private:
32
34 std::condition_variable m_worker_cv;
35
37 std::condition_variable m_master_cv;
38
41 std::vector<T> queue GUARDED_BY(m_mutex);
42
44 int nIdle GUARDED_BY(m_mutex){0};
45
47 int nTotal GUARDED_BY(m_mutex){0};
48
50 bool fAllOk GUARDED_BY(m_mutex){true};
51
57 unsigned int nTodo GUARDED_BY(m_mutex){0};
58
60 const unsigned int nBatchSize;
61
62 std::vector<std::thread> m_worker_threads;
63 bool m_request_stop GUARDED_BY(m_mutex){false};
64
67 {
68 std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
69 std::vector<T> vChecks;
70 vChecks.reserve(nBatchSize);
71 unsigned int nNow = 0;
72 bool fOk = true;
73 do {
74 {
75 WAIT_LOCK(m_mutex, lock);
76 // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
77 if (nNow) {
78 fAllOk &= fOk;
79 nTodo -= nNow;
80 if (nTodo == 0 && !fMaster)
81 // We processed the last element; inform the master it can exit and return the result
82 m_master_cv.notify_one();
83 } else {
84 // first iteration
85 nTotal++;
86 }
87 // logically, the do loop starts here
88 while (queue.empty() && !m_request_stop) {
89 if (fMaster && nTodo == 0) {
90 nTotal--;
91 bool fRet = fAllOk;
92 // reset the status for new work later
93 fAllOk = true;
94 // return the current status
95 return fRet;
96 }
97 nIdle++;
98 cond.wait(lock); // wait
99 nIdle--;
100 }
101 if (m_request_stop) {
102 return false;
103 }
104
105 // Decide how many work units to process now.
106 // * Do not try to do everything at once, but aim for increasingly smaller batches so
107 // all workers finish approximately simultaneously.
108 // * Try to account for idle jobs which will instantly start helping.
109 // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
110 nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
111 auto start_it = queue.end() - nNow;
112 vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end()));
113 queue.erase(start_it, queue.end());
114 // Check whether we need to do work at all
115 fOk = fAllOk;
116 }
117 // execute work
118 for (T& check : vChecks)
119 if (fOk)
120 fOk = check();
121 vChecks.clear();
122 } while (true);
123 }
124
125public:
128
130 explicit CCheckQueue(unsigned int batch_size, int worker_threads_num)
131 : nBatchSize(batch_size)
132 {
133 m_worker_threads.reserve(worker_threads_num);
134 for (int n = 0; n < worker_threads_num; ++n) {
135 m_worker_threads.emplace_back([this, n]() {
136 util::ThreadRename(strprintf("scriptch.%i", n));
137 Loop(false /* worker thread */);
138 });
139 }
140 }
141
142 // Since this class manages its own resources, which is a thread
143 // pool `m_worker_threads`, copy and move operations are not appropriate.
144 CCheckQueue(const CCheckQueue&) = delete;
148
151 {
152 return Loop(true /* master thread */);
153 }
154
156 void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
157 {
158 if (vChecks.empty()) {
159 return;
160 }
161
162 {
163 LOCK(m_mutex);
164 queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end()));
165 nTodo += vChecks.size();
166 }
167
168 if (vChecks.size() == 1) {
169 m_worker_cv.notify_one();
170 } else {
171 m_worker_cv.notify_all();
172 }
173 }
174
176 {
177 WITH_LOCK(m_mutex, m_request_stop = true);
178 m_worker_cv.notify_all();
179 for (std::thread& t : m_worker_threads) {
180 t.join();
181 }
182 }
183
184 bool HasThreads() const { return !m_worker_threads.empty(); }
185};
186
191template <typename T>
193{
194private:
196 bool fDone;
197
198public:
202 explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
203 {
204 // passed queue is supposed to be unused, or nullptr
205 if (pqueue != nullptr) {
206 ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
207 }
208 }
209
210 bool Wait()
211 {
212 if (pqueue == nullptr)
213 return true;
214 bool fRet = pqueue->Wait();
215 fDone = true;
216 return fRet;
217 }
218
219 void Add(std::vector<T>&& vChecks)
220 {
221 if (pqueue != nullptr) {
222 pqueue->Add(std::move(vChecks));
223 }
224 }
225
227 {
228 if (!fDone)
229 Wait();
230 if (pqueue != nullptr) {
231 LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
232 }
233 }
234};
235
236#endif // BITCOIN_CHECKQUEUE_H
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition checkqueue.h:193
CCheckQueueControl & operator=(const CCheckQueueControl &)=delete
CCheckQueue< T > *const pqueue
Definition checkqueue.h:195
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition checkqueue.h:202
CCheckQueueControl()=delete
void Add(std::vector< T > &&vChecks)
Definition checkqueue.h:219
CCheckQueueControl(const CCheckQueueControl &)=delete
Queue for verifications that have to be performed.
Definition checkqueue.h:28
const unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition checkqueue.h:60
std::condition_variable m_master_cv
Master thread blocks on this when out of work.
Definition checkqueue.h:37
bool fAllOk GUARDED_BY(m_mutex)
The temporary evaluation result.
Definition checkqueue.h:50
bool m_request_stop GUARDED_BY(m_mutex)
Definition checkqueue.h:63
bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Internal function that does bulk of the verification work.
Definition checkqueue.h:66
CCheckQueue(CCheckQueue &&)=delete
int nTotal GUARDED_BY(m_mutex)
The total number of workers (including the master).
Definition checkqueue.h:47
std::vector< std::thread > m_worker_threads
Definition checkqueue.h:62
Mutex m_control_mutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition checkqueue.h:127
std::condition_variable m_worker_cv
Worker threads block on this when out of work.
Definition checkqueue.h:34
bool HasThreads() const
Definition checkqueue.h:184
Mutex m_mutex
Mutex to protect the inner state.
Definition checkqueue.h:31
bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Wait until execution finishes, and return whether all evaluations were successful.
Definition checkqueue.h:150
CCheckQueue & operator=(CCheckQueue &&)=delete
CCheckQueue(unsigned int batch_size, int worker_threads_num)
Create a new check queue.
Definition checkqueue.h:130
void Add(std::vector< T > &&vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Add a batch of checks to the queue.
Definition checkqueue.h:156
CCheckQueue & operator=(const CCheckQueue &)=delete
int nIdle GUARDED_BY(m_mutex)
The number of workers (including the master) that are idle.
Definition checkqueue.h:44
CCheckQueue(const CCheckQueue &)=delete
unsigned int nTodo GUARDED_BY(m_mutex)
Number of verifications that haven't completed yet.
Definition checkqueue.h:57
std::vector< T > queue GUARDED_BY(m_mutex)
The queue of elements to be processed.
#define T(expected, seed, data)
void ThreadRename(const std::string &)
Rename a thread both in terms of an internal (in-memory) name as well as its system thread name.
#define WAIT_LOCK(cs, name)
Definition sync.h:262
#define ENTER_CRITICAL_SECTION(cs)
Definition sync.h:264
#define LEAVE_CRITICAL_SECTION(cs)
Definition sync.h:270
#define LOCK(cs)
Definition sync.h:257
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition sync.h:301
#define EXCLUSIVE_LOCKS_REQUIRED(...)
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...