1717#include < util/signalinterrupt.h>
1818#include < util/strencodings.h>
1919#include < util/threadnames.h>
20+ #include < util/threadpool.h>
2021#include < util/translation.h>
2122
2223#include < condition_variable>
@@ -49,83 +50,6 @@ using common::InvalidPortErrMsg;
4950/* * Maximum size of http request (request line + headers) */
5051static const size_t MAX_HEADERS_SIZE = 8192 ;
5152
52- /* * HTTP request work item */
53- class HTTPWorkItem final : public HTTPClosure
54- {
55- public:
56- HTTPWorkItem (std::unique_ptr<HTTPRequest> _req, const std::string &_path, const HTTPRequestHandler& _func):
57- req (std::move(_req)), path(_path), func(_func)
58- {
59- }
60- void operator ()() override
61- {
62- func (req.get (), path);
63- }
64-
65- std::unique_ptr<HTTPRequest> req;
66-
67- private:
68- std::string path;
69- HTTPRequestHandler func;
70- };
71-
72- /* * Simple work queue for distributing work over multiple threads.
73- * Work items are simply callable objects.
74- */
75- template <typename WorkItem>
76- class WorkQueue
77- {
78- private:
79- Mutex cs;
80- std::condition_variable cond GUARDED_BY (cs);
81- std::deque<std::unique_ptr<WorkItem>> queue GUARDED_BY (cs);
82- bool running GUARDED_BY (cs){true };
83- const size_t maxDepth;
84-
85- public:
86- explicit WorkQueue (size_t _maxDepth) : maxDepth(_maxDepth)
87- {
88- }
89- /* * Precondition: worker threads have all stopped (they have been joined).
90- */
91- ~WorkQueue () = default ;
92- /* * Enqueue a work item */
93- bool Enqueue (WorkItem* item) EXCLUSIVE_LOCKS_REQUIRED(!cs)
94- {
95- LOCK (cs);
96- if (!running || queue.size () >= maxDepth) {
97- return false ;
98- }
99- queue.emplace_back (std::unique_ptr<WorkItem>(item));
100- cond.notify_one ();
101- return true ;
102- }
103- /* * Thread function */
104- void Run () EXCLUSIVE_LOCKS_REQUIRED(!cs)
105- {
106- while (true ) {
107- std::unique_ptr<WorkItem> i;
108- {
109- WAIT_LOCK (cs, lock);
110- while (running && queue.empty ())
111- cond.wait (lock);
112- if (!running && queue.empty ())
113- break ;
114- i = std::move (queue.front ());
115- queue.pop_front ();
116- }
117- (*i)();
118- }
119- }
120- /* * Interrupt and exit loops */
121- void Interrupt () EXCLUSIVE_LOCKS_REQUIRED(!cs)
122- {
123- LOCK (cs);
124- running = false ;
125- cond.notify_all ();
126- }
127- };
128-
12953struct HTTPPathHandler
13054{
13155 HTTPPathHandler (std::string _prefix, bool _exactMatch, HTTPRequestHandler _handler):
@@ -145,13 +69,14 @@ static struct event_base* eventBase = nullptr;
14569static struct evhttp * eventHTTP = nullptr ;
14670// ! List of subnets to allow RPC connections from
14771static std::vector<CSubNet> rpc_allow_subnets;
148- // ! Work queue for handling longer requests off the event loop thread
149- static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue{nullptr };
15072// ! Handlers for (sub)paths
15173static GlobalMutex g_httppathhandlers_mutex;
15274static std::vector<HTTPPathHandler> pathHandlers GUARDED_BY (g_httppathhandlers_mutex);
15375// ! Bound listening sockets
15476static std::vector<evhttp_bound_socket *> boundSockets;
77+ // ! Http thread pool - future: encapsulate in HttpContext
78+ static ThreadPool g_threadpool_http (" http" );
79+ static int g_max_queue_depth{100 };
15580
15681/* *
15782 * @brief Helps keep track of open `evhttp_connection`s with active `evhttp_requests`
@@ -327,13 +252,13 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
327252
328253 // Dispatch to worker thread
329254 if (i != iend) {
330- std::unique_ptr<HTTPWorkItem> item ( new HTTPWorkItem ( std::move (hreq), path, i-> handler ));
331- assert (g_work_queue);
332- if (g_work_queue-> Enqueue (item .get ())) {
333- item. release (); /* if true, queue took ownership */
255+ if (( int ) g_threadpool_http. WorkQueueSize () < g_max_queue_depth) {
256+ g_threadpool_http. Submit ([req = std::move (hreq), in_path = std::move (path), fn = i-> handler ](){
257+ fn (req .get (), in_path);
258+ });
334259 } else {
335260 LogPrintf (" WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n " );
336- item-> req ->WriteReply (HTTP_SERVICE_UNAVAILABLE, " Work queue depth exceeded" );
261+ hreq ->WriteReply (HTTP_SERVICE_UNAVAILABLE, " Work queue depth exceeded" );
337262 }
338263 } else {
339264 hreq->WriteReply (HTTP_NOT_FOUND);
@@ -412,13 +337,6 @@ static bool HTTPBindAddresses(struct evhttp* http)
412337 return !boundSockets.empty ();
413338}
414339
415- /* * Simple wrapper to set thread name and run work queue */
416- static void HTTPWorkQueueRun (WorkQueue<HTTPClosure>* queue, int worker_num)
417- {
418- util::ThreadRename (strprintf (" httpworker.%i" , worker_num));
419- queue->Run ();
420- }
421-
422340/* * libevent event log callback */
423341static void libevent_log_cb (int severity, const char *msg)
424342{
@@ -477,10 +395,9 @@ bool InitHTTPServer(const util::SignalInterrupt& interrupt)
477395 }
478396
479397 LogDebug (BCLog::HTTP, " Initialized HTTP server\n " );
480- int workQueueDepth = std::max ((long )gArgs .GetIntArg (" -rpcworkqueue" , DEFAULT_HTTP_WORKQUEUE), 1L );
481- LogDebug (BCLog::HTTP, " creating work queue of depth %d\n " , workQueueDepth );
398+ g_max_queue_depth = std::max ((long )gArgs .GetIntArg (" -rpcworkqueue" , DEFAULT_HTTP_WORKQUEUE), 1L );
399+ LogDebug (BCLog::HTTP, " set work queue of depth %d\n " , g_max_queue_depth );
482400
483- g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth);
484401 // transfer ownership to eventBase/HTTP via .release()
485402 eventBase = base_ctr.release ();
486403 eventHTTP = http_ctr.release ();
@@ -496,17 +413,13 @@ void UpdateHTTPServerLogging(bool enable) {
496413}
497414
498415static std::thread g_thread_http;
499- static std::vector<std::thread> g_thread_http_workers;
500416
501417void StartHTTPServer ()
502418{
503419 int rpcThreads = std::max ((long )gArgs .GetIntArg (" -rpcthreads" , DEFAULT_HTTP_THREADS), 1L );
504420 LogInfo (" Starting HTTP server with %d worker threads\n " , rpcThreads);
421+ g_threadpool_http.Start (rpcThreads);
505422 g_thread_http = std::thread (ThreadHTTP, eventBase);
506-
507- for (int i = 0 ; i < rpcThreads; i++) {
508- g_thread_http_workers.emplace_back (HTTPWorkQueueRun, g_work_queue.get (), i);
509- }
510423}
511424
512425void InterruptHTTPServer ()
@@ -516,21 +429,17 @@ void InterruptHTTPServer()
516429 // Reject requests on current connections
517430 evhttp_set_gencb (eventHTTP, http_reject_request_cb, nullptr );
518431 }
519- if (g_work_queue) {
520- g_work_queue->Interrupt ();
521- }
432+ // Interrupt pool after disabling requests
433+ g_threadpool_http.Interrupt ();
522434}
523435
524436void StopHTTPServer ()
525437{
526438 LogDebug (BCLog::HTTP, " Stopping HTTP server\n " );
527- if (g_work_queue) {
528- LogDebug (BCLog::HTTP, " Waiting for HTTP worker threads to exit\n " );
529- for (auto & thread : g_thread_http_workers) {
530- thread.join ();
531- }
532- g_thread_http_workers.clear ();
533- }
439+
440+ LogDebug (BCLog::HTTP, " Waiting for HTTP worker threads to exit\n " );
441+ g_threadpool_http.Stop ();
442+
534443 // Unlisten sockets, these are what make the event loop running, which means
535444 // that after this and all connections are closed the event loop will quit.
536445 for (evhttp_bound_socket *socket : boundSockets) {
@@ -558,7 +467,6 @@ void StopHTTPServer()
558467 event_base_free (eventBase);
559468 eventBase = nullptr ;
560469 }
561- g_work_queue.reset ();
562470 LogDebug (BCLog::HTTP, " Stopped HTTP server\n " );
563471}
564472
0 commit comments