Skip to content

Commit

Permalink
run string command synchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
LLiuJJ committed Aug 14, 2023
1 parent 5419d7b commit 682a61a
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 166 deletions.
2 changes: 1 addition & 1 deletion src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ PacketLength Client::_HandlePacket(const char *start, std::size_t bytes) {
}

Client::Client(std::string meta_addrs)
: leader_addr_(""), meta_addrs_(meta_addrs) {
: leader_addr_(""), meta_addrs_(meta_addrs), op_count_(0) {
// init stub to meta server node
auto meta_node_addrs = StringUtil::Split(meta_addrs, ',');
for (auto meta_node_addr : meta_node_addrs) {
Expand Down
2 changes: 2 additions & 0 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class Client : public StreamSocket {

EStatus SyncClusterConfig();

std::atomic<int> op_count_;

void _Reset();

void OnConnect() override;
Expand Down
1 change: 1 addition & 0 deletions src/del_command_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ EStatus DelCommandHandler::Execute(const std::vector<std::string>& params,
std::string encode_key = EncodeStringKey(slot, params[1]);
kv_pair_->set_key(encode_key);
kv_pair_->set_op_type(eraftkv::ClientOpType::Del);
kv_pair_->set_op_count(RandomNumber::Between(1,10000));
std::string reply_buf;
if (cli->kv_stubs_[leader_addr] != nullptr) {
auto status_ = cli->kv_stubs_[leader_addr]->ProcessRWOperation(
Expand Down
54 changes: 31 additions & 23 deletions src/eraftkv_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ std::map<int, std::condition_variable*> ERaftKvServer::ready_cond_vars_;

std::mutex ERaftKvServer::ready_mutex_;

bool ERaftKvServer::is_ok_ = false;

/**
* @brief
*
Expand Down Expand Up @@ -112,7 +114,12 @@ grpc::Status ERaftKvServer::ProcessRWOperation(
return grpc::Status::OK;
}
for (auto kv_op : req->kvs()) {
TraceLog("DEBUG: ", " recv rw op type ", kv_op.op_type());
int rand_seq = static_cast<int>(RandomNumber::Between(1, 100000));
TraceLog("DEBUG: ",
" recv rw op type ",
kv_op.op_type(),
" op count ",
rand_seq);
switch (kv_op.op_type()) {
case eraftkv::ClientOpType::Get: {
auto val = raft_context_->store_->GetKV(kv_op.key());
Expand All @@ -123,34 +130,35 @@ grpc::Status ERaftKvServer::ProcessRWOperation(
res->set_value(val.first);
res->set_success(val.second);
res->set_op_type(eraftkv::ClientOpType::Get);
res->set_op_count(op_count_);
res->set_op_count(kv_op.op_count());
break;
}
case eraftkv::ClientOpType::Put:
case eraftkv::ClientOpType::Del: {
std::mutex map_mutex_;
{
op_count_ += 1;
std::condition_variable* new_var = new std::condition_variable();
std::lock_guard<std::mutex> lg(map_mutex_);
ERaftKvServer::ready_cond_vars_[op_count_] = new_var;
kv_op.set_op_count(op_count_);
ERaftKvServer::ready_cond_vars_[rand_seq] = new_var;
kv_op.set_op_count(rand_seq);
}
raft_context_->Propose(
kv_op.SerializeAsString(), &log_index, &log_term, &success);
{
std::unique_lock<std::mutex> ul(ERaftKvServer::ready_mutex_);
ERaftKvServer::ready_cond_vars_[op_count_]->wait(ul,
[] { return true; });
ERaftKvServer::ready_cond_vars_.erase(op_count_);
ERaftKvServer::ready_cond_vars_[rand_seq]->wait(
ul, []() { return ERaftKvServer::is_ok_; });
// ERaftKvServer::ready_cond_vars_.erase(rand_seq);
ERaftKvServer::is_ok_ = false;
TraceLog("DEBUG: ", " send resp ");
auto res = resp->add_ops();
res->set_key(kv_op.key());
res->set_value(kv_op.value());
res->set_success(true);
res->set_op_type(kv_op.op_type());
res->set_op_count(rand_seq);
break;
}
auto res = resp->add_ops();
res->set_key(kv_op.key());
res->set_value(kv_op.value());
res->set_success(true);
res->set_op_type(kv_op.op_type());
res->set_op_count(op_count_);
break;
}
default:
break;
Expand Down Expand Up @@ -186,7 +194,7 @@ grpc::Status ERaftKvServer::ClusterConfigChange(
new_sg->CopyFrom(*sg);
delete sg;
}
break;
return grpc::Status::OK;
}
case eraftkv::ChangeType::MembersQuery: {
resp->set_success(true);
Expand All @@ -201,7 +209,7 @@ grpc::Status ERaftKvServer::ClusterConfigChange(
: g_server->set_server_status(eraftkv::ServerStatus::Down);
}
new_sg->set_leader_id(raft_context_->GetLeaderId());
break;
return grpc::Status::OK;
}
default: {
// no leader reject
Expand All @@ -210,12 +218,12 @@ grpc::Status ERaftKvServer::ClusterConfigChange(
resp->set_leader_addr(raft_context_->GetLeaderId());
return grpc::Status::OK;
}
int rand_seq = static_cast<int>(RandomNumber::Between(1, 10000));
std::mutex map_mutex_;
{
op_count_ += 1;
std::condition_variable* new_var = new std::condition_variable();
std::lock_guard<std::mutex> lg(map_mutex_);
conf_change_req->set_op_count(op_count_);
std::condition_variable* new_var = new std::condition_variable();
conf_change_req->set_op_count(rand_seq);
}
bool success;
raft_context_->ProposeConfChange(conf_change_req->SerializeAsString(),
Expand All @@ -225,9 +233,9 @@ grpc::Status ERaftKvServer::ClusterConfigChange(

{
std::unique_lock<std::mutex> ul(ERaftKvServer::ready_mutex_);
ERaftKvServer::ready_cond_vars_[op_count_]->wait(ul,
[] { return true; });
ERaftKvServer::ready_cond_vars_.erase(op_count_);
ERaftKvServer::ready_cond_vars_[rand_seq]->wait(ul,
[] { return true; });
ERaftKvServer::ready_cond_vars_.erase(rand_seq);
}
break;
}
Expand Down
3 changes: 3 additions & 0 deletions src/eraftkv_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,15 @@ class ERaftKvServer : public eraftkv::ERaftKv::Service {

static std::mutex ready_mutex_;

static bool is_ok_;

private:
/**
* @brief
*
*/
static RaftServer* raft_context_;


int op_count_;
};
1 change: 1 addition & 0 deletions src/get_command_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ EStatus GetCommandHandler::Execute(const std::vector<std::string>& params,
std::string leader_addr;
uint16_t slot;
leader_addr = cli->GetShardLeaderAddrAndSlot(params[1], &slot);
TraceLog("DEBUG: send get request to leader ", leader_addr);
ClientContext op_context;
eraftkv::ClientOperationReq op_req;
auto kv_pair_ = op_req.add_kvs();
Expand Down
79 changes: 19 additions & 60 deletions src/raft_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ RaftServer::RaftServer(RaftConfig raft_config,
}
}

RaftServer::~RaftServer() {
delete this->log_store_;
delete this->net_;
delete this->store_;
}

EStatus RaftServer::ResetRandomElectionTimeout() {
// make rand election timeout in (election_timeout, 2 * election_timout)
auto rand_tick =
Expand All @@ -105,9 +111,22 @@ RaftServer* RaftServer::RunMainLoop(RaftConfig raft_config,
RaftServer* svr = new RaftServer(raft_config, log_store, store, net);
std::thread th(&RaftServer::RunCycle, svr);
th.detach();
std::thread th1(&RaftServer::RunApply, svr);
th1.detach();
return svr;
}


EStatus RaftServer::RunApply() {
while (true) {
if (open_auto_apply_) {
this->ApplyEntries();
}
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
return EStatus::kOk;
}

/**
* @brief raft core cycle
*
Expand Down Expand Up @@ -154,9 +173,6 @@ EStatus RaftServer::RunCycle() {
}
ResetRandomElectionTimeout();
}
if (open_auto_apply_) {
this->ApplyEntries();
}
std::this_thread::sleep_for(std::chrono::milliseconds(tick_interval_));
}
return EStatus::kOk;
Expand Down Expand Up @@ -847,63 +863,6 @@ int64_t RaftServer::GetFirstEntryIdx() {
return 0;
}

/**
* @brief
*
* @return EStatus
*/
EStatus RaftServer::RestoreSnapshotAfterRestart() {
return EStatus::kOk;
}

/**
* @brief
*
* @param last_included_term
* @param last_included_index
* @return EStatus
*/
EStatus RaftServer::BeginLoadSnapshot(int64_t last_included_term,
int64_t last_included_index) {
return EStatus::kOk;
}

/**
* @brief
*
* @return EStatus
*/
EStatus RaftServer::EndLoadSnapshot() {
return EStatus::kOk;
}

/**
* @brief
*
* @return EStatus
*/
EStatus RaftServer::ProposeReadReq() {
return EStatus::kOk;
}

/**
* @brief Get the Logs Count Can Snapshot object
*
* @return int64_t
*/
int64_t RaftServer::GetLogsCountCanSnapshot() {
return 0;
}

/**
* @brief
*
* @return EStatus
*/
EStatus RaftServer::RestoreLog() {
return EStatus::kOk;
}

std::vector<RaftNode*> RaftServer::GetNodes() {
return nodes_;
}
Expand Down
49 changes: 4 additions & 45 deletions src/raft_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,17 @@ class RaftServer {
Storage* store,
Network* net);

~RaftServer();

/**
* @brief raft core cycle
*
* @return EStatus
*/
EStatus RunCycle();

EStatus RunApply();

/**
* @brief
*
Expand Down Expand Up @@ -260,51 +264,6 @@ class RaftServer {
*/
int64_t GetFirstEntryIdx();

/**
* @brief
*
* @return EStatus
*/
EStatus RestoreSnapshotAfterRestart();

/**
* @brief
*
* @param last_included_term
* @param last_included_index
* @return EStatus
*/
EStatus BeginLoadSnapshot(int64_t last_included_term,
int64_t last_included_index);

/**
* @brief
*
* @return EStatus
*/
EStatus EndLoadSnapshot();

/**
* @brief
*
* @return EStatus
*/
EStatus ProposeReadReq();

/**
* @brief Get the Logs Count Can Snapshot object
*
* @return int64_t
*/
int64_t GetLogsCountCanSnapshot();

/**
* @brief
*
* @return EStatus
*/
EStatus RestoreLog();

/**
* @brief
*
Expand Down
Loading

0 comments on commit 682a61a

Please sign in to comment.