From 91dd6bee03457fd48abdc9f8b76d14a95673b506 Mon Sep 17 00:00:00 2001 From: Mark Reed Date: Mon, 13 May 2024 23:00:15 +0000 Subject: [PATCH 1/6] Tag the sqe's to fix the fsm handling a double send cqe --- src/engine_uring.cpp | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/src/engine_uring.cpp b/src/engine_uring.cpp index bbccba8..aaad59b 100644 --- a/src/engine_uring.cpp +++ b/src/engine_uring.cpp @@ -69,6 +69,7 @@ #include "helpers/reply.hpp" #include "helpers/shared.hpp" + #pragma region Cpp Declaration namespace sj = simdjson; @@ -89,6 +90,11 @@ struct connection_t; struct engine_t; struct automata_t; +static constexpr std::size_t uring_recv_tag_k{1}; +static constexpr std::size_t uring_send_tag_k{2}; +static constexpr std::size_t uring_stat_tag_k{3}; +static constexpr std::size_t uring_acpt_tag_k{4}; + enum class stage_t { waiting_to_accept_k = 0, expecting_reception_k, @@ -102,6 +108,7 @@ struct completed_event_t { connection_t* connection_ptr{}; stage_t stage{}; int result{}; + uint64_t type{}; }; class alignas(align_k) mutex_t { @@ -250,6 +257,7 @@ struct automata_t { connection_t& connection; stage_t completed_stage{}; int completed_result{}; + uint64_t type{}; void operator()() noexcept; bool is_corrupted() const noexcept { return completed_result == -EPIPE || completed_result == -EBADF; } @@ -471,6 +479,7 @@ void ucall_take_call(ucall_server_t server, uint16_t thread_idx) { *completed.connection_ptr, completed.stage, completed.result, + completed.type, }; // If everything is fine, let automata work in its normal regime. @@ -755,7 +764,9 @@ template std::size_t engine_t::pop_completed(complete ++passed; if (!uring_cqe->user_data) continue; - events[completed].connection_ptr = (connection_t*)uring_cqe->user_data; + + events[completed].connection_ptr = (connection_t*)(uring_cqe->user_data & 0x0fffffffffffffff); + events[completed].type = (uring_cqe->user_data >> 60) & 0xF;// & 0xf000000000000000; events[completed].stage = events[completed].connection_ptr->stage; events[completed].result = uring_cqe->res; ++completed; @@ -790,7 +801,7 @@ bool engine_t::consider_accepting_new_connection() noexcept { uring_sqe = io_uring_get_sqe(&uring); io_uring_prep_accept_direct(uring_sqe, socket, &connection.client_address, &connection.client_address_len, 0, IORING_FILE_INDEX_ALLOC); - io_uring_sqe_set_data(uring_sqe, &connection); + io_uring_sqe_set_data(uring_sqe, (void*)((uring_acpt_tag_k<<60) | uint64_t(&connection))); // Accepting new connections can be time-less. // io_uring_sqe_set_flags(uring_sqe, IOSQE_IO_LINK); @@ -820,7 +831,8 @@ void engine_t::submit_stats_heartbeat() noexcept { uring_sqe = io_uring_get_sqe(&uring); io_uring_prep_timeout(uring_sqe, &connection.next_wakeup, 0, 0); - io_uring_sqe_set_data(uring_sqe, &connection); + //io_uring_sqe_set_data(uring_sqe, &connection); + io_uring_sqe_set_data(uring_sqe, (void*)((uring_stat_tag_k<<60) | uint64_t(&connection))); uring_result = io_uring_submit(&uring); submission_mutex.unlock(); } @@ -892,7 +904,7 @@ void automata_t::send_next() noexcept { uring_sqe->flags |= IOSQE_FIXED_FILE; uring_sqe->buf_index = engine.connections.offset_of(connection) * 2u + 1u; } - io_uring_sqe_set_data(uring_sqe, &connection); + io_uring_sqe_set_data(uring_sqe, (void*)((uring_send_tag_k<<60) | uint64_t(&connection))); io_uring_sqe_set_flags(uring_sqe, 0); uring_result = io_uring_submit(&engine.uring); engine.submission_mutex.unlock(); @@ -918,7 +930,7 @@ void automata_t::receive_next() noexcept { uring_sqe = io_uring_get_sqe(&engine.uring); io_uring_prep_read_fixed(uring_sqe, int(connection.descriptor), (void*)pipes.next_input_address(), pipes.next_input_length(), 0, engine.connections.offset_of(connection) * 2u); - io_uring_sqe_set_data(uring_sqe, &connection); + io_uring_sqe_set_data(uring_sqe, (void*)((uring_recv_tag_k<<60) | uint64_t(&connection))); io_uring_sqe_set_flags(uring_sqe, IOSQE_IO_LINK); // More than other operations this depends on the information coming from the client. @@ -942,6 +954,9 @@ void automata_t::operator()() noexcept { switch (connection.stage) { case stage_t::waiting_to_accept_k: + if ( type != uring_acpt_tag_k ) { + return; + } if (completed_result == -ECANCELED) { engine.release_connection(connection); @@ -959,6 +974,9 @@ void automata_t::operator()() noexcept { case stage_t::expecting_reception_k: + if ( type != uring_recv_tag_k ) { + return; + } // From documentation: // > If used, the timeout specified in the command will cancel the linked command, // > unless the linked command completes before the timeout. The timeout will complete @@ -982,6 +1000,7 @@ void automata_t::operator()() noexcept { if (completed_result == 0) { connection.empty_transmits++; return should_release() ? close_gracefully() : receive_next(); + //return close_gracefully(); } // Absorb the arrived data. From 6120231335cc607316a55b0756d4d99396a2a666 Mon Sep 17 00:00:00 2001 From: Mark Reed Date: Mon, 13 May 2024 23:02:41 +0000 Subject: [PATCH 2/6] Tag the sqe's to fix the fsm handling a double send cqe --- src/engine_uring.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/engine_uring.cpp b/src/engine_uring.cpp index aaad59b..f5a6047 100644 --- a/src/engine_uring.cpp +++ b/src/engine_uring.cpp @@ -69,7 +69,6 @@ #include "helpers/reply.hpp" #include "helpers/shared.hpp" - #pragma region Cpp Declaration namespace sj = simdjson; @@ -831,7 +830,6 @@ void engine_t::submit_stats_heartbeat() noexcept { uring_sqe = io_uring_get_sqe(&uring); io_uring_prep_timeout(uring_sqe, &connection.next_wakeup, 0, 0); - //io_uring_sqe_set_data(uring_sqe, &connection); io_uring_sqe_set_data(uring_sqe, (void*)((uring_stat_tag_k<<60) | uint64_t(&connection))); uring_result = io_uring_submit(&uring); submission_mutex.unlock(); @@ -1000,7 +998,6 @@ void automata_t::operator()() noexcept { if (completed_result == 0) { connection.empty_transmits++; return should_release() ? close_gracefully() : receive_next(); - //return close_gracefully(); } // Absorb the arrived data. From 02548db53384e7e4dabe093b5610c7493e855bfb Mon Sep 17 00:00:00 2001 From: Mark Reed Date: Mon, 13 May 2024 23:49:25 +0000 Subject: [PATCH 3/6] Fix attempting to close an already closed connection --- src/engine_uring.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/engine_uring.cpp b/src/engine_uring.cpp index f5a6047..e9420a1 100644 --- a/src/engine_uring.cpp +++ b/src/engine_uring.cpp @@ -947,7 +947,8 @@ void automata_t::receive_next() noexcept { void automata_t::operator()() noexcept { if (is_corrupted()) - return close_gracefully(); + if ( connection.stage != stage_t::waiting_to_close_k ) + return close_gracefully(); switch (connection.stage) { From de7ef580f48489dba48d1205fd29fc41c20ca71e Mon Sep 17 00:00:00 2001 From: Mark Reed Date: Tue, 21 May 2024 11:12:46 -0700 Subject: [PATCH 4/6] Fix the full request followed by partial --- src/engine_uring.cpp | 2 +- src/helpers/parse.hpp | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/engine_uring.cpp b/src/engine_uring.cpp index e9420a1..a3c1fa4 100644 --- a/src/engine_uring.cpp +++ b/src/engine_uring.cpp @@ -736,7 +736,7 @@ void automata_t::parse_and_raise_request() noexcept { auto parsed_request = std::get(parsed_request_or_error); scratch.is_http = request.size() != parsed_request.body.size(); - scratch.dynamic_packet = parsed_request.body; + scratch.dynamic_packet = {parsed_request.body.data(), parsed_request.json_length}; if (scratch.dynamic_packet.size() > ram_page_size_k) { sjd::parser parser; if (parser.allocate(scratch.dynamic_packet.size(), scratch.dynamic_packet.size() / 2) != sj::SUCCESS) diff --git a/src/helpers/parse.hpp b/src/helpers/parse.hpp index 3990a43..3c58443 100644 --- a/src/helpers/parse.hpp +++ b/src/helpers/parse.hpp @@ -67,6 +67,7 @@ inline std::variant find_callback(named_callb if (!doc.is_object()) return default_error_t{-32600, "The JSON sent is not a valid request object."}; + // We don't support JSON-RPC before version 2.0. sj::simdjson_result version = doc["jsonrpc"]; if (!version.is_string() || version.get_string().value_unsafe() != "2.0") @@ -78,10 +79,12 @@ inline std::variant find_callback(named_callb bool id_invalid = (id.is_double() && !id.is_int64() && !id.is_uint64()) || id.is_object() || id.is_array(); if (id_invalid) return default_error_t{-32600, "The request must have integer or string id."}; + sj::simdjson_result method = doc["method"]; bool method_invalid = !method.is_string(); if (method_invalid) return default_error_t{-32600, "The method must be a string."}; + sj::simdjson_result params = doc["params"]; bool params_present_and_invalid = !params.is_array() && !params.is_object() && params.error() == sj::SUCCESS; if (params_present_and_invalid) @@ -117,6 +120,7 @@ struct parsed_request_t { std::string_view content_type{}; std::string_view content_length{}; std::string_view body{}; + int json_length; }; /** @@ -166,8 +170,11 @@ inline std::variant split_body_headers(std::s if (pos == std::string_view::npos) return default_error_t{-32700, "Invalid JSON was received by the server."}; req.body = body.substr(pos + 4); - } else + auto res = std::from_chars(req.content_length.begin(), req.content_length.end(), req.json_length); + } else { + req.json_length = body.size(); req.body = body; + } return req; } From 77f822f85a7ae406a2e9a527048c3630ef0e5763 Mon Sep 17 00:00:00 2001 From: Mark Reed Date: Wed, 22 May 2024 12:05:57 -0700 Subject: [PATCH 5/6] Tag the 2 lower bits, formatting --- src/engine_uring.cpp | 20 ++++++++++---------- src/helpers/parse.hpp | 1 - 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/engine_uring.cpp b/src/engine_uring.cpp index a3c1fa4..195dfdc 100644 --- a/src/engine_uring.cpp +++ b/src/engine_uring.cpp @@ -89,10 +89,10 @@ struct connection_t; struct engine_t; struct automata_t; +static constexpr std::size_t uring_acpt_tag_k{0}; static constexpr std::size_t uring_recv_tag_k{1}; static constexpr std::size_t uring_send_tag_k{2}; static constexpr std::size_t uring_stat_tag_k{3}; -static constexpr std::size_t uring_acpt_tag_k{4}; enum class stage_t { waiting_to_accept_k = 0, @@ -764,8 +764,8 @@ template std::size_t engine_t::pop_completed(complete if (!uring_cqe->user_data) continue; - events[completed].connection_ptr = (connection_t*)(uring_cqe->user_data & 0x0fffffffffffffff); - events[completed].type = (uring_cqe->user_data >> 60) & 0xF;// & 0xf000000000000000; + events[completed].connection_ptr = (connection_t*)(uring_cqe->user_data & ~0x3); + events[completed].type = uring_cqe->user_data & 0x3; events[completed].stage = events[completed].connection_ptr->stage; events[completed].result = uring_cqe->res; ++completed; @@ -800,7 +800,7 @@ bool engine_t::consider_accepting_new_connection() noexcept { uring_sqe = io_uring_get_sqe(&uring); io_uring_prep_accept_direct(uring_sqe, socket, &connection.client_address, &connection.client_address_len, 0, IORING_FILE_INDEX_ALLOC); - io_uring_sqe_set_data(uring_sqe, (void*)((uring_acpt_tag_k<<60) | uint64_t(&connection))); + io_uring_sqe_set_data(uring_sqe, (void*)(uring_acpt_tag_k | uint64_t(&connection))); // Accepting new connections can be time-less. // io_uring_sqe_set_flags(uring_sqe, IOSQE_IO_LINK); @@ -830,7 +830,7 @@ void engine_t::submit_stats_heartbeat() noexcept { uring_sqe = io_uring_get_sqe(&uring); io_uring_prep_timeout(uring_sqe, &connection.next_wakeup, 0, 0); - io_uring_sqe_set_data(uring_sqe, (void*)((uring_stat_tag_k<<60) | uint64_t(&connection))); + io_uring_sqe_set_data(uring_sqe, (void*)(uring_stat_tag_k | uint64_t(&connection))); uring_result = io_uring_submit(&uring); submission_mutex.unlock(); } @@ -902,7 +902,7 @@ void automata_t::send_next() noexcept { uring_sqe->flags |= IOSQE_FIXED_FILE; uring_sqe->buf_index = engine.connections.offset_of(connection) * 2u + 1u; } - io_uring_sqe_set_data(uring_sqe, (void*)((uring_send_tag_k<<60) | uint64_t(&connection))); + io_uring_sqe_set_data(uring_sqe, (void*)(uring_send_tag_k | uint64_t(&connection))); io_uring_sqe_set_flags(uring_sqe, 0); uring_result = io_uring_submit(&engine.uring); engine.submission_mutex.unlock(); @@ -928,7 +928,7 @@ void automata_t::receive_next() noexcept { uring_sqe = io_uring_get_sqe(&engine.uring); io_uring_prep_read_fixed(uring_sqe, int(connection.descriptor), (void*)pipes.next_input_address(), pipes.next_input_length(), 0, engine.connections.offset_of(connection) * 2u); - io_uring_sqe_set_data(uring_sqe, (void*)((uring_recv_tag_k<<60) | uint64_t(&connection))); + io_uring_sqe_set_data(uring_sqe, (void*)(uring_recv_tag_k | uint64_t(&connection))); io_uring_sqe_set_flags(uring_sqe, IOSQE_IO_LINK); // More than other operations this depends on the information coming from the client. @@ -947,13 +947,13 @@ void automata_t::receive_next() noexcept { void automata_t::operator()() noexcept { if (is_corrupted()) - if ( connection.stage != stage_t::waiting_to_close_k ) + if (connection.stage != stage_t::waiting_to_close_k) return close_gracefully(); switch (connection.stage) { case stage_t::waiting_to_accept_k: - if ( type != uring_acpt_tag_k ) { + if (type != uring_acpt_tag_k) { return; } @@ -973,7 +973,7 @@ void automata_t::operator()() noexcept { case stage_t::expecting_reception_k: - if ( type != uring_recv_tag_k ) { + if (type != uring_recv_tag_k) { return; } // From documentation: diff --git a/src/helpers/parse.hpp b/src/helpers/parse.hpp index 3c58443..0e3a899 100644 --- a/src/helpers/parse.hpp +++ b/src/helpers/parse.hpp @@ -67,7 +67,6 @@ inline std::variant find_callback(named_callb if (!doc.is_object()) return default_error_t{-32600, "The JSON sent is not a valid request object."}; - // We don't support JSON-RPC before version 2.0. sj::simdjson_result version = doc["jsonrpc"]; if (!version.is_string() || version.get_string().value_unsafe() != "2.0") From 1dfd24fe397c03ed348adb0059de0bb069d9150a Mon Sep 17 00:00:00 2001 From: Mark Reed Date: Wed, 22 May 2024 15:08:37 -0700 Subject: [PATCH 6/6] Review changes --- src/engine_uring.cpp | 32 +++++++++++++++++++------------- src/helpers/parse.hpp | 4 +++- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/engine_uring.cpp b/src/engine_uring.cpp index 195dfdc..c243525 100644 --- a/src/engine_uring.cpp +++ b/src/engine_uring.cpp @@ -89,10 +89,12 @@ struct connection_t; struct engine_t; struct automata_t; -static constexpr std::size_t uring_acpt_tag_k{0}; -static constexpr std::size_t uring_recv_tag_k{1}; -static constexpr std::size_t uring_send_tag_k{2}; -static constexpr std::size_t uring_stat_tag_k{3}; +enum class uring_op_tag_t : std::uintptr_t { + uring_acpt_tag_k = 0, + uring_recv_tag_k, + uring_send_tag_k, + uring_stat_tag_k, // Max 2 bits +}; enum class stage_t { waiting_to_accept_k = 0, @@ -107,7 +109,7 @@ struct completed_event_t { connection_t* connection_ptr{}; stage_t stage{}; int result{}; - uint64_t type{}; + uring_op_tag_t type{}; }; class alignas(align_k) mutex_t { @@ -256,7 +258,7 @@ struct automata_t { connection_t& connection; stage_t completed_stage{}; int completed_result{}; - uint64_t type{}; + uring_op_tag_t type{}; void operator()() noexcept; bool is_corrupted() const noexcept { return completed_result == -EPIPE || completed_result == -EBADF; } @@ -765,7 +767,7 @@ template std::size_t engine_t::pop_completed(complete continue; events[completed].connection_ptr = (connection_t*)(uring_cqe->user_data & ~0x3); - events[completed].type = uring_cqe->user_data & 0x3; + events[completed].type = static_cast(uring_cqe->user_data & 0x3); events[completed].stage = events[completed].connection_ptr->stage; events[completed].result = uring_cqe->res; ++completed; @@ -800,7 +802,8 @@ bool engine_t::consider_accepting_new_connection() noexcept { uring_sqe = io_uring_get_sqe(&uring); io_uring_prep_accept_direct(uring_sqe, socket, &connection.client_address, &connection.client_address_len, 0, IORING_FILE_INDEX_ALLOC); - io_uring_sqe_set_data(uring_sqe, (void*)(uring_acpt_tag_k | uint64_t(&connection))); + io_uring_sqe_set_data(uring_sqe, (void*)(static_cast(uring_op_tag_t::uring_acpt_tag_k) | + (std::uintptr_t)(&connection))); // Accepting new connections can be time-less. // io_uring_sqe_set_flags(uring_sqe, IOSQE_IO_LINK); @@ -830,7 +833,8 @@ void engine_t::submit_stats_heartbeat() noexcept { uring_sqe = io_uring_get_sqe(&uring); io_uring_prep_timeout(uring_sqe, &connection.next_wakeup, 0, 0); - io_uring_sqe_set_data(uring_sqe, (void*)(uring_stat_tag_k | uint64_t(&connection))); + io_uring_sqe_set_data(uring_sqe, (void*)(static_cast(uring_op_tag_t::uring_stat_tag_k) | + (std::uintptr_t)(&connection))); uring_result = io_uring_submit(&uring); submission_mutex.unlock(); } @@ -902,7 +906,8 @@ void automata_t::send_next() noexcept { uring_sqe->flags |= IOSQE_FIXED_FILE; uring_sqe->buf_index = engine.connections.offset_of(connection) * 2u + 1u; } - io_uring_sqe_set_data(uring_sqe, (void*)(uring_send_tag_k | uint64_t(&connection))); + io_uring_sqe_set_data(uring_sqe, (void*)(static_cast(uring_op_tag_t::uring_send_tag_k) | + (std::uintptr_t)(&connection))); io_uring_sqe_set_flags(uring_sqe, 0); uring_result = io_uring_submit(&engine.uring); engine.submission_mutex.unlock(); @@ -928,7 +933,8 @@ void automata_t::receive_next() noexcept { uring_sqe = io_uring_get_sqe(&engine.uring); io_uring_prep_read_fixed(uring_sqe, int(connection.descriptor), (void*)pipes.next_input_address(), pipes.next_input_length(), 0, engine.connections.offset_of(connection) * 2u); - io_uring_sqe_set_data(uring_sqe, (void*)(uring_recv_tag_k | uint64_t(&connection))); + io_uring_sqe_set_data(uring_sqe, (void*)(static_cast(uring_op_tag_t::uring_recv_tag_k) | + (std::uintptr_t)(&connection))); io_uring_sqe_set_flags(uring_sqe, IOSQE_IO_LINK); // More than other operations this depends on the information coming from the client. @@ -953,7 +959,7 @@ void automata_t::operator()() noexcept { switch (connection.stage) { case stage_t::waiting_to_accept_k: - if (type != uring_acpt_tag_k) { + if (type != uring_op_tag_t::uring_acpt_tag_k) { return; } @@ -973,7 +979,7 @@ void automata_t::operator()() noexcept { case stage_t::expecting_reception_k: - if (type != uring_recv_tag_k) { + if (type != uring_op_tag_t::uring_recv_tag_k) { return; } // From documentation: diff --git a/src/helpers/parse.hpp b/src/helpers/parse.hpp index 0e3a899..35c4dc5 100644 --- a/src/helpers/parse.hpp +++ b/src/helpers/parse.hpp @@ -119,7 +119,7 @@ struct parsed_request_t { std::string_view content_type{}; std::string_view content_length{}; std::string_view body{}; - int json_length; + std::size_t json_length; }; /** @@ -170,6 +170,8 @@ inline std::variant split_body_headers(std::s return default_error_t{-32700, "Invalid JSON was received by the server."}; req.body = body.substr(pos + 4); auto res = std::from_chars(req.content_length.begin(), req.content_length.end(), req.json_length); + if (res.ec == std::errc::invalid_argument) + return default_error_t{-32700, "Invalid JSON was received by the server."}; } else { req.json_length = body.size(); req.body = body;