-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storage_proxy: update view update backlog on correct shard when writing #18646
base: master
Are you sure you want to change the base?
Conversation
I created this as a draft for now, as I still need to run a performance test for it (this affects the common write path). I'm also not entirely content with how verbose the fix became, so if you have an idea on some approach other than changing all the |
15c5afc
to
13b3f41
Compare
I added the missing perf_simple_query results so I'm changing this to a PR. |
🔴 CI State: FAILURE❌ - Build Build Failure:
Build Details:
|
13b3f41
to
cc83a47
Compare
To avoid an additional allocation, I moved the view update backlog getter (which updates the backlog) from an extra |
🔴 CI State: FAILURE❌ - Build Build Failure:
Build Details:
|
cc83a47
to
4df3039
Compare
replica/database.cc
Outdated
@@ -2008,6 +2010,7 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra | |||
co_await coroutine::return_exception_ptr(std::move(ex)); | |||
} | |||
lock = lock_f.get(); | |||
backlog = _view_update_generator->get_proxy().get_view_update_backlog(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's horrible layering violation :( Can view_update_generator::push_view_replica_updates() call _proxy.get_view_update_backlog() on its own and return it back to database (and then up the chain) at least?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would ask for the almost opposite thing, which I think will remove 99% of this patch:
Instead of having all this code return the "view update backlog" and changing many functions to pass this backlog up the chain, can't we have have the code that needs it call _view_update_generator->get_proxy().get_view_update_backlog() to get it?
Doing whatever you want to do with the backlog can be the last thing you do on the right shard, anyway it accesses a global (shard-local) variable. Why do we need to propagate it all through the call stack?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we have have the code that needs it call _view_update_generator->get_proxy().get_view_update_backlog() to get it?
This is what the code does today and this patch fixes it like this
@@ -4070,10 +4075,10 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
// lambda for applying mutation locally
auto lmutate = [handler_ptr, response_id, this, my_address, timeout] () mutable {
return handler_ptr->apply_locally(timeout, handler_ptr->get_trace_state())
- .then([response_id, this, my_address, h = std::move(handler_ptr), p = shared_from_this()] {
+ .then([response_id, this, my_address, h = std::move(handler_ptr), p = shared_from_this()] (std::optional<db::view::update_backlog> b) {
// make mutation alive until it is processed locally, otherwise it
// may disappear if write timeouts before this future is ready
- got_response(response_id, my_address, get_view_update_backlog());
+ got_response(response_id, my_address, b);
});
};
so your proposal is to leave things as is :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... so your proposal is to leave things as is :)
And I second that suggestion, by the way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's horrible layering violation :( Can view_update_generator::push_view_replica_updates() call _proxy.get_view_update_backlog() on its own and return it back to database (and then up the chain) at least?
push_view_replica_updates()
is actually a method of table
so I suppose we don't want to refer to the storage_proxy there either, so I moved it to view_update_generator::generate_and_propagate_view_updates
I would ask for the almost opposite thing, which I think will remove 99% of this patch:
Instead of having all this code return the "view update backlog" and changing many functions to pass this backlog up the chain, can't we have have the code that needs it call _view_update_generator->get_proxy().get_view_update_backlog() to get it?
Instead of this entire chain we could have the code that needs it call _db.invoke_on(..., get_view_update_backlog())
but I expect we don't want to add it to the common write path due to performance considerations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since storage_proxy::get_view_update_backlog() is using atomics to fetch-add its numbers, it can be safely used from any shard. So instead of propagating the result of this function all over the code, we (probably) can teach proxy to call its get_view_update_backlog() with int shard argument, this won't cost us extra submit-to-s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
storage_proxy::get_view_update_backlog()
is using atomics to store and read the cached backlogs, but the value to be stored in the atomics comes from database::get_view_update_backlog
which reads the semaphore of the shard it's called on, so to use that we would need the invoke_on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since storage_proxy::get_view_update_backlog() is using atomics to fetch-add its numbers, it can be safely used from any shard. So instead of propagating the result of this function all over the code, we (probably) can teach proxy to call its get_view_update_backlog() with int shard argument, this won't cost us extra submit-to-s
@xemul Each shard's atomic is currently written only by its shard and occasionally (every 10ms) read by other shards. Consider a non-shard-aware workload - won't this have a significant performance impact if multiple shards will be allowed to write to the same atomic on each write operation?
@@ -2077,21 +2081,21 @@ void database::update_write_metrics_for_timed_out_write() { | |||
++_stats->total_writes_timedout; | |||
} | |||
|
|||
future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) { | |||
future<std::optional<db::view::update_backlog>> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 99% of this patch is changing the return future of database::apply() (and its callers) to return std::optionaldb::view::update_backlog> and only 1% is the fix itself (in storage_proxy::send_to_live_endpoints() below). It would be great if this patch is split into two -- first with changing the return type, second with the essence of the fix itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I split the patch in the rebase
🟢 CI State: SUCCESS✅ - Build Build Details:
|
I'm still trying to wrap my head about this PR (and the issue it tries to fix), without success. You used the phrase "coordinator shard" multiple times. But a coordinator doesn't have a "view update backlog" - only a replica does. A replica shard has its view update backlog - the number of view updates that it sent and haven't completed yet. And as you noted an entire node as a replica node has "maximum view update backlog" which is the maximum backlog of all its shards. That's it - a coordinator doesn't maintain a view update backlog for itself. It only collects estimates of view update backlogs for replica nodes (not shards!) so it can calculate its delay. Unless I'm completely confused, there isn't any sense in which "coordinator shards" need to update their backlogs, which you say you are fixing. |
replica/database.cc
Outdated
@@ -2008,6 +2010,7 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra | |||
co_await coroutine::return_exception_ptr(std::move(ex)); | |||
} | |||
lock = lock_f.get(); | |||
backlog = _view_update_generator->get_proxy().get_view_update_backlog(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would ask for the almost opposite thing, which I think will remove 99% of this patch:
Instead of having all this code return the "view update backlog" and changing many functions to pass this backlog up the chain, can't we have have the code that needs it call _view_update_generator->get_proxy().get_view_update_backlog() to get it?
Doing whatever you want to do with the backlog can be the last thing you do on the right shard, anyway it accesses a global (shard-local) variable. Why do we need to propagate it all through the call stack?
service/paxos/paxos_state.cc
Outdated
return utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout, [&sys_ks, &decision, schema, timeout, backlog] { | ||
return sys_ks.save_paxos_decision(*schema, decision, timeout).then([backlog] { | ||
return make_ready_future<std::optional<db::view::update_backlog>>(std::move(backlog)); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like here you duplicated code which already existed below, but why, and how is it related to this patch? If this is an incidental fix of a bug (?) please put it in a separate patch in the same PR and explain it separately in a commit message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I duplicated the code here because, as a result of this change, the f = f.then(...)
now changes the type of f
. I used another approach in the rebase
@@ -119,7 +123,7 @@ public: | |||
static future<bool> accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal, | |||
clock_type::time_point timeout); | |||
// Replica RPC endpoint for Paxos "learn". | |||
static future<> learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state); | |||
static future<std::optional<db::view::update_backlog>> learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope we can find a way to fix whatever this PR is fixing without adding the concept of "view update backlog" to a gazillion other things like replica::update() and paxos_state. MV is complex enough without intertwining it also with LWT if we don't have to :-(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as we can have MVs on tables on which we can perform LWT queries I'm not sure we can avoid this. We still want to have MV flow control and (future) admission control even if we're only doing LWTs
service/storage_proxy.cc
Outdated
}); | ||
_metrics.add_group(storage_proxy_stats::REPLICA_STATS_CATEGORY, { | ||
sm::make_current_bytes("view_update_backlog", [this] { return _max_view_update_backlog.fetch_shard(this_shard_id()).current; }, | ||
sm::description("View update backlog cached size")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you remove the current_throttled_writes statistics? Did you remove it by mistake?
And what's the difference between the two metrics you added here? Maybe part of the same mistake?
Also, please expand on the description "View update backlog cached size". I have no idea what this means, and neither will users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replaced the current_throttled_writes
by an extra repetition of the new metric by mistake, sorry.
I added this metric mostly to enable testing this change, but I improved the description slightly in the rebase anyway
await cql.run_async(f"INSERT INTO ks.tab (key, c, v) VALUES ({0}, {i}, '{v*'a'}')") | ||
# The view update backlog should increase on the node generating view updates | ||
view_backlog = get_metric(local_node.address, 'scylla_storage_proxy_replica_view_update_backlog') | ||
assert view_backlog > v |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please note that delay_before_remote_view_update only delays by 500ms, not by infinity. I guess it will be enough to notice the backlog in 99.9% of the time, but I hope we won't have a crazy-overloaded-super-slow test machine where the 500ms delay passes before we manage to call get_metric() :-(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder I we can check somehow how feasible is this scenario, though won't many tests fail on such a machine anyway due to timeouts?
# This test reproduces issue #18542 | ||
# In the test, we create a table and perform a write to it a couple of times | ||
# Each time, we check that a view update backlog on some shard increased | ||
# due to the write. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid I don't fully understand this test. First of all, the name of the test is "*_on_correct_shard" but the test doesn't seem to check any specific to shards, correct or not - just the node's total. The test's comment makes more sense than the test's name (it mentions "some shard", not correct shard).
Second, can you say a few words (here in the discussion or in the commit message) on how this test failed before this PR? What actually happened? Was the view update backlog seen by this test zero? Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I doubt I can explain this better as a response to a comment than what I wrote in the issue but hopefully by answering questions I can provide the missing insight
Second, can you say a few words (here in the discussion or in the commit message) on how this test failed before this PR? What actually happened? Was the view update backlog seen by this test zero? Why?
Right, the backlog seen by the test was zero. This happened because of the following:
0) all nodes started with empty backlogs
- first node got the write request and became the coordinator of the request
- the coordinator sent a replica write request to the second node
- the second node received the replica write request on a shard different than the shard that contained the data that needed to be modified. (When I was referring to the
coordinator shard
in the description I meant that shard which received the write request on the right replica.) - the second node (on coordinator shard) invoked the correct shard to apply the write
- the correct shard applied the write, generating view updates and consuming units from the
view_update_concurrency_semaphore
(even though the view update backlog is based on this semaphore, this is not increasing the view update backlog propagated in responses and gossip.) - the coordinator shard on the second node finished the invocation on the shard with updated data and then called
storage_proxy::get_view_update_backlog
using its result for the write response. - the coordinator node received the response and updates its saved max backlog for the second node
The problem is that on the shard whose backlog in fact increased, we never updated the backlog used for the responses and gossip.
There can be a few sources of confusion here:
- for each shard on each node there are actually 2 view update backlogs:
Lines 1847 to 1849 in 2d91422
db::view::update_backlog get_view_update_backlog() const { return {max_memory_pending_view_updates() - _view_update_concurrency_sem.current(), max_memory_pending_view_updates()}; } std::vector<per_shard_backlog> _backlogs;
The i. backlog is always up-to-date (it's the source of truth). The ii. backlog needs to be updated, but it's the backlog which we're using to select a max backlog to use in a response or in gossip.
- the ii. backlog is updated using the backlog getter:
scylladb/service/storage_proxy.cc
Lines 2449 to 2451 in 2d91422
db::view::update_backlog storage_proxy::get_view_update_backlog() const { return _max_view_update_backlog.add_fetch(this_shard_id(), get_db().local().get_view_update_backlog()); }
Lines 2612 to 2613 in 2d91422
update_backlog node_update_backlog::add_fetch(unsigned shard, update_backlog backlog) { _backlogs[shard].backlog.store(backlog, std::memory_order_relaxed);
As a result, if we never "get" the view update backlog on the shard whose i. backlog increased, the ii. backlog for that shard will never increase, and other nodes will never get the information about a potential overload.
4df3039
to
2e03afa
Compare
I tried to mention what I meant by "coordinator shard" in #18646 (comment) actually not a shard of the coordinator node but the shard which handles the write on the actual replica (though I heard it's usually the same shard on which the coordinator node received the request))
I mostly repeated the issue description there, but maybe #18646 (comment) helps a bit with the confusion By the way I'm also not a big fan of how this was implemented but I'm not really seeing a better way of achieving the effect of getting the result of a The "cleanest" method would be adding |
2e03afa
to
ad3574f
Compare
🔴 CI State: FAILURE❌ - Build Build Failure:
Build Details:
|
48cb974
to
a471fc9
Compare
db/view/view.cc
Outdated
@@ -1845,6 +1845,10 @@ future<> view_update_generator::mutate_MV( | |||
}); | |||
} | |||
|
|||
update_backlog view_update_generator::get_view_update_backlog() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's db/view/view_update_generator.cc for view_update_generator methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw it but it looks like we didn't move all of its methods there (view_update_generator::mutate_MV
is still here). I assumed we didn't want to include storage_proxy.hh
in view_update_generator.cc
and that's why mutate_MV
was left, but after the last rebase I see storage_proxy.hh
is already in view_update_generator.cc
anyway, so I don't need to put it in db/view/view.cc
after all
🔴 CI State: FAILURE✅ - Build Failed Tests (2/650):Build Details:
|
a471fc9
to
a9bd6d3
Compare
It might have been something specific to the CI (the test failed on the last, 100th repetition), but it might have just caused by #18646 (comment) so I added a longer delay for the test. I updated the PR description with a lot more context. @xemul @nyh please take a look if it makes the issue more clear |
🟢 CI State: SUCCESS✅ - Build Build Details:
|
@wmitros the links to the code you added in your updated description will not be rendered as code snippets in the merge commit. Please replace them with actual snippets of code. |
To get the view update backlog in a response, we need to update it on the shard which contained the altered data. The write is redirected to the correct shard in storage_proxy::mutate_locally, where database::apply is invoked on the correct shard. However, we don't want to also invoke the storage_proxy::get_view_update_backlog on the correct shard there, because that would cost us an additional allocation (a .then() following the db.apply()) on the common write path. More precisely, as discovered using perf-simple-query, for a single operation, we get from 59 allocs, 15 tasks and ~52450 instructions to 60 allocs, 16 tasks and ~52950 instructions. Instead, we can add the backlog update inside the database::apply call. However, the call shouldn't be directly a part of apply() method, as we don't want to refer to the storage_proxy there. It also shouldn't be a part of the table::push_view_replica_updates() method for the same reason. As a result, it needs to be returned from view_update_generator::generate_and_propagate_view_updates(), and passed as a return value through the call stack. This patch prepares all methods in the stack by changing their return value types. For now, the returned values are empty - the actual value to return will be added in a following patch.
a9bd6d3
to
ca72c20
Compare
When performing a write, we should update the view update backlog on the node and shard where the mutation is actually applied, and potentially also on the coordinator of this write, with the updated value from the replica. Currently, instead of updating the backlog on the shard that applies the mutation, we do it on the coordinator shard of this write, and as a result, the backlog on the correct shard is not updated. This patch enables updating the backlog on the correct shard. To achieve that, the update is performed just after the views are pushed, in view_update_generator::generate_and_propagate_view_updates on the correct shard, and later propagated as a return value to be finally used for updating the max view backlog of the node.
This patch adds a test for reproducing issue scylladb#18542 The test performs writes on a table with a materialized view and checks that the view backlog increases. To get the current view update backlog, a new metric "view_update_backlog" is added to the `storage_proxy` metrics. The metric differs from the metric from `database` metric with the same name by taking the backlog from the max_view_update_backlog which keeps view update backlogs from all shards which may be a bit outdated, instead of taking the backlog by checking the view_update_semaphore which the backlog is based on directly.
ca72c20
to
9ea788c
Compare
I've updated the description and fixed the merge conflicts. After the rebase, perf-simple-query shows the same (minor) difference |
When a replica applies a write on a table which has a materialized view
it generates view updates. These updates take memory which is tracked
by
database::_view_update_concurrency_sem
, separate on each shard.The fraction of units taken from the semaphore to the semaphore limit
is the shard's
view update backlog
. Based on these backlogs, we wantto estimate how busy a node is with its view updates work. We do that
by taking the max backlog across all shards.
To avoid excessive cross-shard operations, the node's (max) backlog isn't
calculated each time we need it, but up to 1 time per 10ms (the
_interval
) with an optimization where the backlog of the calculating shard is immediately up-to-date (we don't need cross-shard operations for it):For the same reason, even when we do calculate the new node's backlog,
we don't read from the
_view_update_concurrency_sem
. Instead, foreach shard we also store a
update_backlog
atomic which we use forcalculation:
Due to this distinction, the
update_backlog
atomic need to be updatedseparately, when the
_view_update_concurrency_sem
changes.This is done by calling
storage_proxy::get_view_update_backlog
, which reads the_view_update_concurrency_sem
of the shard (in database::get_view_update_backlog)and then calls
node_update_backlog::add_fetch
where the read backlogis stored in the atomic:
For this implementation of calculating the node's view update backlog to work,
we need the atomics to be updated correctly when the semaphores of corresponding
shards change.
The main event where the view update backlog changes is an incoming write
request. That's why when handling the request and preparing a response
we update the backlog calling
storage_proxy::get_view_update_backlog
(alsobecause we want to read the backlog and send it in the response):
backlog update after local view updates (
storage_proxy::send_to_live_endpoints
inmutate_begin
)backlog update after remote view updates (
storage_proxy::remote::handle_write
)Now assume that on a certain node we have a write request received on shard A,
which updates a row on shard B (A!=B). As a result, shard B will generate view
updates and consume units from its
_view_update_concurrency_sem
, but willnot update its atomic in
_backlogs
yet. Because both shards in the exampleare on the same node, shard A will perform a local write calling
lmutate
shownabove. In the
lmutate
call, theapply_locally
will initiate the actual write onshard B and the
storage_proxy::get_view_update_backlog
will be called backon shard A. In no place will the backlog atomic on shard B get updated even
though it increased in size due to the view updates generated there.
Currently, what we calculate there doesn't really matter - it's only used for the
MV flow control delays, so currently, in this scenario, we may only overload
a replica causing failed replica writes which will be later retried as hints. However,
when we add MV admission control, the calculated backlog will be the difference
between an accepted and a rejected request.
The fix for this correctness issue we need to update the view update backlog
(call
storage_proxy::get_view_update_backlog
) on the shard whose semaphoreunits have been consumed. We could still read the backlog on the shard that
received the initial request (also using
storage_proxy::get_view_update_backlog
),however, if we read the backlog on a different shard than the one which generated
the view updates, it will most likely be outdated (by 10ms), so we'll likely won't see
the backlog change caused by the write request. If we want to see that, we need
to get the return updated value from the shard that generated the view updates
and return it on the shard handling the request.
This patch ensures that when semaphore units are consumed on some shard, it's
atomic backlog is updated and that when a backlogs increases as a result of some
request, the increased backlog is returned in the response of this request. This
is achieved by calling
storage_proxy::get_view_update_backlog
on the shardwhere the view update semaphore units are consumed and then returning the
received value through the call stack until the response is sent. The exact place
where the
get_view_update_backlog
call is added was chosen due to performanceconsiderations: we don't want to create a new task on the common write path
just for this, so it's been embedded deeply into
view_update_generator::generate_and_propagate_view_updates
, so that it's onlycalled when we actually generate view updates and it's performed as a part of a
larger coroutine, not a new continuation.
Perf simple query in write mode gives the following results:
Before:
median 506446.36 tps ( 59.3 allocs/op, 16.0 logallocs/op, 15.0 tasks/op, 52680 insns/op, 0 errors)
median absolute deviation: 5352.38
maximum: 521346.01
minimum: 500055.56
After:
median 523666.95 tps ( 59.3 allocs/op, 16.0 logallocs/op, 15.0 tasks/op, 52457 insns/op, 0 errors)
median absolute deviation: 7856.04
maximum: 547143.16
minimum: 451149.62
Fixes: #18542
Without admission control (#18334), this patch doesn't affect much, so I'm marking it as backport/none