Skip to content

Commit

Permalink
add snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
LLiuJJ committed Aug 20, 2023
1 parent 4a8f1af commit d9687b0
Show file tree
Hide file tree
Showing 19 changed files with 248 additions and 127 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ grpc/
grpc
rocksdb/
rocksdb
protobuf/
protobuf/
spdlog/
spdlog
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,6 @@ FROM eraft/eraftkv:v0.0.4
# RUN ldconfig

RUN apt-get install -y redis-tools telnet

RUN git clone --branch v1.9.2 https://github.com/gabime/spdlog.git && cd spdlog && mkdir build && cd build \
&& cmake .. && make -j && make install && rm -rf build
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ rm-net:
docker network rm mytestnetwork

run-demo:
docker run --name kvserver-node1 --network mytestnetwork --ip 172.18.0.10 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 0 /tmp/kv_db0 /tmp/log_db0 /tmp/snap_db0 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090
docker run --name kvserver-node1 --network mytestnetwork --ip 172.18.0.10 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 0 /tmp/kv_db0 /tmp/log_db0 /tmp/snap_db0 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 /tmp/log1.log
sleep 2
docker run --name kvserver-node2 --network mytestnetwork --ip 172.18.0.11 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 1 /tmp/kv_db1 /tmp/log_db1 /tmp/snap_db1 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090
docker run --name kvserver-node3 --network mytestnetwork --ip 172.18.0.12 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 2 /tmp/kv_db2 /tmp/log_db2 /tmp/snap_db2 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090
docker run --name kvserver-node2 --network mytestnetwork --ip 172.18.0.11 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 1 /tmp/kv_db1 /tmp/log_db1 /tmp/snap_db1 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 /tmp/log2.log
docker run --name kvserver-node3 --network mytestnetwork --ip 172.18.0.12 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 2 /tmp/kv_db2 /tmp/log_db2 /tmp/snap_db2 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 /tmp/log3.log
sleep 1
docker run --name metaserver-node1 --network mytestnetwork --ip 172.18.0.2 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 0 /tmp/meta_db0 /tmp/log_db0 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090
sleep 3
docker run --name metaserver-node2 --network mytestnetwork --ip 172.18.0.3 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 1 /tmp/meta_db1 /tmp/log_db1 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090
docker run --name metaserver-node3 --network mytestnetwork --ip 172.18.0.4 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 2 /tmp/meta_db2 /tmp/log_db2 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090
sleep 16
docker run --name vdbserver-node --network mytestnetwork --ip 172.18.0.6 -it --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraft-kdb 172.18.0.6:12306 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090
docker run --name vdbserver-node --network mytestnetwork --ip 172.18.0.6 -it --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraft-kdb 172.18.0.6:12306 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 /tmp/eraft-kdb.log

stop-demo:
docker stop kvserver-node1 kvserver-node2 kvserver-node3 vdbserver-node metaserver-node1 metaserver-node2 metaserver-node3
Expand Down
5 changes: 3 additions & 2 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "client.h"

#include <spdlog/spdlog.h>
#include <stdint.h>
#include <sys/time.h>

Expand Down Expand Up @@ -50,7 +51,7 @@ Client::Client(std::string meta_addrs)
// init stub to meta server node
auto meta_node_addrs = StringUtil::Split(meta_addrs, ',');
for (auto meta_node_addr : meta_node_addrs) {
TraceLog("DEBUG: init rpc link to ", meta_node_addr);
SPDLOG_INFO("init rpc link to {} ", meta_node_addr);
auto chan_ =
grpc::CreateChannel(meta_node_addr, grpc::InsecureChannelCredentials());
std::unique_ptr<ERaftKv::Stub> stub_(ERaftKv::NewStub(chan_));
Expand Down Expand Up @@ -100,7 +101,7 @@ std::string Client::GetShardLeaderAddrAndSlot(std::string partion_key,
std::string leader_address;
int64_t key_slot = -1;
key_slot = HashUtil::CRC64(0, partion_key.c_str(), partion_key.size()) % 1024;
TraceLog("DEBUG: partion key " + partion_key + " with slot ", key_slot);
SPDLOG_DEBUG("partion key {} with slot {} ", partion_key, key_slot);
*slot = key_slot;
for (auto sg : cluster_conf_.shard_group()) {
for (auto sl : sg.slots()) {
Expand Down
4 changes: 3 additions & 1 deletion src/del_command_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
#include "command_handler.h"
#include "key_encode.h"

#include <spdlog/spdlog.h>

EStatus DelCommandHandler::Execute(const std::vector<std::string>& params,
Client* cli) {
std::string leader_addr;
uint16_t slot;
leader_addr = cli->GetShardLeaderAddrAndSlot(params[1], &slot);
TraceLog("DEBUG: send del request to leader ", leader_addr);
SPDLOG_DEBUG("send del request to leader {} ", leader_addr);
ClientContext op_context;
eraftkv::ClientOperationReq op_req;
eraftkv::ClientOperationResp op_resp;
Expand Down
31 changes: 31 additions & 0 deletions src/eraft_vdb_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@

#include "eraft_vdb_server.h"

#include <spdlog/common.h>
#include <spdlog/sinks/daily_file_sink.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/sinks/stdout_sinks.h>
#include <spdlog/spdlog.h>

#include "client.h"
#include "server.h"
#include "socket.h"
Expand Down Expand Up @@ -57,7 +63,32 @@ bool ERaftVdbServer::_Recycle() {
int main(int argc, char *argv[]) {
std::string addr = std::string(argv[1]);
std::string kv_svr_addrs = std::string(argv[2]);
std::string log_file_path = std::string(argv[3]);
ERaftVdbServer svr(addr, kv_svr_addrs);

auto console_sink = std::make_shared<spdlog::sinks::stdout_color_sink_mt>();
console_sink->set_level(spdlog::level::trace);
console_sink->set_pattern("[%H:%M:%S %z] [%@] %v");

auto file_sink = std::make_shared<spdlog::sinks::daily_file_sink_st>(
log_file_path, 23, 59);
file_sink->set_level(spdlog::level::trace);
file_sink->set_pattern("[%H:%M:%S %z] [%@] %v");

spdlog::sinks_init_list sink_list = {file_sink, console_sink};

spdlog::logger logger("multi_sink", sink_list.begin(), sink_list.end());
logger.set_level(spdlog::level::trace);
logger.warn("this should appear in both console and file");
logger.info(
"this message should not appear in the console, only in the file");

spdlog::set_default_logger(std::make_shared<spdlog::logger>(
"multi_sink", spdlog::sinks_init_list({console_sink, file_sink})));

SPDLOG_INFO("eraftkdb server start with addr " + addr + " kv_svr_addrs " +
kv_svr_addrs);

svr.MainLoop(false);
return 0;
}
30 changes: 30 additions & 0 deletions src/eraftkv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
*/

#include <rocksdb/db.h>
#include <spdlog/common.h>
#include <spdlog/sinks/daily_file_sink.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/sinks/stdout_sinks.h>
#include <spdlog/spdlog.h>

#include <iostream>

Expand All @@ -44,6 +49,7 @@
* @param argv (eg: eraftkv 0 /tmp/kv_db0 /tmp/log_db0
* 127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090)
* eraftkv [node id] [kv data path] [log data path] [meta server addrs]
* [log_file_path]
* @return int
*/
int main(int argc, char* argv[]) {
Expand All @@ -54,7 +60,31 @@ int main(int argc, char* argv[]) {
options_.log_db_path = std::string(argv[3]);
options_.snap_db_path = std::string(argv[4]);
options_.peer_addrs = std::string(argv[5]);
std::string log_file_path = std::string(argv[6]);
ERaftKvServer server(options_);

auto console_sink = std::make_shared<spdlog::sinks::stdout_color_sink_mt>();
console_sink->set_level(spdlog::level::trace);
console_sink->set_pattern("[%H:%M:%S %z] [%@] %v");

auto file_sink = std::make_shared<spdlog::sinks::daily_file_sink_st>(
log_file_path, 23, 59);
file_sink->set_level(spdlog::level::trace);
file_sink->set_pattern("[%H:%M:%S %z] [%@] %v");

spdlog::sinks_init_list sink_list = {file_sink, console_sink};

spdlog::logger logger("multi_sink", sink_list.begin(), sink_list.end());
logger.set_level(spdlog::level::trace);
logger.warn("this should appear in both console and file");
logger.info(
"this message should not appear in the console, only in the file");

spdlog::set_default_logger(std::make_shared<spdlog::logger>(
"multi_sink", spdlog::sinks_init_list({console_sink, file_sink})));
SPDLOG_INFO("eraftkv server start with peer_addrs " + options_.peer_addrs +
" kv_db_path " + options_.kv_db_path);

server.BuildAndRunRpcServer();
return 0;
}
26 changes: 13 additions & 13 deletions src/eraftkv_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "eraftkv_server.h"

#include <grpcpp/grpcpp.h>
#include <spdlog/spdlog.h>

#include "consts.h"

Expand Down Expand Up @@ -110,25 +111,26 @@ grpc::Status ERaftKvServer::ProcessRWOperation(
int64_t log_index;
int64_t log_term;
bool success;
TraceLog("DEBUG: ", " recv rw op with ts ", req->op_timestamp());
SPDLOG_DEBUG("recv rw op with ts {} ", req->op_timestamp());
// no leader reject
if (!raft_context_->IsLeader()) {
resp->set_error_code(eraftkv::ErrorCode::REQUEST_NOT_LEADER_NODE);
resp->set_leader_addr(raft_context_->GetLeaderId());
return grpc::Status::OK;
}
// snapshot reject
if (raft_context_->IsSnapshoting()) {
SPDLOG_WARN("node is snapshoting, reject request");
resp->set_error_code(eraftkv::ErrorCode::NODE_IS_SNAPSHOTING);
return grpc::Status::OK;
}
for (auto kv_op : req->kvs()) {
int rand_seq = static_cast<int>(RandomNumber::Between(1, 100000));
TraceLog("DEBUG: ",
" recv rw op type ",
kv_op.op_type(),
" op count ",
rand_seq);
SPDLOG_DEBUG("recv rw op type {} op count {}", kv_op.op_type(), rand_seq);
switch (kv_op.op_type()) {
case eraftkv::ClientOpType::Get: {
auto val = raft_context_->store_->GetKV(kv_op.key());
TraceLog(
"DEBUG: ", " get key ", kv_op.key(), " with value ", val.first);
SPDLOG_DEBUG(" get key {} with value {}", kv_op.key(), val.first);
auto res = resp->add_ops();
res->set_key(kv_op.key());
res->set_value(val.first);
Expand All @@ -154,7 +156,6 @@ grpc::Status ERaftKvServer::ProcessRWOperation(
ul, []() { return ERaftKvServer::is_ok_; });
// ERaftKvServer::ready_cond_vars_.erase(rand_seq);
ERaftKvServer::is_ok_ = false;
TraceLog("DEBUG: ", " send resp ");
auto res = resp->add_ops();
res->set_key(kv_op.key());
res->set_value(kv_op.value());
Expand Down Expand Up @@ -182,9 +183,8 @@ grpc::Status ERaftKvServer::ClusterConfigChange(
eraftkv::ClusterConfigChangeResp* resp) {
int64_t log_index;
int64_t log_term;
TraceLog("DEBUG: ",
" recv config change req with change_type ",
req->change_type());
SPDLOG_INFO("recv config change req with change_type {} ",
req->change_type());
// return cluster topology, Currently, only single raft group are supported
auto conf_change_req = const_cast<eraftkv::ClusterConfigChangeReq*>(req);
switch (conf_change_req->change_type()) {
Expand Down Expand Up @@ -255,7 +255,7 @@ grpc::Status ERaftKvServer::ClusterConfigChange(
}

EStatus ERaftKvServer::TakeSnapshot(int64_t log_idx) {
return raft_context_->SnaoshotingStart(log_idx, options_.kv_db_path);
return raft_context_->SnapshotingStart(log_idx, options_.kv_db_path);
}

/**
Expand Down
3 changes: 0 additions & 3 deletions src/eraftmetaserver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ TEST(EraftMetaServerTests, TestMetaBasicOp) {
ASSERT_EQ(query_resp.shard_group_size(), 1);
ASSERT_EQ(query_resp.shard_group(0).servers_size(), 3);
ASSERT_EQ(query_resp.shard_group(0).id(), 1);
TraceLog("DEBUG: cluster config resp -> ", query_resp.DebugString());

ClientContext move_context;
eraftkv::ClusterConfigChangeReq move_req;
Expand All @@ -131,8 +130,6 @@ TEST(EraftMetaServerTests, TestMetaBasicOp) {
ClientContext query_context_;
auto status4 =
leader_stub->ClusterConfigChange(&query_context_, query_req, &query_resp);
// TraceLog("DEBUG: cluster config after move resp -> ",
// query_resp.DebugString());
ASSERT_EQ(status4.ok(), true);
}

Expand Down
4 changes: 3 additions & 1 deletion src/get_command_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*
*/

#include <spdlog/spdlog.h>

#include "command_handler.h"
#include "key_encode.h"

Expand All @@ -17,7 +19,7 @@ EStatus GetCommandHandler::Execute(const std::vector<std::string>& params,
std::string leader_addr;
uint16_t slot;
leader_addr = cli->GetShardLeaderAddrAndSlot(params[1], &slot);
TraceLog("DEBUG: send get request to leader ", leader_addr);
SPDLOG_DEBUG("send get request to leader {}", leader_addr);
ClientContext op_context;
eraftkv::ClientOperationReq op_req;
auto kv_pair_ = op_req.add_kvs();
Expand Down
21 changes: 15 additions & 6 deletions src/grpc_network_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

#include <grpcpp/grpcpp.h>
#include <gtest/gtest.h>
#include <spdlog/spdlog.h>
#include <unistd.h>

#include <iostream>
Expand All @@ -60,7 +61,7 @@ using grpc::Status;
EStatus GRpcNetworkImpl::SendRequestVote(RaftServer* raft,
RaftNode* target_node,
eraftkv::RequestVoteReq* req) {
TraceLog("DEBUG: ", " send req vote to ", target_node->address);
SPDLOG_DEBUG("send req vote to {}", target_node->address);
ERaftKv::Stub* stub_ = GetPeerNodeConnection(target_node->id);
if (stub_ == nullptr) {
return EStatus::kNotFound;
Expand Down Expand Up @@ -93,6 +94,9 @@ EStatus GRpcNetworkImpl::SendRequestVote(RaftServer* raft,
EStatus GRpcNetworkImpl::SendAppendEntries(RaftServer* raft,
RaftNode* target_node,
eraftkv::AppendEntriesReq* req) {
SPDLOG_INFO("send append entries request to {} req {}",
target_node->address,
req->DebugString());
// 1.send entries with grpc message to target_node
ERaftKv::Stub* stub_ = GetPeerNodeConnection(target_node->id);
if (stub_ == nullptr) {
Expand All @@ -106,7 +110,7 @@ EStatus GRpcNetworkImpl::SendAppendEntries(RaftServer* raft,
ClientContext context;
auto status = stub_->AppendEntries(&context, *req, resp);
if (!status.ok()) {
TraceLog("ERROR: ", " send append req to FAILED! ", target_node->address);
SPDLOG_DEBUG(" send append req to {} failed! ", target_node->address);
target_node->node_state = NodeStateEnum::LostConnection;
} else {
target_node->node_state = NodeStateEnum::Running;
Expand All @@ -132,13 +136,18 @@ EStatus GRpcNetworkImpl::SendAppendEntries(RaftServer* raft,
EStatus GRpcNetworkImpl::SendSnapshot(RaftServer* raft,
RaftNode* target_node,
eraftkv::SnapshotReq* req) {
SPDLOG_INFO("send snapshot request to {} req {}",
target_node->address,
req->DebugString());
ERaftKv::Stub* stub_ = GetPeerNodeConnection(target_node->id);
if (stub_ == nullptr) {
return EStatus::kNotFound;
}
eraftkv::SnapshotResp* resp = new eraftkv::SnapshotResp;
ClientContext context;
auto status = stub_->Snapshot(&context, *req, resp);
resp->set_term(0);
resp->set_offset(0);
ClientContext context;
auto status = stub_->Snapshot(&context, *req, resp);
if (raft->HandleSnapshotResp(target_node, req, resp) == EStatus::kOk) {
return EStatus::kOk;
} else {
Expand All @@ -163,7 +172,7 @@ EStatus GRpcNetworkImpl::InitPeerNodeConnections(
grpc::CreateChannel(itr.second, grpc::InsecureChannelCredentials());
auto stub_(ERaftKv::NewStub(chan_));
this->peer_node_connections_[itr.first] = std::move(stub_);
TraceLog("DEBUG: ", "init peer connection ", itr.second);
SPDLOG_DEBUG("init peer connection {} ", itr.second);
}
return EStatus::kOk;
}
Expand All @@ -173,7 +182,7 @@ EStatus GRpcNetworkImpl::InsertPeerNodeConnection(int64_t peer_id,
auto chan_ = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
auto stub_(ERaftKv::NewStub(chan_));
this->peer_node_connections_[peer_id] = std::move(stub_);
TraceLog("DEBUG: ", "insert peer connection to ", addr);
SPDLOG_DEBUG("insert peer connection to {}", addr);
return EStatus::kOk;
}

Expand Down
Loading

0 comments on commit d9687b0

Please sign in to comment.