Skip to content

Commit

Permalink
add lock
Browse files Browse the repository at this point in the history
  • Loading branch information
LLiuJJ committed Aug 20, 2023
1 parent d9687b0 commit 0b616d3
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 47 deletions.
2 changes: 1 addition & 1 deletion src/del_command_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ EStatus DelCommandHandler::Execute(const std::vector<std::string>& params,
std::string leader_addr;
uint16_t slot;
leader_addr = cli->GetShardLeaderAddrAndSlot(params[1], &slot);
SPDLOG_DEBUG("send del request to leader {} ", leader_addr);
SPDLOG_INFO("send del request to leader {} ", leader_addr);
ClientContext op_context;
eraftkv::ClientOperationReq op_req;
eraftkv::ClientOperationResp op_resp;
Expand Down
6 changes: 3 additions & 3 deletions src/eraftkv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,18 @@ int main(int argc, char* argv[]) {
ERaftKvServer server(options_);

auto console_sink = std::make_shared<spdlog::sinks::stdout_color_sink_mt>();
console_sink->set_level(spdlog::level::trace);
console_sink->set_level(spdlog::level::debug);
console_sink->set_pattern("[%H:%M:%S %z] [%@] %v");

auto file_sink = std::make_shared<spdlog::sinks::daily_file_sink_st>(
log_file_path, 23, 59);
file_sink->set_level(spdlog::level::trace);
file_sink->set_level(spdlog::level::debug);
file_sink->set_pattern("[%H:%M:%S %z] [%@] %v");

spdlog::sinks_init_list sink_list = {file_sink, console_sink};

spdlog::logger logger("multi_sink", sink_list.begin(), sink_list.end());
logger.set_level(spdlog::level::trace);
logger.set_level(spdlog::level::debug);
logger.warn("this should appear in both console and file");
logger.info(
"this message should not appear in the console, only in the file");
Expand Down
18 changes: 12 additions & 6 deletions src/eraftkv_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include <grpcpp/grpcpp.h>
#include <spdlog/spdlog.h>

#include <chrono>

#include "consts.h"

RaftServer* ERaftKvServer::raft_context_ = nullptr;
Expand Down Expand Up @@ -111,7 +113,7 @@ grpc::Status ERaftKvServer::ProcessRWOperation(
int64_t log_index;
int64_t log_term;
bool success;
SPDLOG_DEBUG("recv rw op with ts {} ", req->op_timestamp());
SPDLOG_INFO("recv rw op with ts {} ", req->op_timestamp());
// no leader reject
if (!raft_context_->IsLeader()) {
resp->set_error_code(eraftkv::ErrorCode::REQUEST_NOT_LEADER_NODE);
Expand All @@ -126,11 +128,11 @@ grpc::Status ERaftKvServer::ProcessRWOperation(
}
for (auto kv_op : req->kvs()) {
int rand_seq = static_cast<int>(RandomNumber::Between(1, 100000));
SPDLOG_DEBUG("recv rw op type {} op count {}", kv_op.op_type(), rand_seq);
SPDLOG_INFO("recv rw op type {} op count {}", kv_op.op_type(), rand_seq);
switch (kv_op.op_type()) {
case eraftkv::ClientOpType::Get: {
auto val = raft_context_->store_->GetKV(kv_op.key());
SPDLOG_DEBUG(" get key {} with value {}", kv_op.key(), val.first);
SPDLOG_INFO(" get key {} with value {}", kv_op.key(), val.first);
auto res = resp->add_ops();
res->set_key(kv_op.key());
res->set_value(val.first);
Expand All @@ -151,9 +153,13 @@ grpc::Status ERaftKvServer::ProcessRWOperation(
raft_context_->Propose(
kv_op.SerializeAsString(), &log_index, &log_term, &success);
{
auto endTime =
std::chrono::system_clock::now() + std::chrono::seconds(5);
std::unique_lock<std::mutex> ul(ERaftKvServer::ready_mutex_);
ERaftKvServer::ready_cond_vars_[rand_seq]->wait(
ul, []() { return ERaftKvServer::is_ok_; });
// ERaftKvServer::ready_cond_vars_[rand_seq]->wait(
// ul, []() { return ERaftKvServer::is_ok_; });
ERaftKvServer::ready_cond_vars_[rand_seq]->wait_until(
ul, endTime, []() { return ERaftKvServer::is_ok_; });
// ERaftKvServer::ready_cond_vars_.erase(rand_seq);
ERaftKvServer::is_ok_ = false;
auto res = resp->add_ops();
Expand All @@ -162,8 +168,8 @@ grpc::Status ERaftKvServer::ProcessRWOperation(
res->set_success(true);
res->set_op_type(kv_op.op_type());
res->set_op_count(rand_seq);
break;
}
break;
}
default:
break;
Expand Down
2 changes: 1 addition & 1 deletion src/get_command_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ EStatus GetCommandHandler::Execute(const std::vector<std::string>& params,
std::string leader_addr;
uint16_t slot;
leader_addr = cli->GetShardLeaderAddrAndSlot(params[1], &slot);
SPDLOG_DEBUG("send get request to leader {}", leader_addr);
SPDLOG_INFO("send get request to leader {}", leader_addr);
ClientContext op_context;
eraftkv::ClientOperationReq op_req;
auto kv_pair_ = op_req.add_kvs();
Expand Down
25 changes: 17 additions & 8 deletions src/raft_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ RaftServer::RaftServer(RaftConfig raft_config,
this->net_ = net;
this->store_->ReadRaftMeta(this, &this->current_term_, &this->voted_for_);
this->log_store_->ReadMetaState(&this->commit_idx_, &this->last_applied_idx_);
SPDLOG_DEBUG(
SPDLOG_INFO(
" raft server init with current_term {} voted_for {} commit_idx {}",
current_term_,
voted_for_,
Expand Down Expand Up @@ -192,7 +192,8 @@ EStatus RaftServer::SendAppendEntries() {
}

for (auto& node : this->nodes_) {
if (node->id == this->id_ || node->node_state == NodeStateEnum::Down) {
if (node->id == this->id_ || node->node_state == NodeStateEnum::Down ||
this->is_snapshoting_) {
continue;
}

Expand All @@ -211,9 +212,9 @@ EStatus RaftServer::SendAppendEntries() {
snap_req->set_last_included_term(new_first_log_ent->term());
snap_req->set_data("snaptestdata");

SPDLOG_DEBUG("send snapshot to node {} with req {}",
node->id,
snap_req->DebugString());
SPDLOG_INFO("send snapshot to node {} with req {}",
node->id,
snap_req->DebugString());

this->net_->SendSnapshot(this, node, snap_req);

Expand Down Expand Up @@ -260,6 +261,7 @@ EStatus RaftServer::SendAppendEntries() {
* @return EStatus
*/
EStatus RaftServer::ApplyEntries() {
std::lock_guard<std::mutex> guard(raft_op_mutex_);
this->store_->ApplyLog(this, 0, 0);
return EStatus::kOk;
}
Expand All @@ -280,6 +282,7 @@ bool RaftServer::IsUpToDate(int64_t last_idx, int64_t term) {
EStatus RaftServer::HandleRequestVoteReq(RaftNode* from_node,
const eraftkv::RequestVoteReq* req,
eraftkv::RequestVoteResp* resp) {
std::lock_guard<std::mutex> guard(raft_op_mutex_);
resp->set_term(current_term_);
resp->set_prevote(req->prevote());
SPDLOG_INFO("handle vote req " + req->DebugString());
Expand Down Expand Up @@ -347,9 +350,9 @@ EStatus RaftServer::HandleRequestVoteResp(RaftNode* from_node,
eraftkv::RequestVoteResp* resp) {
if (resp != nullptr) {

SPDLOG_DEBUG("send request vote revice resp {}, from node {}",
resp->DebugString(),
from_node->address);
SPDLOG_INFO("send request vote revice resp {}, from node {}",
resp->DebugString(),
from_node->address);

if (this->role_ == NodeRaftRoleEnum::PreCandidate &&
req->term() == this->current_term_ + 1 && resp->prevote()) {
Expand Down Expand Up @@ -403,6 +406,7 @@ EStatus RaftServer::Propose(std::string payload,
int64_t* new_log_index,
int64_t* new_log_term,
bool* is_success) {
std::lock_guard<std::mutex> guard(raft_op_mutex_);
if (this->role_ != NodeRaftRoleEnum::Leader) {
*new_log_index = -1;
*new_log_term = -1;
Expand Down Expand Up @@ -443,6 +447,8 @@ EStatus RaftServer::Propose(std::string payload,
EStatus RaftServer::HandleAppendEntriesReq(RaftNode* from_node,
const eraftkv::AppendEntriesReq* req,
eraftkv::AppendEntriesResp* resp) {
std::lock_guard<std::mutex> guard(raft_op_mutex_);

ResetRandomElectionTimeout();
election_tick_count_ = 0;

Expand Down Expand Up @@ -734,6 +740,7 @@ EStatus RaftServer::ProposeConfChange(std::string payload,
int64_t* new_log_index,
int64_t* new_log_term,
bool* is_success) {
std::lock_guard<std::mutex> guard(raft_op_mutex_);
if (this->role_ != NodeRaftRoleEnum::Leader) {
*new_log_index = -1;
*new_log_term = -1;
Expand Down Expand Up @@ -873,6 +880,8 @@ EStatus RaftServer::ElectionStart(bool is_prevote) {
*/
EStatus RaftServer::SnapshotingStart(int64_t ety_idx, std::string snapdir) {

std::lock_guard<std::mutex> guard(raft_op_mutex_);

this->is_snapshoting_ = true;
auto snap_index = this->log_store_->FirstIndex();

Expand Down
2 changes: 2 additions & 0 deletions src/raft_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,8 @@ class RaftServer {
*/
bool is_snapshoting_;

std::mutex raft_op_mutex_;

/**
* @brief
*
Expand Down
2 changes: 1 addition & 1 deletion src/rocksdb_storage_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ EStatus RocksDBStorageImpl::ApplyLog(RaftServer* raft,
if (raft->log_store_->LogCount() > raft->snap_threshold_log_count_) {
// to snapshot
if (raft->IsLeader()) {
raft->SnapshotingStart(ety->id(), raft->snap_db_path_);
// raft->SnapshotingStart(ety->id(), raft->snap_db_path_);
}
}
break;
Expand Down
2 changes: 1 addition & 1 deletion src/set_command_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ EStatus SetCommandHandler::Execute(const std::vector<std::string>& params,
std::string leader_addr;
uint16_t slot;
leader_addr = cli->GetShardLeaderAddrAndSlot(params[1], &slot);
SPDLOG_DEBUG("send set request to leader {}", leader_addr);
SPDLOG_INFO("send set request to leader {}", leader_addr);
ClientContext op_context;
eraftkv::ClientOperationReq op_req;
eraftkv::ClientOperationResp op_resp;
Expand Down
30 changes: 4 additions & 26 deletions utils/test_commands.txt
Original file line number Diff line number Diff line change
@@ -1,27 +1,5 @@
set a h
set b e
set c l
set d l
set e o
set ab h
set bb e
set cb l
set db l
set eb o
set ac h
set bc e
set cc l
set dc l
set ec o
set ad h
set bd e
del bd
get bd

get a
get b
get c
get d
get e
del e
get e
set d d
set f f
set b f
set u u

0 comments on commit 0b616d3

Please sign in to comment.