From 682a61a5cfa843c95a013c49bf351ba82a9263fe Mon Sep 17 00:00:00 2001 From: LLiuJJ Date: Mon, 14 Aug 2023 23:24:33 +0800 Subject: [PATCH] run string command synchronous --- src/client.cc | 2 +- src/client.h | 2 + src/del_command_handler.cc | 1 + src/eraftkv_server.cc | 54 ++++++++++++++----------- src/eraftkv_server.h | 3 ++ src/get_command_handler.cc | 1 + src/raft_server.cc | 79 +++++++++---------------------------- src/raft_server.h | 49 ++--------------------- src/rocksdb_storage_impl.cc | 28 ++++++------- src/set_command_handler.cc | 5 ++- utils/run-kdb-tests.sh | 20 +--------- utils/test_commands.txt | 12 ++++++ 12 files changed, 90 insertions(+), 166 deletions(-) create mode 100644 utils/test_commands.txt diff --git a/src/client.cc b/src/client.cc index 243b98b1..0cc7ea6e 100644 --- a/src/client.cc +++ b/src/client.cc @@ -46,7 +46,7 @@ PacketLength Client::_HandlePacket(const char *start, std::size_t bytes) { } Client::Client(std::string meta_addrs) - : leader_addr_(""), meta_addrs_(meta_addrs) { + : leader_addr_(""), meta_addrs_(meta_addrs), op_count_(0) { // init stub to meta server node auto meta_node_addrs = StringUtil::Split(meta_addrs, ','); for (auto meta_node_addr : meta_node_addrs) { diff --git a/src/client.h b/src/client.h index 62308546..c636742e 100644 --- a/src/client.h +++ b/src/client.h @@ -73,6 +73,8 @@ class Client : public StreamSocket { EStatus SyncClusterConfig(); + std::atomic op_count_; + void _Reset(); void OnConnect() override; diff --git a/src/del_command_handler.cc b/src/del_command_handler.cc index 3ff6234f..4a9d23c6 100644 --- a/src/del_command_handler.cc +++ b/src/del_command_handler.cc @@ -25,6 +25,7 @@ EStatus DelCommandHandler::Execute(const std::vector& params, std::string encode_key = EncodeStringKey(slot, params[1]); kv_pair_->set_key(encode_key); kv_pair_->set_op_type(eraftkv::ClientOpType::Del); + kv_pair_->set_op_count(RandomNumber::Between(1,10000)); std::string reply_buf; if (cli->kv_stubs_[leader_addr] != nullptr) { auto status_ = cli->kv_stubs_[leader_addr]->ProcessRWOperation( diff --git a/src/eraftkv_server.cc b/src/eraftkv_server.cc index a0dbc7b8..1c6bc543 100644 --- a/src/eraftkv_server.cc +++ b/src/eraftkv_server.cc @@ -42,6 +42,8 @@ std::map ERaftKvServer::ready_cond_vars_; std::mutex ERaftKvServer::ready_mutex_; +bool ERaftKvServer::is_ok_ = false; + /** * @brief * @@ -112,7 +114,12 @@ grpc::Status ERaftKvServer::ProcessRWOperation( return grpc::Status::OK; } for (auto kv_op : req->kvs()) { - TraceLog("DEBUG: ", " recv rw op type ", kv_op.op_type()); + int rand_seq = static_cast(RandomNumber::Between(1, 100000)); + TraceLog("DEBUG: ", + " recv rw op type ", + kv_op.op_type(), + " op count ", + rand_seq); switch (kv_op.op_type()) { case eraftkv::ClientOpType::Get: { auto val = raft_context_->store_->GetKV(kv_op.key()); @@ -123,34 +130,35 @@ grpc::Status ERaftKvServer::ProcessRWOperation( res->set_value(val.first); res->set_success(val.second); res->set_op_type(eraftkv::ClientOpType::Get); - res->set_op_count(op_count_); + res->set_op_count(kv_op.op_count()); break; } case eraftkv::ClientOpType::Put: case eraftkv::ClientOpType::Del: { std::mutex map_mutex_; { - op_count_ += 1; std::condition_variable* new_var = new std::condition_variable(); std::lock_guard lg(map_mutex_); - ERaftKvServer::ready_cond_vars_[op_count_] = new_var; - kv_op.set_op_count(op_count_); + ERaftKvServer::ready_cond_vars_[rand_seq] = new_var; + kv_op.set_op_count(rand_seq); } raft_context_->Propose( kv_op.SerializeAsString(), &log_index, &log_term, &success); { std::unique_lock ul(ERaftKvServer::ready_mutex_); - ERaftKvServer::ready_cond_vars_[op_count_]->wait(ul, - [] { return true; }); - ERaftKvServer::ready_cond_vars_.erase(op_count_); + ERaftKvServer::ready_cond_vars_[rand_seq]->wait( + ul, []() { return ERaftKvServer::is_ok_; }); + // ERaftKvServer::ready_cond_vars_.erase(rand_seq); + ERaftKvServer::is_ok_ = false; + TraceLog("DEBUG: ", " send resp "); + auto res = resp->add_ops(); + res->set_key(kv_op.key()); + res->set_value(kv_op.value()); + res->set_success(true); + res->set_op_type(kv_op.op_type()); + res->set_op_count(rand_seq); + break; } - auto res = resp->add_ops(); - res->set_key(kv_op.key()); - res->set_value(kv_op.value()); - res->set_success(true); - res->set_op_type(kv_op.op_type()); - res->set_op_count(op_count_); - break; } default: break; @@ -186,7 +194,7 @@ grpc::Status ERaftKvServer::ClusterConfigChange( new_sg->CopyFrom(*sg); delete sg; } - break; + return grpc::Status::OK; } case eraftkv::ChangeType::MembersQuery: { resp->set_success(true); @@ -201,7 +209,7 @@ grpc::Status ERaftKvServer::ClusterConfigChange( : g_server->set_server_status(eraftkv::ServerStatus::Down); } new_sg->set_leader_id(raft_context_->GetLeaderId()); - break; + return grpc::Status::OK; } default: { // no leader reject @@ -210,12 +218,12 @@ grpc::Status ERaftKvServer::ClusterConfigChange( resp->set_leader_addr(raft_context_->GetLeaderId()); return grpc::Status::OK; } + int rand_seq = static_cast(RandomNumber::Between(1, 10000)); std::mutex map_mutex_; { - op_count_ += 1; - std::condition_variable* new_var = new std::condition_variable(); std::lock_guard lg(map_mutex_); - conf_change_req->set_op_count(op_count_); + std::condition_variable* new_var = new std::condition_variable(); + conf_change_req->set_op_count(rand_seq); } bool success; raft_context_->ProposeConfChange(conf_change_req->SerializeAsString(), @@ -225,9 +233,9 @@ grpc::Status ERaftKvServer::ClusterConfigChange( { std::unique_lock ul(ERaftKvServer::ready_mutex_); - ERaftKvServer::ready_cond_vars_[op_count_]->wait(ul, - [] { return true; }); - ERaftKvServer::ready_cond_vars_.erase(op_count_); + ERaftKvServer::ready_cond_vars_[rand_seq]->wait(ul, + [] { return true; }); + ERaftKvServer::ready_cond_vars_.erase(rand_seq); } break; } diff --git a/src/eraftkv_server.h b/src/eraftkv_server.h index ff5aa6a5..aecfff9b 100644 --- a/src/eraftkv_server.h +++ b/src/eraftkv_server.h @@ -182,6 +182,8 @@ class ERaftKvServer : public eraftkv::ERaftKv::Service { static std::mutex ready_mutex_; + static bool is_ok_; + private: /** * @brief @@ -189,5 +191,6 @@ class ERaftKvServer : public eraftkv::ERaftKv::Service { */ static RaftServer* raft_context_; + int op_count_; }; diff --git a/src/get_command_handler.cc b/src/get_command_handler.cc index 10b9b8f3..a9d7b993 100644 --- a/src/get_command_handler.cc +++ b/src/get_command_handler.cc @@ -17,6 +17,7 @@ EStatus GetCommandHandler::Execute(const std::vector& params, std::string leader_addr; uint16_t slot; leader_addr = cli->GetShardLeaderAddrAndSlot(params[1], &slot); + TraceLog("DEBUG: send get request to leader ", leader_addr); ClientContext op_context; eraftkv::ClientOperationReq op_req; auto kv_pair_ = op_req.add_kvs(); diff --git a/src/raft_server.cc b/src/raft_server.cc index b7476643..0fdd9e86 100644 --- a/src/raft_server.cc +++ b/src/raft_server.cc @@ -89,6 +89,12 @@ RaftServer::RaftServer(RaftConfig raft_config, } } +RaftServer::~RaftServer() { + delete this->log_store_; + delete this->net_; + delete this->store_; +} + EStatus RaftServer::ResetRandomElectionTimeout() { // make rand election timeout in (election_timeout, 2 * election_timout) auto rand_tick = @@ -105,9 +111,22 @@ RaftServer* RaftServer::RunMainLoop(RaftConfig raft_config, RaftServer* svr = new RaftServer(raft_config, log_store, store, net); std::thread th(&RaftServer::RunCycle, svr); th.detach(); + std::thread th1(&RaftServer::RunApply, svr); + th1.detach(); return svr; } + +EStatus RaftServer::RunApply() { + while (true) { + if (open_auto_apply_) { + this->ApplyEntries(); + } + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + return EStatus::kOk; +} + /** * @brief raft core cycle * @@ -154,9 +173,6 @@ EStatus RaftServer::RunCycle() { } ResetRandomElectionTimeout(); } - if (open_auto_apply_) { - this->ApplyEntries(); - } std::this_thread::sleep_for(std::chrono::milliseconds(tick_interval_)); } return EStatus::kOk; @@ -847,63 +863,6 @@ int64_t RaftServer::GetFirstEntryIdx() { return 0; } -/** - * @brief - * - * @return EStatus - */ -EStatus RaftServer::RestoreSnapshotAfterRestart() { - return EStatus::kOk; -} - -/** - * @brief - * - * @param last_included_term - * @param last_included_index - * @return EStatus - */ -EStatus RaftServer::BeginLoadSnapshot(int64_t last_included_term, - int64_t last_included_index) { - return EStatus::kOk; -} - -/** - * @brief - * - * @return EStatus - */ -EStatus RaftServer::EndLoadSnapshot() { - return EStatus::kOk; -} - -/** - * @brief - * - * @return EStatus - */ -EStatus RaftServer::ProposeReadReq() { - return EStatus::kOk; -} - -/** - * @brief Get the Logs Count Can Snapshot object - * - * @return int64_t - */ -int64_t RaftServer::GetLogsCountCanSnapshot() { - return 0; -} - -/** - * @brief - * - * @return EStatus - */ -EStatus RaftServer::RestoreLog() { - return EStatus::kOk; -} - std::vector RaftServer::GetNodes() { return nodes_; } diff --git a/src/raft_server.h b/src/raft_server.h index c39e007b..a31912ac 100644 --- a/src/raft_server.h +++ b/src/raft_server.h @@ -85,6 +85,8 @@ class RaftServer { Storage* store, Network* net); + ~RaftServer(); + /** * @brief raft core cycle * @@ -92,6 +94,8 @@ class RaftServer { */ EStatus RunCycle(); + EStatus RunApply(); + /** * @brief * @@ -260,51 +264,6 @@ class RaftServer { */ int64_t GetFirstEntryIdx(); - /** - * @brief - * - * @return EStatus - */ - EStatus RestoreSnapshotAfterRestart(); - - /** - * @brief - * - * @param last_included_term - * @param last_included_index - * @return EStatus - */ - EStatus BeginLoadSnapshot(int64_t last_included_term, - int64_t last_included_index); - - /** - * @brief - * - * @return EStatus - */ - EStatus EndLoadSnapshot(); - - /** - * @brief - * - * @return EStatus - */ - EStatus ProposeReadReq(); - - /** - * @brief Get the Logs Count Can Snapshot object - * - * @return int64_t - */ - int64_t GetLogsCountCanSnapshot(); - - /** - * @brief - * - * @return EStatus - */ - EStatus RestoreLog(); - /** * @brief * diff --git a/src/rocksdb_storage_impl.cc b/src/rocksdb_storage_impl.cc index f3de5da0..b7e563b3 100644 --- a/src/rocksdb_storage_impl.cc +++ b/src/rocksdb_storage_impl.cc @@ -75,9 +75,9 @@ EStatus RocksDBStorageImpl::SaveNodeAddress(RaftServer* raft, EStatus RocksDBStorageImpl::ApplyLog(RaftServer* raft, int64_t snapshot_index, int64_t snapshot_term) { - if (raft->commit_idx_ == raft->last_applied_idx_) { - return EStatus::kOk; - } + // if (raft->commit_idx_ == raft->last_applied_idx_) { + // return EStatus::kOk; + // } auto etys = raft->log_store_->Gets(raft->last_applied_idx_, raft->commit_idx_); for (auto ety : etys) { @@ -92,14 +92,11 @@ EStatus RocksDBStorageImpl::ApplyLog(RaftServer* raft, ety->id()); raft->last_applied_idx_ = ety->id(); if (raft->role_ == NodeRaftRoleEnum::Leader) { - std::mutex map_mutex; { - std::lock_guard lg(map_mutex); - if (ERaftKvServer::ready_cond_vars_[op_pair->op_count()] != - nullptr) { - ERaftKvServer::ready_cond_vars_[op_pair->op_count()] - ->notify_one(); - } + std::lock_guard lg(ERaftKvServer::ready_mutex_); + ERaftKvServer::is_ok_ = true; + ERaftKvServer::ready_cond_vars_[op_pair->op_count()] + ->notify_one(); } } } @@ -111,14 +108,11 @@ EStatus RocksDBStorageImpl::ApplyLog(RaftServer* raft, ety->id()); raft->last_applied_idx_ = ety->id(); if (raft->role_ == NodeRaftRoleEnum::Leader) { - std::mutex map_mutex; { - std::lock_guard lg(map_mutex); - if (ERaftKvServer::ready_cond_vars_[op_pair->op_count()] != - nullptr) { - ERaftKvServer::ready_cond_vars_[op_pair->op_count()] - ->notify_one(); - } + std::lock_guard lg(ERaftKvServer::ready_mutex_); + ERaftKvServer::is_ok_ = true; + ERaftKvServer::ready_cond_vars_[op_pair->op_count()] + ->notify_one(); } } } diff --git a/src/set_command_handler.cc b/src/set_command_handler.cc index 2df6f9cd..72d55a3a 100644 --- a/src/set_command_handler.cc +++ b/src/set_command_handler.cc @@ -11,6 +11,8 @@ #include +#include + #include "command_handler.h" #include "key_encode.h" #include "util.h" @@ -20,7 +22,7 @@ EStatus SetCommandHandler::Execute(const std::vector& params, std::string leader_addr; uint16_t slot; leader_addr = cli->GetShardLeaderAddrAndSlot(params[1], &slot); - TraceLog("DEBUG: send request to leader ", leader_addr); + TraceLog("DEBUG: send set request to leader ", leader_addr); ClientContext op_context; eraftkv::ClientOperationReq op_req; eraftkv::ClientOperationResp op_resp; @@ -30,6 +32,7 @@ EStatus SetCommandHandler::Execute(const std::vector& params, kv_pair_->set_key(encode_key); kv_pair_->set_value(encode_val); kv_pair_->set_op_type(eraftkv::ClientOpType::Put); + kv_pair_->set_op_count(RandomNumber::Between(1, 10000)); std::string reply_buf; if (cli->kv_stubs_[leader_addr] != nullptr) { auto status_ = cli->kv_stubs_[leader_addr]->ProcessRWOperation( diff --git a/utils/run-kdb-tests.sh b/utils/run-kdb-tests.sh index 7b237d75..4c4af38d 100755 --- a/utils/run-kdb-tests.sh +++ b/utils/run-kdb-tests.sh @@ -6,22 +6,4 @@ redis-cli -h 172.18.0.6 -p 12306 shardgroup query sleep 1 # test mode raft interval is 1s redis-cli -h 172.18.0.6 -p 12306 info -redis-cli -h 172.18.0.6 -p 12306 set a h -redis-cli -h 172.18.0.6 -p 12306 set b e -redis-cli -h 172.18.0.6 -p 12306 set c l -redis-cli -h 172.18.0.6 -p 12306 set d l -redis-cli -h 172.18.0.6 -p 12306 set e o - -# sleep 1 - -redis-cli -h 172.18.0.6 -p 12306 get a -redis-cli -h 172.18.0.6 -p 12306 get b -redis-cli -h 172.18.0.6 -p 12306 get c -redis-cli -h 172.18.0.6 -p 12306 get d -redis-cli -h 172.18.0.6 -p 12306 get e - -redis-cli -h 172.18.0.6 -p 12306 del e - -# sleep 1 - -redis-cli -h 172.18.0.6 -p 12306 get e +cat /eraft/utils/test_commands.txt | redis-cli -h 172.18.0.6 -p 12306 diff --git a/utils/test_commands.txt b/utils/test_commands.txt new file mode 100644 index 00000000..10c0666f --- /dev/null +++ b/utils/test_commands.txt @@ -0,0 +1,12 @@ +set a h +set b e +set c l +set d l +set e o +get a +get b +get c +get d +get e +del e +get e