Skip to content

Commit

Permalink
Merge pull request #80 from unum-cloud/Interface-improvements
Browse files Browse the repository at this point in the history
Interface improvements
  • Loading branch information
ashvardanian authored Sep 7, 2023
2 parents 6dc09be + 07a08cc commit 2841bd3
Show file tree
Hide file tree
Showing 14 changed files with 331 additions and 128 deletions.
22 changes: 20 additions & 2 deletions examples/login/fastapi_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python3
import httpx
import os
import struct
import random
Expand All @@ -17,7 +18,7 @@

class ClientREST:

def __init__(self, uri: str = '127.0.0.1', port: int = 8000, identity: int = PROCESS_ID) -> None:
def __init__(self, uri: str = '127.0.0.1', port: int = 8545, identity: int = PROCESS_ID) -> None:
self.identity = identity
self.url = f'http://{uri}:{port}/'

Expand All @@ -26,11 +27,28 @@ def __call__(self, *, a: Optional[int] = None, b: Optional[int] = None) -> int:
b = random.randint(1, 1000) if b is None else b
result = requests.get(
f'{self.url}validate_session?user_id={a}&session_id={b}').text
c = int(result)
c = False if result == 'false' else True
assert ((a ^ b) % 23 == 0) == c, 'Wrong Answer'
return c


class ClientTLSREST:

def __init__(self, uri: str = '127.0.0.1', port: int = 8545, identity: int = PROCESS_ID) -> None:
self.identity = identity
self.url = f'https://{uri}:{port}/'

def __call__(self, *, a: Optional[int] = None, b: Optional[int] = None) -> int:
with httpx.Client(verify=False) as client:
a = random.randint(1, 1000) if a is None else a
b = random.randint(1, 1000) if b is None else b
result = client.get(
f'{self.url}validate_session?user_id={a}&session_id={b}').text
c = False if result == 'false' else True
assert ((a ^ b) % 23 == 0) == c, 'Wrong Answer'
return c


class ClientRESTReddit:

def __init__(self, uri: str = '127.0.0.1', port: int = 8000, identity: int = PROCESS_ID) -> None:
Expand Down
52 changes: 52 additions & 0 deletions examples/login/jsonrpc_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import httpx
import os
import json
import errno
Expand Down Expand Up @@ -153,6 +154,57 @@ def recv(self):
return self.response.json


class CaseTLSNoReuse:
"""JSON-RPC Client that operates directly over TCP/IPv4 stack, without HTTP"""

def __init__(self, uri: str = '127.0.0.1', port: int = 8545, identity: int = PROCESS_ID) -> None:
self.identity = identity
self.expected = -1
self.client = ClientTLS(uri, port, allow_self_signed=True)
self.response = None

def __call__(self) -> str:
self.send()
res = self.recv()
self.client.sock.close()
self.client.sock = None
return res

def send(self):
user_id = random.randint(1, 1000)
session_id = random.randint(1, 1000)
self.expected = ((user_id ^ session_id) % 23) == 0

self.response = self.client.validate_session(
user_id=user_id, session_id=session_id)

def recv(self):
assert self.response.json == self.expected
return self.response.json


class CaseTLSNoReuseX:

def __init__(self, uri: str = '127.0.0.1', port: int = 8545, identity: int = PROCESS_ID) -> None:
self.identity = identity
self.url = f'https://{uri}:{port}/'

def __call__(self, *, a: Optional[int] = None, b: Optional[int] = None) -> int:

with httpx.Client(verify=False) as client:
a = random.randint(1, 1000) if a is None else a
b = random.randint(1, 1000) if b is None else b
jsonrpc = {"jsonrpc": "2.0", "id": self.identity,
"method": "validate_session", "params": {"user_id": a, "session_id": b}}
result = client.post(
f'{self.url}', json=jsonrpc,
headers={"Connection": "close"},
)
result = result.text
c = False if result == 'false' else True
return c


class CaseTCPHTTP:
"""JSON-RPC Client that operates directly over TCP/IPv4 stack, with HTTP"""

Expand Down
22 changes: 22 additions & 0 deletions examples/login/ucall_server_rest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ static bool get_param_int_from_path(ucall_call_t call, char const* name, int64_t
}

static void validate_session(ucall_call_t call, ucall_callback_tag_t) {

int64_t a{}, b{};
char c_str[256]{};

bool got_a = ucall_param_named_i64(call, "user_id", 0, &a);
bool got_b = get_param_int_from_path(call, "session_id", &b);
if (!got_a || !got_b)
Expand All @@ -36,6 +38,24 @@ static void validate_session(ucall_call_t call, ucall_callback_tag_t) {
ucall_call_reply_content(call, res, strlen(res));
}

static void upload_binary(ucall_call_t call, ucall_callback_tag_t) {
char const* user_agent = nullptr;
char const* binary = nullptr;
char const* path = nullptr;
size_t user_agent_len = 0;
size_t binary_len = 0;
size_t path_len = 0;

bool got_path = ucall_param_named_str(call, "path", 0, &path, &path_len);
bool got_bin = ucall_get_request_body(call, &binary, &binary_len);
bool got_head = ucall_get_request_header(call, "User-Agent", 0, &user_agent, &user_agent_len);
if (!got_bin || !got_head)
return ucall_call_reply_error_invalid_params(call);

std::string res = "{\"uploaded_binary_size\": " + std::to_string(binary_len) + "}";
ucall_call_reply_content(call, res.data(), res.size());
}

static void get_books(ucall_call_t call, ucall_callback_tag_t punned_books) noexcept {
auto* books = reinterpret_cast<std::map<std::size_t, std::string>*>(punned_books);
std::string response = "{ \"books\": [";
Expand Down Expand Up @@ -139,6 +159,8 @@ int main(int argc, char** argv) {

ucall_add_procedure(server, "/validate_session/{session_id}", &validate_session, request_type_t::get_k, nullptr);

ucall_add_procedure(server, "/upload_binary/{path}", &upload_binary, request_type_t::put_k, nullptr);

if (config.max_threads > 1) {
std::vector<std::thread> threads;
for (uint16_t i = 0; i != config.max_threads; ++i)
Expand Down
40 changes: 24 additions & 16 deletions include/ucall/ucall.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,35 +147,43 @@ void ucall_take_call(ucall_server_t server, uint16_t thread_idx);
*/
void ucall_take_calls(ucall_server_t server, uint16_t thread_idx);

bool ucall_param_named_bool( //
ucall_call_t call, //
ucall_str_t json_pointer, //
size_t json_pointer_length, //
bool ucall_param_named_bool( //
ucall_call_t call, //
ucall_str_t param_name, //
size_t param_name_length, //
bool* output);

bool ucall_param_named_i64( //
ucall_call_t call, //
ucall_str_t json_pointer, //
size_t json_pointer_length, //
bool ucall_param_named_i64( //
ucall_call_t call, //
ucall_str_t param_name, //
size_t param_name_length, //
int64_t* output);

bool ucall_param_named_f64( //
ucall_call_t call, //
ucall_str_t json_pointer, //
size_t json_pointer_length, //
bool ucall_param_named_f64( //
ucall_call_t call, //
ucall_str_t param_name, //
size_t param_name_length, //
double* output);

bool ucall_param_named_str( //
ucall_call_t call, //
ucall_str_t json_pointer, //
size_t json_pointer_length, //
bool ucall_param_named_str( //
ucall_call_t call, //
ucall_str_t param_name, //
size_t param_name_length, //
ucall_str_t* output, size_t* output_length);

bool ucall_param_positional_bool(ucall_call_t, size_t, bool*);
bool ucall_param_positional_i64(ucall_call_t, size_t, int64_t*);
bool ucall_param_positional_f64(ucall_call_t, size_t, double*);
bool ucall_param_positional_str(ucall_call_t, size_t, ucall_str_t*, size_t*);

bool ucall_get_request_header(ucall_call_t call, //
ucall_str_t header_name, //
size_t header_name_length, //
ucall_str_t* output, size_t* output_length);

bool ucall_get_request_body(ucall_call_t call, //
ucall_str_t* output, size_t* output_length);

/**
* @param call Encapsulates the context and the arguments of the current request.
* @param json_reply The response to send, which must be a valid JSON string.
Expand Down
10 changes: 5 additions & 5 deletions src/headers/automata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void automata_t::operator()() noexcept {
// If we have reached the end of the stream,
// it is time to analyze the contents
// and send back a response.
connection.decrypt();
connection.decrypt(completed_result);
if (connection.protocol.is_input_complete(connection.pipes.input_span())) {
server.engine.raise_request(connection.pipes, connection.protocol, this);

Expand Down Expand Up @@ -165,10 +165,10 @@ void automata_t::operator()() noexcept {
connection.pipes.mark_submitted_outputs(completed_result);
if (!connection.pipes.has_remaining_outputs()) {
connection.exchanges++;
// if (connection.exchanges >= server.max_lifetime_exchanges) TODO Why?
// return close_gracefully();
// else
return receive_next();
if (connection.must_close())
return close_gracefully();
else
return receive_next();
} else {
connection.pipes.prepare_more_outputs();
return send_next();
Expand Down
19 changes: 19 additions & 0 deletions src/headers/backend_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,23 @@ bool ucall_param_positional_str(ucall_call_t call, size_t position, ucall_str_t*
return false;
}

bool ucall_get_request_header(ucall_call_t call, ucall_str_t header_name, size_t header_name_length,
ucall_str_t* output, size_t* output_length) {
unum::ucall::automata_t& automata = *reinterpret_cast<unum::ucall::automata_t*>(call);
size_t name_len = unum::ucall::string_length(header_name, header_name_length);
std::string_view res = automata.get_protocol().get_header({header_name, name_len});
if (res.size() == 0)
return false;
*output = res.data();
*output_length = res.size();
return true;
}

bool ucall_get_request_body(ucall_call_t call, ucall_str_t* output, size_t* output_length) {
unum::ucall::automata_t& automata = *reinterpret_cast<unum::ucall::automata_t*>(call);
std::string_view body = automata.get_protocol().get_content();
*output = body.data();
*output_length = body.size();
return true;
}
#pragma endregion
27 changes: 22 additions & 5 deletions src/headers/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,15 @@ struct connection_t {
bool expired() const noexcept {
return std::chrono::high_resolution_clock::now().time_since_epoch().count() - last_active_ns >
max_inactive_duration_ns_k;
};
}

bool is_ready() const noexcept { return tls_ctx == nullptr || ptls_handshake_is_complete(tls_ctx); }

bool must_close() const noexcept {
auto conn = protocol.get_header("Connection");
return conn == "Close" || conn == "close";
}

bool prepare_step() noexcept {
if (is_ready())
return true;
Expand Down Expand Up @@ -109,15 +114,27 @@ struct connection_t {
}
}

void decrypt() noexcept {
void decrypt(size_t received_amount) noexcept {
if (tls_ctx == nullptr || !ptls_handshake_is_complete(tls_ctx))
return;

work_buf.off = 0;
int res = 0;
size_t in_len = pipes.input_span().size();
int res = ptls_receive(tls_ctx, &work_buf, pipes.input_span().data(), &in_len);
if (res != -1) {
pipes.release_inputs();
void const* input = pipes.input_span().data();
if (received_amount != in_len) {
input = static_cast<char const*>(input) + (in_len - received_amount);
in_len = received_amount;
}
while (in_len != 0 && res != -1) {
size_t consumed = in_len;
res = ptls_receive(tls_ctx, &work_buf, input, &consumed);
in_len -= consumed;
input = static_cast<char const*>(input) + consumed;
}
if (res != -1 && work_buf.off > 0) {
printf("WB: %i\n", work_buf.off);
pipes.drop_last_input(received_amount);
std::memcpy(pipes.next_input_address(), work_buf.base, work_buf.off);
pipes.absorb_input(work_buf.off);
}
Expand Down
10 changes: 10 additions & 0 deletions src/headers/containers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ class exchange_pipes_t {
input_.dynamic.reset();
input_.embedded_used = 0;
}

void release_current_input() noexcept { input_.embedded_used = 0; }

void release_outputs() noexcept {
output_.dynamic.reset();
output_.embedded_used = 0;
Expand All @@ -243,6 +246,13 @@ class exchange_pipes_t {
std::memmove(input_.embedded, input_.embedded + cnt, input_.embedded_used);
}

void drop_last_input(size_t cnt) noexcept {
if (input_.dynamic.size() >= cnt)
input_.dynamic.pop_back(cnt);
else if (input_.embedded_used >= cnt)
input_.embedded_used -= cnt;
}

bool shift_input_to_dynamic() noexcept {
if (!input_.dynamic.append_n(input_.embedded, input_.embedded_used))
return false;
Expand Down
Loading

0 comments on commit 2841bd3

Please sign in to comment.