Skip to content

Commit c219b93

Browse files
committed
util: introduce general purpose thread pool
1 parent d30f149 commit c219b93

File tree

3 files changed

+467
-0
lines changed

3 files changed

+467
-0
lines changed

src/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ add_executable(test_bitcoin
106106
system_ram_tests.cpp
107107
system_tests.cpp
108108
testnet4_miner_tests.cpp
109+
threadpool_tests.cpp
109110
timeoffsets_tests.cpp
110111
torcontrol_tests.cpp
111112
transaction_tests.cpp

src/test/threadpool_tests.cpp

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
// Copyright (c) 2024-present The Bitcoin Core developers
2+
// Distributed under the MIT software license, see the accompanying
3+
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4+
5+
#include <util/string.h>
6+
#include <util/threadpool.h>
7+
8+
#include <boost/test/unit_test.hpp>
9+
10+
BOOST_AUTO_TEST_SUITE(threadpool_tests)
11+
12+
constexpr auto TIMEOUT_SECS = std::chrono::seconds(120);
13+
14+
template <typename T>
15+
void WaitFor(const std::vector<std::future<T>>& futures, const std::string& context)
16+
{
17+
for (size_t i = 0; i < futures.size(); ++i) {
18+
if (futures[i].wait_for(TIMEOUT_SECS) != std::future_status::ready) {
19+
throw std::runtime_error("Timeout waiting for: " + context + ", task index " + util::ToString(i));
20+
}
21+
}
22+
}
23+
24+
// Block a number of worker threads by submitting tasks that wait on `blocker_future`.
25+
// Returns the futures of the blocking tasks, ensuring all have started and are waiting.
26+
std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::shared_future<void>& blocker_future, int num_of_threads_to_block, const std::string& context) {
27+
// Per-thread ready promises to ensure all workers are actually blocked
28+
std::vector<std::promise<void>> ready_promises(num_of_threads_to_block);
29+
std::vector<std::future<void>> ready_futures;
30+
ready_futures.reserve(num_of_threads_to_block);
31+
for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future());
32+
33+
// Fill all workers with blocking tasks
34+
std::vector<std::future<void>> blocking_tasks;
35+
for (int i = 0; i < num_of_threads_to_block; i++) {
36+
std::promise<void>& ready = ready_promises[i];
37+
blocking_tasks.emplace_back(threadPool.Submit([blocker_future, &ready]() {
38+
ready.set_value();
39+
blocker_future.wait();
40+
}));
41+
}
42+
43+
// Wait until all threads are actually blocked
44+
WaitFor(ready_futures, context);
45+
return blocking_tasks;
46+
}
47+
48+
BOOST_AUTO_TEST_CASE(threadpool_basic)
49+
{
50+
// Test Cases
51+
// 0) Submit task to a non-started pool.
52+
// 1) Submit tasks and verify completion.
53+
// 2) Maintain all threads busy except one.
54+
// 3) Wait for work to finish.
55+
// 4) Wait for result object.
56+
// 5) The task throws an exception, catch must be done in the consumer side.
57+
// 6) Busy workers, help them by processing tasks from outside.
58+
// 7) Recursive submission of tasks.
59+
// 8) Submit task when all threads are busy, stop pool and verify the task gets executed.
60+
61+
const int NUM_WORKERS_DEFAULT = 3;
62+
const std::string POOL_NAME = "test";
63+
64+
// Test case 0, submit task to a non-started pool
65+
{
66+
ThreadPool threadPool(POOL_NAME);
67+
bool err = false;
68+
try {
69+
threadPool.Submit([]() { return false; });
70+
} catch (const std::runtime_error&) { err = true; }
71+
BOOST_CHECK(err);
72+
}
73+
74+
// Test case 1, submit tasks and verify completion.
75+
{
76+
int num_tasks = 50;
77+
78+
ThreadPool threadPool(POOL_NAME);
79+
threadPool.Start(NUM_WORKERS_DEFAULT);
80+
std::atomic<int> counter = 0;
81+
82+
// Store futures to ensure completion before checking counter.
83+
std::vector<std::future<void>> futures;
84+
futures.reserve(num_tasks);
85+
86+
for (int i = 1; i <= num_tasks; i++) {
87+
futures.emplace_back(threadPool.Submit([&counter, i]() {
88+
counter.fetch_add(i);
89+
}));
90+
}
91+
92+
// Wait for all tasks to finish
93+
WaitFor(futures, /*context=*/"test1 task");
94+
int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
95+
BOOST_CHECK_EQUAL(counter.load(), expected_value);
96+
BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
97+
}
98+
99+
// Test case 2, maintain all threads busy except one.
100+
{
101+
ThreadPool threadPool(POOL_NAME);
102+
threadPool.Start(NUM_WORKERS_DEFAULT);
103+
// Single blocking future for all threads
104+
std::promise<void> blocker;
105+
std::shared_future<void> blocker_future(blocker.get_future());
106+
const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT-1, /*context=*/"test2 blocking tasks enabled");
107+
108+
// Now execute tasks on the single available worker
109+
// and check that all the tasks are executed.
110+
int num_tasks = 15;
111+
std::atomic<int> counter = 0;
112+
113+
// Store futures to wait on
114+
std::vector<std::future<void>> futures;
115+
futures.reserve(num_tasks);
116+
for (int i = 0; i < num_tasks; i++) {
117+
futures.emplace_back(threadPool.Submit([&counter]() {
118+
counter.fetch_add(1);
119+
}));
120+
}
121+
122+
WaitFor(futures, /*context=*/"test2 tasks");
123+
BOOST_CHECK_EQUAL(counter.load(), num_tasks);
124+
125+
blocker.set_value();
126+
WaitFor(blocking_tasks, /*context=*/"test2 blocking tasks disabled");
127+
threadPool.Stop();
128+
BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
129+
}
130+
131+
// Test case 3, wait for work to finish.
132+
{
133+
ThreadPool threadPool(POOL_NAME);
134+
threadPool.Start(NUM_WORKERS_DEFAULT);
135+
std::atomic<bool> flag = false;
136+
std::future<void> future = threadPool.Submit([&flag]() {
137+
std::this_thread::sleep_for(std::chrono::milliseconds{200});
138+
flag.store(true);
139+
});
140+
future.wait();
141+
BOOST_CHECK(flag.load());
142+
}
143+
144+
// Test case 4, obtain result object.
145+
{
146+
ThreadPool threadPool(POOL_NAME);
147+
threadPool.Start(NUM_WORKERS_DEFAULT);
148+
std::future<bool> future_bool = threadPool.Submit([]() {
149+
return true;
150+
});
151+
BOOST_CHECK(future_bool.get());
152+
153+
std::future<std::string> future_str = threadPool.Submit([]() {
154+
return std::string("true");
155+
});
156+
std::string result = future_str.get();
157+
BOOST_CHECK_EQUAL(result, "true");
158+
}
159+
160+
// Test case 5, throw exception and catch it on the consumer side.
161+
{
162+
ThreadPool threadPool(POOL_NAME);
163+
threadPool.Start(NUM_WORKERS_DEFAULT);
164+
165+
int ROUNDS = 5;
166+
std::string err_msg{"something wrong happened"};
167+
std::vector<std::future<void>> futures;
168+
futures.reserve(ROUNDS);
169+
for (int i = 0; i < ROUNDS; i++) {
170+
futures.emplace_back(threadPool.Submit([err_msg, i]() {
171+
throw std::runtime_error(err_msg + util::ToString(i));
172+
}));
173+
}
174+
175+
for (int i = 0; i < ROUNDS; i++) {
176+
try {
177+
futures.at(i).get();
178+
BOOST_FAIL("Expected exception not thrown");
179+
} catch (const std::runtime_error& e) {
180+
BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i));
181+
}
182+
}
183+
}
184+
185+
// Test case 6, all workers are busy, help them by processing tasks from outside.
186+
{
187+
ThreadPool threadPool(POOL_NAME);
188+
threadPool.Start(NUM_WORKERS_DEFAULT);
189+
190+
std::promise<void> blocker;
191+
std::shared_future<void> blocker_future(blocker.get_future());
192+
const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT, /*context=*/"test6 blocking tasks enabled");
193+
194+
// Now submit tasks and check that none of them are executed.
195+
int num_tasks = 20;
196+
std::atomic<int> counter = 0;
197+
for (int i = 0; i < num_tasks; i++) {
198+
threadPool.Submit([&counter]() {
199+
counter.fetch_add(1);
200+
});
201+
}
202+
std::this_thread::sleep_for(std::chrono::milliseconds{100});
203+
BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 20);
204+
205+
// Now process manually
206+
for (int i = 0; i < num_tasks; i++) {
207+
threadPool.ProcessTask();
208+
}
209+
BOOST_CHECK_EQUAL(counter.load(), num_tasks);
210+
blocker.set_value();
211+
threadPool.Stop();
212+
WaitFor(blocking_tasks, "Failure waiting for test6 blocking task futures");
213+
}
214+
215+
// Test case 7, recursive submission of tasks.
216+
{
217+
ThreadPool threadPool(POOL_NAME);
218+
threadPool.Start(NUM_WORKERS_DEFAULT);
219+
220+
std::promise<void> signal;
221+
threadPool.Submit([&]() {
222+
threadPool.Submit([&]() {
223+
signal.set_value();
224+
});
225+
});
226+
227+
signal.get_future().wait();
228+
threadPool.Stop();
229+
}
230+
231+
// Test case 8, submit a task when all threads are busy and then stop the pool.
232+
{
233+
ThreadPool threadPool(POOL_NAME);
234+
threadPool.Start(NUM_WORKERS_DEFAULT);
235+
236+
std::promise<void> blocker;
237+
std::shared_future<void> blocker_future(blocker.get_future());
238+
const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT, /*context=*/"test8 blocking tasks enabled");
239+
240+
// Submit an extra task that should execute once a worker is free
241+
std::future<bool> future = threadPool.Submit([]() { return true; });
242+
243+
// At this point, all workers are blocked, and the extra task is queued
244+
BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
245+
246+
// Wait a short moment before unblocking the threads to mimic a concurrent shutdown
247+
std::thread thread_unblocker([&blocker]() {
248+
std::this_thread::sleep_for(std::chrono::milliseconds{300});
249+
blocker.set_value();
250+
});
251+
252+
// Stop the pool while the workers are still blocked
253+
threadPool.Stop();
254+
255+
// Expect the submitted task to complete
256+
BOOST_CHECK(future.get());
257+
thread_unblocker.join();
258+
259+
// Obviously all the previously blocking tasks should be completed at this point too
260+
WaitFor(blocking_tasks, "Failure waiting for test8 blocking task futures");
261+
262+
// Pool should be stopped and no workers remaining
263+
BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
264+
}
265+
}
266+
267+
BOOST_AUTO_TEST_SUITE_END()

0 commit comments

Comments
 (0)