Skip to content

Commit

Permalink
feat: scheduler for async subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
shuai132 committed Nov 28, 2024
1 parent 897c971 commit 826aea0
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 43 deletions.
51 changes: 43 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ For TCP-based implementation: [asio_net](https://github.com/shuai132/asio_net)
* Support any connection type (`tcp socket`, `serial port`, etc.)
* High Performance Serialization, support most STL containers and user type
* Serialization plugins implementations for `flatbuffers` and `nlohmann::json`
* RAII-based `dispose` for automatic cancel request
* Timeout and Retry API
* Support `std::future` interface
* Support `co_await`, depend on `C++20` and `asio`, or custom implementation
* Support subscribe async callback, async coroutine, and custom scheduler
* RAII-based `dispose` for automatic cancel request
* Support timeout, retry, cancel api

## TCP-based implementations

Expand All @@ -55,6 +55,8 @@ For TCP-based implementation: [asio_net](https://github.com/shuai132/asio_net)

## Usage

* async callback:

```c++
// The Receiver
rpc->subscribe("cmd", [](const std::string& msg) -> std::string {
Expand All @@ -69,17 +71,33 @@ rpc->cmd("cmd")
assert(rsp == "world");
})
->call();
```
* async coroutine:
// Or use C++20 co_await with asio
```c++
// The Receiver
rpc->subscribe("cmd", [&](request_response<std::string, std::string> rr) -> asio::awaitable<void> {
LOG("session on cmd: %s", rr->req.c_str());
asio::steady_timer timer(context);
timer.expires_after(std::chrono::seconds(1));
co_await timer.async_wait();
rr->rsp("world");
}, scheduler_asio_coroutine);
// The Sender
// use C++20 co_await with asio, or you can use custom async implementation, and co_await it!
auto rsp = co_await rpc->cmd("cmd")->msg(std::string("hello"))->async_call<std::string>();
assert(rsp.data == "world");

// Or you can use custom async implementation, and co_await it!
```

Addition:
Inspect the code for more
details: [rpc_s_coroutine.cpp](https://github.com/shuai132/asio_net/blob/main/test/rpc_s_coroutine.cpp)
and [rpc_c_coroutine.cpp](https://github.com/shuai132/asio_net/blob/main/test/rpc_c_coroutine.cpp)

* Addition:

1. `msg` and `rsp` support any serializable type, see [Serialization](#Serialization).
1. `msg` and `rsp` support any serializable type, refer to [Serialization](#Serialization).
2. Detailed usages and unittests can be found here: [rpc_test.cpp](test/test_rpc.cpp)
3. There is an example shows custom async
impl: [rpc_c_coroutine.hpp](https://github.com/shuai132/asio_net/blob/main/test/rpc_c_coroutine.hpp)
Expand All @@ -88,6 +106,23 @@ Addition:

High-performance and memory-saving binary serialization.

### Why design a new serialization

Fist of all, I want to keep `rpc_core` library standalone, without any dependencies, except for STL.

Moreover, these serialization libraries do not align with my design goals:

* protobuf library is too large for some platforms, and it's Varint, Zigzag, and GZIP will use a lot of cpu.
* msgpack has similarity reason to protobuf.
* flatbuffers serialized data is too large.

Of course, when communicating across languages, it is recommended to use the above serialization libraries!

Finally, it also provides a way to use thirdparty serialization libraries directly, refer
to [Serialization Plugins](#Serialization-Plugins).

### Usage

For example, user data:

```c++
Expand Down
16 changes: 14 additions & 2 deletions include/rpc_core/rpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,19 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc> {
subscribe_helper<F, F_ReturnIsEmpty, F_ParamIsEmpty>()(cmd, std::move(handle), dispatcher_.get());
}

template <class F>
using Scheduler = std::function<void(std::function<typename detail::callable_traits<F>::return_type()>)>;

template <typename F, typename std::enable_if<detail::fp_is_request_response<F>::value, int>::type = 0>
void subscribe(const cmd_type& cmd, F handle) {
static_assert(std::is_void<typename detail::callable_traits<F>::return_type>::value, "should return void");
subscribe(cmd, std::move(handle), nullptr);
}

template <typename F, typename std::enable_if<detail::fp_is_request_response<F>::value, int>::type = 0>
void subscribe(const cmd_type& cmd, F handle, Scheduler<F> scheduler) {
static_assert(detail::callable_traits<F>::argc == 1, "should be request_response<>");
dispatcher_->subscribe_cmd(cmd, [handle = std::move(handle)](const detail::msg_wrapper& msg) mutable {
dispatcher_->subscribe_cmd(cmd, [handle = std::move(handle), scheduler = std::move(scheduler)](const detail::msg_wrapper& msg) mutable {
using request_response = detail::remove_cvref_t<typename detail::callable_traits<F>::template argument_type<0>>;
using request_response_impl = typename request_response::element_type;
static_assert(detail::is_request_response<request_response>::value, "should be request_response<>");
Expand Down Expand Up @@ -98,7 +106,11 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc> {
hp->send_async_response(serialize(std::move(rr->rsp_data)));
}
};
handle(std::move(rr));
if (scheduler) {
scheduler(std::bind(handle, std::move(rr)));
} else {
(void)handle(std::move(rr));
}
}
return detail::msg_wrapper::make_rsp_async(msg.seq, std::move(async_helper), r.first);
});
Expand Down
111 changes: 78 additions & 33 deletions test/test_rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,25 @@ namespace rpc_core_test {
void test_rpc() {
using namespace rpc_core;

// 此示例使用回环连接 实际使用时需自定义连接
// loopback connection for test purposes, a connection should be implemented in practical use
auto loopback = loopback_connection::create();

// 创建rpc server
// create rpc as server
auto rpc_s = rpc::create(loopback.first);

// 定时器实现 应配合当前应用的事件循环 以确保消息收发和超时回调在同一个线程
// 此示例使用回环连接 不做超时测试
// timer implementation:
// should be paired with the current application's event loop
// to ensure message sending, receiving and timeout callbacks occur on the same thread.
// this example uses a loopback connection and does not perform timeout testing
rpc_s->set_timer([](uint32_t ms, const rpc::timeout_cb& cb) {
RPC_CORE_UNUSED(ms);
RPC_CORE_UNUSED(cb);
});

// 已连接时设置ready
// mark as ready, on connected
rpc_s->set_ready(true);

// 创建rpc client
// create rpc as client
auto rpc_c = rpc::create(loopback.second);
rpc_c->set_timer([](uint32_t ms, const rpc::timeout_cb& cb) {
RPC_CORE_UNUSED(ms);
Expand All @@ -33,8 +35,9 @@ void test_rpc() {
rpc_c->set_ready(true);

/**
* 简单示例
* 以收发`std::string`为例,支持结构体和绝大多数STL容器(见序列化章节)。
* simple usage
* as an example, support send and receive std::string type
* also support structures and most STL containers (see serialization section)
*/
{
// The Receiver
Expand All @@ -53,19 +56,16 @@ void test_rpc() {
}

/**
* 详细测试
* 根据使用场景不同 提供以下几种方式
* detail usage
*/
{
RPC_CORE_LOG("1. 收发消息完整测试");
// 注册监听
RPC_CORE_LOG("1. send and receive data");
rpc_s->subscribe("cmd1", [&](const std::string& msg) -> std::string {
RPC_CORE_LOGI("get cmd1: %s", msg.c_str());
ASSERT(msg == "test");
return "ok";
});

// 请求支持很多方法 可根据需求使用所需部分
bool pass = false;
auto request = rpc_c->cmd("cmd1")
->msg(std::string("test"))
Expand All @@ -74,36 +74,36 @@ void test_rpc() {
ASSERT(rsp == "ok");
pass = true;
})
// or: ->rsp([&](const std::string& msg, finally_t type){})
// or: ->rsp([](const std::string& msg, finally_t type){})
->timeout([] {
RPC_CORE_LOGI("timeout");
})
->finally([](finally_t type) {
RPC_CORE_LOGI("finally: type:%s", rpc_core::finally_t_str(type));
});
RPC_CORE_LOGI("执行请求");
RPC_CORE_LOGI("request->call()");
ASSERT(!pass);
request->call();
ASSERT(pass);

/// 其他功能测试
RPC_CORE_LOGI("多次调用");
/// test other request api
RPC_CORE_LOGI("call() can more than once");
pass = false;
request->call();
ASSERT(pass);

RPC_CORE_LOGI("测试取消");
RPC_CORE_LOGI("request can be cancel");
pass = false;
request->cancel();
request->call();
ASSERT(!pass);

RPC_CORE_LOGI("恢复取消");
RPC_CORE_LOGI("request can be resume");
request->reset_cancel();
request->call();
ASSERT(pass);

RPC_CORE_LOGI("添加到dispose");
RPC_CORE_LOGI("add to dispose, use RAII");
pass = false;
{ // RAII dispose
dispose dispose;
Expand All @@ -121,7 +121,7 @@ void test_rpc() {
request->call();
ASSERT(pass);

RPC_CORE_LOGI("先创建request");
RPC_CORE_LOGI("can create request first");
pass = false;
request::create()
->cmd("cmd1")
Expand Down Expand Up @@ -149,7 +149,7 @@ void test_rpc() {
ASSERT(pass);
}

RPC_CORE_LOG("2. 复杂结构体类型测试(包含STL容器)");
RPC_CORE_LOG("2. test complex structures(including STL containers)");
{
bool pass = false;
CustomType customType;
Expand All @@ -173,7 +173,7 @@ void test_rpc() {
ASSERT(pass);
}

RPC_CORE_LOG("3. finally测试");
RPC_CORE_LOG("3. test finally");
{
bool pass = false;
bool pass_finally = false;
Expand Down Expand Up @@ -207,9 +207,9 @@ void test_rpc() {
ASSERT(pass_finally);
}

RPC_CORE_LOG("4. 多种使用场景测试");
RPC_CORE_LOG("4. test various usage");
{
RPC_CORE_LOG("4.1 有参数 有返回");
RPC_CORE_LOG("4.1 has parameter, has return value");
{
bool pass_cmd = false;
bool pass_rsp = false;
Expand All @@ -229,7 +229,7 @@ void test_rpc() {
ASSERT(pass_rsp);
}

RPC_CORE_LOG("4.2 有参数 无返回");
RPC_CORE_LOG("4.2 has parameter, no return value");
{
bool pass_cmd = false;
rpc_s->subscribe("cmd4", [&](const std::string& msg) {
Expand All @@ -240,7 +240,7 @@ void test_rpc() {
ASSERT(pass_cmd);
}

RPC_CORE_LOG("4.3 无参数 有返回");
RPC_CORE_LOG("4.3 no parameter, has return value");
{
bool pass_cmd = false;
bool pass_rsp = false;
Expand All @@ -258,7 +258,7 @@ void test_rpc() {
ASSERT(pass_rsp);
}

RPC_CORE_LOG("4.4 无参数 无返回");
RPC_CORE_LOG("4.4 no parameter, no return value");
{
bool pass_cmd = false;
rpc_s->subscribe("cmd4", [&]() {
Expand All @@ -269,7 +269,7 @@ void test_rpc() {
}
}

RPC_CORE_LOG("5. ping pong测试");
RPC_CORE_LOG("5. test ping pong");
{
bool pass = false;
rpc_c->ping("test")
Expand All @@ -282,7 +282,7 @@ void test_rpc() {
ASSERT(pass);
}

RPC_CORE_LOG("6. dispose测试");
RPC_CORE_LOG("6. test dispose");
{
bool pass = false;
auto request = rpc_c->ping("test")
Expand All @@ -303,7 +303,7 @@ void test_rpc() {
}

#ifdef RPC_CORE_FEATURE_FUTURE
RPC_CORE_LOG("7. future模式测试");
RPC_CORE_LOG("7. test future api");
{
{
auto result = rpc_c->ping("test")->future<std::string>().get();
Expand All @@ -317,7 +317,7 @@ void test_rpc() {
}
#endif

RPC_CORE_LOG("8. 未ready的rpc对象");
RPC_CORE_LOG("8. test rpc which not ready");
{
bool pass = false;
auto rpc_tmp = rpc::create();
Expand All @@ -332,7 +332,7 @@ void test_rpc() {
ASSERT(pass);
}

RPC_CORE_LOG("9. 测试编译lambda capture mutable");
RPC_CORE_LOG("9. test compile: lambda capture mutable");
{
auto rpc = rpc::create();
std::string tmp;
Expand Down Expand Up @@ -391,6 +391,51 @@ void test_rpc() {
ASSERT(pass_cmd);
ASSERT(pass_rsp);
}

RPC_CORE_LOG("12. subscribe async: use coroutine or custom scheduler");
#if 0
{
/// scheduler for dispatch rsp to asio context
auto scheduler_asio_dispatch = [&](auto handle) {
asio::dispatch(context, std::move(handle));
};
/// scheduler for use asio coroutine
auto scheduler_asio_coroutine = [&](auto handle) {
asio::co_spawn(context, [handle = std::move(handle)]() -> asio::awaitable<void> {
co_await handle();
}, asio::detached);
};

/// 1. common usage
rpc->subscribe("cmd", [&](request_response<std::string, std::string> rr) {
// call rsp when data ready
rr->rsp("world");
// or run on context thread
asio::dispatch(context, [rr = std::move(rr)]{ rr->rsp("world"); });
// or run on context thread, use asio coroutine
asio::co_spawn(context, [&, rr = std::move(rr)]() -> asio::awaitable<void> {
asio::steady_timer timer(context);
timer.expires_after(std::chrono::seconds(1));
co_await timer.async_wait();
rr->rsp("world");
}, asio::detached);
});

/// 2. custom scheduler, automatic dispatch
rpc->subscribe("cmd", [](const request_response<std::string, std::string>& rr) {
rr->rsp("world");
}, scheduler_asio_dispatch);

/// 3. custom scheduler, simple way to use asio coroutine
rpc->subscribe("cmd", [&](request_response<std::string, std::string> rr) -> asio::awaitable<void> {
LOG("session on cmd: %s", rr->req.c_str());
asio::steady_timer timer(context);
timer.expires_after(std::chrono::seconds(1));
co_await timer.async_wait();
rr->rsp("world");
}, scheduler_asio_coroutine);
}
#endif
}

} // namespace rpc_core_test

0 comments on commit 826aea0

Please sign in to comment.