Skip to content

Commit

Permalink
fix match and next idx
Browse files Browse the repository at this point in the history
  • Loading branch information
LLiuJJ committed Aug 20, 2023
1 parent 0b616d3 commit 0ed9b55
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 18 deletions.
33 changes: 22 additions & 11 deletions src/raft_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ RaftServer::RaftServer(RaftConfig raft_config,
for (auto n : raft_config.peer_address_map) {
RaftNode* node = new RaftNode(n.first,
NodeStateEnum::Running,
0,
this->log_store_->LastIndex() + 1,
0,
n.second);
this->nodes_.push_back(node);
}
Expand Down Expand Up @@ -191,7 +191,7 @@ EStatus RaftServer::SendAppendEntries() {
return EStatus::kNotSupport;
}

for (auto& node : this->nodes_) {
for (auto node : this->nodes_) {
if (node->id == this->id_ || node->node_state == NodeStateEnum::Down ||
this->is_snapshoting_) {
continue;
Expand Down Expand Up @@ -261,7 +261,7 @@ EStatus RaftServer::SendAppendEntries() {
* @return EStatus
*/
EStatus RaftServer::ApplyEntries() {
std::lock_guard<std::mutex> guard(raft_op_mutex_);
// std::lock_guard<std::mutex> guard(raft_op_mutex_);
this->store_->ApplyLog(this, 0, 0);
return EStatus::kOk;
}
Expand All @@ -282,7 +282,7 @@ bool RaftServer::IsUpToDate(int64_t last_idx, int64_t term) {
EStatus RaftServer::HandleRequestVoteReq(RaftNode* from_node,
const eraftkv::RequestVoteReq* req,
eraftkv::RequestVoteResp* resp) {
std::lock_guard<std::mutex> guard(raft_op_mutex_);
// std::lock_guard<std::mutex> guard(raft_op_mutex_);
resp->set_term(current_term_);
resp->set_prevote(req->prevote());
SPDLOG_INFO("handle vote req " + req->DebugString());
Expand Down Expand Up @@ -326,11 +326,18 @@ EStatus RaftServer::SendHeartBeat() {
continue;
}

auto prev_log_index = node->next_log_index - 1;

SPDLOG_INFO("node prev_log_index {} node id {}", prev_log_index, node->id);
SPDLOG_INFO("current node first log index {}",
this->log_store_->FirstIndex());

eraftkv::AppendEntriesReq* append_req = new eraftkv::AppendEntriesReq();
append_req->set_is_heartbeat(true);
append_req->set_leader_id(this->id_);
append_req->set_term(this->current_term_);
append_req->set_leader_commit(this->commit_idx_);
append_req->set_prev_log_index(prev_log_index);

this->net_->SendAppendEntries(this, node, append_req);
}
Expand Down Expand Up @@ -406,7 +413,7 @@ EStatus RaftServer::Propose(std::string payload,
int64_t* new_log_index,
int64_t* new_log_term,
bool* is_success) {
std::lock_guard<std::mutex> guard(raft_op_mutex_);
// std::lock_guard<std::mutex> guard(raft_op_mutex_);
if (this->role_ != NodeRaftRoleEnum::Leader) {
*new_log_index = -1;
*new_log_term = -1;
Expand Down Expand Up @@ -447,7 +454,7 @@ EStatus RaftServer::Propose(std::string payload,
EStatus RaftServer::HandleAppendEntriesReq(RaftNode* from_node,
const eraftkv::AppendEntriesReq* req,
eraftkv::AppendEntriesResp* resp) {
std::lock_guard<std::mutex> guard(raft_op_mutex_);
// std::lock_guard<std::mutex> guard(raft_op_mutex_);

ResetRandomElectionTimeout();
election_tick_count_ = 0;
Expand Down Expand Up @@ -528,14 +535,18 @@ EStatus RaftServer::HandleAppendEntriesResp(RaftNode* from_node,
eraftkv::AppendEntriesResp* resp) {
if (role_ == NodeRaftRoleEnum::Leader) {
if (resp != nullptr) {
SPDLOG_INFO("send append entry resp {}", resp->DebugString());
if (resp->success()) {
for (auto node : this->nodes_) {
if (node->node_state == NodeStateEnum::Down) {
continue;
}
// if (node->node_state == NodeStateEnum::Down) {
// continue;
// }
if (from_node->id == node->id) {
node->match_log_index =
req->prev_log_index() + req->entries().size();
SPDLOG_INFO("update node {} match_log_index = {}",
from_node->id,
node->match_log_index);
node->next_log_index = node->match_log_index + 1;
this->AdvanceCommitIndexForLeader();
}
Expand Down Expand Up @@ -740,7 +751,7 @@ EStatus RaftServer::ProposeConfChange(std::string payload,
int64_t* new_log_index,
int64_t* new_log_term,
bool* is_success) {
std::lock_guard<std::mutex> guard(raft_op_mutex_);
// std::lock_guard<std::mutex> guard(raft_op_mutex_);
if (this->role_ != NodeRaftRoleEnum::Leader) {
*new_log_index = -1;
*new_log_term = -1;
Expand Down Expand Up @@ -880,7 +891,7 @@ EStatus RaftServer::ElectionStart(bool is_prevote) {
*/
EStatus RaftServer::SnapshotingStart(int64_t ety_idx, std::string snapdir) {

std::lock_guard<std::mutex> guard(raft_op_mutex_);
// std::lock_guard<std::mutex> guard(raft_op_mutex_);

this->is_snapshoting_ = true;
auto snap_index = this->log_store_->FirstIndex();
Expand Down
4 changes: 1 addition & 3 deletions src/rocksdb_storage_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,7 @@ EStatus RocksDBStorageImpl::ApplyLog(RaftServer* raft,
delete op_pair;
if (raft->log_store_->LogCount() > raft->snap_threshold_log_count_) {
// to snapshot
if (raft->IsLeader()) {
// raft->SnapshotingStart(ety->id(), raft->snap_db_path_);
}
raft->SnapshotingStart(ety->id(), raft->snap_db_path_);
}
break;
}
Expand Down
13 changes: 9 additions & 4 deletions utils/test_commands.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
set a h
set d d
set f f
set b f
set u u
set b e
set c l
set d l
set e o
set a h
set b e
set c l
set d l
set e o

0 comments on commit 0ed9b55

Please sign in to comment.