From b9b5995efdb3adf7c48df81a8c5f016f6ccf48f2 Mon Sep 17 00:00:00 2001 From: lnikon Date: Sun, 29 Dec 2024 00:42:01 +0400 Subject: [PATCH] Refactor timer thread --- examples/raft/main.cpp | 106 +++++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 56 deletions(-) diff --git a/examples/raft/main.cpp b/examples/raft/main.cpp index 3aeb43a..c68d3f5 100644 --- a/examples/raft/main.cpp +++ b/examples/raft/main.cpp @@ -10,13 +10,10 @@ #include #include #include -#include #include #include #include -#include #include -#include #include #include #include @@ -24,8 +21,6 @@ #include -#include "thread_safety.h" - #include "Raft.grpc.pb.h" #include "Raft.pb.h" @@ -153,11 +148,16 @@ class ConsensusModule : public RaftService::Service, std::enable_shared_from_thi spdlog::info( "Recevied AppendEntries RPC from leader={} during term={}", pRequest->senderid(), pRequest->term()); + absl::MutexLock timerLocker(&m_timerMutex); absl::MutexLock locker(&m_electionMutex); + pResponse->set_term(m_currentTerm); pResponse->set_success(true); pResponse->set_responderid(m_id); + spdlog::info("Node={} is resetting election timeout at term={}", m_id, m_currentTerm); + resetElectionTimer(); + m_leaderHeartbeatReceived.store(true); return grpc::Status::OK; @@ -231,41 +231,59 @@ class ConsensusModule : public RaftService::Service, std::enable_shared_from_thi return; } - if (m_state == NodeState::LEADER) { - spdlog::info("Skipping the election loop as the node is the leader"); - return; + absl::MutexLock locker(&m_electionMutex); + if (m_state == NodeState::LEADER) + { + continue; + } } + auto currentTimeMs = [] + { + return std::chrono::duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()) + .count(); + }; + + // Wait until heartbeat timeouts or timer CV gets signaled absl::WriterMutexLock locker(&m_timerMutex); - if (m_timerMutex.AwaitWithTimeout(absl::Condition( - +[](std::atomic *leaderHeartbeatReceived) - { return leaderHeartbeatReceived->load(); }, - &m_leaderHeartbeatReceived), - absl::Milliseconds(m_electionTimeout.load()))) + int64_t timeToWaitMs = generateRandomTimeout(); + int64_t timeToWaitDeadlineMs = currentTimeMs() + timeToWaitMs; + + while (!m_leaderHeartbeatReceived.load() && timeToWaitMs > 0) + { + spdlog::info("Timer thread at node={} will block for {}ms for the leader to send a heartbeat", + m_id, + timeToWaitMs); + + m_timerCV.WaitWithTimeout(&m_timerMutex, absl::Milliseconds(m_electionTimeout.load())); + timeToWaitMs = timeToWaitDeadlineMs - currentTimeMs(); + } + + // If timer CV gets signaled, then node has received the heartbeat from the leader. + // Otherwise, heartbeat timed out and node needs to start the new leader election + if (m_leaderHeartbeatReceived.load()) { { - absl::MutexLock locker(&m_electionMutex); - spdlog::info("node={} received heartbeat during term={}", m_id, m_currentTerm); + spdlog::info("Node={} received heartbeat", m_id); } m_leaderHeartbeatReceived.store(false); + continue; } - else + else if (timeToWaitMs <= 0) { startElection(); } } }); - /*m_serverThread = std::jthread(*/ - /*[this](std::stop_token token)*/ { assert(m_server); spdlog::info("Listening for RPC requests on "); m_server->Wait(); } - /*);*/ } void startServer() @@ -276,7 +294,7 @@ class ConsensusModule : public RaftService::Service, std::enable_shared_from_thi void stop() { m_stopElectionTimer = false; - m_timerCV.notify_all(); + m_timerCV.SignalAll(); m_electionTimerThread.request_stop(); m_electionTimerThread.join(); @@ -308,8 +326,6 @@ class ConsensusModule : public RaftService::Service, std::enable_shared_from_thi { // TODO(vbejanyan): Staff to do - resetElectionTimer(); - // TODO(vbejanyan): Work continues } @@ -317,9 +333,9 @@ class ConsensusModule : public RaftService::Service, std::enable_shared_from_thi // Called every time 'AppendEntries' received. void resetElectionTimer() { - /*m_timerMutex.AssertHeld();*/ m_electionTimeout.store(generateRandomTimeout()); - m_timerCV.notify_all(); + m_electionTimeoutResetTime = std::chrono::high_resolution_clock::now(); + m_timerCV.Signal(); } // The logic behind election @@ -407,30 +423,6 @@ class ConsensusModule : public RaftService::Service, std::enable_shared_from_thi spdlog::info("Server reverted to FOLLOWER state in term={}", m_currentTerm); } - // A task to monitor the election timeout and start a new election if needed. - /*void electionTimeoutTask(std::stop_token token)*/ - /*{*/ - /* while (!m_stopElectionTimer)*/ - /* {*/ - /* if (token.stop_requested())*/ - /* {*/ - /* return;*/ - /* }*/ - /**/ - /* std::unique_lock lock(m_timerMutex);*/ - /* if (m_timerCV.wait_for(lock,*/ - /* std::chrono::milliseconds(m_electionTimeout),*/ - /* [this]() { return m_leaderHeartbeatReceived.load(); }))*/ - /* {*/ - /* m_leaderHeartbeatReceived.store(false);*/ - /* }*/ - /* else*/ - /* {*/ - /* startElection();*/ - /* }*/ - /* }*/ - /*}*/ - /**/ auto hasMajority(const uint32_t votes) const -> bool { return votes > static_cast(m_replicas.size()) / 2.0; @@ -557,14 +549,16 @@ class ConsensusModule : public RaftService::Service, std::enable_shared_from_thi std::vector m_matchIndex; // Election and election timer related fields. - std::atomic m_leaderHeartbeatReceived{false}; - absl::Mutex m_timerMutex; - std::condition_variable m_timerCV; - std::atomic m_stopElectionTimer{false}; - std::atomic m_electionTimeout{0}; - std::jthread m_electionTimerThread; - std::atomic m_voteCount{0}; - std::atomic m_electionInProgress{false}; + std::atomic m_leaderHeartbeatReceived{false}; + absl::Mutex m_timerMutex; + absl::CondVar m_timerCV; + std::atomic m_stopElectionTimer{false}; + std::atomic m_electionTimeout{generateRandomTimeout()}; + std::chrono::high_resolution_clock::time_point m_electionTimeoutResetTime{ + std::chrono::high_resolution_clock::now()}; + std::jthread m_electionTimerThread; + std::atomic m_voteCount{0}; + std::atomic m_electionInProgress{false}; std::jthread m_serverThread; };