Skip to content

Commit

Permalink
Revert "Allow components to depend on other components (#4295)"
Browse files Browse the repository at this point in the history
This reverts commit 7b612d3, reversing
changes made to 49f0c60.
  • Loading branch information
dominiklohmann committed Jun 21, 2024
1 parent 2beb29e commit 43cc27e
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 88 deletions.
2 changes: 1 addition & 1 deletion contrib/tenzir-plugins
7 changes: 4 additions & 3 deletions libtenzir/builtins/components/metrics_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ struct metrics_collector_state {
return ok.error();
}
}
if (instances.empty()) {
return {};
}
TENZIR_ASSERT(not instances.empty());
detail::weak_run_delayed_loop(
self, std::chrono::seconds{30},
[this] {
Expand Down Expand Up @@ -141,6 +139,9 @@ class plugin final : public virtual component_plugin {

auto make_component(node_actor::stateful_pointer<node_state> node) const
-> component_plugin_actor override {
if (collect(plugins::get<metrics_plugin>()).empty()) {
return {};
}
auto [importer] = node->state.registry.find<importer_actor>();
return node->spawn<caf::linked>(metrics_collector, std::move(importer));
}
Expand Down
3 changes: 0 additions & 3 deletions libtenzir/include/tenzir/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ struct node_state {
/// The component registry.
component_registry registry = {};

/// The list of component plugin actors in the order that they were spawned.
std::vector<std::string> ordered_components = {};

/// Components that are still alive for lifetime-tracking.
std::set<std::pair<caf::actor_addr, std::string>> alive_components = {};

Expand Down
5 changes: 0 additions & 5 deletions libtenzir/include/tenzir/plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,6 @@ class component_plugin : public virtual plugin {
/// Defaults to the plugin name.
virtual std::string component_name() const;

/// Components that should be created before the current one so initialization
/// can succeed.
/// Defaults to empty list.
virtual auto wanted_components() const -> std::vector<std::string>;

/// Creates an actor as a component in the NODE.
/// @param node A stateful pointer to the NODE actor.
/// @returns The actor handle to the NODE component.
Expand Down
91 changes: 21 additions & 70 deletions libtenzir/src/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,56 +284,6 @@ auto node_state::get_endpoint_handler(const http_request_description& desc)
return result->second;
}

auto spawn_components(node_actor::stateful_pointer<node_state> self) -> void {
using component_plugin_map = std::unordered_map<std::string, const component_plugin*>;
// 1. Collect all component_plugins into a name -> plugin* map:
component_plugin_map todo = {};
for (const auto* component : plugins::get<component_plugin>()) {
todo.emplace(component->component_name(), component);
}
// 2. Calculate an ordered loading sequnce based on the wanted_components of
// each plugin.
std::vector<const component_plugin*> sequenced_components = {};
std::unordered_set<std::string> done = {};
auto derive_sequence = [&](auto derive_sequence, const std::string& name) {
auto entry = todo.find(name);
if (entry == todo.end()) {
return;
}
const auto* plugin = entry->second;
todo.erase(entry);
if (done.contains(name)) {
return;
}
for (auto& wanted : plugin->wanted_components()) {
derive_sequence(derive_sequence, wanted);
}
done.insert(name);
sequenced_components.push_back(plugin);
};
while (not todo.empty()) {
derive_sequence(derive_sequence, todo.begin()->second->component_name());
}
// 3. Load all components in order.
for (const auto* plugin : sequenced_components) {
auto name = plugin->component_name();
auto handle = plugin->make_component(self);
if (!handle) {
diagnostic::error("{} failed to create the {} component", *self, name)
.throw_();
}
self->system().registry().put(fmt::format("tenzir.{}", name), handle);
if (auto err = register_component(self, caf::actor_cast<caf::actor>(handle),
name)) {
diagnostic::error(err)
.note("{} failed to register component {} in component registry", *self,
name)
.throw_();
}
self->state.ordered_components.push_back(name);
}
}

node_actor::behavior_type
node(node_actor::stateful_pointer<node_state> self, std::string /*name*/,
std::filesystem::path dir) {
Expand Down Expand Up @@ -380,19 +330,6 @@ node(node_actor::stateful_pointer<node_state> self, std::string /*name*/,
}
}
});
self->set_exception_handler([=](std::exception_ptr& ptr) -> caf::error {
try {
std::rethrow_exception(ptr);
} catch (const diagnostic& diag) {
return diag.to_error();
} catch (const std::exception& err) {
return diagnostic::error("{}", err.what())
.note("unhandled exception in {}", *self)
.to_error();
} catch (...) {
return diagnostic::error("unhandled exception in {}", *self).to_error();
}
});
// Terminate deterministically on shutdown.
self->set_exit_handler([=](const caf::exit_msg& msg) {
TENZIR_DEBUG("{} got EXIT from {}", *self, msg.source);
Expand All @@ -417,12 +354,6 @@ node(node_actor::stateful_pointer<node_state> self, std::string /*name*/,
// Core components are terminated in a second stage, we remove them from the
// registry upfront and deal with them later.
std::vector<caf::actor> core_shutdown_handles;
for (const auto& name :
self->state.ordered_components | std::ranges::views::reverse) {
if (auto comp = registry.remove(name)) {
core_shutdown_handles.push_back(comp->actor);
}
}
caf::actor filesystem_handle;
// The components listed here need to be terminated in sequential order.
// The importer needs to shut down first because it might still have
Expand Down Expand Up @@ -526,7 +457,27 @@ node(node_actor::stateful_pointer<node_state> self, std::string /*name*/,
return rp;
},
[self](atom::internal, atom::spawn, atom::plugin) -> caf::result<void> {
spawn_components(self);
// Add all plugins to the component registry.
for (const auto& component : plugins::get<component_plugin>()) {
auto handle = component->make_component(self);
if (!handle) {
// The spawn function can provide a better log message so we don't
// print one here.
continue;
}
self->system().registry().put(
fmt::format("tenzir.{}", component->component_name()), handle);
if (auto err
= register_component(self, caf::actor_cast<caf::actor>(handle),
component->component_name())) {
return caf::make_error( //
ec::unspecified, fmt::format("{} failed to register component {} "
"from plugin {} in component "
"registry: {}",
*self, component->component_name(),
component->name(), err));
}
}
return {};
},
[self](atom::spawn, const invocation& inv) {
Expand Down
4 changes: 0 additions & 4 deletions libtenzir/src/plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,6 @@ std::string component_plugin::component_name() const {
return this->name();
}

auto component_plugin::wanted_components() const -> std::vector<std::string> {
return {};
}

// -- loader plugin -----------------------------------------------------------

auto loader_parser_plugin::supported_uri_schemes() const
Expand Down
5 changes: 3 additions & 2 deletions nix/tenzir/plugins/source.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "tenzir-plugins",
"url": "[email protected]:tenzir/tenzir-plugins",
"ref": "main",
"rev": "0fa9bf5efa10534e21ca7d2b399871f06dbd0022",
"rev": "891b31784dccb6868afb5c19d8c88c10f3bb1a09",
"submodules": true,
"shallow": true
"shallow": true,
"allRefs": true
}

0 comments on commit 43cc27e

Please sign in to comment.