diff --git a/mongo/src/storages/mongo/cdriver/async_stream.cpp b/mongo/src/storages/mongo/cdriver/async_stream.cpp index 4ab49df6db3c..e9fb6cd7e0c7 100644 --- a/mongo/src/storages/mongo/cdriver/async_stream.cpp +++ b/mongo/src/storages/mongo/cdriver/async_stream.cpp @@ -175,23 +175,12 @@ clients::dns::AddrVector GetaddrInfo(const mongoc_host_list_t& host, bson_error_ return result; } -engine::io::Socket ConnectTcpByName( +engine::io::Socket DoConnectTcpByName( const mongoc_host_list_t& host, int32_t timeout_ms, bson_error_t* error, clients::dns::Resolver* dns_resolver ) { - if (!IsTcpConnectAllowed(host.host_and_port)) { - bson_set_error( - error, - MONGOC_ERROR_STREAM, - MONGOC_ERROR_STREAM_CONNECT, - "Too many connection errors in recent period for %s", - host.host_and_port - ); - return {}; - } - const auto deadline = DeadlineFromTimeoutMs(timeout_ms); try { auto addrs = dns_resolver ? dns_resolver->Resolve(host.host, deadline) : GetaddrInfo(host, error); @@ -205,8 +194,7 @@ engine::io::Socket ConnectTcpByName( ReportTcpConnectSuccess(host.host_and_port); return socket; } catch (const engine::io::IoCancelled& ex) { - UASSERT_MSG(false, "Cancellation is not supported in cdriver implementation"); - + ReportTcpConnectError(host.host_and_port); bson_set_error(error, MONGOC_ERROR_STREAM, MONGOC_ERROR_STREAM_CONNECT, "%s", ex.what()); return {}; } catch (const engine::io::IoException& ex) { @@ -227,14 +215,57 @@ engine::io::Socket ConnectTcpByName( return {}; } -engine::io::Socket -Connect(const mongoc_host_list_t* host, int32_t timeout_ms, bson_error_t* error, clients::dns::Resolver* dns_resolver) { +engine::io::Socket ConnectTcpByName( + const mongoc_host_list_t& host, + int32_t timeout_ms, + bson_error_t* error, + clients::dns::Resolver* dns_resolver, + concurrent::BackgroundTaskStorage& bts +) { + const auto host_state = CheckTcpConnectionState(host.host_and_port); + if (host_state == HostConnectionState::kChecking) { + /* + * Pessimistically check for TCP connection in background. + * It is needed for services with a small number of connections and a periodic task + * that uses the same connection every ~3 seconds. It must not experience synchronous probe + * delays as it obviously affects response timings. + * The background probe does the same check thing, but doesn't slow down the user. + * + * See https://st.yandex-team.ru/TAXICOMMON-9746 and https://st.yandex-team.ru/TAXICOMMON-9644 + */ + bts.AsyncDetach("mongo_probe_tcp_connection", [host, timeout_ms, dns_resolver] { + bson_error_t tmp_error; + [[maybe_unused]] auto socket = DoConnectTcpByName(host, timeout_ms, &tmp_error, dns_resolver); + }); + } + + if (host_state != HostConnectionState::kAlive) { + bson_set_error( + error, + MONGOC_ERROR_STREAM, + MONGOC_ERROR_STREAM_CONNECT, + "Too many connection errors in recent period for %s", + host.host_and_port + ); + return {}; + } + + return DoConnectTcpByName(host, timeout_ms, error, dns_resolver); +} + +engine::io::Socket Connect( + const mongoc_host_list_t* host, + int32_t timeout_ms, + bson_error_t* error, + clients::dns::Resolver* dns_resolver, + concurrent::BackgroundTaskStorage& bts +) { UASSERT(host); switch (host->family) { case AF_UNSPEC: // mongoc thinks this is okay case AF_INET: case AF_INET6: - return ConnectTcpByName(*host, timeout_ms, error, dns_resolver); + return ConnectTcpByName(*host, timeout_ms, error, dns_resolver, bts); case AF_UNIX: return ConnectUnix(*host, timeout_ms, error); @@ -310,7 +341,7 @@ mongoc_stream_t* MakeAsyncStream( auto* init_data = static_cast(user_data); const auto connect_timeout_ms = mongoc_uri_get_option_as_int32(uri, MONGOC_URI_CONNECTTIMEOUTMS, 5000); - auto socket = Connect(host, connect_timeout_ms, error, init_data->dns_resolver); + auto socket = Connect(host, connect_timeout_ms, error, init_data->dns_resolver, init_data->bts); if (!socket) return nullptr; auto stream = AsyncStream::Create(std::move(socket)); diff --git a/mongo/src/storages/mongo/cdriver/async_stream.hpp b/mongo/src/storages/mongo/cdriver/async_stream.hpp index fd9ac0af61d9..32e6ba088ef2 100644 --- a/mongo/src/storages/mongo/cdriver/async_stream.hpp +++ b/mongo/src/storages/mongo/cdriver/async_stream.hpp @@ -3,6 +3,7 @@ #include #include +#include USERVER_NAMESPACE_BEGIN @@ -12,6 +13,8 @@ struct AsyncStreamInitiatorData { // If equals to nullptr, use getaddrinfo(3) clients::dns::Resolver* dns_resolver; + concurrent::BackgroundTaskStorage bts; + mongoc_ssl_opt_t ssl_opt; }; diff --git a/mongo/src/storages/mongo/cdriver/pool_impl.cpp b/mongo/src/storages/mongo/cdriver/pool_impl.cpp index 15e17c99f3fe..30576871c9ca 100644 --- a/mongo/src/storages/mongo/cdriver/pool_impl.cpp +++ b/mongo/src/storages/mongo/cdriver/pool_impl.cpp @@ -301,7 +301,7 @@ CDriverPoolImpl::CDriverPoolImpl( ) : PoolImpl(std::move(id), config, config_source), app_name_(config.app_name), - init_data_{dns_resolver, {}}, + init_data_{dns_resolver, {}, {}}, max_size_(config.pool_settings.max_size), idle_limit_(config.pool_settings.idle_limit), queue_timeout_(config.queue_timeout), diff --git a/mongo/src/storages/mongo/tcp_connect_precheck.cpp b/mongo/src/storages/mongo/tcp_connect_precheck.cpp index 56c281b711ce..eeb197a695d8 100644 --- a/mongo/src/storages/mongo/tcp_connect_precheck.cpp +++ b/mongo/src/storages/mongo/tcp_connect_precheck.cpp @@ -13,19 +13,13 @@ USERVER_NAMESPACE_BEGIN namespace storages::mongo::impl { namespace { -constexpr size_t kRecentErrorThreshold = 2; -constexpr std::chrono::seconds kRecentErrorPeriod{15}; +constexpr size_t kRecentErrorThreshold = 5; constexpr size_t kRecoveryAttemptsLimit = 1; constexpr std::chrono::seconds kRecoveryPeriod{3}; struct InstanceState { - std::atomic is_failed{false}; - - // only used when not failed - utils::TokenBucket failures{ - kRecentErrorThreshold, - {1, utils::TokenBucket::Duration{kRecentErrorPeriod} / kRecentErrorThreshold}}; + std::atomic failed_in_row{0}; // only used when failed utils::TokenBucket recovery_attempts{ @@ -42,26 +36,26 @@ auto& GetInstanceStatesByHostAndPort() { } // namespace -bool IsTcpConnectAllowed(const char* host_and_port) { +HostConnectionState CheckTcpConnectionState(const char* host_and_port) { auto instance_state = GetInstanceStatesByHostAndPort().Get(host_and_port); - if (!instance_state || !instance_state->is_failed) return true; + if (!instance_state || instance_state->failed_in_row < kRecentErrorThreshold) return HostConnectionState::kAlive; // we're in recovery mode - if (instance_state->recovery_attempts.Obtain()) return true; + if (instance_state->recovery_attempts.Obtain()) return HostConnectionState::kChecking; - return false; + return HostConnectionState::kDead; } void ReportTcpConnectSuccess(const char* host_and_port) { - GetInstanceStatesByHostAndPort().Emplace(host_and_port).value->is_failed = false; + GetInstanceStatesByHostAndPort().Emplace(host_and_port).value->failed_in_row = 0; } void ReportTcpConnectError(const char* host_and_port) { auto instance_state = GetInstanceStatesByHostAndPort().Emplace(host_and_port).value; UASSERT(instance_state); - if (!instance_state->failures.Obtain()) instance_state->is_failed = true; + instance_state->failed_in_row++; } } // namespace storages::mongo::impl diff --git a/mongo/src/storages/mongo/tcp_connect_precheck.hpp b/mongo/src/storages/mongo/tcp_connect_precheck.hpp index 55169bfb01cf..1db37af761ad 100644 --- a/mongo/src/storages/mongo/tcp_connect_precheck.hpp +++ b/mongo/src/storages/mongo/tcp_connect_precheck.hpp @@ -4,7 +4,13 @@ USERVER_NAMESPACE_BEGIN namespace storages::mongo::impl { -bool IsTcpConnectAllowed(const char* host_and_port); +enum class HostConnectionState { + kAlive, + kChecking, + kDead, +}; + +HostConnectionState CheckTcpConnectionState(const char* host_and_port); void ReportTcpConnectSuccess(const char* host_and_port); void ReportTcpConnectError(const char* host_and_port);