Skip to content

Commit

Permalink
sync response to client
Browse files Browse the repository at this point in the history
  • Loading branch information
LLiuJJ committed Aug 13, 2024
1 parent 5f5125a commit d3e71cb
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 19 deletions.
20 changes: 15 additions & 5 deletions example/include/eraftkv_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ using eraftkv::ERaftKv;
using grpc::ServerContext;
using grpc::Status;

#define DEAFULT_READ_WRITE_TIMEOUT_SECONDS 5


enum ServerRoleEnum { DataServer, MetaServer };

Expand Down Expand Up @@ -114,8 +116,16 @@ class ERaftKvServer : public eraftkv::ERaftKv::Service {
RocksDBSingleLogStorageImpl* log_db =
new RocksDBSingleLogStorageImpl(options_.log_db_path);
RocksDBStorageImpl* kv_db = new RocksDBStorageImpl(options_.kv_db_path);
raft_context_ =
RaftServer::RunMainLoop(raft_config, log_db, kv_db, net_rpc);
response_ready_singals_ = new std::map<int, std::condition_variable*>();
response_ready_mutex_ = new std::mutex();
is_ok_to_response_ = new bool(false);
raft_context_ = RaftServer::RunMainLoop(raft_config,
log_db,
kv_db,
net_rpc,
response_ready_singals_,
response_ready_mutex_,
is_ok_to_response_);
}

ERaftKvServer() {}
Expand Down Expand Up @@ -190,11 +200,11 @@ class ERaftKvServer : public eraftkv::ERaftKv::Service {
*/
ERaftKvServerOptions options_;

static std::map<int, std::condition_variable*> ready_cond_vars_;
static std::map<int, std::condition_variable*>* response_ready_singals_;

static std::mutex ready_mutex_;
static std::mutex* response_ready_mutex_;

static bool is_ok_;
static bool* is_ok_to_response_;

private:
/**
Expand Down
5 changes: 2 additions & 3 deletions example/src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ void Client::UpdateKvServerLeaderStubByPartitionKey(std::string partition_key) {
}

bool Client::PutKV(std::string k, std::string v) {
this->UpdateKvServerLeaderStubByPartitionKey(k);
ClientContext op_context;
eraftkv::ClientOperationReq op_req;
eraftkv::ClientOperationResp op_resp;
Expand All @@ -250,7 +249,6 @@ bool Client::PutKV(std::string k, std::string v) {
}

std::pair<std::string, std::string> Client::GetKV(std::string k) {
this->UpdateKvServerLeaderStubByPartitionKey(k);
ClientContext op_context;
eraftkv::ClientOperationReq op_req;
eraftkv::ClientOperationResp op_resp;
Expand All @@ -271,8 +269,9 @@ std::pair<std::string, std::string> Client::GetKV(std::string k) {
}

void Client::RunBench(int64_t N) {
auto partition_key = StringUtil::RandStr(256);
this->UpdateKvServerLeaderStubByPartitionKey(partition_key);
for (int i = 0; i < N; i++) {
auto partition_key = StringUtil::RandStr(256);
auto value = StringUtil::RandStr(256);
this->PutKV(partition_key, value);
}
Expand Down
8 changes: 6 additions & 2 deletions example/src/eraftkv_ctl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,15 @@ int main(int argc, char* argv[]) {
case PutKV: {
auto partition_key = std::string(std::string(argv[3]));
auto value = std::string(std::string(argv[4]));
eraftkv_ctl.PutKV(partition_key, value);
eraftkv_ctl
.UpdateKvServerLeaderStubByPartitionKey(partition_key);
eraftkv_ctl.PutKV(partition_key, value);
break;
}
case GetKV: {
eraftkv_ctl.GetKV(std::string(argv[3]));
eraftkv_ctl
.UpdateKvServerLeaderStubByPartitionKey(std::string(argv[3]));
eraftkv_ctl.GetKV(std::string(argv[3]));
break;
}
default:
Expand Down
21 changes: 21 additions & 0 deletions example/src/eraftkv_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@

RaftServer* ERaftKvServer::raft_context_ = nullptr;

std::map<int, std::condition_variable*>*
ERaftKvServer::response_ready_singals_ = nullptr;

std::mutex* ERaftKvServer::response_ready_mutex_ = nullptr;

bool* ERaftKvServer::is_ok_to_response_ = nullptr;


/**
* @brief
*
Expand Down Expand Up @@ -141,9 +149,22 @@ grpc::Status ERaftKvServer::ProcessRWOperation(
}
case eraftkv::ClientOpType::Put:
case eraftkv::ClientOpType::Del: {
std::mutex map_mutex_;
{
std::condition_variable* new_signal = new std::condition_variable();
std::lock_guard<std::mutex> lg(map_mutex_);
(*ERaftKvServer::response_ready_singals_)[rand_seq] = new_signal;
kv_op.set_op_sign(rand_seq);
}
raft_context_->Propose(
kv_op.SerializeAsString(), &log_index, &log_term, &success);
{
auto end_time =
std::chrono::system_clock::now() +
std::chrono::seconds(DEAFULT_READ_WRITE_TIMEOUT_SECONDS);
std::unique_lock<std::mutex> ul(*response_ready_mutex_);
(*ERaftKvServer::response_ready_singals_)[rand_seq]->wait_until(
ul, end_time, []() { return *is_ok_to_response_; });
auto res = resp->add_ops();
res->set_key(kv_op.key());
res->set_value(kv_op.value());
Expand Down
18 changes: 14 additions & 4 deletions raftcore/include/eraft/raft_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,14 @@ class RaftServer {
* @param raft_config
* @return EStatus
*/
static RaftServer* RunMainLoop(RaftConfig raft_config,
LogStore* log_store,
Storage* store,
Network* net);
static RaftServer* RunMainLoop(
RaftConfig raft_config,
LogStore* log_store,
Storage* store,
Network* net,
std::map<int, std::condition_variable*>* response_ready_singals,
std::mutex* response_ready_mutex,
bool* is_ok_to_response);

/**
* @brief
Expand Down Expand Up @@ -561,6 +565,12 @@ class RaftServer {
*/
std::condition_variable apply_ready_cv_;

std::map<int, std::condition_variable*>* response_ready_singals_;

std::mutex* response_ready_mutex_;

bool* is_ok_to_response_;

private:
/**
* @brief
Expand Down
23 changes: 19 additions & 4 deletions raftcore/src/raft_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,18 @@ EStatus RaftServer::ResetRandomElectionTimeout() {
* @param net
* @return RaftServer*
*/
RaftServer* RaftServer::RunMainLoop(RaftConfig raft_config,
LogStore* log_store,
Storage* store,
Network* net) {
RaftServer* RaftServer::RunMainLoop(
RaftConfig raft_config,
LogStore* log_store,
Storage* store,
Network* net,
std::map<int, std::condition_variable*>* response_ready_singals,
std::mutex* response_ready_mutex,
bool* is_ok_to_response) {
RaftServer* svr = new RaftServer(raft_config, log_store, store, net);
svr->response_ready_singals_ = response_ready_singals;
svr->response_ready_mutex_ = response_ready_mutex;
svr->is_ok_to_response_ = is_ok_to_response;
std::thread th(&RaftServer::RunCycle, svr);
th.detach();
std::thread th1(&RaftServer::RunApply, svr);
Expand Down Expand Up @@ -351,6 +358,14 @@ EStatus RaftServer::ApplyEntries() {
break;
}
}
{
// notify to response
std::lock_guard<std::mutex> lg(*response_ready_mutex_);
*is_ok_to_response_ = true;
if ((*response_ready_singals_)[op_pair->op_sign()] != nullptr) {
(*response_ready_singals_)[op_pair->op_sign()]->notify_one();
}
}
delete op_pair;
if (this->log_store_->LogCount() > this->snap_threshold_log_count_) {
this->SnapshotingStart(ety->id());
Expand Down
2 changes: 1 addition & 1 deletion scripts/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ set -xe
/eraft/build/example/eraftkv-ctl 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 add_group 1 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090
/eraft/build/example/eraftkv-ctl 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 query_groups
/eraft/build/example/eraftkv-ctl 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 set_slot 1 0-9
/eraft/build/example/eraftkv-ctl 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 run_bench 100
/eraft/build/example/eraftkv-ctl 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 run_bench 1000

0 comments on commit d3e71cb

Please sign in to comment.