diff --git a/Makefile b/Makefile index 9f883765..c95e2cda 100644 --- a/Makefile +++ b/Makefile @@ -39,11 +39,6 @@ build-dev: chmod +x scripts/build-dev.sh docker run -it --rm -v $(realpath .):/eraft eraft/eraftkv:$(IMAGE_VERSION) /eraft/scripts/build-dev.sh -# run all unit test -tests: - chmod +x scripts/run-tests.sh - docker run -it --rm -v $(realpath .):/eraft eraft/eraftkv:$(IMAGE_VERSION) /eraft/scripts/run-tests.sh - create-net: docker network create --subnet=172.18.0.0/16 mytestnetwork diff --git a/raftcore/include/eraft/raft_server.h b/raftcore/include/eraft/raft_server.h index 5a3fd9a5..edd4d0e3 100644 --- a/raftcore/include/eraft/raft_server.h +++ b/raftcore/include/eraft/raft_server.h @@ -36,6 +36,7 @@ #include #include #include +#include #include "eraft/estatus.h" #include "eraft/raft_config.h" @@ -113,6 +114,12 @@ class RaftServer { */ void RunApply(); + /** + * @brief + * + */ + void NotifyToApply(); + /** * @brief * @@ -249,20 +256,6 @@ class RaftServer { */ EStatus ElectionStart(bool is_prevote); - - /** - * @brief Get the Last Applied Entry object - * - * @return Entry* - */ - eraftkv::Entry* GetLastAppliedEntry(); - /** - * @brief Get the First Entry Idx object - * - * @return int64_t - */ - int64_t GetFirstEntryIdx(); - /** * @brief * @@ -550,6 +543,24 @@ class RaftServer { */ bool is_snapshoting_; + /** + * @brief + * + */ + bool ready_to_apply_; + + /** + * @brief + * + */ + std::mutex apply_ready_mtx_; + + /** + * @brief + * + */ + std::condition_variable apply_ready_cv_; + private: /** * @brief diff --git a/raftcore/src/raft_server.cc b/raftcore/src/raft_server.cc index 5deca40e..63d667ef 100644 --- a/raftcore/src/raft_server.cc +++ b/raftcore/src/raft_server.cc @@ -74,7 +74,8 @@ RaftServer::RaftServer(RaftConfig raft_config, , open_auto_apply_(true) , is_snapshoting_(false) , snap_db_path_(raft_config.snap_path) - , election_running_(true) { + , election_running_(true) + , ready_to_apply_(false) { this->log_store_ = log_store; this->store_ = store; this->net_ = net; @@ -106,7 +107,7 @@ RaftServer::~RaftServer() { } /** - * @brief + * @brief generate random election timeout for election * * @return EStatus */ @@ -120,7 +121,7 @@ EStatus RaftServer::ResetRandomElectionTimeout() { } /** - * @brief + * @brief run server mainloop * * @param raft_config * @param log_store @@ -141,14 +142,19 @@ RaftServer* RaftServer::RunMainLoop(RaftConfig raft_config, } /** - * @brief + * @brief run apply * */ void RaftServer::RunApply() { while (true) { + { + std::unique_lock lock(apply_ready_mtx_); + apply_ready_cv_.wait(lock, [this] { return this->ready_to_apply_; }); + } if (open_auto_apply_) { this->ApplyEntries(); } + this->ready_to_apply_ = false; } } @@ -467,6 +473,17 @@ EStatus RaftServer::ApplyEntries() { return EStatus::kOk; } +/** + * @brief + * + */ +void RaftServer::NotifyToApply() { + std::lock_guard lock(this->apply_ready_mtx_); + this->ready_to_apply_ = true; + this->apply_ready_cv_.notify_one(); +} + + /** * @brief * @@ -882,12 +899,12 @@ EStatus RaftServer::AdvanceCommitIndexForLeader() { } sort(match_idxs.begin(), match_idxs.end()); int64_t new_commit_index = match_idxs[match_idxs.size() / 2]; - if (new_commit_index > this->commit_idx_) { - if (this->MatchLog(this->current_term_, new_commit_index)) { - this->commit_idx_ = new_commit_index; - this->log_store_->PersisLogMetaState(this->commit_idx_, - this->last_applied_idx_); - } + if (new_commit_index > this->commit_idx_ && + this->MatchLog(this->current_term_, new_commit_index)) { + this->commit_idx_ = new_commit_index; + this->log_store_->PersisLogMetaState(this->commit_idx_, + this->last_applied_idx_); + this->NotifyToApply(); } return EStatus::kOk; } @@ -899,13 +916,13 @@ EStatus RaftServer::AdvanceCommitIndexForLeader() { * @return EStatus */ EStatus RaftServer::AdvanceCommitIndexForFollower(int64_t leader_commit) { - int64_t new_commit_index = std::min(leader_commit, this->log_store_->GetLastEty()->id()); if (new_commit_index > this->commit_idx_) { this->commit_idx_ = new_commit_index; this->log_store_->PersisLogMetaState(this->commit_idx_, this->last_applied_idx_); + this->NotifyToApply(); } return EStatus::kOk; } @@ -1139,24 +1156,6 @@ EStatus RaftServer::SnapshotingStart(int64_t ety_idx) { return EStatus::kOk; } -/** - * @brief Get the Last Applied Entry object - * - * @return Entry* - */ -eraftkv::Entry* RaftServer::GetLastAppliedEntry() { - return nullptr; -} - -/** - * @brief Get the First Entry Idx object - * - * @return int64_t - */ -int64_t RaftServer::GetFirstEntryIdx() { - return 0; -} - std::vector RaftServer::GetNodes() { return nodes_; } diff --git a/scripts/run-ci-tests.sh b/scripts/run-ci-tests.sh old mode 100644 new mode 100755 diff --git a/scripts/run-tests.sh b/scripts/run-tests.sh index 037f3b71..e45eac6f 100755 --- a/scripts/run-tests.sh +++ b/scripts/run-tests.sh @@ -2,7 +2,7 @@ set -xe # run test exe -/eraft/build/eraftkv-ctl 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 add_group 1 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 -/eraft/build/eraftkv-ctl 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 query_groups -/eraft/build/eraftkv-ctl 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 set_slot 1 0-9 -/eraft/build/eraftkv-ctl 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 run_bench 600 +/eraft/build/example/eraftkv-ctl 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 add_group 1 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 +/eraft/build/example/eraftkv-ctl 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 query_groups +/eraft/build/example/eraftkv-ctl 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 set_slot 1 0-9 +/eraft/build/example/eraftkv-ctl 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 run_bench 100