Skip to content

Commit b9b5995

Browse files
committed
Refactor timer thread
1 parent 10396a7 commit b9b5995

File tree

1 file changed

+50
-56
lines changed

1 file changed

+50
-56
lines changed

examples/raft/main.cpp

Lines changed: 50 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,17 @@
1010
#include <grpcpp/security/server_credentials.h>
1111
#include <grpcpp/server_builder.h>
1212
#include <grpcpp/support/status.h>
13-
#include <iterator>
1413
#include <cstdlib>
1514
#include <chrono>
1615
#include <memory>
17-
#include <mutex>
1816
#include <ranges>
19-
#include <stdexcept>
2017
#include <thread>
2118
#include <utility>
2219
#include <vector>
2320
#include <random>
2421

2522
#include <spdlog/spdlog.h>
2623

27-
#include "thread_safety.h"
28-
2924
#include "Raft.grpc.pb.h"
3025
#include "Raft.pb.h"
3126

@@ -153,11 +148,16 @@ class ConsensusModule : public RaftService::Service, std::enable_shared_from_thi
153148
spdlog::info(
154149
"Recevied AppendEntries RPC from leader={} during term={}", pRequest->senderid(), pRequest->term());
155150

151+
absl::MutexLock timerLocker(&m_timerMutex);
156152
absl::MutexLock locker(&m_electionMutex);
153+
157154
pResponse->set_term(m_currentTerm);
158155
pResponse->set_success(true);
159156
pResponse->set_responderid(m_id);
160157

158+
spdlog::info("Node={} is resetting election timeout at term={}", m_id, m_currentTerm);
159+
resetElectionTimer();
160+
161161
m_leaderHeartbeatReceived.store(true);
162162

163163
return grpc::Status::OK;
@@ -231,41 +231,59 @@ class ConsensusModule : public RaftService::Service, std::enable_shared_from_thi
231231
return;
232232
}
233233

234-
if (m_state == NodeState::LEADER)
235234
{
236-
spdlog::info("Skipping the election loop as the node is the leader");
237-
return;
235+
absl::MutexLock locker(&m_electionMutex);
236+
if (m_state == NodeState::LEADER)
237+
{
238+
continue;
239+
}
238240
}
239241

242+
auto currentTimeMs = []
243+
{
244+
return std::chrono::duration_cast<std::chrono::milliseconds>(
245+
std::chrono::high_resolution_clock::now().time_since_epoch())
246+
.count();
247+
};
248+
249+
// Wait until heartbeat timeouts or timer CV gets signaled
240250
absl::WriterMutexLock locker(&m_timerMutex);
241-
if (m_timerMutex.AwaitWithTimeout(absl::Condition(
242-
+[](std::atomic<bool> *leaderHeartbeatReceived)
243-
{ return leaderHeartbeatReceived->load(); },
244-
&m_leaderHeartbeatReceived),
245-
absl::Milliseconds(m_electionTimeout.load())))
251+
int64_t timeToWaitMs = generateRandomTimeout();
252+
int64_t timeToWaitDeadlineMs = currentTimeMs() + timeToWaitMs;
253+
254+
while (!m_leaderHeartbeatReceived.load() && timeToWaitMs > 0)
255+
{
256+
spdlog::info("Timer thread at node={} will block for {}ms for the leader to send a heartbeat",
257+
m_id,
258+
timeToWaitMs);
259+
260+
m_timerCV.WaitWithTimeout(&m_timerMutex, absl::Milliseconds(m_electionTimeout.load()));
261+
timeToWaitMs = timeToWaitDeadlineMs - currentTimeMs();
262+
}
263+
264+
// If timer CV gets signaled, then node has received the heartbeat from the leader.
265+
// Otherwise, heartbeat timed out and node needs to start the new leader election
266+
if (m_leaderHeartbeatReceived.load())
246267
{
247268
{
248-
absl::MutexLock locker(&m_electionMutex);
249-
spdlog::info("node={} received heartbeat during term={}", m_id, m_currentTerm);
269+
spdlog::info("Node={} received heartbeat", m_id);
250270
}
251271

252272
m_leaderHeartbeatReceived.store(false);
273+
continue;
253274
}
254-
else
275+
else if (timeToWaitMs <= 0)
255276
{
256277
startElection();
257278
}
258279
}
259280
});
260281

261-
/*m_serverThread = std::jthread(*/
262-
/*[this](std::stop_token token)*/
263282
{
264283
assert(m_server);
265284
spdlog::info("Listening for RPC requests on ");
266285
m_server->Wait();
267286
}
268-
/*);*/
269287
}
270288

271289
void startServer()
@@ -276,7 +294,7 @@ class ConsensusModule : public RaftService::Service, std::enable_shared_from_thi
276294
void stop()
277295
{
278296
m_stopElectionTimer = false;
279-
m_timerCV.notify_all();
297+
m_timerCV.SignalAll();
280298
m_electionTimerThread.request_stop();
281299
m_electionTimerThread.join();
282300

@@ -308,18 +326,16 @@ class ConsensusModule : public RaftService::Service, std::enable_shared_from_thi
308326
{
309327
// TODO(vbejanyan): Staff to do
310328

311-
resetElectionTimer();
312-
313329
// TODO(vbejanyan): Work continues
314330
}
315331

316332
// Timer handling
317333
// Called every time 'AppendEntries' received.
318334
void resetElectionTimer()
319335
{
320-
/*m_timerMutex.AssertHeld();*/
321336
m_electionTimeout.store(generateRandomTimeout());
322-
m_timerCV.notify_all();
337+
m_electionTimeoutResetTime = std::chrono::high_resolution_clock::now();
338+
m_timerCV.Signal();
323339
}
324340

325341
// The logic behind election
@@ -407,30 +423,6 @@ class ConsensusModule : public RaftService::Service, std::enable_shared_from_thi
407423
spdlog::info("Server reverted to FOLLOWER state in term={}", m_currentTerm);
408424
}
409425

410-
// A task to monitor the election timeout and start a new election if needed.
411-
/*void electionTimeoutTask(std::stop_token token)*/
412-
/*{*/
413-
/* while (!m_stopElectionTimer)*/
414-
/* {*/
415-
/* if (token.stop_requested())*/
416-
/* {*/
417-
/* return;*/
418-
/* }*/
419-
/**/
420-
/* std::unique_lock lock(m_timerMutex);*/
421-
/* if (m_timerCV.wait_for(lock,*/
422-
/* std::chrono::milliseconds(m_electionTimeout),*/
423-
/* [this]() { return m_leaderHeartbeatReceived.load(); }))*/
424-
/* {*/
425-
/* m_leaderHeartbeatReceived.store(false);*/
426-
/* }*/
427-
/* else*/
428-
/* {*/
429-
/* startElection();*/
430-
/* }*/
431-
/* }*/
432-
/*}*/
433-
/**/
434426
auto hasMajority(const uint32_t votes) const -> bool
435427
{
436428
return votes > static_cast<double>(m_replicas.size()) / 2.0;
@@ -557,14 +549,16 @@ class ConsensusModule : public RaftService::Service, std::enable_shared_from_thi
557549
std::vector<uint32_t> m_matchIndex;
558550

559551
// Election and election timer related fields.
560-
std::atomic<bool> m_leaderHeartbeatReceived{false};
561-
absl::Mutex m_timerMutex;
562-
std::condition_variable m_timerCV;
563-
std::atomic<bool> m_stopElectionTimer{false};
564-
std::atomic<int> m_electionTimeout{0};
565-
std::jthread m_electionTimerThread;
566-
std::atomic<uint32_t> m_voteCount{0};
567-
std::atomic<bool> m_electionInProgress{false};
552+
std::atomic<bool> m_leaderHeartbeatReceived{false};
553+
absl::Mutex m_timerMutex;
554+
absl::CondVar m_timerCV;
555+
std::atomic<bool> m_stopElectionTimer{false};
556+
std::atomic<int> m_electionTimeout{generateRandomTimeout()};
557+
std::chrono::high_resolution_clock::time_point m_electionTimeoutResetTime{
558+
std::chrono::high_resolution_clock::now()};
559+
std::jthread m_electionTimerThread;
560+
std::atomic<uint32_t> m_voteCount{0};
561+
std::atomic<bool> m_electionInProgress{false};
568562

569563
std::jthread m_serverThread;
570564
};

0 commit comments

Comments
 (0)