Skip to content

Commit f53b501

Browse files
committed
Fix state machine modification during log replication
1 parent 2b7359e commit f53b501

File tree

2 files changed

+106
-65
lines changed

2 files changed

+106
-65
lines changed

examples/raft/raft.cpp

Lines changed: 97 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
#include "raft.h"
22

3-
#include "db.h"
4-
53
#include <cstdint>
64
#include <cstdlib>
75
#include <memory>
@@ -25,13 +23,14 @@
2523
#include <fmt/format.h>
2624
#include <spdlog/spdlog.h>
2725

28-
static int gFirstElection = 0;
26+
static bool gFirstElection = true;
2927

3028
NodeClient::NodeClient(ID nodeId, IP nodeIp)
3129
: m_id{nodeId},
3230
m_ip{std::move(nodeIp)},
3331
m_channel(grpc::CreateChannel(m_ip, grpc::InsecureChannelCredentials())),
34-
m_stub(RaftService::NewStub(m_channel))
32+
m_stub(RaftService::NewStub(m_channel)),
33+
m_kvStub(TinyKVPPService::NewStub(m_channel))
3534
{
3635
assert(m_id > 0);
3736
assert(!m_ip.empty());
@@ -45,6 +44,11 @@ NodeClient::NodeClient(ID nodeId, IP nodeIp)
4544
{
4645
throw std::runtime_error(fmt::format("Failed to create a stub for node={} ip={}", m_id, m_id));
4746
}
47+
48+
if (!m_kvStub)
49+
{
50+
throw std::runtime_error(fmt::format("Failed to create a KV stub for node={} ip={}", m_id, m_id));
51+
}
4852
}
4953

5054
auto NodeClient::appendEntries(const AppendEntriesRequest &request, AppendEntriesResponse *response) -> bool
@@ -77,6 +81,22 @@ auto NodeClient::requestVote(const RequestVoteRequest &request, RequestVoteRespo
7781
return true;
7882
}
7983

84+
auto NodeClient::put(const PutRequest &request, PutResponse *pResponse) -> bool
85+
{
86+
grpc::ClientContext context;
87+
context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(generateRandomTimeout()));
88+
grpc::Status status = m_kvStub->Put(&context, request, pResponse);
89+
if (!status.ok())
90+
{
91+
spdlog::error("Put RPC call failed. Error code={} and message={}",
92+
static_cast<int>(status.error_code()),
93+
status.error_message());
94+
return false;
95+
}
96+
97+
return true;
98+
}
99+
80100
auto NodeClient::getId() const -> ID
81101
{
82102
return m_id;
@@ -126,10 +146,9 @@ auto ConsensusModule::AppendEntries(grpc::ServerContext *pContext,
126146
(void)pRequest;
127147
(void)pResponse;
128148

129-
spdlog::info("Recevied AppendEntries RPC from leader={} during term={}", pRequest->senderid(), pRequest->term());
149+
spdlog::debug("Recevied AppendEntries RPC from leader={} during term={}", pRequest->senderid(), pRequest->term());
130150

131151
absl::MutexLock locker(&m_stateMutex);
132-
/*absl::MutexLock timerLocker(&m_timerMutex);*/
133152

134153
// 1. Term check
135154
if (pRequest->term() < m_currentTerm)
@@ -172,19 +191,34 @@ auto ConsensusModule::AppendEntries(grpc::ServerContext *pContext,
172191

173192
m_log.insert(m_log.end(), pRequest->entries().begin(), pRequest->entries().end());
174193

194+
/*const auto &entries = pRequest->entries();*/
195+
/*for (const auto &entry : entries)*/
196+
/*{*/
197+
/* m_wal.add(db::wal::wal_t::record_t{.op = db::wal::wal_t::operation_k::add_k,*/
198+
/* .kv = {structures::memtable::memtable_t::record_t::key_t{entry.key()},*/
199+
/* structures::memtable::memtable_t::record_t::value_t{entry.value()}}});*/
200+
/*}*/
201+
175202
if (pRequest->leadercommit() > m_commitIndex)
176203
{
177204
m_commitIndex = std::min(pRequest->leadercommit(), (uint32_t)m_log.size());
205+
206+
while (m_lastApplied < m_commitIndex)
207+
{
208+
++m_lastApplied;
209+
m_kv[m_log[m_lastApplied - 1].key()] = m_log[m_lastApplied - 1].value();
210+
}
178211
}
179212

180213
pResponse->set_term(m_currentTerm);
181214
pResponse->set_success(true);
182215
pResponse->set_responderid(m_id);
183216
pResponse->set_match_index(m_log.size());
184217

218+
m_votedFor = pRequest->senderid();
185219
m_leaderHeartbeatReceived.store(true);
186220

187-
spdlog::info("Node={} is resetting election timeout at term={}", m_id, m_currentTerm);
221+
spdlog::debug("Node={} is resetting election timeout at term={}", m_id, m_currentTerm);
188222

189223
return grpc::Status::OK;
190224
}
@@ -197,10 +231,10 @@ auto ConsensusModule::RequestVote(grpc::ServerContext *pContext,
197231

198232
absl::WriterMutexLock locker(&m_stateMutex);
199233

200-
spdlog::info("Received RequestVote RPC from candidate={} during term={} peerTerm={}",
201-
pRequest->candidateid(),
202-
m_currentTerm,
203-
pRequest->term());
234+
spdlog::debug("Received RequestVote RPC from candidate={} during term={} peerTerm={}",
235+
pRequest->candidateid(),
236+
m_currentTerm,
237+
pRequest->term());
204238

205239
pResponse->set_term(m_currentTerm);
206240
pResponse->set_votegranted(0);
@@ -245,19 +279,42 @@ auto ConsensusModule::Put(grpc::ServerContext *pContext, const PutRequest *pRequ
245279

246280
uint32_t currentTerm = 0;
247281
uint32_t lastLogIndex = 0;
282+
uint32_t votedFor = 0;
248283
{
249284
absl::MutexLock locker{&m_stateMutex};
250285
if (m_state != NodeState::LEADER)
251286
{
252-
spdlog::error("Non-leader node={} received a put request", m_id);
253-
pResponse->set_status("");
254-
return grpc::Status::OK;
287+
if (m_votedFor != invalidId)
288+
{
289+
votedFor = m_votedFor;
290+
}
291+
else
292+
{
293+
spdlog::error("Non-leader node={} received a put request. Leader at current term is unkown.", m_id);
294+
pResponse->set_status("");
295+
return grpc::Status::OK;
296+
}
255297
}
256298

257299
currentTerm = m_currentTerm;
258300
lastLogIndex = getLastLogIndex() + 1;
259301
}
260302

303+
if (votedFor != invalidId)
304+
{
305+
spdlog::error("Non-leader node={} received a put request. Forwarding to leader={} during currentTerm={}",
306+
m_id,
307+
votedFor,
308+
currentTerm);
309+
310+
if (!m_replicas[votedFor]->put(*pRequest, pResponse))
311+
{
312+
spdlog::error("Non-leader node={} was unable to forward put RPC to leader={}", m_id, votedFor);
313+
}
314+
315+
return grpc::Status::OK;
316+
}
317+
261318
LogEntry logEntry;
262319
logEntry.set_term(currentTerm);
263320
logEntry.set_index(lastLogIndex);
@@ -272,21 +329,14 @@ auto ConsensusModule::Put(grpc::ServerContext *pContext, const PutRequest *pRequ
272329

273330
for (auto &[id, client] : m_replicas)
274331
{
275-
sendAppendEntriesRPC(client, {logEntry});
332+
sendAppendEntriesRPC(client.value(), {logEntry});
276333
}
277334

278335
absl::MutexLock locker{&m_stateMutex};
279336
bool success = waitForMajorityReplication(logEntry.index());
280337
if (success)
281338
{
282339
spdlog::info("Node={} majority agreed on logEntry={}", m_id, logEntry.index());
283-
284-
// Apply successfull replication to the state machine e.g. in-memory hash-table
285-
while (m_lastApplied < m_commitIndex)
286-
{
287-
++m_lastApplied;
288-
m_kv[m_log[m_lastApplied - 1].key()] = m_log[m_lastApplied - 1].value();
289-
}
290340
}
291341
else
292342
{
@@ -354,16 +404,16 @@ void ConsensusModule::start()
354404
absl::MutexLock locker(&m_timerMutex); // Lock the mutex using Abseil's MutexLock
355405

356406
// Determine the timeout duration
357-
int64_t timeToWaitMs = gFirstElection == 0 ? generateRandomTimeout() : 1'000'000'000;
407+
int64_t timeToWaitMs = gFirstElection ? generateRandomTimeout() : 1'000'000'000;
358408
int64_t timeToWaitDeadlineMs = currentTimeMs() + timeToWaitMs;
359409

360410
// Define the condition to wait for leader's heartbeat
361411
auto heartbeatReceivedCondition = [this, &timeToWaitDeadlineMs, currentTimeMs]()
362412
{ return m_leaderHeartbeatReceived.load() || currentTimeMs() >= timeToWaitDeadlineMs; };
363413

364-
spdlog::info("Timer thread at node={} will block for {}ms for the leader to send a heartbeat",
365-
m_id,
366-
timeToWaitMs);
414+
spdlog::debug("Timer thread at node={} will block for {}ms for the leader to send a heartbeat",
415+
m_id,
416+
timeToWaitMs);
367417

368418
// Wait for the condition to be met or timeout
369419
bool heartbeatReceived = m_timerMutex.AwaitWithTimeout(absl::Condition(&heartbeatReceivedCondition),
@@ -373,8 +423,8 @@ void ConsensusModule::start()
373423
// Otherwise, heartbeat timed out and node needs to start the new leader election
374424
if (heartbeatReceived && m_leaderHeartbeatReceived.load())
375425
{
376-
gFirstElection = 1;
377-
spdlog::info("Node={} received heartbeat", m_id);
426+
gFirstElection = false;
427+
spdlog::debug("Node={} received heartbeat", m_id);
378428
m_leaderHeartbeatReceived.store(false);
379429
}
380430
else
@@ -387,7 +437,7 @@ void ConsensusModule::start()
387437

388438
{
389439
assert(m_raftServer);
390-
spdlog::info("Listening for RPC requests on ");
440+
spdlog::debug("Listening for RPC requests on ");
391441
m_raftServer->Wait();
392442
}
393443
}
@@ -436,7 +486,7 @@ void ConsensusModule::startElection()
436486
m_currentTerm++;
437487
m_state = NodeState::CANDIDATE;
438488

439-
spdlog::info("Node={} starts election. New term={}", m_id, m_currentTerm);
489+
spdlog::debug("Node={} starts election. New term={}", m_id, m_currentTerm);
440490

441491
// Node in a canditate state should vote for itself.
442492
m_voteCount++;
@@ -453,7 +503,7 @@ void ConsensusModule::startElection()
453503
for (auto &[id, client] : m_replicas)
454504
{
455505
RequestVoteResponse response;
456-
if (!client.requestVote(request, &response))
506+
if (!client->requestVote(request, &response))
457507
{
458508
spdlog::error("RequestVote RPC failed in requester thread");
459509
return;
@@ -462,10 +512,10 @@ void ConsensusModule::startElection()
462512
auto responseTerm = response.term();
463513
auto voteGranted = response.votegranted();
464514

465-
spdlog::info("Received RequestVoteResponse in requester thread peerTerm={} voteGranted={} responseTerm={}",
466-
responseTerm,
467-
voteGranted,
468-
response.responderid());
515+
spdlog::debug("Received RequestVoteResponse in requester thread peerTerm={} voteGranted={} responseTerm={}",
516+
responseTerm,
517+
voteGranted,
518+
response.responderid());
469519

470520
absl::MutexLock locker(&m_stateMutex);
471521
if (responseTerm > m_currentTerm)
@@ -503,7 +553,7 @@ void ConsensusModule::becomeFollower(uint32_t newTerm)
503553
}
504554
m_heartbeatThreads.clear();
505555

506-
spdlog::info("Server reverted to FOLLOWER state in term={}", m_currentTerm);
556+
spdlog::debug("Server reverted to FOLLOWER state in term={}", m_currentTerm);
507557
}
508558

509559
auto ConsensusModule::hasMajority(uint32_t votes) const -> bool
@@ -525,11 +575,11 @@ void ConsensusModule::becomeLeader()
525575

526576
m_state = NodeState::LEADER;
527577

528-
spdlog::info("Node={} become a leader at term={}", m_id, m_currentTerm);
578+
spdlog::debug("Node={} become a leader at term={}", m_id, m_currentTerm);
529579

530580
for (auto &[id, client] : m_replicas)
531581
{
532-
sendHeartbeat(client);
582+
sendHeartbeat(client.value());
533583
}
534584
}
535585

@@ -541,7 +591,7 @@ void ConsensusModule::sendHeartbeat(NodeClient &client)
541591
m_heartbeatThreads.emplace_back(
542592
[this, maxRetries, &client, heartbeatInterval](std::stop_token token)
543593
{
544-
spdlog::info("Node={} is starting a heartbeat thread for client={}", m_id, client.getId());
594+
spdlog::debug("Node={} is starting a heartbeat thread for client={}", m_id, client.getId());
545595

546596
int consecutiveFailures = 0;
547597
while (!token.stop_requested())
@@ -551,7 +601,7 @@ void ConsensusModule::sendHeartbeat(NodeClient &client)
551601
absl::ReaderMutexLock locker(&m_stateMutex);
552602
if (m_state != NodeState::LEADER)
553603
{
554-
spdlog::info("Node={} is no longer a leader. Stopping the heartbeat thread");
604+
spdlog::debug("Node={} is no longer a leader. Stopping the heartbeat thread");
555605
break;
556606
}
557607

@@ -585,7 +635,7 @@ void ConsensusModule::sendHeartbeat(NodeClient &client)
585635
auto responseTerm = response.term();
586636
auto success = response.success();
587637

588-
spdlog::info(
638+
spdlog::debug(
589639
"Received AppendEntriesResponse in requester thread peerTerm={} success={} responderId={}",
590640
responseTerm,
591641
success,
@@ -599,18 +649,13 @@ void ConsensusModule::sendHeartbeat(NodeClient &client)
599649
becomeFollower(responseTerm);
600650
break;
601651
}
602-
603-
if (!success)
604-
{
605-
/*decrementNextIndex(client.getId());*/
606-
}
607652
}
608653
}
609654

610655
std::this_thread::sleep_for(heartbeatInterval);
611656
}
612657

613-
spdlog::info("Stopping heartbeat thread for on the node={} for the client={}", m_id, client.getId());
658+
spdlog::debug("Stopping heartbeat thread for on the node={} for the client={}", m_id, client.getId());
614659
});
615660
}
616661

@@ -631,16 +676,7 @@ void ConsensusModule::sendAppendEntriesRPC(NodeClient &client, std::vector<LogEn
631676

632677
for (auto logEntry : logEntries)
633678
{
634-
spdlog::info("VAGAG: logEntry.command={} logEntry.command.size={}",
635-
logEntry.command(),
636-
logEntry.command().size());
637679
request.add_entries()->CopyFrom(logEntry);
638-
/*auto *entry = request.add_entries();*/
639-
/*entry->set_term(logEntry.term());*/
640-
/*entry->set_index(logEntry.index());*/
641-
/*entry->set_command(logEntry.command());*/
642-
/*entry->set_key(logEntry.key());*/
643-
/*entry->set_value(logEntry.value());*/
644680
}
645681
}
646682

@@ -677,11 +713,11 @@ void ConsensusModule::sendAppendEntriesRPC(NodeClient &client, std::vector<LogEn
677713
m_commitIndex = majorityIndex;
678714

679715
// Apply successfull replication to the state machine e.g. in-memory hash-table
680-
/*while (m_lastApplied < m_commitIndex)*/
681-
/*{*/
682-
/* ++m_lastApplied;*/
683-
/* m_kv[m_log[m_lastApplied - 1].key()] = m_log[m_lastApplied - 1].value();*/
684-
/*}*/
716+
while (m_lastApplied < m_commitIndex)
717+
{
718+
++m_lastApplied;
719+
m_kv[m_log[m_lastApplied - 1].key()] = m_log[m_lastApplied - 1].value();
720+
}
685721

686722
return;
687723
}

0 commit comments

Comments
 (0)