Skip to content

Commit

Permalink
Correctly handle duplicate serve requests and tune some logs (#4715)
Browse files Browse the repository at this point in the history
These 3 commits are basically independent, but are all caused by the
same incident.
Refer to the actual commit messages for motivation and explanation.
  • Loading branch information
tobim authored Nov 1, 2024
2 parents 9de74be + fbc2c25 commit 1601a45
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 47 deletions.
2 changes: 2 additions & 0 deletions changelog/next/bug-fixes/4715--serve-manager-race.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
We eliminated a rare crash in the `serve` operator that was introduced in
v4.20.3.
6 changes: 5 additions & 1 deletion libtenzir/builtins/operators/import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ class import_operator final : public crtp_operator<import_operator> {
// can offer a better mechanism here.
auto self = caf::scoped_actor{ctrl.self().system()};
auto components = get_node_components<importer_actor>(self, ctrl.node());
TENZIR_ASSERT(components);
if (!components) {
diagnostic::error(components.error())
.note("failed get a handle to the importer actor")
.throw_();
}
auto [importer] = std::move(*components);
auto metric_handler = ctrl.metrics({
"tenzir.metrics.import",
Expand Down
4 changes: 2 additions & 2 deletions libtenzir/builtins/operators/serve.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,10 @@ struct serve_manager_state {
TENZIR_DEBUG("unable to find serve request after timeout expired");
return;
}
if (found->done) {
// In case the client re-sent the request in the meantime we are done.
if (found->done or found->continuation_token != continuation_token) {
return;
}
TENZIR_ASSERT(found->continuation_token == continuation_token);
TENZIR_ASSERT(not found->get_rps.empty());
const auto delivered = found->try_deliver_results(true);
TENZIR_ASSERT(delivered);
Expand Down
14 changes: 7 additions & 7 deletions libtenzir/src/active_partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void serialize(
*self->state.synopsis_path, std::move(ps_chunk))
.then(
[=](atom::ok) {
TENZIR_DEBUG("{} persisted partition synopsis", *self);
TENZIR_TRACE("{} persisted partition synopsis", *self);
},
[=](const caf::error& err) {
TENZIR_WARN("{} failed to persist partition synopsis to {} and will "
Expand All @@ -129,7 +129,7 @@ void serialize(
"restore it on the next start",
*self);
}
TENZIR_DEBUG("{} persists partition with a total size of "
TENZIR_TRACE("{} persists partition with a total size of "
"{} bytes",
*self, (*partition)->size());
self->state.data.synopsis.unshared().indexes_file = {
Expand Down Expand Up @@ -312,14 +312,14 @@ active_partition_actor::behavior_type active_partition(
self->state.data.store_header = chunk::make_empty();
self->state.data.store_header = header;
self->state.store_builder = builder;
TENZIR_DEBUG("{} spawned new active store at {}", *self, builder);
TENZIR_TRACE("{} spawned new active store at {}", *self, builder);
self->set_exit_handler([=](const caf::exit_msg& msg) {
TENZIR_DEBUG("{} received EXIT from {} with reason: {}", *self, msg.source,
TENZIR_TRACE("{} received EXIT from {} with reason: {}", *self, msg.source,
msg.reason);
// Delay shutdown if we're currently in the process of persisting.
if (self->state.persistence_promise.pending()) {
std::call_once(self->state.shutdown_once, [=] {
TENZIR_DEBUG("{} delays partition shutdown because it is still "
TENZIR_TRACE("{} delays partition shutdown because it is still "
"writing to disk",
*self);
});
Expand All @@ -333,7 +333,7 @@ active_partition_actor::behavior_type active_partition(
caf::delayed_anon_send(caf::actor_cast<caf::actor>(self), 100ms, msg);
return;
}
TENZIR_DEBUG("{} shuts down after persisting partition state", *self);
TENZIR_TRACE("{} shuts down after persisting partition state", *self);
if (msg.reason) {
self->quit(
diagnostic::error(msg.reason).note("via exit handler").to_error());
Expand All @@ -356,7 +356,7 @@ active_partition_actor::behavior_type active_partition(
[self](atom::persist, const std::filesystem::path& part_path,
const std::filesystem::path& synopsis_path)
-> caf::result<partition_synopsis_ptr> {
TENZIR_DEBUG("{} got persist atom", *self);
TENZIR_TRACE("{} got persist atom", *self);
// Ensure that the response promise has not already been initialized.
TENZIR_ASSERT(!self->state.persistence_promise.source());
self->state.persist_path = part_path;
Expand Down
4 changes: 2 additions & 2 deletions libtenzir/src/evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void evaluator_state::handle_result(const offset& position, const ids& result) {
auto& [missing, accumulated_hits] = *ptr;
accumulated_hits |= result;
if (--missing == 0) {
TENZIR_DEBUG("{} collected all results at position {}", *self, position);
TENZIR_TRACE("{} collected all results at position {}", *self, position);
evaluate();
}
decrement_pending();
Expand All @@ -114,7 +114,7 @@ void evaluator_state::handle_missing_result(
auto ptr = hits_for(position);
TENZIR_ASSERT(ptr != nullptr);
if (--ptr->first == 0) {
TENZIR_DEBUG("{} collected all results at position {}", *self, position);
TENZIR_TRACE("{} collected all results at position {}", *self, position);
evaluate();
}
decrement_pending();
Expand Down
36 changes: 18 additions & 18 deletions libtenzir/src/index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ partition_actor partition_factory::operator()(const uuid& id) const {
"to load it regardless",
*state_.self, id);
const auto path = state_.partition_path(id);
TENZIR_DEBUG("{} loads partition {} for path {}", *state_.self, id, path);
TENZIR_TRACE("{} loads partition {} for path {}", *state_.self, id, path);
materializations_++;
return state_.self->spawn(passive_partition, id, filesystem_, path);
}
Expand Down Expand Up @@ -521,7 +521,7 @@ caf::error index_state::load_from_disk() {
auto partition_uuid = partitions[idx];
auto error = [&]() -> caf::error {
auto part_path = partition_path(partition_uuid);
TENZIR_DEBUG("{} unpacks partition {} ({}/{})", *self, partition_uuid,
TENZIR_TRACE("{} unpacks partition {} ({}/{})", *self, partition_uuid,
idx, partitions.size());
// Generate external partition synopsis file if it doesn't exist.
auto synopsis_path = partition_synopsis_path(partition_uuid);
Expand Down Expand Up @@ -680,7 +680,7 @@ void index_state::handle_slice(table_slice x) {
}
active_partition = *part;
} else if (x.rows() > active_partition->second.capacity) {
TENZIR_DEBUG("{} flushes active partition {} with {} rows and {}/{} events",
TENZIR_TRACE("{} flushes active partition {} with {} rows and {}/{} events",
*self, schema, x.rows(),
partition_capacity - active_partition->second.capacity,
partition_capacity);
Expand Down Expand Up @@ -732,7 +732,7 @@ index_state::create_active_partition(const type& schema) {
// If the partition was already rotated then there's nothing to do for us.
return;
}
TENZIR_DEBUG("{} flushes active partition {} with {}/{} {} events "
TENZIR_TRACE("{} flushes active partition {} with {}/{} {} events "
"after {} timeout",
*self, it->second.id, partition_capacity - it->second.capacity,
partition_capacity, schema, data{active_partition_timeout});
Expand All @@ -746,7 +746,7 @@ index_state::create_active_partition(const type& schema) {
});
flush_to_disk();
});
TENZIR_DEBUG("{} created new partition {}", *self, id);
TENZIR_TRACE("{} created new partition {}", *self, id);
return active_partition;
}

Expand Down Expand Up @@ -864,13 +864,13 @@ auto index_state::schedule_lookups() -> size_t {
// 1. Get the partition with the highest accumulated priority.
auto next = pending_queries.next();
if (!next) {
TENZIR_DEBUG("{} did not find a partition to query", *self);
TENZIR_TRACE("{} did not find a partition to query", *self);
break;
}
auto immediate_completion = [&](const query_queue::entry& x) {
for (auto qid : x.queries) {
if (auto client = pending_queries.handle_completion(qid)) {
TENZIR_DEBUG("{} completes query {} immediately", *self, qid);
TENZIR_TRACE("{} completes query {} immediately", *self, qid);
self->send(*client, atom::done_v);
}
}
Expand All @@ -886,7 +886,7 @@ auto index_state::schedule_lookups() -> size_t {
*self, next->partition);
continue;
}
TENZIR_DEBUG("{} schedules partition {} for {}", *self, next->partition,
TENZIR_TRACE("{} schedules partition {} for {}", *self, next->partition,
next->queries);
// 2. Acquire the actor for the selected partition, potentially materializing
// it from its persisted state.
Expand Down Expand Up @@ -957,7 +957,7 @@ auto index_state::schedule_lookups() -> size_t {
--running_partition_lookups;
active_lookups.erase(active_lookup);
const auto num_scheduled = schedule_lookups();
TENZIR_DEBUG("{} scheduled {} partitions after completion of a "
TENZIR_TRACE("{} scheduled {} partitions after completion of a "
"previously scheduled lookup",
*self, num_scheduled);
}
Expand All @@ -977,7 +977,7 @@ auto index_state::schedule_lookups() -> size_t {
context_it->second)
.then(
[this, handle_completion, qid, pid = next->partition](uint64_t n) {
TENZIR_DEBUG("{} received {} results for query {} from partition "
TENZIR_TRACE("{} received {} results for query {} from partition "
"{}",
*self, n, qid, pid);
handle_completion();
Expand Down Expand Up @@ -1140,7 +1140,7 @@ index(index_actor::stateful_pointer<index_state> self,
});
return {
[self](atom::done, uuid partition_id) {
TENZIR_DEBUG("{} queried partition {} successfully", *self, partition_id);
TENZIR_TRACE("{} queried partition {} successfully", *self, partition_id);
},
[self](table_slice& slice) {
self->state.handle_slice(std::move(slice));
Expand Down Expand Up @@ -1246,7 +1246,7 @@ index(index_actor::stateful_pointer<index_state> self,
lookup_result.candidate_infos) {
query_contexts[type] = query_context;
query_contexts[type].expr = lookup_result.exp;
TENZIR_DEBUG(
TENZIR_TRACE(
"{} got initial candidates {} for schema {} and from "
"catalog {}",
*self, candidates, type, lookup_result.partition_infos);
Expand All @@ -1262,7 +1262,7 @@ index(index_actor::stateful_pointer<index_state> self,
},
query_context.cmd);
if (lookup_result.empty()) {
TENZIR_DEBUG("{} returns without result: no partitions qualify",
TENZIR_TRACE("{} returns without result: no partitions qualify",
*self);
rp.deliver(query_cursor{query_id, 0u, 0u});
self->send(client, atom::done_v);
Expand All @@ -1283,7 +1283,7 @@ index(index_actor::stateful_pointer<index_state> self,
rp.deliver(err);
rp.deliver(query_cursor{query_id, num_candidates, scheduled});
const auto num_scheduled = self->state.schedule_lookups();
TENZIR_DEBUG("{} scheduled {} partitions for lookup after a new "
TENZIR_TRACE("{} scheduled {} partitions for lookup after a new "
"query came in",
*self, num_scheduled);
},
Expand All @@ -1305,7 +1305,7 @@ index(index_actor::stateful_pointer<index_state> self,
= self->state.pending_queries.activate(query_id, num_partitions))
TENZIR_WARN("{} can't activate unknown query: {}", *self, err);
const auto num_scheduled = self->state.schedule_lookups();
TENZIR_DEBUG("{} scheduled {} partitions following the request to "
TENZIR_TRACE("{} scheduled {} partitions following the request to "
"activate {} partitions for query {}",
*self, num_scheduled, num_partitions, query_id);
},
Expand Down Expand Up @@ -1348,7 +1348,7 @@ index(index_actor::stateful_pointer<index_state> self,
synopsis_path)
.then(
[self, partition_id](atom::done) {
TENZIR_DEBUG("{} erased partition synopsis {} from "
TENZIR_TRACE("{} erased partition synopsis {} from "
"filesystem",
*self, partition_id);
},
Expand All @@ -1365,7 +1365,7 @@ index(index_actor::stateful_pointer<index_state> self,
self->state.filesystem, caf::infinite, atom::erase_v, path)
.then(
[self, partition_id](atom::done) {
TENZIR_DEBUG("{} erased partition {} from filesystem",
TENZIR_TRACE("{} erased partition {} from filesystem",
*self, partition_id);
},
[self, partition_id, path](const caf::error& err) {
Expand Down Expand Up @@ -1532,7 +1532,7 @@ index(index_actor::stateful_pointer<index_state> self,
// We set the query priority for partition transforms to zero so they
// always get less priority than queries.
query_context.priority = 0;
TENZIR_DEBUG("{} emplaces {} for pipeline {:?}", *self, query_context,
TENZIR_TRACE("{} emplaces {} for pipeline {:?}", *self, query_context,
pipe);
auto query_contexts = query_state::type_query_context_map{};
for (const auto& [type, _] : corrected_partitions.candidate_infos) {
Expand Down
16 changes: 8 additions & 8 deletions libtenzir/src/passive_partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ indexer_actor passive_partition_state::indexer_at(size_t position) const {
indexer = self->spawn(passive_indexer, id, std::move(value_index));
return indexer;
}
TENZIR_DEBUG("passive-partition {} has no index or failed to index for field "
TENZIR_TRACE("passive-partition {} has no index or failed to index for field "
"{}",
id, qualified_index->field_name()->string_view());
return {};
Expand Down Expand Up @@ -323,7 +323,7 @@ partition_actor::behavior_type passive_partition(
TENZIR_TRACEPOINT(passive_partition_spawned, id_string.c_str());
self->set_down_handler([=](const caf::down_msg& msg) {
if (msg.source != self->state.store.address()) {
TENZIR_DEBUG("{} ignores DOWN from unexpected sender: {}", *self,
TENZIR_TRACE("{} ignores DOWN from unexpected sender: {}", *self,
msg.reason);
return;
}
Expand All @@ -332,7 +332,7 @@ partition_actor::behavior_type passive_partition(
self->quit(msg.reason);
});
self->set_exit_handler([=](const caf::exit_msg& msg) {
TENZIR_DEBUG("{} received EXIT from {} with reason: {}", *self, msg.source,
TENZIR_TRACE("{} received EXIT from {} with reason: {}", *self, msg.source,
msg.reason);
self->demonitor(self->state.store->address());
// Receiving an EXIT message does not need to coincide with the state
Expand All @@ -354,7 +354,7 @@ partition_actor::behavior_type passive_partition(
terminate<policy::parallel>(self, std::move(indexers))
.then(
[=](atom::done) {
TENZIR_DEBUG("{} shut down all indexers successfully", *self);
TENZIR_TRACE("{} shut down all indexers successfully", *self);
self->quit();
},
[=](const caf::error& err) {
Expand Down Expand Up @@ -471,11 +471,11 @@ partition_actor::behavior_type passive_partition(
// issues downstream because you need to very carefully handle
// this scenario, which is easy to overlook as a developer. We
// should fix this issue.
TENZIR_DEBUG("{} received evaluator results with wrong length: "
TENZIR_TRACE("{} received evaluator results with wrong length: "
"expected {}, got {}",
*self, self->state.events, hits.size());
}
TENZIR_DEBUG("{} received results from the evaluator", *self);
TENZIR_TRACE("{} received results from the evaluator", *self);
// TODO: Use the first path if the expression can be evaluated
// exactly.
query_context.ids = hits;
Expand All @@ -490,10 +490,10 @@ partition_actor::behavior_type passive_partition(
[self](atom::erase) -> caf::result<atom::done> {
auto rp = self->make_response_promise<atom::done>();
if (!self->state.partition_chunk) {
TENZIR_DEBUG("{} skips an erase request", *self);
TENZIR_TRACE("{} skips an erase request", *self);
return self->state.deferred_erasures.emplace_back(std::move(rp));
}
TENZIR_DEBUG("{} received an erase message and deletes {}", *self,
TENZIR_TRACE("{} received an erase message and deletes {}", *self,
self->state.path);
self
->request(self->state.filesystem, caf::infinite, atom::erase_v,
Expand Down
16 changes: 8 additions & 8 deletions libtenzir/src/pipeline_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void pipeline_executor_state::start_nodes_if_all_spawned() {
}

void pipeline_executor_state::spawn_execution_nodes(pipeline pipe) {
TENZIR_DEBUG("{} spawns execution nodes", *self);
TENZIR_TRACE("{} spawns execution nodes", *self);
auto input_type = operator_type::make<void>();
auto previous = exec_node_actor{};
bool spawn_remote = false;
Expand All @@ -86,7 +86,7 @@ void pipeline_executor_state::spawn_execution_nodes(pipeline pipe) {
}
auto description = fmt::format("{:?}", op);
if (spawn_remote) {
TENZIR_DEBUG("{} spawns {} remotely", *self, description);
TENZIR_TRACE("{} spawns {} remotely", *self, description);
if (not node) {
abort_start(caf::make_error(
ec::invalid_argument, "encountered remote operator, but remote node "
Expand Down Expand Up @@ -125,7 +125,7 @@ void pipeline_executor_state::spawn_execution_nodes(pipeline pipe) {
});
input_type = *output_type;
} else {
TENZIR_DEBUG("{} spawns {} locally", *self, description);
TENZIR_TRACE("{} spawns {} locally", *self, description);
auto spawn_result
= spawn_exec_node(self, std::move(op), input_type, node, diagnostics,
metrics, op_index, has_terminal, is_hidden, run_id);
Expand All @@ -135,7 +135,7 @@ void pipeline_executor_state::spawn_execution_nodes(pipeline pipe) {
.to_error());
return;
}
TENZIR_DEBUG("{} spawned {} locally", *self, description);
TENZIR_TRACE("{} spawned {} locally", *self, description);
std::tie(previous, input_type) = std::move(*spawn_result);
self->monitor(previous);
exec_nodes.push_back(previous);
Expand Down Expand Up @@ -170,12 +170,12 @@ void pipeline_executor_state::abort_start(caf::error reason) {
}

void pipeline_executor_state::finish_start() {
TENZIR_DEBUG("{} signals successful start", *self);
TENZIR_TRACE("{} signals successful start", *self);
start_rp.deliver();
}

auto pipeline_executor_state::start() -> caf::result<void> {
TENZIR_DEBUG("{} got start request", *self);
TENZIR_TRACE("{} got start request", *self);
if (not this->pipe) {
return caf::make_error(ec::logic_error,
"pipeline exeuctor can only start once");
Expand Down Expand Up @@ -207,7 +207,7 @@ auto pipeline_executor_state::start() -> caf::result<void> {
if (not node) {
for (const auto& op : pipe.operators()) {
if (op->location() == operator_location::remote) {
TENZIR_DEBUG("{} connects to node because of remote operators", *self);
TENZIR_TRACE("{} connects to node because of remote operators", *self);
connect_to_node(self, [this, pipe = std::move(pipe)](
caf::expected<node_actor> result) mutable {
if (not result) {
Expand Down Expand Up @@ -268,7 +268,7 @@ auto pipeline_executor(
pipeline pipe, receiver_actor<diagnostic> diagnostics,
metrics_receiver_actor metrics, node_actor node, bool has_terminal,
bool is_hidden) -> pipeline_executor_actor::behavior_type {
TENZIR_DEBUG("{} was created", *self);
TENZIR_TRACE("{} was created", *self);
self->state.self = self;
self->state.node = std::move(node);
self->state.pipe = std::move(pipe);
Expand Down
Loading

0 comments on commit 1601a45

Please sign in to comment.