Skip to content

Commit

Permalink
支持等待 .detach() 创建的协程
Browse files Browse the repository at this point in the history
当调用 .detach(), 然后不等待它,那么这个 detch 的协程就直接后台跑了。就好像 create_thread
创建了个新线程一样。

当调用 .detach(), 然后使用 co_await 等待它,那么当前协程就等待这个 detach 的协程完成。
就好像调用 std.thread.join() 那样。
  • Loading branch information
microcai committed Oct 15, 2024
1 parent ef17010 commit b077b7b
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 54 deletions.
198 changes: 144 additions & 54 deletions include/ucoro/awaitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,83 +118,173 @@ namespace ucoro
};

//////////////////////////////////////////////////////////////////////////
// 存储协程 promise 的返回值
template<typename T>
struct awaitable_promise_value
{
template<typename V>
void return_value(V&& val) noexcept
{
value_.template emplace<T>(std::forward<V>(val));
}

void unhandled_exception() noexcept
{
value_.template emplace<std::exception_ptr>(std::current_exception());
}

std::variant<std::exception_ptr, T> value_{nullptr};
};

//////////////////////////////////////////////////////////////////////////
// 存储协程 promise 的返回值 void 的特化实现
template<>
struct awaitable_promise_value<void>
{
std::exception_ptr exception_{nullptr};

constexpr void return_void() noexcept
{
}

void unhandled_exception() noexcept
{
exception_ = std::current_exception();
}
};

//////////////////////////////////////////////////////////////////////////
// 使用 .detach() 后创建的独立的协程的入口点
// 由它开始链式使用 awaitable<>
template<typename T = void>
struct awaitable_detached
{
struct promise_type : public debug_coro_promise
awaitable_detached(const awaitable_detached&) = delete;

struct promise_type : public awaitable_promise_value<T>, public debug_coro_promise
{
std::suspend_never initial_suspend() noexcept
awaitable_detached get_return_object()
{
return {};
return awaitable_detached{std::coroutine_handle<promise_type>::from_promise(*this)};
}

std::suspend_never final_suspend() noexcept
struct final_awaiter : std::suspend_always
{
return {};
}
std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> final_coro_handle) const noexcept
{
if (final_coro_handle.promise().continuation_)
{
// continuation_ 不为空,则 说明 .detach() 被 co_await
// 因此,awaitable_detached 析构的时候会顺便撤销自己,所以这里不用 destory
// 返回 continuation_,以便让协程框架调用 continuation_.resume()
// 这样就把等它的协程唤醒了.
return final_coro_handle.promise().continuation_;
}
// 如果 continuation_ 为空,则说明 .detach() 没有被 co_await
// 因此,awaitable_detached 对象其实已经析构
// 所以必须主动调用 destroy() 以免内存泄漏.
final_coro_handle.destroy();
return std::noop_coroutine();
}
};

void return_void() noexcept
auto initial_suspend() noexcept
{
return std::suspend_always{};
}

void unhandled_exception()
auto final_suspend() noexcept
{
return final_awaiter{};
}

awaitable_detached get_return_object() noexcept
{
return awaitable_detached();
}
// 对 detached 的 coro 调用 co_await 相当于 thread.join()
// 因此记录这个 continuation 以便在 final awaiter 里唤醒
std::coroutine_handle<> continuation_;
};
};

//////////////////////////////////////////////////////////////////////////
explicit awaitable_detached(std::coroutine_handle<promise_type> promise_handle)
: current_coro_handle_(promise_handle)
{
}

template<typename T>
struct final_awaitable : std::suspend_always
{
std::coroutine_handle<> await_suspend(std::coroutine_handle<awaitable_promise<T>> h) noexcept
awaitable_detached(awaitable_detached&& other)
: current_coro_handle_(other.current_coro_handle_)
{
if (h.promise().continuation_)
other.current_coro_handle_ = nullptr;
}

~awaitable_detached()
{
if (current_coro_handle_)
{
return h.promise().continuation_;
if (current_coro_handle_.done())
{
current_coro_handle_.destroy();
}
else
{
// 由于 initial_supend 为 suspend_always
// 因此 如果不对 .detach() 的返回值调用 co_await
// 此协程将不会运行。
// 因此,在本对象析构时,协程其实完全没运行过。
// 正因为本对象析构的时候,协程都没有运行,就意味着
// 其实用户只是调用了 .detach() 并没有对返回值进行
// co_await 操作。
// 因此为了能把协程运行起来,这里强制调用 resume
current_coro_handle_.resume();
}
}
return std::noop_coroutine();
}
};

//////////////////////////////////////////////////////////////////////////
// 存储协程 promise 的返回值
template<typename T>
struct awaitable_promise_value
{
template<typename V>
void return_value(V&& val) noexcept
bool await_ready() noexcept
{
value_.template emplace<T>(std::forward<V>(val));
return false;
}

void unhandled_exception() noexcept
auto await_suspend(std::coroutine_handle<> continuation) noexcept
{
value_.template emplace<std::exception_ptr>(std::current_exception());
current_coro_handle_.promise().continuation_ = continuation;
return current_coro_handle_;
}

std::variant<std::exception_ptr, T> value_{nullptr};
T await_resume()
{
if constexpr (std::is_void_v<T>)
{
auto exception = current_coro_handle_.promise().exception_;
if (exception)
{
std::rethrow_exception(exception);
}
}
else
{
auto ret = std::move(current_coro_handle_.promise().value_);
if (std::holds_alternative<std::exception_ptr>(ret))
{
std::rethrow_exception(std::get<std::exception_ptr>(ret));
}

return std::get<T>(ret);
}
}

std::coroutine_handle<promise_type> current_coro_handle_;
};

//////////////////////////////////////////////////////////////////////////
// 存储协程 promise 的返回值 void 的特化实现
template<>
struct awaitable_promise_value<void>
{
std::exception_ptr exception_{nullptr};

constexpr void return_void() noexcept
{
}

void unhandled_exception() noexcept
template<typename T>
struct final_awaitable : std::suspend_always
{
std::coroutine_handle<> await_suspend(std::coroutine_handle<awaitable_promise<T>> h) noexcept
{
exception_ = std::current_exception();
if (h.promise().continuation_)
{
return h.promise().continuation_;
}
return std::noop_coroutine();
}
};

Expand Down Expand Up @@ -379,21 +469,21 @@ namespace ucoro
current_coro_handle_.promise().set_local(local);
}

void detach()
auto detach()
{
auto launch_coro = [](awaitable<T> lazy) -> awaitable_detached { co_await lazy; };
[[maybe_unused]] auto detached = launch_coro(std::move(*this));
auto launch_coro = [](awaitable<T> lazy) -> awaitable_detached<T> { co_return co_await lazy; };
return launch_coro(std::move(*this));
}

void detach(std::any local)
auto detach(std::any local)
{
if (local.has_value())
{
set_local(local);
}

auto launch_coro = [](awaitable<T> lazy) -> awaitable_detached { co_await lazy; };
[[maybe_unused]] auto detached = launch_coro(std::move(*this));
auto launch_coro = [](awaitable<T> lazy) -> awaitable_detached<T> { co_return co_await lazy; };
return launch_coro(std::move(*this));
}

std::coroutine_handle<promise_type> current_coro_handle_;
Expand Down Expand Up @@ -516,13 +606,13 @@ ucoro::ExecutorAwaiter<T, callback> executor_awaitable(callback&& cb)
}

template<typename Awaitable, typename Local>
void coro_start(Awaitable&& coro, Local&& local)
auto coro_start(Awaitable&& coro, Local&& local)
{
coro.detach(local);
return coro.detach(local);
}

template<typename Awaitable>
void coro_start(Awaitable&& coro)
auto coro_start(Awaitable&& coro)
{
coro.detach();
return coro.detach();
}
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
set_target_properties(ucoro_tests PROPERTIES FOLDER "ucoro_tests")

add_subdirectory(test1)
add_subdirectory(test2)

find_package(Boost 1.60 COMPONENTS thread system atomic)
if(Boost_FOUND)
Expand Down
6 changes: 6 additions & 0 deletions tests/test2/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

add_executable(test2 test2.cpp)
target_link_libraries(test2 ucoro)

add_test(NAME test2 COMMAND test2)
set_target_properties(test2 PROPERTIES FOLDER "ucoro_tests")
47 changes: 47 additions & 0 deletions tests/test2/test2.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#include <iostream>
#include "ucoro/awaitable.hpp"


ucoro::awaitable<int> coro_compute_int(int value)
{
co_return (value * 100);
}

ucoro::awaitable<void> coro_compute_exec(int value)
{
auto x = co_await ucoro::local_storage;
std::cout << "local storage: " << std::any_cast<std::string>(x) << std::endl;

try
{
auto y = co_await ucoro::local_storage_t<std::string>();
std::cout << "local storage: " << y << std::endl;
}
catch (const std::exception& e)
{
std::cout << e.what();
}

auto comput_promise = coro_compute_int(value);

auto ret = co_await std::move(comput_promise);
std::cout << "return: " << ret << std::endl;
}

ucoro::awaitable<void> coro_compute()
{
for (auto i = 0; i < 100; i+=2)
{
co_await coro_compute_exec(i);
co_await coro_compute_exec(i+1).detach(std::string{"hello from detached coro"});
}
}

int main(int argc, char **argv)
{
std::string str = "hello from main coro";

coro_start(coro_compute(), str);

return 0;
}

0 comments on commit b077b7b

Please sign in to comment.