Skip to content

Commit

Permalink
fix log conflict bug
Browse files Browse the repository at this point in the history
  • Loading branch information
LLiuJJ committed Aug 15, 2024
1 parent d3e71cb commit dcb6096
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 95 deletions.
21 changes: 0 additions & 21 deletions example/include/rocksdb_storage_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,6 @@
class RocksDBStorageImpl : public Storage {

public:
/**
* @brief Get the Node Address object
*
* @param raft
* @param id
* @return std::string
*/
std::string GetNodeAddress(RaftServer* raft, std::string id);

/**
* @brief
*
* @param raft
* @param id
* @param address
* @return EStatus
*/
EStatus SaveNodeAddress(RaftServer* raft,
std::string id,
std::string address);

/**
* @brief
*
Expand Down
10 changes: 4 additions & 6 deletions example/src/eraftkv_ctl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,13 @@ int main(int argc, char* argv[]) {
case PutKV: {
auto partition_key = std::string(std::string(argv[3]));
auto value = std::string(std::string(argv[4]));
eraftkv_ctl
.UpdateKvServerLeaderStubByPartitionKey(partition_key);
eraftkv_ctl.PutKV(partition_key, value);
eraftkv_ctl.UpdateKvServerLeaderStubByPartitionKey(partition_key);
eraftkv_ctl.PutKV(partition_key, value);
break;
}
case GetKV: {
eraftkv_ctl
.UpdateKvServerLeaderStubByPartitionKey(std::string(argv[3]));
eraftkv_ctl.GetKV(std::string(argv[3]));
eraftkv_ctl.UpdateKvServerLeaderStubByPartitionKey(std::string(argv[3]));
eraftkv_ctl.GetKV(std::string(argv[3]));
break;
}
default:
Expand Down
5 changes: 5 additions & 0 deletions example/src/rocksdb_log_storage_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ EStatus RocksDBSingleLogStorageImpl::Append(eraftkv::Entry* ety) {
auto st = log_db_->Put(rocksdb::WriteOptions(), key, val);
assert(st.ok());
this->last_idx = ety->id();
auto status = log_db_->Put(
rocksdb::WriteOptions(), "M:LAST_IDX", std::to_string(this->last_idx));
if (!status.ok()) {
return EStatus::kError;
}
return EStatus::kOk;
}

Expand Down
26 changes: 0 additions & 26 deletions example/src/rocksdb_storage_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,6 @@
#include "eraftkv_server.h"
#include "protocol/eraftkv.pb.h"

/**
* @brief Get the Node Address object
*
* @param raft
* @param id
* @return std::string
*/
std::string RocksDBStorageImpl::GetNodeAddress(RaftServer* raft,
std::string id) {
return std::string("");
}

/**
* @brief
*
* @param raft
* @param id
* @param address
* @return EStatus
*/
EStatus RocksDBStorageImpl::SaveNodeAddress(RaftServer* raft,
std::string id,
std::string address) {
return EStatus::kOk;
}

/**
* @brief
*
Expand Down
22 changes: 0 additions & 22 deletions raftcore/include/eraft/kv_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,28 +52,6 @@ class Storage {
*/
virtual ~Storage() {}


/**
* @brief Get the Node Address object
*
* @param raft
* @param id
* @return std::string
*/
virtual std::string GetNodeAddress(RaftServer* raft, std::string id) = 0;

/**
* @brief
*
* @param raft
* @param id
* @param address
* @return EStatus
*/
virtual EStatus SaveNodeAddress(RaftServer* raft,
std::string id,
std::string address) = 0;

/**
* @brief
*
Expand Down
40 changes: 20 additions & 20 deletions raftcore/src/raft_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ RaftServer::RaftServer(RaftConfig raft_config,
, max_entries_per_append_req_(100)
, tick_interval_(100)
, granted_votes_(0)
, snap_threshold_log_count_(500)
, snap_threshold_log_count_(10000)
, open_auto_apply_(true)
, is_snapshoting_(false)
, snap_db_path_(raft_config.snap_path)
Expand Down Expand Up @@ -141,10 +141,10 @@ RaftServer* RaftServer::RunMainLoop(
svr->response_ready_singals_ = response_ready_singals;
svr->response_ready_mutex_ = response_ready_mutex;
svr->is_ok_to_response_ = is_ok_to_response;
std::thread th(&RaftServer::RunCycle, svr);
th.detach();
std::thread th1(&RaftServer::RunApply, svr);
th1.detach();
std::thread cycleThread(&RaftServer::RunCycle, svr);
cycleThread.detach();
std::thread applyThread(&RaftServer::RunApply, svr);
applyThread.detach();
return svr;
}

Expand Down Expand Up @@ -731,35 +731,35 @@ EStatus RaftServer::HandleAppendEntriesReq(RaftNode* from_node,
// after snapshoting GetLastEty()->term() is 0
if (!(this->MatchLog(req->prev_log_term(), req->prev_log_index()) ||
this->log_store_->GetLastEty()->term() == 0)) {
resp->set_success(true);
resp->set_success(false);
if (this->log_store_->LastIndex() < req->prev_log_index()) {
SPDLOG_INFO("log conflict with index {} term {}",
this->log_store_->LastIndex(),
this->log_store_->GetLastEty()->term());
resp->set_conflict_index(this->log_store_->LastIndex());
resp->set_conflict_term(this->log_store_->GetLastEty()->term());
this->log_store_->LastIndex() + 1,
-1);
resp->set_conflict_index(this->log_store_->LastIndex() + 1);
resp->set_conflict_term(-1);
} else {
// set term
resp->set_conflict_term(
this->log_store_->Get(req->prev_log_index())->term());
// find conflict index
auto index = req->prev_log_index();
while (index >= this->commit_idx_ &&
auto index = req->prev_log_index() - 1;
while (index >= this->log_store_->FirstIndex() &&
this->log_store_->Get(index)->term() == resp->conflict_term()) {
index -= 1;
}
resp->set_conflict_index(index);
}
} else {
for (auto ety : req->entries()) {
this->log_store_->Append(&ety);
this->log_store_->PersisLogMetaState(this->commit_idx_,
this->last_applied_idx_);
}
this->AdvanceCommitIndexForFollower(req->leader_commit());
resp->set_success(true);
return EStatus::kOk;
}

for (auto ety : req->entries()) {
this->log_store_->Append(&ety);
this->log_store_->PersisLogMetaState(this->commit_idx_,
this->last_applied_idx_);
}
this->AdvanceCommitIndexForFollower(req->leader_commit());
resp->set_success(true);
return EStatus::kOk;
}

Expand Down

0 comments on commit dcb6096

Please sign in to comment.