Skip to content

Commit

Permalink
Store ids of all running actors in the registry
Browse files Browse the repository at this point in the history
This is useful for debugging shutdown hangs, when an actor that
does not terminate blocks the destructor of the actor system itself.
  • Loading branch information
tobim committed Jun 5, 2024
1 parent 4ff66bf commit 698a505
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 19 deletions.
23 changes: 19 additions & 4 deletions libcaf_core/caf/actor_registry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <string>
#include <thread>
#include <unordered_map>
#include <unordered_set>

#include "caf/abstract_actor.hpp"
#include "caf/actor.hpp"
Expand Down Expand Up @@ -53,19 +54,32 @@ class CAF_CORE_EXPORT actor_registry {

/// Increases running-actors-count by one.
/// @returns the increased count.
size_t inc_running();
size_t inc_running(actor_id key);

/// Decreases running-actors-count by one.
/// @returns the decreased count.
size_t dec_running();
size_t dec_running(actor_id key);

/// Returns the number of currently running actors.
size_t running() const;

/// Blocks the caller until running-actors-count becomes `expected`
/// (must be either 0 or 1).
/// Returns the the actor ids of all currently running actors.
const std::unordered_set<actor_id>& running_ids() const;

/// Blocks the caller until running-actors-count becomes `expected`..
void await_running_count_equal(size_t expected) const;

/// Blocks the caller until running-actors-count becomes `expected`..
/// Invokes `cb` every time the set of running actors shrinks.
template <class CB>
void await_running_count_equal(size_t expected, CB&& cb) const {
std::unique_lock<std::mutex> guard{running_mtx_};
while (running_.size() != expected) {
running_cv_.wait(guard);
cb();
}
}

/// Returns the actor associated with `key` or `invalid_actor`.
template <class T = strong_actor_ptr>
T get(const std::string& key) const {
Expand Down Expand Up @@ -112,6 +126,7 @@ class CAF_CORE_EXPORT actor_registry {

mutable std::mutex running_mtx_;
mutable std::condition_variable running_cv_;
std::unordered_set<actor_id> running_;

mutable detail::shared_spinlock instances_mtx_;
entries entries_;
Expand Down
4 changes: 2 additions & 2 deletions libcaf_core/src/abstract_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ void abstract_actor::register_at_system() {
if (getf(is_registered_flag))
return;
setf(is_registered_flag);
[[maybe_unused]] auto count = home_system().registry().inc_running();
[[maybe_unused]] auto count = home_system().registry().inc_running(id());
CAF_LOG_DEBUG("actor" << id() << "increased running count to" << count);
}

void abstract_actor::unregister_from_system() {
if (!getf(is_registered_flag))
return;
unsetf(is_registered_flag);
[[maybe_unused]] auto count = home_system().registry().dec_running();
[[maybe_unused]] auto count = home_system().registry().dec_running(id());
CAF_LOG_DEBUG("actor" << id() << "decreased running count to" << count);
}

Expand Down
31 changes: 20 additions & 11 deletions libcaf_core/src/actor_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,29 +80,38 @@ void actor_registry::erase(actor_id key) {
}
}

size_t actor_registry::inc_running() {
return ++*system_.base_metrics().running_actors;
size_t actor_registry::inc_running(actor_id key) {
std::unique_lock<std::mutex> guard(running_mtx_);
++*system_.base_metrics().running_actors;
running_.emplace(key);
return running_.size();
}

size_t actor_registry::running() const {
return static_cast<size_t>(system_.base_metrics().running_actors->value());
std::unique_lock<std::mutex> guard(running_mtx_);
return running_.size();
}

size_t actor_registry::dec_running() {
size_t new_val = --*system_.base_metrics().running_actors;
if (new_val <= 1) {
std::unique_lock<std::mutex> guard(running_mtx_);
running_cv_.notify_all();
}
const std::unordered_set<actor_id>& actor_registry::running_ids() const {
std::unique_lock<std::mutex> guard(running_mtx_);
return running_;
}

size_t actor_registry::dec_running(actor_id key) {
std::unique_lock<std::mutex> guard(running_mtx_);
--*system_.base_metrics().running_actors;
running_.erase(key);
size_t new_val = running_.size();
running_cv_.notify_all();
return new_val;
}

void actor_registry::await_running_count_equal(size_t expected) const {
CAF_ASSERT(expected == 0 || expected == 1);
CAF_LOG_TRACE(CAF_ARG(expected));
std::unique_lock<std::mutex> guard{running_mtx_};
while (running() != expected) {
CAF_LOG_DEBUG(CAF_ARG(running()));
while (running_.size() != expected) {
CAF_LOG_DEBUG(CAF_ARG(running_.size()));
running_cv_.wait(guard);
}
}
Expand Down
4 changes: 2 additions & 2 deletions libcaf_core/src/blocking_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class blocking_actor_runner : public resumable {
auto& sys = ctx->system();
sys.release_private_thread(thread_);
if (!hidden_) {
[[maybe_unused]] auto count = sys.registry().dec_running();
[[maybe_unused]] auto count = sys.registry().dec_running(self_->id());
CAF_LOG_DEBUG("actor" << self_->id() << "decreased running count to"
<< count);
}
Expand Down Expand Up @@ -166,7 +166,7 @@ void blocking_actor::launch(execution_unit*, bool, bool hide) {
// Note: must *not* call register_at_system() to stop actor cleanup from
// decrementing the count before releasing the thread.
if (!hide) {
[[maybe_unused]] auto count = sys.registry().inc_running();
[[maybe_unused]] auto count = sys.registry().inc_running(id());
CAF_LOG_DEBUG("actor" << id() << "increased running count to" << count);
}
thread->resume(new blocking_actor_runner(this, thread, hide));
Expand Down
1 change: 1 addition & 0 deletions libcaf_io/test/detail/prometheus_broker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ CAF_TEST(the prometheus broker responds to HTTP get requests) {
string_view response{reinterpret_cast<char*>(response_buf.data()),
response_buf.size()};
CHECK(starts_with(response, http_ok_header));
MESSAGE(response);
CHECK(contains(response, "\ncaf_system_running_actors 2 "));
if (detail::prometheus_broker::has_process_metrics()) {
CHECK(contains(response, "\nprocess_cpu_seconds_total "));
Expand Down

0 comments on commit 698a505

Please sign in to comment.