-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tag the sqe's to fix the fsm handling a double send cqe #104
base: main-dev
Are you sure you want to change the base?
Changes from 5 commits
91dd6be
6120231
02548db
de7ef58
77f822f
1dfd24f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,6 +89,11 @@ struct connection_t; | |
struct engine_t; | ||
struct automata_t; | ||
|
||
static constexpr std::size_t uring_acpt_tag_k{0}; | ||
static constexpr std::size_t uring_recv_tag_k{1}; | ||
static constexpr std::size_t uring_send_tag_k{2}; | ||
static constexpr std::size_t uring_stat_tag_k{3}; | ||
|
||
enum class stage_t { | ||
waiting_to_accept_k = 0, | ||
expecting_reception_k, | ||
|
@@ -102,6 +107,7 @@ struct completed_event_t { | |
connection_t* connection_ptr{}; | ||
stage_t stage{}; | ||
int result{}; | ||
uint64_t type{}; | ||
}; | ||
|
||
class alignas(align_k) mutex_t { | ||
|
@@ -250,6 +256,7 @@ struct automata_t { | |
connection_t& connection; | ||
stage_t completed_stage{}; | ||
int completed_result{}; | ||
uint64_t type{}; | ||
|
||
void operator()() noexcept; | ||
bool is_corrupted() const noexcept { return completed_result == -EPIPE || completed_result == -EBADF; } | ||
|
@@ -471,6 +478,7 @@ void ucall_take_call(ucall_server_t server, uint16_t thread_idx) { | |
*completed.connection_ptr, | ||
completed.stage, | ||
completed.result, | ||
completed.type, | ||
}; | ||
|
||
// If everything is fine, let automata work in its normal regime. | ||
|
@@ -728,7 +736,7 @@ void automata_t::parse_and_raise_request() noexcept { | |
|
||
auto parsed_request = std::get<parsed_request_t>(parsed_request_or_error); | ||
scratch.is_http = request.size() != parsed_request.body.size(); | ||
scratch.dynamic_packet = parsed_request.body; | ||
scratch.dynamic_packet = {parsed_request.body.data(), parsed_request.json_length}; | ||
if (scratch.dynamic_packet.size() > ram_page_size_k) { | ||
sjd::parser parser; | ||
if (parser.allocate(scratch.dynamic_packet.size(), scratch.dynamic_packet.size() / 2) != sj::SUCCESS) | ||
|
@@ -755,7 +763,9 @@ template <std::size_t max_count_ak> std::size_t engine_t::pop_completed(complete | |
++passed; | ||
if (!uring_cqe->user_data) | ||
continue; | ||
events[completed].connection_ptr = (connection_t*)uring_cqe->user_data; | ||
|
||
events[completed].connection_ptr = (connection_t*)(uring_cqe->user_data & ~0x3); | ||
events[completed].type = uring_cqe->user_data & 0x3; | ||
events[completed].stage = events[completed].connection_ptr->stage; | ||
events[completed].result = uring_cqe->res; | ||
++completed; | ||
|
@@ -790,7 +800,7 @@ bool engine_t::consider_accepting_new_connection() noexcept { | |
uring_sqe = io_uring_get_sqe(&uring); | ||
io_uring_prep_accept_direct(uring_sqe, socket, &connection.client_address, &connection.client_address_len, 0, | ||
IORING_FILE_INDEX_ALLOC); | ||
io_uring_sqe_set_data(uring_sqe, &connection); | ||
io_uring_sqe_set_data(uring_sqe, (void*)(uring_acpt_tag_k | uint64_t(&connection))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
||
// Accepting new connections can be time-less. | ||
// io_uring_sqe_set_flags(uring_sqe, IOSQE_IO_LINK); | ||
|
@@ -820,7 +830,7 @@ void engine_t::submit_stats_heartbeat() noexcept { | |
|
||
uring_sqe = io_uring_get_sqe(&uring); | ||
io_uring_prep_timeout(uring_sqe, &connection.next_wakeup, 0, 0); | ||
io_uring_sqe_set_data(uring_sqe, &connection); | ||
io_uring_sqe_set_data(uring_sqe, (void*)(uring_stat_tag_k | uint64_t(&connection))); | ||
uring_result = io_uring_submit(&uring); | ||
submission_mutex.unlock(); | ||
} | ||
|
@@ -892,7 +902,7 @@ void automata_t::send_next() noexcept { | |
uring_sqe->flags |= IOSQE_FIXED_FILE; | ||
uring_sqe->buf_index = engine.connections.offset_of(connection) * 2u + 1u; | ||
} | ||
io_uring_sqe_set_data(uring_sqe, &connection); | ||
io_uring_sqe_set_data(uring_sqe, (void*)(uring_send_tag_k | uint64_t(&connection))); | ||
io_uring_sqe_set_flags(uring_sqe, 0); | ||
uring_result = io_uring_submit(&engine.uring); | ||
engine.submission_mutex.unlock(); | ||
|
@@ -918,7 +928,7 @@ void automata_t::receive_next() noexcept { | |
uring_sqe = io_uring_get_sqe(&engine.uring); | ||
io_uring_prep_read_fixed(uring_sqe, int(connection.descriptor), (void*)pipes.next_input_address(), | ||
pipes.next_input_length(), 0, engine.connections.offset_of(connection) * 2u); | ||
io_uring_sqe_set_data(uring_sqe, &connection); | ||
io_uring_sqe_set_data(uring_sqe, (void*)(uring_recv_tag_k | uint64_t(&connection))); | ||
io_uring_sqe_set_flags(uring_sqe, IOSQE_IO_LINK); | ||
|
||
// More than other operations this depends on the information coming from the client. | ||
|
@@ -937,11 +947,15 @@ void automata_t::receive_next() noexcept { | |
void automata_t::operator()() noexcept { | ||
|
||
if (is_corrupted()) | ||
return close_gracefully(); | ||
if (connection.stage != stage_t::waiting_to_close_k) | ||
return close_gracefully(); | ||
|
||
switch (connection.stage) { | ||
|
||
case stage_t::waiting_to_accept_k: | ||
if (type != uring_acpt_tag_k) { | ||
return; | ||
} | ||
|
||
if (completed_result == -ECANCELED) { | ||
engine.release_connection(connection); | ||
|
@@ -959,6 +973,9 @@ void automata_t::operator()() noexcept { | |
|
||
case stage_t::expecting_reception_k: | ||
|
||
if (type != uring_recv_tag_k) { | ||
return; | ||
} | ||
// From documentation: | ||
// > If used, the timeout specified in the command will cancel the linked command, | ||
// > unless the linked command completes before the timeout. The timeout will complete | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,10 +78,12 @@ inline std::variant<named_callback_t, default_error_t> find_callback(named_callb | |
bool id_invalid = (id.is_double() && !id.is_int64() && !id.is_uint64()) || id.is_object() || id.is_array(); | ||
if (id_invalid) | ||
return default_error_t{-32600, "The request must have integer or string id."}; | ||
|
||
sj::simdjson_result<sjd::element> method = doc["method"]; | ||
bool method_invalid = !method.is_string(); | ||
if (method_invalid) | ||
return default_error_t{-32600, "The method must be a string."}; | ||
|
||
sj::simdjson_result<sjd::element> params = doc["params"]; | ||
bool params_present_and_invalid = !params.is_array() && !params.is_object() && params.error() == sj::SUCCESS; | ||
if (params_present_and_invalid) | ||
|
@@ -117,6 +119,7 @@ struct parsed_request_t { | |
std::string_view content_type{}; | ||
std::string_view content_length{}; | ||
std::string_view body{}; | ||
int json_length; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be |
||
}; | ||
|
||
/** | ||
|
@@ -166,8 +169,11 @@ inline std::variant<parsed_request_t, default_error_t> split_body_headers(std::s | |
if (pos == std::string_view::npos) | ||
return default_error_t{-32700, "Invalid JSON was received by the server."}; | ||
req.body = body.substr(pos + 4); | ||
} else | ||
auto res = std::from_chars(req.content_length.begin(), req.content_length.end(), req.json_length); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The result of parsing error should be handled. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We now return the same Invalid JSON error. We could add a new code "Invalid HTTP request" for the 2 errors here. |
||
} else { | ||
req.json_length = body.size(); | ||
req.body = body; | ||
} | ||
|
||
return req; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe better to use an
enum class using_verb_t : std::uintptr_t {}
, and later reuse that enum-type instead ofuint64_t type
instage_t
andmutex_t
. Would make things more readable.