Skip to content

Commit

Permalink
feat mongo: light connection state algorithm
Browse files Browse the repository at this point in the history
commit_hash:66f012b76eb3304b8726edf27e8f2049530680fc
  • Loading branch information
segoon committed Jan 27, 2025
1 parent 7036a0a commit ff16341
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 34 deletions.
67 changes: 49 additions & 18 deletions mongo/src/storages/mongo/cdriver/async_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,23 +175,12 @@ clients::dns::AddrVector GetaddrInfo(const mongoc_host_list_t& host, bson_error_
return result;
}

engine::io::Socket ConnectTcpByName(
engine::io::Socket DoConnectTcpByName(
const mongoc_host_list_t& host,
int32_t timeout_ms,
bson_error_t* error,
clients::dns::Resolver* dns_resolver
) {
if (!IsTcpConnectAllowed(host.host_and_port)) {
bson_set_error(
error,
MONGOC_ERROR_STREAM,
MONGOC_ERROR_STREAM_CONNECT,
"Too many connection errors in recent period for %s",
host.host_and_port
);
return {};
}

const auto deadline = DeadlineFromTimeoutMs(timeout_ms);
try {
auto addrs = dns_resolver ? dns_resolver->Resolve(host.host, deadline) : GetaddrInfo(host, error);
Expand All @@ -205,8 +194,7 @@ engine::io::Socket ConnectTcpByName(
ReportTcpConnectSuccess(host.host_and_port);
return socket;
} catch (const engine::io::IoCancelled& ex) {
UASSERT_MSG(false, "Cancellation is not supported in cdriver implementation");

ReportTcpConnectError(host.host_and_port);
bson_set_error(error, MONGOC_ERROR_STREAM, MONGOC_ERROR_STREAM_CONNECT, "%s", ex.what());
return {};
} catch (const engine::io::IoException& ex) {
Expand All @@ -227,14 +215,57 @@ engine::io::Socket ConnectTcpByName(
return {};
}

engine::io::Socket
Connect(const mongoc_host_list_t* host, int32_t timeout_ms, bson_error_t* error, clients::dns::Resolver* dns_resolver) {
engine::io::Socket ConnectTcpByName(
const mongoc_host_list_t& host,
int32_t timeout_ms,
bson_error_t* error,
clients::dns::Resolver* dns_resolver,
concurrent::BackgroundTaskStorage& bts
) {
const auto host_state = CheckTcpConnectionState(host.host_and_port);
if (host_state == HostConnectionState::kChecking) {
/*
* Pessimistically check for TCP connection in background.
* It is needed for services with a small number of connections and a periodic task
* that uses the same connection every ~3 seconds. It must not experience synchronous probe
* delays as it obviously affects response timings.
* The background probe does the same check thing, but doesn't slow down the user.
*
* See https://st.yandex-team.ru/TAXICOMMON-9746 and https://st.yandex-team.ru/TAXICOMMON-9644
*/
bts.AsyncDetach("mongo_probe_tcp_connection", [host, timeout_ms, dns_resolver] {
bson_error_t tmp_error;
[[maybe_unused]] auto socket = DoConnectTcpByName(host, timeout_ms, &tmp_error, dns_resolver);
});
}

if (host_state != HostConnectionState::kAlive) {
bson_set_error(
error,
MONGOC_ERROR_STREAM,
MONGOC_ERROR_STREAM_CONNECT,
"Too many connection errors in recent period for %s",
host.host_and_port
);
return {};
}

return DoConnectTcpByName(host, timeout_ms, error, dns_resolver);
}

engine::io::Socket Connect(
const mongoc_host_list_t* host,
int32_t timeout_ms,
bson_error_t* error,
clients::dns::Resolver* dns_resolver,
concurrent::BackgroundTaskStorage& bts
) {
UASSERT(host);
switch (host->family) {
case AF_UNSPEC: // mongoc thinks this is okay
case AF_INET:
case AF_INET6:
return ConnectTcpByName(*host, timeout_ms, error, dns_resolver);
return ConnectTcpByName(*host, timeout_ms, error, dns_resolver, bts);

case AF_UNIX:
return ConnectUnix(*host, timeout_ms, error);
Expand Down Expand Up @@ -310,7 +341,7 @@ mongoc_stream_t* MakeAsyncStream(
auto* init_data = static_cast<AsyncStreamInitiatorData*>(user_data);

const auto connect_timeout_ms = mongoc_uri_get_option_as_int32(uri, MONGOC_URI_CONNECTTIMEOUTMS, 5000);
auto socket = Connect(host, connect_timeout_ms, error, init_data->dns_resolver);
auto socket = Connect(host, connect_timeout_ms, error, init_data->dns_resolver, init_data->bts);
if (!socket) return nullptr;

auto stream = AsyncStream::Create(std::move(socket));
Expand Down
3 changes: 3 additions & 0 deletions mongo/src/storages/mongo/cdriver/async_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <mongoc/mongoc.h>

#include <userver/clients/dns/resolver_fwd.hpp>
#include <userver/concurrent/background_task_storage.hpp>

USERVER_NAMESPACE_BEGIN

Expand All @@ -12,6 +13,8 @@ struct AsyncStreamInitiatorData {
// If equals to nullptr, use getaddrinfo(3)
clients::dns::Resolver* dns_resolver;

concurrent::BackgroundTaskStorage bts;

mongoc_ssl_opt_t ssl_opt;
};

Expand Down
2 changes: 1 addition & 1 deletion mongo/src/storages/mongo/cdriver/pool_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ CDriverPoolImpl::CDriverPoolImpl(
)
: PoolImpl(std::move(id), config, config_source),
app_name_(config.app_name),
init_data_{dns_resolver, {}},
init_data_{dns_resolver, {}, {}},
max_size_(config.pool_settings.max_size),
idle_limit_(config.pool_settings.idle_limit),
queue_timeout_(config.queue_timeout),
Expand Down
22 changes: 8 additions & 14 deletions mongo/src/storages/mongo/tcp_connect_precheck.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,13 @@ USERVER_NAMESPACE_BEGIN
namespace storages::mongo::impl {
namespace {

constexpr size_t kRecentErrorThreshold = 2;
constexpr std::chrono::seconds kRecentErrorPeriod{15};
constexpr size_t kRecentErrorThreshold = 5;

constexpr size_t kRecoveryAttemptsLimit = 1;
constexpr std::chrono::seconds kRecoveryPeriod{3};

struct InstanceState {
std::atomic<bool> is_failed{false};

// only used when not failed
utils::TokenBucket failures{
kRecentErrorThreshold,
{1, utils::TokenBucket::Duration{kRecentErrorPeriod} / kRecentErrorThreshold}};
std::atomic<uint64_t> failed_in_row{0};

// only used when failed
utils::TokenBucket recovery_attempts{
Expand All @@ -42,26 +36,26 @@ auto& GetInstanceStatesByHostAndPort() {

} // namespace

bool IsTcpConnectAllowed(const char* host_and_port) {
HostConnectionState CheckTcpConnectionState(const char* host_and_port) {
auto instance_state = GetInstanceStatesByHostAndPort().Get(host_and_port);

if (!instance_state || !instance_state->is_failed) return true;
if (!instance_state || instance_state->failed_in_row < kRecentErrorThreshold) return HostConnectionState::kAlive;

// we're in recovery mode
if (instance_state->recovery_attempts.Obtain()) return true;
if (instance_state->recovery_attempts.Obtain()) return HostConnectionState::kChecking;

return false;
return HostConnectionState::kDead;
}

void ReportTcpConnectSuccess(const char* host_and_port) {
GetInstanceStatesByHostAndPort().Emplace(host_and_port).value->is_failed = false;
GetInstanceStatesByHostAndPort().Emplace(host_and_port).value->failed_in_row = 0;
}

void ReportTcpConnectError(const char* host_and_port) {
auto instance_state = GetInstanceStatesByHostAndPort().Emplace(host_and_port).value;
UASSERT(instance_state);

if (!instance_state->failures.Obtain()) instance_state->is_failed = true;
instance_state->failed_in_row++;
}

} // namespace storages::mongo::impl
Expand Down
8 changes: 7 additions & 1 deletion mongo/src/storages/mongo/tcp_connect_precheck.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ USERVER_NAMESPACE_BEGIN

namespace storages::mongo::impl {

bool IsTcpConnectAllowed(const char* host_and_port);
enum class HostConnectionState {
kAlive,
kChecking,
kDead,
};

HostConnectionState CheckTcpConnectionState(const char* host_and_port);

void ReportTcpConnectSuccess(const char* host_and_port);
void ReportTcpConnectError(const char* host_and_port);
Expand Down

0 comments on commit ff16341

Please sign in to comment.