-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix: client kill preempts in atomic section on shutdown #5283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: kostas <[email protected]>
src/server/server_family.cc
Outdated
facade::Connection::WeakRef ref = std::move(kill_list.front()); | ||
kill_list.pop_front(); | ||
facade::Connection* conn = ref.Get(); | ||
// TODO think how to handle migration for eval. See RequestAsyncMigration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to think what we can do here
src/server/server_family.cc
Outdated
@@ -535,14 +535,39 @@ void ClientKill(CmdArgList args, absl::Span<facade::Listener*> listeners, SinkRe | |||
|
|||
const bool is_admin_request = cntx->conn()->IsPrivileged(); | |||
|
|||
std::vector<util::fb2::Fiber> fibers(pp->size() * listeners.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we could also have a single fiber per shard which sleeps and wake up when we push work to its queue. IMO I have no preference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand the reason for all the complexity here and below.
I would imagine to have an array per thread and you just add all the matching connections to the array in their owning thread.
Then the next step is to run pool->AwaitFiberOnAll
that accesses an array in its own thread and sequentially shuts down the connections.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I agree it was an overkill
There is a small issue -- plz see my other comment
src/server/server_family.cc
Outdated
@@ -499,7 +499,7 @@ void ClientId(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cnt | |||
} | |||
|
|||
void ClientKill(CmdArgList args, absl::Span<facade::Listener*> listeners, SinkReplyBuilder* builder, | |||
ConnectionContext* cntx) { | |||
ConnectionContext* cntx, util::ProactorPool* pp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you do not need to pass it - it's accessible via shard_set->pool()
facade::Connection* conn = tcon.Get(); | ||
if (conn && conn->socket()->proactor()->GetPoolIndex() == p->GetPoolIndex()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem here is that we might migrate a connection between preemptions of Shutdown. Calling tcon.Get()
on a connection that was migrated can become a data race.
Maybe the solution is to simply disable the migrations for each connection we are about to kill ? WDYT ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And now I am thinking about it, when we call Borrow()
we should disable migrations. If a connection migrates after borrow, we cached socket_->proactor()->GetPoolIndex()
within ConnectionWeakRef
which is no longer valid. The DCHECK() within Get()
won't protect us either, it uses the cached yet no longer valid proactor index
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we run with migrate_connections=false
in production so it's not an issue.
i think your checks are good enough for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
listener->TraverseConnections(cb); | ||
} | ||
auto cb = [&](unsigned idx, ProactorBase* p) mutable { | ||
// Step 1 aggregate the per thread connections from all listeners |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you do both steps locally in-thread, there is not reason to declare thread_connections
as a global object outside. Instead just define vector<facade::Connection::WeakRef>
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I thought about that. My issue was that we need to pass this thread local to traverse_cb
. This implies either defining the traverse_cb within this lambda which makes it difficult to read or to bind
it via [&local] { travere_cb(local, ...); }
I am fine with any of these -- I will do whatever you find more readable :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am fine with either option (slight preference to first).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good I will follow up
The problem is that
TlsSocket::Shutdown
is preemptive because it flushes its buffer on the socket. However this violated the FiberAtomicGuard when traversing the connection list on each shard. To fix this, we move the shutdown call to another fiber.