Skip to content

Commit

Permalink
feat: refine async
Browse files Browse the repository at this point in the history
  • Loading branch information
shuai132 committed Nov 15, 2024
1 parent 38bfcb8 commit 6bc6093
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 67 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
a tiny C++14 rpc library, supports all platforms (macOS, Linux, Windows, iOS, Android, etc.) and most microchips (
Arduino, STM32, ESP32/ESP8266, etc.)

**Recommend TCP-based implementation: [asio_net](https://github.com/shuai132/asio_net)**

## Introduction

Full-feature rpc frameworks (e.g. `gRPC` and `bRPC`) very complex and not suitable for embedded systems.
Expand All @@ -17,7 +19,7 @@ It supports all platforms and a wide range of microchips, including Arduino, STM

Note:
This project only offers the protocol layer and API, it **does not** include the implementation of the transport layer.
For TCP-based implementations: [asio_net](https://github.com/shuai132/asio_net)
For TCP-based implementation: [asio_net](https://github.com/shuai132/asio_net)

## Features

Expand Down Expand Up @@ -79,7 +81,7 @@ Addition:
1. `msg` and `rsp` support any serializable type, see [Serialization](#Serialization).
2. Detailed usages and unittests can be found here: [rpc_test.cpp](test/test_rpc.cpp)
3. There is an example shows custom async
impl: [rpc_c_coroutine.cpp](https://github.com/shuai132/asio_net/blob/main/test/rpc_c_coroutine.cpp)
impl: [rpc_c_coroutine.hpp](https://github.com/shuai132/asio_net/blob/main/test/rpc_c_coroutine.hpp)
## Serialization
Expand Down
148 changes: 84 additions & 64 deletions include/rpc_core/request.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,38 @@ class request : detail::noncopyable, public std::enable_shared_from_this<request
return shared_from_this();
}

template <typename F, typename std::enable_if<callable_traits<F>::argc, int>::type = 0>
template <typename F, typename std::enable_if<callable_traits<F>::argc == 2, int>::type = 0>
request_s rsp(F cb) {
using T = detail::remove_cvref_t<typename callable_traits<F>::template argument_type<0>>;

need_rsp_ = true;
auto self = shared_from_this();
this->rsp_handle_ = [this, cb = std::move(cb)](detail::msg_wrapper msg) mutable {
if (canceled_) {
on_finish(finally_t::canceled);
return true;
}

if (msg.type & detail::msg_wrapper::msg_type::no_such_cmd) {
on_finish(finally_t::no_such_cmd);
return true;
}

auto rsp = msg.unpack_as<T>();
if (rsp.first) {
cb(std::move(rsp.second), finally_t::normal);
on_finish(finally_t::normal);
return true;
} else {
cb({}, finally_t::rsp_serialize_error);
on_finish(finally_t::rsp_serialize_error);
return false;
}
};
return self;
}

template <typename F, typename std::enable_if<callable_traits<F>::argc == 1, int>::type = 0>
request_s rsp(F cb) {
using T = detail::remove_cvref_t<typename callable_traits<F>::template argument_type<0>>;

Expand Down Expand Up @@ -117,7 +148,7 @@ class request : detail::noncopyable, public std::enable_shared_from_this<request
return self;
}

template <typename F, typename std::enable_if<!callable_traits<F>::argc, int>::type = 0>
template <typename F, typename std::enable_if<callable_traits<F>::argc == 0, int>::type = 0>
request_s rsp(F cb) {
need_rsp_ = true;
auto self = shared_from_this();
Expand Down Expand Up @@ -213,13 +244,23 @@ class request : detail::noncopyable, public std::enable_shared_from_this<request
}

/**
* Force to ignore `rsp` callback.
* Force ignoring `rsp` callback.
*/
request_s disable_rsp() {
need_rsp_ = false;
return shared_from_this();
}

request_s enable_rsp() {
need_rsp_ = true;
return shared_from_this();
}

request_s mark_need_rsp() {
rsp([] {});
return shared_from_this();
}

request_s rpc(rpc_w rpc) {
rpc_ = std::move(rpc);
return shared_from_this();
Expand All @@ -239,34 +280,34 @@ class request : detail::noncopyable, public std::enable_shared_from_this<request
}

template <typename T>
struct future_ret;
struct result;

#ifdef RPC_CORE_FEATURE_FUTURE
/**
* Future pattern
* It is not recommended to use blocking interfaces unless you are very clear about what you are doing, as it is easy to cause deadlock.
*/
template <typename R, typename std::enable_if<!std::is_same<R, void>::value, int>::type = 0>
std::future<future_ret<R>> future(const rpc_s& rpc = nullptr);
std::future<result<R>> future(const rpc_s& rpc = nullptr);

template <typename R = void, typename std::enable_if<std::is_same<R, void>::value, int>::type = 0>
std::future<future_ret<void>> future(const rpc_s& rpc = nullptr);
std::future<result<void>> future(const rpc_s& rpc = nullptr);
#endif

#ifdef RPC_CORE_FEATURE_ASIO_COROUTINE
template <typename R, typename std::enable_if<!std::is_same<R, void>::value, int>::type = 0>
auto async_call();
asio::awaitable<result<R>> async_call();

template <typename R = void, typename std::enable_if<std::is_same<R, void>::value, int>::type = 0>
auto async_call();
asio::awaitable<result<R>> async_call();
#endif

#ifdef RPC_CORE_FEATURE_ASYNC_CUSTOM
template <typename R, typename std::enable_if<!std::is_same<R, void>::value, int>::type = 0>
auto RPC_CORE_FEATURE_ASYNC_CUSTOM();
RPC_CORE_FEATURE_ASYNC_CUSTOM_R RPC_CORE_FEATURE_ASYNC_CUSTOM();

template <typename R = void, typename std::enable_if<std::is_same<R, void>::value, int>::type = 0>
auto RPC_CORE_FEATURE_ASYNC_CUSTOM();
RPC_CORE_FEATURE_ASYNC_CUSTOM_R RPC_CORE_FEATURE_ASYNC_CUSTOM();
#endif

private:
Expand Down Expand Up @@ -308,42 +349,33 @@ class request : detail::noncopyable, public std::enable_shared_from_this<request
};

template <typename T>
struct request::future_ret {
struct request::result {
finally_t type;
T data;
};

template <>
struct request::future_ret<void> {
struct request::result<void> {
finally_t type;
};

#ifdef RPC_CORE_FEATURE_FUTURE
template <typename R, typename std::enable_if<!std::is_same<R, void>::value, int>::type>
std::future<request::future_ret<R>> request::future(const rpc_s& rpc) {
auto promise = std::make_shared<std::promise<future_ret<R>>>();
rsp([promise](R r) {
promise->set_value({finally_t::normal, std::move(r)});
});
finally([promise](finally_t type) {
if (type != finally_t::normal) {
promise->set_value({type, {}});
}
std::future<request::result<R>> request::future(const rpc_s& rpc) {
auto promise = std::make_shared<std::promise<result<R>>>();
rsp([promise](R r, finally_t type) {
promise->set_value({type, std::move(r)});
});
call(rpc);
return promise->get_future();
}

template <typename R, typename std::enable_if<std::is_same<R, void>::value, int>::type>
std::future<request::future_ret<void>> request::future(const rpc_s& rpc) {
auto promise = std::make_shared<std::promise<request::future_ret<void>>>();
rsp([promise] {
promise->set_value({request::finally_t::normal});
});
std::future<request::result<void>> request::future(const rpc_s& rpc) {
auto promise = std::make_shared<std::promise<request::result<void>>>();
mark_need_rsp();
finally([promise](request::finally_t type) {
if (type != request::finally_t::normal) {
promise->set_value({type});
}
promise->set_value({type});
});
call(rpc);
return promise->get_future();
Expand All @@ -352,50 +384,38 @@ std::future<request::future_ret<void>> request::future(const rpc_s& rpc) {

#ifdef RPC_CORE_FEATURE_ASIO_COROUTINE
template <typename R, typename std::enable_if<!std::is_same<R, void>::value, int>::type>
auto request::async_call() {
asio::use_awaitable_t<> use_awaitable = {};
return asio::async_initiate<asio::use_awaitable_t<>, void(rpc_core::request::future_ret<R>)>(
[this]<typename Handler>(Handler&& h) mutable {
auto handler = std::make_shared<Handler>(std::forward<Handler>(h));
rsp([handler](R data) mutable {
rpc_core::request::future_ret<R> ret;
ret.type = finally_t::normal;
ret.data = std::move(data);
(*handler)(std::move(ret));
});
finally([handler = std::move(handler)](finally_t type) {
if (type != finally_t::normal) {
rpc_core::request::future_ret<R> ret;
ret.type = type;
(*handler)(std::move(ret));
}
asio::awaitable<request::result<R>> request::async_call() {
auto executor = co_await asio::this_coro::executor;
co_return co_await asio::async_compose<decltype(asio::use_awaitable), void(rpc_core::request::result<R>)>(
[this, &executor](auto& self) mutable {
using ST = std::remove_reference<decltype(self)>::type;
auto self_sp = std::make_shared<ST>(std::forward<ST>(self));
rsp([&executor, self = std::move(self_sp)](R data, finally_t type) mutable {
asio::dispatch(executor, [self = std::move(self), data = std::move(data), type]() {
self->complete({type, data});
});
});
call();
},
use_awaitable);
asio::use_awaitable);
}

template <typename R, typename std::enable_if<std::is_same<R, void>::value, int>::type>
auto request::async_call() {
asio::use_awaitable_t<> use_awaitable = {};
return asio::async_initiate<asio::use_awaitable_t<>, void(rpc_core::request::future_ret<void>)>(
[this]<typename Handler>(Handler&& h) mutable {
auto handler = std::make_shared<Handler>(std::forward<Handler>(h));
rsp([handler]() mutable {
rpc_core::request::future_ret<R> ret;
ret.type = finally_t::normal;
(*handler)(std::move(ret));
});
finally([handler = std::move(handler)](finally_t type) {
if (type != finally_t::normal) {
rpc_core::request::future_ret<R> ret;
ret.type = type;
(*handler)(std::move(ret));
}
asio::awaitable<request::result<R>> request::async_call() {
auto executor = co_await asio::this_coro::executor;
co_return co_await asio::async_compose<decltype(asio::use_awaitable), void(rpc_core::request::result<R>)>(
[this, &executor](auto&& self) mutable {
using ST = std::remove_reference<decltype(self)>::type;
auto self_sp = std::make_shared<ST>(std::forward<ST>(self));
mark_need_rsp();
finally([&executor, self = std::move(self_sp)](finally_t type) mutable {
asio::dispatch(executor, [self = std::move(self), type] {
self->complete({type});
});
});
call();
},
use_awaitable);
asio::use_awaitable);
}
#endif

Expand Down
24 changes: 23 additions & 1 deletion test/test_rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ void test_rpc() {
ASSERT(rsp == "ok");
pass = true;
})
// or: ->rsp([&](const std::string& msg, finally_t type){})
->timeout([] {
RPC_CORE_LOGI("timeout");
})
Expand Down Expand Up @@ -319,7 +320,7 @@ void test_rpc() {
RPC_CORE_LOG("8. 未ready的rpc对象");
{
bool pass = false;
auto rpc_tmp = rpc::create(loopback.first);
auto rpc_tmp = rpc::create();
rpc_tmp->cmd("cmd")->call(); // should not crash
rpc_tmp->cmd("cmd")
->finally([&](finally_t type) {
Expand All @@ -342,6 +343,27 @@ void test_rpc() {
tmp = "";
});
}

RPC_CORE_LOG("10. rsp finally");
{
bool pass_cmd = false;
bool pass_rsp = false;
rpc_s->subscribe("cmd", [&](const std::string& msg) -> std::string {
ASSERT(msg == "test");
pass_cmd = true;
return "test";
});
rpc_c->cmd("cmd")
->msg(std::string("test"))
->rsp([&](const std::string& msg, finally_t type) {
ASSERT(msg == "test");
ASSERT(type == finally_t::normal);
pass_rsp = true;
})
->call();
ASSERT(pass_cmd);
ASSERT(pass_rsp);
}
}

} // namespace rpc_core_test

0 comments on commit 6bc6093

Please sign in to comment.