Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor client tracking, fix atomicity, squashing and multi/exec #2970

Merged
merged 20 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 4 additions & 11 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,6 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
cc_->conn_closing = true; // Signal dispatch to close.
evc_.notify();
phase_ = SHUTTING_DOWN;

VLOG(2) << "Before dispatch_fb.join()";
dispatch_fb_.JoinIfNeeded();
VLOG(2) << "After dispatch_fb.join()";
Expand Down Expand Up @@ -1119,6 +1118,10 @@ void Connection::HandleMigrateRequest() {
}
}

// This triggers when a pub/sub connection both publish and subscribe to the
// same channel. See #3035 on github for details.
// DCHECK(dispatch_q_.empty());
kostasrim marked this conversation as resolved.
Show resolved Hide resolved

// In case we Yield()ed in Migrate() above, dispatch_fb_ might have been started.
LaunchDispatchFiberIfNeeded();
}
Expand Down Expand Up @@ -1648,16 +1651,6 @@ void Connection::RequestAsyncMigration(util::fb2::ProactorBase* dest) {
migration_request_ = dest;
}

void Connection::SetClientTrackingSwitch(bool is_on) {
tracking_enabled_ = is_on;
if (tracking_enabled_)
cc_->subscriptions++;
}

bool Connection::IsTrackingOn() const {
return tracking_enabled_;
}

void Connection::StartTrafficLogging(string_view path) {
OpenTrafficLogger(path);
}
Expand Down
6 changes: 0 additions & 6 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,6 @@ class Connection : public util::Connection {
// Connections will migrate at most once, and only when the flag --migrate_connections is true.
void RequestAsyncMigration(util::fb2::ProactorBase* dest);

void SetClientTrackingSwitch(bool is_on);

bool IsTrackingOn() const;

// Starts traffic logging in the calling thread. Must be a proactor thread.
// Each thread creates its own log file combining requests from all the connections in
// that thread. A noop if the thread is already logging.
Expand Down Expand Up @@ -450,8 +446,6 @@ class Connection : public util::Connection {
// Per-thread queue backpressure structs.
static thread_local QueueBackpressure tl_queue_backpressure_;

// a flag indicating whether the client has turned on client tracking.
bool tracking_enabled_ = false;
bool skip_next_squashing_ = false; // Forcefully skip next squashing

// Connection migration vars, see RequestAsyncMigration() above.
Expand Down
3 changes: 1 addition & 2 deletions src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ add_library(dfly_transaction db_slice.cc malloc_stats.cc blocking_controller.cc
common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc
server_state.cc table.cc top_keys.cc transaction.cc tx_base.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc
${TX_LINUX_SRCS} acl/acl_log.cc slowlog.cc
)
${TX_LINUX_SRCS} acl/acl_log.cc slowlog.cc)

SET(SEARCH_FILES search/search_family.cc search/doc_index.cc search/doc_accessors.cc
search/aggregator.cc)
Expand Down
8 changes: 8 additions & 0 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,12 @@ void ConnectionState::ExecInfo::ClearWatched() {
watched_existed = 0;
}

bool ConnectionState::ClientTracking::ShouldTrackKeys() const {
if (!IsTrackingOn()) {
return false;
}

return !optin_ || (seq_num_ == (1 + caching_seq_num_));
}
romange marked this conversation as resolved.
Show resolved Hide resolved

} // namespace dfly
83 changes: 83 additions & 0 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,87 @@ struct ConnectionState {

size_t UsedMemory() const;

// Client tracking is a per-connection state machine that adheres to the requirements
// of the CLIENT TRACKING command. Note that the semantics described below are enforced
// by the tests in server_family_test. The rules are:
// 1. If CLIENT TRACKING is ON then each READ command must be tracked. Invalidation
// messages are sent `only once`. Subsequent changes of the same key require the
// client to re-read the key in order to receive the next invalidation message.
// 2. CLIENT TRACKING ON OPTIN turns on optional tracking. Read commands are not
// tracked unless the client issues a CLIENT CACHING YES command which conditionally
// allows the tracking of the command that follows CACHING YES). For example:
// >> CLIENT TRACKING ON
// >> CLIENT CACHING YES
// >> GET foo <--------------------- From now foo is being tracked
// However:
// >> CLIENT TRACKING ON
// >> CLIENT CACHING YES
// >> SET foo bar
// >> GET foo <--------------------- is *NOT* tracked since GET does not succeed CACHING
// Also, in the context of multi transactions, CLIENT CACHING YES is *STICKY*:
// >> CLIENT TRACKING ON
// >> CLIENT CACHING YES
// >> MULTI
// >> GET foo
// >> SET foo bar
// >> GET brother_foo
// >> EXEC
// From this point onwards `foo` and `get` keys are tracked. Same aplies if CACHING YES
// is used within the MULTI/EXEC block.
//
// The state machine implements the above rules. We need to track:
// 1. If TRACKING is ON and OPTIN
// 2. Stickiness of CACHING as described above
//
// We introduce a monotonic counter called sequence number which we increment only:
// * On InvokeCmd when we are not Collecting (multi)
// We introduce another counter called caching_seq_num which is set to seq_num
// when the users sends a CLIENT CACHING YES command
// If seq_num == caching_seq_num + 1 then we know that we should Track().
class ClientTracking {
public:
// Sets to true when CLIENT TRACKING is ON
void SetClientTracking(bool is_on) {
tracking_enabled_ = is_on;
}

// Increment current sequence number
void IncrementSequenceNumber() {
++seq_num_;
}

// Set if OPTIN subcommand is used in CLIENT TRACKING
romange marked this conversation as resolved.
Show resolved Hide resolved
void SetOptin(bool optin) {
optin_ = optin;
}

// Check if the keys should be tracked. Result adheres to the state machine described above.
bool ShouldTrackKeys() const;

// Check only if CLIENT TRACKING is ON
bool IsTrackingOn() const {
return tracking_enabled_;
}

// Called by CLIENT CACHING YES and caches the current seq_num_
void SetCachingSequenceNumber(bool is_multi) {
// We need -1 when we are in multi
caching_seq_num_ = is_multi && seq_num_ != 0 ? seq_num_ - 1 : seq_num_;
}

void ResetCachingSequenceNumber() {
caching_seq_num_ = 0;
}

private:
// a flag indicating whether the client has turned on client tracking.
bool tracking_enabled_ = false;
bool optin_ = false;
// sequence number
size_t seq_num_ = 0;
size_t caching_seq_num_ = 0;
};

public:
DbIndex db_index = 0;

Expand All @@ -161,6 +242,7 @@ struct ConnectionState {
std::optional<SquashingInfo> squashing_info;
std::unique_ptr<ScriptInfo> script_info;
std::unique_ptr<SubscribeInfo> subscribe_info;
ClientTracking tracking_info_;
};

class ConnectionContext : public facade::ConnectionContext {
Expand All @@ -183,6 +265,7 @@ class ConnectionContext : public facade::ConnectionContext {
// TODO: to introduce proper accessors.
Transaction* transaction = nullptr;
const CommandId* cid = nullptr;

ConnectionState conn_state;

DbIndex db_index() const {
Expand Down
39 changes: 22 additions & 17 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1423,24 +1423,29 @@ void DbSlice::SendInvalidationTrackingMessage(std::string_view key) {
return;

auto it = client_tracking_map_.find(key);
if (it != client_tracking_map_.end()) {
// notify all the clients.
auto& client_set = it->second;
auto cb = [key, client_set = std::move(client_set)](unsigned idx, util::ProactorBase*) {
for (auto it = client_set.begin(); it != client_set.end(); ++it) {
if ((unsigned int)it->Thread() != idx)
continue;
facade::Connection* conn = it->Get();
if ((conn != nullptr) && conn->IsTrackingOn()) {
std::string key_str = {key.begin(), key.end()};
conn->SendInvalidationMessageAsync({key_str});
}
}
};
shard_set->pool()->DispatchBrief(std::move(cb));
// remove this key from the tracking table as the key no longer exists
client_tracking_map_.erase(key);
if (it == client_tracking_map_.end()) {
return;
}
auto& client_set = it->second;
romange marked this conversation as resolved.
Show resolved Hide resolved
// Notify all the clients. We copy key because we dispatch briefly below and
// we need to preserve its lifetime
// TODO this key is further copied within DispatchFiber. Fix this.
auto cb = [key = std::string(key), client_set = std::move(client_set)](unsigned idx,
romange marked this conversation as resolved.
Show resolved Hide resolved
util::ProactorBase*) {
for (auto& client : client_set) {
if (client.IsExpired() || (client.Thread() != idx)) {
continue;
}
auto* conn = client.Get();
auto* cntx = static_cast<ConnectionContext*>(conn->cntx());
if (cntx && cntx->conn_state.tracking_info_.IsTrackingOn()) {
conn->SendInvalidationMessageAsync({key});
}
}
};
shard_set->pool()->DispatchBrief(std::move(cb));
// remove this key from the tracking table as the key no longer exists
client_tracking_map_.erase(key);
}

void DbSlice::PerformDeletion(PrimeIterator del_it, DbTable* table) {
Expand Down