Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added SELECT redis command #482

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Added SELECT redis command
tysion committed Jan 24, 2024
commit afa8f8d69369fecd2bd2e013620b98a9627eda87
7 changes: 5 additions & 2 deletions redis/include/userver/storages/redis/impl/base.hpp
Original file line number Diff line number Diff line change
@@ -27,16 +27,19 @@ struct ConnectionInfo {
bool read_only = false;
ConnectionSecurity connection_security = ConnectionSecurity::kNone;
using HostVector = std::vector<std::string>;
std::optional<size_t> database_index{};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New connections always use database index 0 so it is not really an optional:

Suggested change
std::optional<size_t> database_index{};
std::size_t database_index = 0;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


ConnectionInfo() = default;
ConnectionInfo(std::string host, int port, Password password,
bool read_only = false,
ConnectionSecurity security = ConnectionSecurity::kNone)
ConnectionSecurity security = ConnectionSecurity::kNone,
std::optional<size_t> database_index = {})
: host{std::move(host)},
port{port},
password{std::move(password)},
read_only{read_only},
connection_security(security) {}
connection_security(security),
database_index{database_index} {}
};

struct Stat {
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ struct RedisSettings {

std::vector<std::string> shards;
std::vector<HostPort> sentinels;
std::optional<size_t> database_index;
redis::Password password{std::string()};
redis::ConnectionSecurity secure_connection{redis::ConnectionSecurity::kNone};
};
80 changes: 64 additions & 16 deletions redis/src/storages/redis/impl/redis.cpp
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
#include <storages/redis/impl/tcp_socket.hpp>
#include <userver/storages/redis/impl/reply.hpp>
#include <userver/storages/redis/impl/retry_budget.hpp>
#include <userver/utils/scope_guard.hpp>

#include "command_control_impl.hpp"

@@ -140,7 +141,7 @@ class Redis::RedisImpl : public std::enable_shared_from_this<Redis::RedisImpl> {
~RedisImpl();

void Connect(const ConnectionInfo::HostVector& host_addrs, int port,
const Password& password);
const Password& password, std::optional<size_t> database_index);
void Disconnect();

bool AsyncCommand(const CommandPtr& command);
@@ -227,6 +228,7 @@ class Redis::RedisImpl : public std::enable_shared_from_this<Redis::RedisImpl> {
void ProcessCommand(const CommandPtr& command);

void Authenticate();
void SelectDatabase();
void SendReadOnly();
void FreeCommands();

@@ -245,7 +247,8 @@ class Redis::RedisImpl : public std::enable_shared_from_this<Redis::RedisImpl> {
static bool WatchCommandTimerEnabled(
const CommandsBufferingSettings& commands_buffering_settings);

bool Connect(const std::string& host, int port, const Password& password);
bool Connect(const std::string& host, int port, const Password& password,
std::optional<size_t> database_index);

Redis* redis_obj_;
engine::ev::ThreadControl ev_thread_control_;
@@ -266,6 +269,7 @@ class Redis::RedisImpl : public std::enable_shared_from_this<Redis::RedisImpl> {
uint16_t port_ = 0;
std::string server_;
Password password_{std::string()};
std::optional<size_t> database_index_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not optional and 0 by default

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

std::atomic<size_t> commands_size_ = 0;
size_t sent_count_ = 0;
size_t cmd_counter_ = 0;
@@ -341,8 +345,9 @@ Redis::~Redis() {
}

void Redis::Connect(const ConnectionInfo::HostVector& host_addrs, int port,
const Password& password) {
impl_->Connect(host_addrs, port, password);
const Password& password,
std::optional<size_t> database_index) {
impl_->Connect(host_addrs, port, password, database_index);
}

bool Redis::AsyncCommand(const CommandPtr& command) {
@@ -450,17 +455,19 @@ void Redis::RedisImpl::Detach() {
}

void Redis::RedisImpl::Connect(const ConnectionInfo::HostVector& host_addrs,
int port, const Password& password) {
int port, const Password& password,
std::optional<size_t> database_index) {
for (const auto& host : host_addrs)
if (Connect(host, port, password)) return;
if (Connect(host, port, password, database_index)) return;

LOG_ERROR() << "error async connect to Redis server (host addrs ="
<< host_addrs << ", port=" << port << ")";
SetState(State::kInitError);
}

bool Redis::RedisImpl::Connect(const std::string& host, int port,
const Password& password) {
const Password& password,
std::optional<size_t> database_index) {
UASSERT(context_ == nullptr);
UASSERT(state_ == State::kInit);

@@ -471,6 +478,7 @@ bool Redis::RedisImpl::Connect(const std::string& host, int port,
log_extra_.Extend("redis_server", GetServer());
log_extra_.Extend("server_id", GetServerId().GetId());
password_ = password;
database_index_ = database_index;
LOG_INFO() << log_extra_ << "Async connect to Redis server=" << GetServer();
context_ = redisAsyncConnect(host.c_str(), port);

@@ -1046,19 +1054,13 @@ bool Redis::RedisImpl::InitSecureConnection() {

void Redis::RedisImpl::Authenticate() {
if (password_.GetUnderlying().empty()) {
if (send_readonly_)
SendReadOnly();
else
SetState(State::kConnected);
SendReadOnly();
} else {
ProcessCommand(PrepareCommand(
CmdArgs{"AUTH", password_.GetUnderlying()},
[this](const CommandPtr&, ReplyPtr reply) {
if (*reply && reply->data.IsStatus()) {
if (send_readonly_)
SendReadOnly();
else
SetState(State::kConnected);
SendReadOnly();
} else {
if (*reply) {
if (reply->IsUnknownCommandError()) {
@@ -1085,12 +1087,17 @@ void Redis::RedisImpl::Authenticate() {
}

void Redis::RedisImpl::SendReadOnly() {
if (!send_readonly_) {
SelectDatabase();
return;
}

LOG_DEBUG() << "Send READONLY command to slave "
<< GetServerId().GetDescription() << " in cluster mode";
ProcessCommand(PrepareCommand(CmdArgs{"READONLY"}, [this](const CommandPtr&,
ReplyPtr reply) {
if (*reply && reply->data.IsStatus()) {
SetState(State::kConnected);
SelectDatabase();
} else {
if (*reply) {
LOG_LIMITED_ERROR()
@@ -1107,6 +1114,47 @@ void Redis::RedisImpl::SendReadOnly() {
}));
}

void Redis::RedisImpl::SelectDatabase() {
if (!database_index_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better yet, if database index equals 0.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

SetState(RedisState::kConnected);
return;
}

ProcessCommand(PrepareCommand(
CmdArgs{"SELECT", *database_index_},
[this](const CommandPtr&, ReplyPtr reply) {
if (*reply && reply->data.IsStatus()) {
SetState(RedisState::kConnected);
LOG_INFO() << log_extra_
<< "Selected redis logical database with index "
<< *database_index_;
return;
}

const utils::ScopeGuard auto_disconnect([this]() { Disconnect(); });

if (!*reply) {
LOG_LIMITED_ERROR()
<< "SELECT failed with status " << reply->status << " ("
<< reply->status_string << ") " << log_extra_;
return;
}

if (reply->IsUnknownCommandError()) {
LOG_WARNING() << log_extra_
<< "SELECT failed: unknown command `SELECT` - "
"possible when connecting to Sentinel instead "
"of Redis master or slave instance";
return;
}

LOG_LIMITED_ERROR()
<< log_extra_
<< "SELECT failed: response type=" << reply->data.GetTypeString()
<< " msg=" << reply->data.ToDebugString();
}));
}

void Redis::RedisImpl::OnRedisReply(redisAsyncContext* c, void* r,
void* privdata) noexcept {
auto* impl = static_cast<Redis::RedisImpl*>(c->data);
3 changes: 2 additions & 1 deletion redis/src/storages/redis/impl/redis.hpp
Original file line number Diff line number Diff line change
@@ -36,7 +36,8 @@ class Redis {
Redis(Redis&& o) = delete;

void Connect(const ConnectionInfo::HostVector& host_addrs, int port,
const Password& password);
const Password& password,
std::optional<size_t> database_index = {});

bool AsyncCommand(const CommandPtr& command);
size_t GetRunningCommands() const;
1 change: 1 addition & 0 deletions redis/src/storages/redis/impl/redis_stats.cpp
Original file line number Diff line number Diff line change
@@ -76,6 +76,7 @@ const std::string_view kCommandTypes[] = {
"scan",
"scard",
"script",
"select",
"sentinel",
"set",
"setex",
28 changes: 16 additions & 12 deletions redis/src/storages/redis/impl/sentinel.cpp
Original file line number Diff line number Diff line change
@@ -40,15 +40,18 @@ void ThrowIfCancelled() {

} // namespace

Sentinel::Sentinel(
const std::shared_ptr<ThreadPools>& thread_pools,
const std::vector<std::string>& shards,
const std::vector<ConnectionInfo>& conns, std::string shard_group_name,
const std::string& client_name, const Password& password,
ConnectionSecurity connection_security, ReadyChangeCallback ready_callback,
dynamic_config::Source dynamic_config_source,
std::unique_ptr<KeyShard>&& key_shard, CommandControl command_control,
const testsuite::RedisControl& testsuite_redis_control, ConnectionMode mode)
Sentinel::Sentinel(const std::shared_ptr<ThreadPools>& thread_pools,
const std::vector<std::string>& shards,
const std::vector<ConnectionInfo>& conns,
std::string shard_group_name, const std::string& client_name,
const Password& password,
ConnectionSecurity connection_security,
ReadyChangeCallback ready_callback,
dynamic_config::Source dynamic_config_source,
std::unique_ptr<KeyShard>&& key_shard,
CommandControl command_control,
const testsuite::RedisControl& testsuite_redis_control,
ConnectionMode mode, std::optional<size_t> database_index)
: thread_pools_(thread_pools),
secdist_default_command_control_(command_control),
testsuite_redis_control_(testsuite_redis_control) {
@@ -82,7 +85,7 @@ Sentinel::Sentinel(
*sentinel_thread_control_, thread_pools_->GetRedisThreadPool(), *this,
shards, conns, std::move(shard_group_name), client_name, password,
connection_security, std::move(ready_callback), std::move(key_shard),
dynamic_config_source, mode);
dynamic_config_source, mode, database_index);
}
});
}
@@ -152,7 +155,7 @@ std::shared_ptr<Sentinel> Sentinel::CreateSentinel(
// sentinels in cluster mode.
conns.emplace_back(sentinel.host, sentinel.port,
(key_shard ? Password("") : password), false,
settings.secure_connection);
settings.secure_connection, std::nullopt);
}

LOG_DEBUG() << "redis command_control:" << command_control.ToString();
@@ -162,7 +165,8 @@ std::shared_ptr<Sentinel> Sentinel::CreateSentinel(
thread_pools, shards, conns, std::move(shard_group_name), client_name,
password, settings.secure_connection, std::move(ready_callback),
dynamic_config_source, std::move(key_shard), command_control,
testsuite_redis_control);
testsuite_redis_control, ConnectionMode::kCommands,
settings.database_index);
client->Start();
}

3 changes: 2 additions & 1 deletion redis/src/storages/redis/impl/sentinel.hpp
Original file line number Diff line number Diff line change
@@ -59,7 +59,8 @@ class Sentinel {
std::unique_ptr<KeyShard>&& key_shard = nullptr,
CommandControl command_control = {},
const testsuite::RedisControl& testsuite_redis_control = {},
ConnectionMode mode = ConnectionMode::kCommands);
ConnectionMode mode = ConnectionMode::kCommands,
std::optional<size_t> database_index = {});
virtual ~Sentinel();

void Start();
8 changes: 6 additions & 2 deletions redis/src/storages/redis/impl/sentinel_impl.cpp
Original file line number Diff line number Diff line change
@@ -64,7 +64,8 @@ SentinelImpl::SentinelImpl(
const std::string& client_name, const Password& password,
ConnectionSecurity connection_security, ReadyChangeCallback ready_callback,
std::unique_ptr<KeyShard>&& key_shard,
dynamic_config::Source dynamic_config_source, ConnectionMode mode)
dynamic_config::Source dynamic_config_source, ConnectionMode mode,
std::optional<size_t> database_index)
: sentinel_obj_(sentinel),
ev_thread_(sentinel_thread_control),
shard_group_name_(std::move(shard_group_name)),
@@ -79,7 +80,8 @@ SentinelImpl::SentinelImpl(
key_shard_(std::move(key_shard)),
connection_mode_(mode),
slot_info_(IsInClusterMode() ? std::make_unique<SlotInfo>() : nullptr),
dynamic_config_source_(dynamic_config_source) {
dynamic_config_source_(dynamic_config_source),
database_index_(database_index) {
for (size_t i = 0; i < init_shards_->size(); ++i) {
shards_[(*init_shards_)[i]] = i;
connected_statuses_.push_back(std::make_unique<ConnectedStatus>());
@@ -637,6 +639,7 @@ void SentinelImpl::ReadSentinels() {
for (auto shard_conn : info) {
if (shards_.find(shard_conn.Name()) != shards_.end()) {
shard_conn.SetConnectionSecurity(connection_security_);
shard_conn.SetDatabaseIndex(database_index_);
shard_found[shards_[shard_conn.Name()]] = true;
watcher->host_port_to_shard[shard_conn.HostPort()] =
shards_[shard_conn.Name()];
@@ -675,6 +678,7 @@ void SentinelImpl::ReadSentinels() {
shard_conn.SetName(shard);
shard_conn.SetReadOnly(true);
shard_conn.SetConnectionSecurity(connection_security_);
shard_conn.SetDatabaseIndex(database_index_);
if (shards_.find(shard_conn.Name()) != shards_.end())
watcher->host_port_to_shard[shard_conn.HostPort()] =
shards_[shard_conn.Name()];
4 changes: 3 additions & 1 deletion redis/src/storages/redis/impl/sentinel_impl.hpp
Original file line number Diff line number Diff line change
@@ -113,7 +113,8 @@ class SentinelImpl : public SentinelImplBase {
ReadyChangeCallback ready_callback,
std::unique_ptr<KeyShard>&& key_shard,
dynamic_config::Source dynamic_config_source,
ConnectionMode mode = ConnectionMode::kCommands);
ConnectionMode mode = ConnectionMode::kCommands,
std::optional<size_t> database_index = {});
~SentinelImpl() override;

std::unordered_map<ServerId, size_t, ServerIdHasher>
@@ -286,6 +287,7 @@ class SentinelImpl : public SentinelImplBase {
std::optional<CommandsBufferingSettings> commands_buffering_settings_;
dynamic_config::Source dynamic_config_source_;
std::atomic<int> publish_shard_{0};
std::optional<size_t> database_index_;
};

} // namespace redis
Loading