Skip to content

Commit 2de0ce5

Browse files
committed
http: replace WorkQueue and threads handling for ThreadPool
Replace the HTTP server's WorkQueue implementation and single threads handling code with ThreadPool for processing HTTP requests. The ThreadPool class encapsulates all this functionality on a reusable class, properly unit and fuzz tested (the previous code was not unit nor fuzz tested at all). This cleanly separates responsibilities: The HTTP server now focuses solely on receiving and dispatching requests, while ThreadPool handles concurrency, queuing, and execution. It simplifies init, shutdown and requests tracking. This also allows us to experiment with further performance improvements at the task queuing and execution level, such as a lock-free structure, task prioritization or any other performance improvement in the future, without having to deal with HTTP code that lives on a different layer.
1 parent 49f4e91 commit 2de0ce5

File tree

2 files changed

+18
-119
lines changed

2 files changed

+18
-119
lines changed

src/httpserver.cpp

Lines changed: 18 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <util/signalinterrupt.h>
1818
#include <util/strencodings.h>
1919
#include <util/threadnames.h>
20+
#include <util/threadpool.h>
2021
#include <util/translation.h>
2122

2223
#include <condition_variable>
@@ -49,83 +50,6 @@ using common::InvalidPortErrMsg;
4950
/** Maximum size of http request (request line + headers) */
5051
static const size_t MAX_HEADERS_SIZE = 8192;
5152

52-
/** HTTP request work item */
53-
class HTTPWorkItem final : public HTTPClosure
54-
{
55-
public:
56-
HTTPWorkItem(std::unique_ptr<HTTPRequest> _req, const std::string &_path, const HTTPRequestHandler& _func):
57-
req(std::move(_req)), path(_path), func(_func)
58-
{
59-
}
60-
void operator()() override
61-
{
62-
func(req.get(), path);
63-
}
64-
65-
std::unique_ptr<HTTPRequest> req;
66-
67-
private:
68-
std::string path;
69-
HTTPRequestHandler func;
70-
};
71-
72-
/** Simple work queue for distributing work over multiple threads.
73-
* Work items are simply callable objects.
74-
*/
75-
template <typename WorkItem>
76-
class WorkQueue
77-
{
78-
private:
79-
Mutex cs;
80-
std::condition_variable cond GUARDED_BY(cs);
81-
std::deque<std::unique_ptr<WorkItem>> queue GUARDED_BY(cs);
82-
bool running GUARDED_BY(cs){true};
83-
const size_t maxDepth;
84-
85-
public:
86-
explicit WorkQueue(size_t _maxDepth) : maxDepth(_maxDepth)
87-
{
88-
}
89-
/** Precondition: worker threads have all stopped (they have been joined).
90-
*/
91-
~WorkQueue() = default;
92-
/** Enqueue a work item */
93-
bool Enqueue(WorkItem* item) EXCLUSIVE_LOCKS_REQUIRED(!cs)
94-
{
95-
LOCK(cs);
96-
if (!running || queue.size() >= maxDepth) {
97-
return false;
98-
}
99-
queue.emplace_back(std::unique_ptr<WorkItem>(item));
100-
cond.notify_one();
101-
return true;
102-
}
103-
/** Thread function */
104-
void Run() EXCLUSIVE_LOCKS_REQUIRED(!cs)
105-
{
106-
while (true) {
107-
std::unique_ptr<WorkItem> i;
108-
{
109-
WAIT_LOCK(cs, lock);
110-
while (running && queue.empty())
111-
cond.wait(lock);
112-
if (!running && queue.empty())
113-
break;
114-
i = std::move(queue.front());
115-
queue.pop_front();
116-
}
117-
(*i)();
118-
}
119-
}
120-
/** Interrupt and exit loops */
121-
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!cs)
122-
{
123-
LOCK(cs);
124-
running = false;
125-
cond.notify_all();
126-
}
127-
};
128-
12953
struct HTTPPathHandler
13054
{
13155
HTTPPathHandler(std::string _prefix, bool _exactMatch, HTTPRequestHandler _handler):
@@ -145,13 +69,14 @@ static struct event_base* eventBase = nullptr;
14569
static struct evhttp* eventHTTP = nullptr;
14670
//! List of subnets to allow RPC connections from
14771
static std::vector<CSubNet> rpc_allow_subnets;
148-
//! Work queue for handling longer requests off the event loop thread
149-
static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue{nullptr};
15072
//! Handlers for (sub)paths
15173
static GlobalMutex g_httppathhandlers_mutex;
15274
static std::vector<HTTPPathHandler> pathHandlers GUARDED_BY(g_httppathhandlers_mutex);
15375
//! Bound listening sockets
15476
static std::vector<evhttp_bound_socket *> boundSockets;
77+
//! Http thread pool - future: encapsulate in HttpContext
78+
static ThreadPool g_threadpool_http("http");
79+
static int g_max_queue_depth{100};
15580

15681
/**
15782
* @brief Helps keep track of open `evhttp_connection`s with active `evhttp_requests`
@@ -327,13 +252,13 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
327252

328253
// Dispatch to worker thread
329254
if (i != iend) {
330-
std::unique_ptr<HTTPWorkItem> item(new HTTPWorkItem(std::move(hreq), path, i->handler));
331-
assert(g_work_queue);
332-
if (g_work_queue->Enqueue(item.get())) {
333-
item.release(); /* if true, queue took ownership */
255+
if ((int)g_threadpool_http.WorkQueueSize() < g_max_queue_depth) {
256+
g_threadpool_http.Submit([req = std::move(hreq), in_path = std::move(path), fn = i->handler]() {
257+
fn(req.get(), in_path);
258+
});
334259
} else {
335260
LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");
336-
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
261+
hreq->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
337262
}
338263
} else {
339264
hreq->WriteReply(HTTP_NOT_FOUND);
@@ -412,13 +337,6 @@ static bool HTTPBindAddresses(struct evhttp* http)
412337
return !boundSockets.empty();
413338
}
414339

415-
/** Simple wrapper to set thread name and run work queue */
416-
static void HTTPWorkQueueRun(WorkQueue<HTTPClosure>* queue, int worker_num)
417-
{
418-
util::ThreadRename(strprintf("httpworker.%i", worker_num));
419-
queue->Run();
420-
}
421-
422340
/** libevent event log callback */
423341
static void libevent_log_cb(int severity, const char *msg)
424342
{
@@ -477,10 +395,9 @@ bool InitHTTPServer(const util::SignalInterrupt& interrupt)
477395
}
478396

479397
LogDebug(BCLog::HTTP, "Initialized HTTP server\n");
480-
int workQueueDepth = std::max((long)gArgs.GetIntArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
481-
LogDebug(BCLog::HTTP, "creating work queue of depth %d\n", workQueueDepth);
398+
g_max_queue_depth = std::max((long)gArgs.GetIntArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
399+
LogDebug(BCLog::HTTP, "set work queue of depth %d\n", g_max_queue_depth);
482400

483-
g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth);
484401
// transfer ownership to eventBase/HTTP via .release()
485402
eventBase = base_ctr.release();
486403
eventHTTP = http_ctr.release();
@@ -496,17 +413,13 @@ void UpdateHTTPServerLogging(bool enable) {
496413
}
497414

498415
static std::thread g_thread_http;
499-
static std::vector<std::thread> g_thread_http_workers;
500416

501417
void StartHTTPServer()
502418
{
503419
int rpcThreads = std::max((long)gArgs.GetIntArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
504420
LogInfo("Starting HTTP server with %d worker threads\n", rpcThreads);
421+
g_threadpool_http.Start(rpcThreads);
505422
g_thread_http = std::thread(ThreadHTTP, eventBase);
506-
507-
for (int i = 0; i < rpcThreads; i++) {
508-
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue.get(), i);
509-
}
510423
}
511424

512425
void InterruptHTTPServer()
@@ -516,21 +429,17 @@ void InterruptHTTPServer()
516429
// Reject requests on current connections
517430
evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr);
518431
}
519-
if (g_work_queue) {
520-
g_work_queue->Interrupt();
521-
}
432+
// Interrupt pool after disabling requests
433+
g_threadpool_http.Interrupt();
522434
}
523435

524436
void StopHTTPServer()
525437
{
526438
LogDebug(BCLog::HTTP, "Stopping HTTP server\n");
527-
if (g_work_queue) {
528-
LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
529-
for (auto& thread : g_thread_http_workers) {
530-
thread.join();
531-
}
532-
g_thread_http_workers.clear();
533-
}
439+
440+
LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
441+
g_threadpool_http.Stop();
442+
534443
// Unlisten sockets, these are what make the event loop running, which means
535444
// that after this and all connections are closed the event loop will quit.
536445
for (evhttp_bound_socket *socket : boundSockets) {
@@ -558,7 +467,6 @@ void StopHTTPServer()
558467
event_base_free(eventBase);
559468
eventBase = nullptr;
560469
}
561-
g_work_queue.reset();
562470
LogDebug(BCLog::HTTP, "Stopped HTTP server\n");
563471
}
564472

src/httpserver.h

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -159,15 +159,6 @@ class HTTPRequest
159159
*/
160160
std::optional<std::string> GetQueryParameterFromUri(const char* uri, const std::string& key);
161161

162-
/** Event handler closure.
163-
*/
164-
class HTTPClosure
165-
{
166-
public:
167-
virtual void operator()() = 0;
168-
virtual ~HTTPClosure() = default;
169-
};
170-
171162
/** Event class. This can be used either as a cross-thread trigger or as a timer.
172163
*/
173164
class HTTPEvent

0 commit comments

Comments
 (0)