From ff1634111d3d7913bf9bd0e6c01d8e1fcce1b20b Mon Sep 17 00:00:00 2001
From: segoon <segoon@userver.tech>
Date: Mon, 27 Jan 2025 12:41:50 +0300
Subject: [PATCH] feat mongo: light connection state algorithm
 commit_hash:66f012b76eb3304b8726edf27e8f2049530680fc

---
 .../storages/mongo/cdriver/async_stream.cpp   | 67 ++++++++++++++-----
 .../storages/mongo/cdriver/async_stream.hpp   |  3 +
 .../src/storages/mongo/cdriver/pool_impl.cpp  |  2 +-
 .../storages/mongo/tcp_connect_precheck.cpp   | 22 +++---
 .../storages/mongo/tcp_connect_precheck.hpp   |  8 ++-
 5 files changed, 68 insertions(+), 34 deletions(-)

diff --git a/mongo/src/storages/mongo/cdriver/async_stream.cpp b/mongo/src/storages/mongo/cdriver/async_stream.cpp
index 4ab49df6db3c..e9fb6cd7e0c7 100644
--- a/mongo/src/storages/mongo/cdriver/async_stream.cpp
+++ b/mongo/src/storages/mongo/cdriver/async_stream.cpp
@@ -175,23 +175,12 @@ clients::dns::AddrVector GetaddrInfo(const mongoc_host_list_t& host, bson_error_
     return result;
 }
 
-engine::io::Socket ConnectTcpByName(
+engine::io::Socket DoConnectTcpByName(
     const mongoc_host_list_t& host,
     int32_t timeout_ms,
     bson_error_t* error,
     clients::dns::Resolver* dns_resolver
 ) {
-    if (!IsTcpConnectAllowed(host.host_and_port)) {
-        bson_set_error(
-            error,
-            MONGOC_ERROR_STREAM,
-            MONGOC_ERROR_STREAM_CONNECT,
-            "Too many connection errors in recent period for %s",
-            host.host_and_port
-        );
-        return {};
-    }
-
     const auto deadline = DeadlineFromTimeoutMs(timeout_ms);
     try {
         auto addrs = dns_resolver ? dns_resolver->Resolve(host.host, deadline) : GetaddrInfo(host, error);
@@ -205,8 +194,7 @@ engine::io::Socket ConnectTcpByName(
                 ReportTcpConnectSuccess(host.host_and_port);
                 return socket;
             } catch (const engine::io::IoCancelled& ex) {
-                UASSERT_MSG(false, "Cancellation is not supported in cdriver implementation");
-
+                ReportTcpConnectError(host.host_and_port);
                 bson_set_error(error, MONGOC_ERROR_STREAM, MONGOC_ERROR_STREAM_CONNECT, "%s", ex.what());
                 return {};
             } catch (const engine::io::IoException& ex) {
@@ -227,14 +215,57 @@ engine::io::Socket ConnectTcpByName(
     return {};
 }
 
-engine::io::Socket
-Connect(const mongoc_host_list_t* host, int32_t timeout_ms, bson_error_t* error, clients::dns::Resolver* dns_resolver) {
+engine::io::Socket ConnectTcpByName(
+    const mongoc_host_list_t& host,
+    int32_t timeout_ms,
+    bson_error_t* error,
+    clients::dns::Resolver* dns_resolver,
+    concurrent::BackgroundTaskStorage& bts
+) {
+    const auto host_state = CheckTcpConnectionState(host.host_and_port);
+    if (host_state == HostConnectionState::kChecking) {
+        /*
+         * Pessimistically check for TCP connection in background.
+         * It is needed for services with a small number of connections and a periodic task
+         * that uses the same connection every ~3 seconds. It must not experience synchronous probe
+         * delays as it obviously affects response timings.
+         * The background probe does the same check thing, but doesn't slow down the user.
+         *
+         * See https://st.yandex-team.ru/TAXICOMMON-9746 and https://st.yandex-team.ru/TAXICOMMON-9644
+         */
+        bts.AsyncDetach("mongo_probe_tcp_connection", [host, timeout_ms, dns_resolver] {
+            bson_error_t tmp_error;
+            [[maybe_unused]] auto socket = DoConnectTcpByName(host, timeout_ms, &tmp_error, dns_resolver);
+        });
+    }
+
+    if (host_state != HostConnectionState::kAlive) {
+        bson_set_error(
+            error,
+            MONGOC_ERROR_STREAM,
+            MONGOC_ERROR_STREAM_CONNECT,
+            "Too many connection errors in recent period for %s",
+            host.host_and_port
+        );
+        return {};
+    }
+
+    return DoConnectTcpByName(host, timeout_ms, error, dns_resolver);
+}
+
+engine::io::Socket Connect(
+    const mongoc_host_list_t* host,
+    int32_t timeout_ms,
+    bson_error_t* error,
+    clients::dns::Resolver* dns_resolver,
+    concurrent::BackgroundTaskStorage& bts
+) {
     UASSERT(host);
     switch (host->family) {
         case AF_UNSPEC:  // mongoc thinks this is okay
         case AF_INET:
         case AF_INET6:
-            return ConnectTcpByName(*host, timeout_ms, error, dns_resolver);
+            return ConnectTcpByName(*host, timeout_ms, error, dns_resolver, bts);
 
         case AF_UNIX:
             return ConnectUnix(*host, timeout_ms, error);
@@ -310,7 +341,7 @@ mongoc_stream_t* MakeAsyncStream(
     auto* init_data = static_cast<AsyncStreamInitiatorData*>(user_data);
 
     const auto connect_timeout_ms = mongoc_uri_get_option_as_int32(uri, MONGOC_URI_CONNECTTIMEOUTMS, 5000);
-    auto socket = Connect(host, connect_timeout_ms, error, init_data->dns_resolver);
+    auto socket = Connect(host, connect_timeout_ms, error, init_data->dns_resolver, init_data->bts);
     if (!socket) return nullptr;
 
     auto stream = AsyncStream::Create(std::move(socket));
diff --git a/mongo/src/storages/mongo/cdriver/async_stream.hpp b/mongo/src/storages/mongo/cdriver/async_stream.hpp
index fd9ac0af61d9..32e6ba088ef2 100644
--- a/mongo/src/storages/mongo/cdriver/async_stream.hpp
+++ b/mongo/src/storages/mongo/cdriver/async_stream.hpp
@@ -3,6 +3,7 @@
 #include <mongoc/mongoc.h>
 
 #include <userver/clients/dns/resolver_fwd.hpp>
+#include <userver/concurrent/background_task_storage.hpp>
 
 USERVER_NAMESPACE_BEGIN
 
@@ -12,6 +13,8 @@ struct AsyncStreamInitiatorData {
     // If equals to nullptr, use getaddrinfo(3)
     clients::dns::Resolver* dns_resolver;
 
+    concurrent::BackgroundTaskStorage bts;
+
     mongoc_ssl_opt_t ssl_opt;
 };
 
diff --git a/mongo/src/storages/mongo/cdriver/pool_impl.cpp b/mongo/src/storages/mongo/cdriver/pool_impl.cpp
index 15e17c99f3fe..30576871c9ca 100644
--- a/mongo/src/storages/mongo/cdriver/pool_impl.cpp
+++ b/mongo/src/storages/mongo/cdriver/pool_impl.cpp
@@ -301,7 +301,7 @@ CDriverPoolImpl::CDriverPoolImpl(
 )
     : PoolImpl(std::move(id), config, config_source),
       app_name_(config.app_name),
-      init_data_{dns_resolver, {}},
+      init_data_{dns_resolver, {}, {}},
       max_size_(config.pool_settings.max_size),
       idle_limit_(config.pool_settings.idle_limit),
       queue_timeout_(config.queue_timeout),
diff --git a/mongo/src/storages/mongo/tcp_connect_precheck.cpp b/mongo/src/storages/mongo/tcp_connect_precheck.cpp
index 56c281b711ce..eeb197a695d8 100644
--- a/mongo/src/storages/mongo/tcp_connect_precheck.cpp
+++ b/mongo/src/storages/mongo/tcp_connect_precheck.cpp
@@ -13,19 +13,13 @@ USERVER_NAMESPACE_BEGIN
 namespace storages::mongo::impl {
 namespace {
 
-constexpr size_t kRecentErrorThreshold = 2;
-constexpr std::chrono::seconds kRecentErrorPeriod{15};
+constexpr size_t kRecentErrorThreshold = 5;
 
 constexpr size_t kRecoveryAttemptsLimit = 1;
 constexpr std::chrono::seconds kRecoveryPeriod{3};
 
 struct InstanceState {
-    std::atomic<bool> is_failed{false};
-
-    // only used when not failed
-    utils::TokenBucket failures{
-        kRecentErrorThreshold,
-        {1, utils::TokenBucket::Duration{kRecentErrorPeriod} / kRecentErrorThreshold}};
+    std::atomic<uint64_t> failed_in_row{0};
 
     // only used when failed
     utils::TokenBucket recovery_attempts{
@@ -42,26 +36,26 @@ auto& GetInstanceStatesByHostAndPort() {
 
 }  // namespace
 
-bool IsTcpConnectAllowed(const char* host_and_port) {
+HostConnectionState CheckTcpConnectionState(const char* host_and_port) {
     auto instance_state = GetInstanceStatesByHostAndPort().Get(host_and_port);
 
-    if (!instance_state || !instance_state->is_failed) return true;
+    if (!instance_state || instance_state->failed_in_row < kRecentErrorThreshold) return HostConnectionState::kAlive;
 
     // we're in recovery mode
-    if (instance_state->recovery_attempts.Obtain()) return true;
+    if (instance_state->recovery_attempts.Obtain()) return HostConnectionState::kChecking;
 
-    return false;
+    return HostConnectionState::kDead;
 }
 
 void ReportTcpConnectSuccess(const char* host_and_port) {
-    GetInstanceStatesByHostAndPort().Emplace(host_and_port).value->is_failed = false;
+    GetInstanceStatesByHostAndPort().Emplace(host_and_port).value->failed_in_row = 0;
 }
 
 void ReportTcpConnectError(const char* host_and_port) {
     auto instance_state = GetInstanceStatesByHostAndPort().Emplace(host_and_port).value;
     UASSERT(instance_state);
 
-    if (!instance_state->failures.Obtain()) instance_state->is_failed = true;
+    instance_state->failed_in_row++;
 }
 
 }  // namespace storages::mongo::impl
diff --git a/mongo/src/storages/mongo/tcp_connect_precheck.hpp b/mongo/src/storages/mongo/tcp_connect_precheck.hpp
index 55169bfb01cf..1db37af761ad 100644
--- a/mongo/src/storages/mongo/tcp_connect_precheck.hpp
+++ b/mongo/src/storages/mongo/tcp_connect_precheck.hpp
@@ -4,7 +4,13 @@ USERVER_NAMESPACE_BEGIN
 
 namespace storages::mongo::impl {
 
-bool IsTcpConnectAllowed(const char* host_and_port);
+enum class HostConnectionState {
+    kAlive,
+    kChecking,
+    kDead,
+};
+
+HostConnectionState CheckTcpConnectionState(const char* host_and_port);
 
 void ReportTcpConnectSuccess(const char* host_and_port);
 void ReportTcpConnectError(const char* host_and_port);