Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interface improvements #80

Merged
merged 9 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions examples/login/fastapi_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python3
import httpx
import os
import struct
import random
Expand All @@ -17,7 +18,7 @@

class ClientREST:

def __init__(self, uri: str = '127.0.0.1', port: int = 8000, identity: int = PROCESS_ID) -> None:
def __init__(self, uri: str = '127.0.0.1', port: int = 8545, identity: int = PROCESS_ID) -> None:
self.identity = identity
self.url = f'http://{uri}:{port}/'

Expand All @@ -26,11 +27,28 @@ def __call__(self, *, a: Optional[int] = None, b: Optional[int] = None) -> int:
b = random.randint(1, 1000) if b is None else b
result = requests.get(
f'{self.url}validate_session?user_id={a}&session_id={b}').text
c = int(result)
c = False if result == 'false' else True
assert ((a ^ b) % 23 == 0) == c, 'Wrong Answer'
return c


class ClientTLSREST:

def __init__(self, uri: str = '127.0.0.1', port: int = 8545, identity: int = PROCESS_ID) -> None:
self.identity = identity
self.url = f'https://{uri}:{port}/'

def __call__(self, *, a: Optional[int] = None, b: Optional[int] = None) -> int:
with httpx.Client(verify=False) as client:
a = random.randint(1, 1000) if a is None else a
b = random.randint(1, 1000) if b is None else b
result = client.get(
f'{self.url}validate_session?user_id={a}&session_id={b}').text
c = False if result == 'false' else True
assert ((a ^ b) % 23 == 0) == c, 'Wrong Answer'
return c


class ClientRESTReddit:

def __init__(self, uri: str = '127.0.0.1', port: int = 8000, identity: int = PROCESS_ID) -> None:
Expand Down
52 changes: 52 additions & 0 deletions examples/login/jsonrpc_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import httpx
import os
import json
import errno
Expand Down Expand Up @@ -153,6 +154,57 @@ def recv(self):
return self.response.json


class CaseTLSNoReuse:
"""JSON-RPC Client that operates directly over TCP/IPv4 stack, without HTTP"""

def __init__(self, uri: str = '127.0.0.1', port: int = 8545, identity: int = PROCESS_ID) -> None:
self.identity = identity
self.expected = -1
self.client = ClientTLS(uri, port, allow_self_signed=True)
self.response = None

def __call__(self) -> str:
self.send()
res = self.recv()
self.client.sock.close()
self.client.sock = None
return res

def send(self):
user_id = random.randint(1, 1000)
session_id = random.randint(1, 1000)
self.expected = ((user_id ^ session_id) % 23) == 0

self.response = self.client.validate_session(
user_id=user_id, session_id=session_id)

def recv(self):
assert self.response.json == self.expected
return self.response.json


class CaseTLSNoReuseX:

def __init__(self, uri: str = '127.0.0.1', port: int = 8545, identity: int = PROCESS_ID) -> None:
self.identity = identity
self.url = f'https://{uri}:{port}/'

def __call__(self, *, a: Optional[int] = None, b: Optional[int] = None) -> int:

with httpx.Client(verify=False) as client:
a = random.randint(1, 1000) if a is None else a
b = random.randint(1, 1000) if b is None else b
jsonrpc = {"jsonrpc": "2.0", "id": self.identity,
"method": "validate_session", "params": {"user_id": a, "session_id": b}}
result = client.post(
f'{self.url}', json=jsonrpc,
headers={"Connection": "close"},
)
result = result.text
c = False if result == 'false' else True
return c


class CaseTCPHTTP:
"""JSON-RPC Client that operates directly over TCP/IPv4 stack, with HTTP"""

Expand Down
22 changes: 22 additions & 0 deletions examples/login/ucall_server_rest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ static bool get_param_int_from_path(ucall_call_t call, char const* name, int64_t
}

static void validate_session(ucall_call_t call, ucall_callback_tag_t) {

int64_t a{}, b{};
char c_str[256]{};

bool got_a = ucall_param_named_i64(call, "user_id", 0, &a);
bool got_b = get_param_int_from_path(call, "session_id", &b);
if (!got_a || !got_b)
Expand All @@ -36,6 +38,24 @@ static void validate_session(ucall_call_t call, ucall_callback_tag_t) {
ucall_call_reply_content(call, res, strlen(res));
}

static void upload_binary(ucall_call_t call, ucall_callback_tag_t) {
char const* user_agent = nullptr;
char const* binary = nullptr;
char const* path = nullptr;
size_t user_agent_len = 0;
size_t binary_len = 0;
size_t path_len = 0;

bool got_path = ucall_param_named_str(call, "path", 0, &path, &path_len);
bool got_bin = ucall_get_request_body(call, &binary, &binary_len);
bool got_head = ucall_get_request_header(call, "User-Agent", 0, &user_agent, &user_agent_len);
if (!got_bin || !got_head)
return ucall_call_reply_error_invalid_params(call);

std::string res = "{\"uploaded_binary_size\": " + std::to_string(binary_len) + "}";
ucall_call_reply_content(call, res.data(), res.size());
}

static void get_books(ucall_call_t call, ucall_callback_tag_t punned_books) noexcept {
auto* books = reinterpret_cast<std::map<std::size_t, std::string>*>(punned_books);
std::string response = "{ \"books\": [";
Expand Down Expand Up @@ -139,6 +159,8 @@ int main(int argc, char** argv) {

ucall_add_procedure(server, "/validate_session/{session_id}", &validate_session, request_type_t::get_k, nullptr);

ucall_add_procedure(server, "/upload_binary/{path}", &upload_binary, request_type_t::put_k, nullptr);

if (config.max_threads > 1) {
std::vector<std::thread> threads;
for (uint16_t i = 0; i != config.max_threads; ++i)
Expand Down
40 changes: 24 additions & 16 deletions include/ucall/ucall.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,35 +147,43 @@ void ucall_take_call(ucall_server_t server, uint16_t thread_idx);
*/
void ucall_take_calls(ucall_server_t server, uint16_t thread_idx);

bool ucall_param_named_bool( //
ucall_call_t call, //
ucall_str_t json_pointer, //
size_t json_pointer_length, //
bool ucall_param_named_bool( //
ucall_call_t call, //
ucall_str_t param_name, //
size_t param_name_length, //
bool* output);

bool ucall_param_named_i64( //
ucall_call_t call, //
ucall_str_t json_pointer, //
size_t json_pointer_length, //
bool ucall_param_named_i64( //
ucall_call_t call, //
ucall_str_t param_name, //
size_t param_name_length, //
int64_t* output);

bool ucall_param_named_f64( //
ucall_call_t call, //
ucall_str_t json_pointer, //
size_t json_pointer_length, //
bool ucall_param_named_f64( //
ucall_call_t call, //
ucall_str_t param_name, //
size_t param_name_length, //
double* output);

bool ucall_param_named_str( //
ucall_call_t call, //
ucall_str_t json_pointer, //
size_t json_pointer_length, //
bool ucall_param_named_str( //
ucall_call_t call, //
ucall_str_t param_name, //
size_t param_name_length, //
ucall_str_t* output, size_t* output_length);

bool ucall_param_positional_bool(ucall_call_t, size_t, bool*);
bool ucall_param_positional_i64(ucall_call_t, size_t, int64_t*);
bool ucall_param_positional_f64(ucall_call_t, size_t, double*);
bool ucall_param_positional_str(ucall_call_t, size_t, ucall_str_t*, size_t*);

bool ucall_get_request_header(ucall_call_t call, //
ucall_str_t header_name, //
size_t header_name_length, //
ucall_str_t* output, size_t* output_length);

bool ucall_get_request_body(ucall_call_t call, //
ucall_str_t* output, size_t* output_length);

/**
* @param call Encapsulates the context and the arguments of the current request.
* @param json_reply The response to send, which must be a valid JSON string.
Expand Down
10 changes: 5 additions & 5 deletions src/headers/automata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void automata_t::operator()() noexcept {
// If we have reached the end of the stream,
// it is time to analyze the contents
// and send back a response.
connection.decrypt();
connection.decrypt(completed_result);
if (connection.protocol.is_input_complete(connection.pipes.input_span())) {
server.engine.raise_request(connection.pipes, connection.protocol, this);

Expand Down Expand Up @@ -165,10 +165,10 @@ void automata_t::operator()() noexcept {
connection.pipes.mark_submitted_outputs(completed_result);
if (!connection.pipes.has_remaining_outputs()) {
connection.exchanges++;
// if (connection.exchanges >= server.max_lifetime_exchanges) TODO Why?
// return close_gracefully();
// else
return receive_next();
if (connection.must_close())
return close_gracefully();
else
return receive_next();
} else {
connection.pipes.prepare_more_outputs();
return send_next();
Expand Down
19 changes: 19 additions & 0 deletions src/headers/backend_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,23 @@ bool ucall_param_positional_str(ucall_call_t call, size_t position, ucall_str_t*
return false;
}

bool ucall_get_request_header(ucall_call_t call, ucall_str_t header_name, size_t header_name_length,
ucall_str_t* output, size_t* output_length) {
unum::ucall::automata_t& automata = *reinterpret_cast<unum::ucall::automata_t*>(call);
size_t name_len = unum::ucall::string_length(header_name, header_name_length);
std::string_view res = automata.get_protocol().get_header({header_name, name_len});
if (res.size() == 0)
return false;
*output = res.data();
*output_length = res.size();
return true;
}

bool ucall_get_request_body(ucall_call_t call, ucall_str_t* output, size_t* output_length) {
unum::ucall::automata_t& automata = *reinterpret_cast<unum::ucall::automata_t*>(call);
std::string_view body = automata.get_protocol().get_content();
*output = body.data();
*output_length = body.size();
return true;
}
#pragma endregion
27 changes: 22 additions & 5 deletions src/headers/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,15 @@ struct connection_t {
bool expired() const noexcept {
return std::chrono::high_resolution_clock::now().time_since_epoch().count() - last_active_ns >
max_inactive_duration_ns_k;
};
}

bool is_ready() const noexcept { return tls_ctx == nullptr || ptls_handshake_is_complete(tls_ctx); }

bool must_close() const noexcept {
auto conn = protocol.get_header("Connection");
return conn == "Close" || conn == "close";
}

bool prepare_step() noexcept {
if (is_ready())
return true;
Expand Down Expand Up @@ -109,15 +114,27 @@ struct connection_t {
}
}

void decrypt() noexcept {
void decrypt(size_t received_amount) noexcept {
if (tls_ctx == nullptr || !ptls_handshake_is_complete(tls_ctx))
return;

work_buf.off = 0;
int res = 0;
size_t in_len = pipes.input_span().size();
int res = ptls_receive(tls_ctx, &work_buf, pipes.input_span().data(), &in_len);
if (res != -1) {
pipes.release_inputs();
void const* input = pipes.input_span().data();
if (received_amount != in_len) {
input = static_cast<char const*>(input) + (in_len - received_amount);
in_len = received_amount;
}
while (in_len != 0 && res != -1) {
size_t consumed = in_len;
res = ptls_receive(tls_ctx, &work_buf, input, &consumed);
in_len -= consumed;
input = static_cast<char const*>(input) + consumed;
}
if (res != -1 && work_buf.off > 0) {
printf("WB: %i\n", work_buf.off);
pipes.drop_last_input(received_amount);
std::memcpy(pipes.next_input_address(), work_buf.base, work_buf.off);
pipes.absorb_input(work_buf.off);
}
Expand Down
10 changes: 10 additions & 0 deletions src/headers/containers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ class exchange_pipes_t {
input_.dynamic.reset();
input_.embedded_used = 0;
}

void release_current_input() noexcept { input_.embedded_used = 0; }

void release_outputs() noexcept {
output_.dynamic.reset();
output_.embedded_used = 0;
Expand All @@ -243,6 +246,13 @@ class exchange_pipes_t {
std::memmove(input_.embedded, input_.embedded + cnt, input_.embedded_used);
}

void drop_last_input(size_t cnt) noexcept {
if (input_.dynamic.size() >= cnt)
input_.dynamic.pop_back(cnt);
else if (input_.embedded_used >= cnt)
input_.embedded_used -= cnt;
}

bool shift_input_to_dynamic() noexcept {
if (!input_.dynamic.append_n(input_.embedded, input_.embedded_used))
return false;
Expand Down
Loading