diff --git a/CMakeLists.txt b/CMakeLists.txt index 28268072..75d677b7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -168,6 +168,7 @@ list(APPEND eraftvdb_sources src/info_command_handler.cc) list(APPEND eraftvdb_sources src/set_command_handler.cc) list(APPEND eraftvdb_sources src/get_command_handler.cc) list(APPEND eraftvdb_sources src/unknow_command_handler.cc) +list(APPEND eraftvdb_sources src/util.cc) add_executable(eraft-vdb ${eraftvdb_sources}) target_link_libraries(eraft-vdb @@ -237,6 +238,7 @@ add_executable(eraftkv_server_test src/rocksdb_storage_impl.cc src/log_entry_cache.cc src/grpc_network_impl.cc + src/util.cc ) target_link_libraries(eraftkv_server_test PUBLIC ${GTEST_LIBRARIES} @@ -310,6 +312,7 @@ add_executable(grpc_network_impl_test src/log_storage_impl.cc src/rocksdb_storage_impl.cc src/log_entry_cache.cc + src/util.cc ) target_link_libraries(grpc_network_impl_test PUBLIC ${GTEST_LIBRARIES} @@ -325,7 +328,9 @@ add_executable(eraftkv-ctl src/eraftkv_ctl.cc src/eraftkv.pb.cc src/eraftkv.grpc.pb.cc - ) + src/util.cc +) + target_link_libraries(eraftkv-ctl PUBLIC gRPC::grpc++ ${Protobuf_LIBRARY} diff --git a/Makefile b/Makefile index 529f4e2f..cb3e174e 100644 --- a/Makefile +++ b/Makefile @@ -51,28 +51,35 @@ rm-net: docker network rm mytestnetwork run-demo: - docker run --name kvserver-node1 --network mytestnetwork --ip 172.18.0.2 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 0 /tmp/kv_db0 /tmp/log_db0 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 - sleep 5 - docker run --name kvserver-node2 --network mytestnetwork --ip 172.18.0.3 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 1 /tmp/kv_db1 /tmp/log_db1 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 - docker run --name kvserver-node3 --network mytestnetwork --ip 172.18.0.4 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 2 /tmp/kv_db2 /tmp/log_db2 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 - sleep 10 - docker run --name vdbserver-node --network mytestnetwork --ip 172.18.0.6 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraft-vdb 172.18.0.6:12306 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 + docker run --name kvserver-node1 --network mytestnetwork --ip 172.18.0.10 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 0 /tmp/kv_db0 /tmp/log_db0 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 + sleep 2 + docker run --name kvserver-node2 --network mytestnetwork --ip 172.18.0.11 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 1 /tmp/kv_db1 /tmp/log_db1 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 + docker run --name kvserver-node3 --network mytestnetwork --ip 172.18.0.12 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 2 /tmp/kv_db2 /tmp/log_db2 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 + sleep 1 + docker run --name metaserver-node1 --network mytestnetwork --ip 172.18.0.2 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 0 /tmp/meta_db0 /tmp/log_db0 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 + sleep 3 + docker run --name metaserver-node2 --network mytestnetwork --ip 172.18.0.3 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 1 /tmp/meta_db1 /tmp/log_db1 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 + docker run --name metaserver-node3 --network mytestnetwork --ip 172.18.0.4 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 2 /tmp/meta_db2 /tmp/log_db2 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 + sleep 16 + docker run --name metaserver-tests --network mytestnetwork --ip 172.18.0.8 -it --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta_server_test + sleep 2 + docker run --name vdbserver-node --network mytestnetwork --ip 172.18.0.6 -it --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraft-vdb 172.18.0.6:12306 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 stop-demo: - docker stop kvserver-node1 kvserver-node2 kvserver-node3 vdbserver-node + docker stop kvserver-node1 kvserver-node2 kvserver-node3 vdbserver-node metaserver-node1 metaserver-node2 metaserver-node3 run-demo-bench: docker run --name kvserver-bench --network mytestnetwork --ip 172.18.0.5 --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv-ctl 172.18.0.2:8088 bench 64 64 10 run-vdb-tests: chmod +x utils/run-vdb-tests.sh - docker run --name vdbserver-node-tests --network mytestnetwork --ip 172.18.0.8 -it --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/utils/run-vdb-tests.sh + docker run --name vdbserver-node-tests --network mytestnetwork --ip 172.18.0.9 -it --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/utils/run-vdb-tests.sh run-metaserver-tests: docker run --name metaserver-node1 --network mytestnetwork --ip 172.18.0.2 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 0 /tmp/meta_db0 /tmp/log_db0 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 docker run --name metaserver-node2 --network mytestnetwork --ip 172.18.0.3 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 1 /tmp/meta_db1 /tmp/log_db1 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 docker run --name metaserver-node3 --network mytestnetwork --ip 172.18.0.4 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 2 /tmp/meta_db2 /tmp/log_db2 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 - sleep 10 + sleep 6 docker run --name metaserver-tests --network mytestnetwork --ip 172.18.0.8 -it --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta_server_test sleep 2 docker stop metaserver-node1 metaserver-node2 metaserver-node3 diff --git a/README.md b/README.md index e7d2e0c8..96f1077d 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,14 @@ ERaftKV is a persistent distributed KV storage system, uses the Raft protocol to # Getting Started +## Build + +Execute follower build command on the machine with docker installed. + +``` +sudo make build-dev +``` + ## Run demo in docker - step 1, create docker sub net @@ -34,23 +42,59 @@ docker network create --subnet=172.18.0.0/16 mytestnetwork f57ad3d454f27f4b84efca3ce61bf4764bd30ce3d4971b85477daf05c6ae28a3 ``` -- step 2, run cluster +- step 2, run cluster in shard mode ``` sudo make run-demo ``` command output ``` -docker run --name kvserver-node1 --network mytestnetwork --ip 172.18.0.2 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 0 /tmp/kv_db0 /tmp/log_db0 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 -b11928b1281d6693a1fb12d12ab6fbc1f4b13c509d983fb3f04551fdcdff5d32 -sleep 4s -docker run --name kvserver-node2 --network mytestnetwork --ip 172.18.0.3 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 1 /tmp/kv_db1 /tmp/log_db1 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 -7588f7ab8176a518ef6100157cfa6cc966ce719401b6f1909f7944230ef4266b -docker run --name kvserver-node3 --network mytestnetwork --ip 172.18.0.4 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 2 /tmp/kv_db2 /tmp/log_db2 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 -8192b50bfc5f00ab0ce43fe95013fa1c80b40a5383ed260333c3032bf7e62203 -sleep 10s -docker run --name vdbserver-node --network mytestnetwork --ip 172.18.0.6 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraft-vdb 172.18.0.6:12306 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 -32064172894ca5afb6bc20546121817da8c75438e36c54550b373d8236690653 +docker run --name kvserver-node1 --network mytestnetwork --ip 172.18.0.10 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 0 /tmp/kv_db0 /tmp/log_db0 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 +eca081a545a9eb8dbf9b05c2a307f38c74b4fea2910776e85c806c1b70cedf20 +sleep 2 +docker run --name kvserver-node2 --network mytestnetwork --ip 172.18.0.11 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 1 /tmp/kv_db1 /tmp/log_db1 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 +74d14edf114f47889b50f0ed20ea810af7cd383de26ebdd3d1e36078290674e7 +docker run --name kvserver-node3 --network mytestnetwork --ip 172.18.0.12 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 2 /tmp/kv_db2 /tmp/log_db2 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 +36bd437e67d00f019732e95e31b7f7ab9c19739a0f10676f31e9c0a7fad98a6c +sleep 1 +docker run --name metaserver-node1 --network mytestnetwork --ip 172.18.0.2 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 0 /tmp/meta_db0 /tmp/log_db0 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 +f8a1382542f41d14e645ddeb285e8b93afc4367b8537e5bc4030487116d8f5cd +sleep 3 +docker run --name metaserver-node2 --network mytestnetwork --ip 172.18.0.3 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 1 /tmp/meta_db1 /tmp/log_db1 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 +7f5385341bc1f990f50020bd09526eaba3eeec56ab3c67fab325d313ab4ceaea +docker run --name metaserver-node3 --network mytestnetwork --ip 172.18.0.4 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 2 /tmp/meta_db2 /tmp/log_db2 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 +666732b5a9b10cd828e9c0829bd97159b3ac9a7d39d1d3f2dcbbc2e5af654373 +sleep 16 +docker run --name metaserver-tests --network mytestnetwork --ip 172.18.0.8 -it --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta_server_test +[==========] Running 1 test from 1 test suite. +[----------] Global test environment set-up. +[----------] 1 test from EraftMetaServerTests +[ RUN ] EraftMetaServerTests.TestMetaBasicOp +DEBUG: cluster config resp -> success: true +shard_group { + id: 1 + servers { + address: "172.18.0.10:8088" + } + servers { + id: 1 + address: "172.18.0.11:8089" + } + servers { + id: 2 + address: "172.18.0.12:8090" + } +} + +[ OK ] EraftMetaServerTests.TestMetaBasicOp (4028 ms) +[----------] 1 test from EraftMetaServerTests (4028 ms total) + +[----------] Global test environment tear-down +[==========] 1 test from 1 test suite ran. (4028 ms total) +[ PASSED ] 1 test. +sleep 2 +docker run --name vdbserver-node --network mytestnetwork --ip 172.18.0.6 -it --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraft-vdb 172.18.0.6:12306 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 +run server success! ``` - step 3, run eraft vdb tests @@ -61,15 +105,32 @@ sudo make run-vdb-tests command output ``` chmod +x utils/run-vdb-tests.sh -docker run --name vdbserver-node-tests --network mytestnetwork --ip 172.18.0.8 -it --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/utils/run-vdb-tests.sh +docker run --name vdbserver-node-tests --network mytestnetwork --ip 172.18.0.9 -it --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/utils/run-vdb-tests.sh + redis-cli -h 172.18.0.6 -p 12306 info -server_id: 0,server_address: 172.18.0.2:8088,status: Running,Role: Follower -server_id: 1,server_address: 172.18.0.3:8089,status: Running,Role: Leader -server_id: 2,server_address: 172.18.0.4:8090,status: Running,Role: Follower -+ redis-cli -h 172.18.0.6 -p 12306 set a testvalue +server_id: 0,server_address: 172.18.0.10:8088,status: Running,Role: Leader +server_id: 1,server_address: 172.18.0.11:8089,status: Running,Role: Follower +server_id: 2,server_address: 172.18.0.12:8090,status: Running,Role: Follower ++ redis-cli -h 172.18.0.6 -p 12306 set a h +OK ++ redis-cli -h 172.18.0.6 -p 12306 set b e +OK ++ redis-cli -h 172.18.0.6 -p 12306 set c l +OK ++ redis-cli -h 172.18.0.6 -p 12306 set d l +OK ++ redis-cli -h 172.18.0.6 -p 12306 set e o OK ++ sleep 1 + redis-cli -h 172.18.0.6 -p 12306 get a -"testvalue" +"h" ++ redis-cli -h 172.18.0.6 -p 12306 get b +"e" ++ redis-cli -h 172.18.0.6 -p 12306 get c +"l" ++ redis-cli -h 172.18.0.6 -p 12306 get d +"l" ++ redis-cli -h 172.18.0.6 -p 12306 get e +"o" ``` - step 4, clean all @@ -106,50 +167,7 @@ make image ``` # Documentation - -## Run ERaftKV server group demo in physical machine - -- how to set up demo cluster? - -``` -./build/eraftkv 0 /tmp/kv_db0 /tmp/log_db0 127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090 -./build/eraftkv 1 /tmp/kv_db1 /tmp/log_db1 127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090 -./build/eraftkv 2 /tmp/kv_db2 /tmp/log_db2 127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090 - -``` - -- how to join an new node? - -``` -./build/eraftkv 3 /tmp/kv_db4 /tmp/log_db4 127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090,127.0.0.1:8091 - -./build/eraftkv-ctl addnode 3 127.0.0.1:8091 -``` - -## example usage - -- put kv -``` -./eraftkv-ctl [leader_address] put [key] [value] -``` -- get kv -``` -./eraftkv-ctl [leader_address] get [key] -``` - -- addnode to raft group -``` -./eraftkv-ctl [leader_address] addnode [node id] [node address] -``` -- remove node from raft group -``` -./eraftkv-ctl [leader_address] removenode [node id] -``` - -- run kv benchmark -``` -./eraftkv-ctl [leader_address] bench [key size] [value size] [op count] -``` +[ERaftKV Documentation](doc/eraft-vdb.md) # Contributing diff --git a/protocol/eraftkv.proto b/protocol/eraftkv.proto index 1f890b1c..d3defa42 100644 --- a/protocol/eraftkv.proto +++ b/protocol/eraftkv.proto @@ -168,7 +168,9 @@ message ClientOperationReq { } message ClientOperationResp { - repeated KvOpPair ops = 2; + repeated KvOpPair ops = 1; + ErrorCode error_code = 2; + int64 leader_addr = 3; } service ERaftKv { diff --git a/src/client.cc b/src/client.cc index b0681eac..341d2038 100644 --- a/src/client.cc +++ b/src/client.cc @@ -41,15 +41,18 @@ PacketLength Client::_HandlePacket(const char *start, std::size_t bytes) { return static_cast(bytes); } -Client::Client(std::string kv_addrs) : leader_addr_("") { - // init stub to kv server node - auto kv_node_addrs = StringUtil::Split(kv_addrs, ','); - for (auto kv_node_addr : kv_node_addrs) { +Client::Client(std::string meta_addrs) : leader_addr_("") { + // init stub to meta server node + auto meta_node_addrs = StringUtil::Split(meta_addrs, ','); + for (auto meta_node_addr : meta_node_addrs) { + TraceLog("DEBUG: init rpc link to ", meta_node_addr); auto chan_ = - grpc::CreateChannel(kv_node_addr, grpc::InsecureChannelCredentials()); + grpc::CreateChannel(meta_node_addr, grpc::InsecureChannelCredentials()); std::unique_ptr stub_(ERaftKv::NewStub(chan_)); - this->stubs_[kv_node_addr] = std::move(stub_); + this->stubs_[meta_node_addr] = std::move(stub_); } + // sync config + SyncClusterConfig(); _Reset(); } @@ -60,19 +63,52 @@ void Client::_Reset() { void Client::OnConnect() {} -std::string Client::GetLeaderAddr() { +std::string Client::GetLeaderAddr(std::string partion_key) { + std::string leader_address; + int64_t key_slot = -1; + key_slot = HashUtil::CRC64(0, partion_key.c_str(), partion_key.size()) % 1024; + TraceLog("DEBUG: partion key " + partion_key + " with slot ", key_slot); + for (auto sg : cluster_conf_.shard_group()) { + for (auto sl : sg.slots()) { + if (key_slot == sl.id()) { + // find sg leader addr + for (auto server : sg.servers()) { + if (server.id() == sg.leader_id()) { + ClientContext context; + eraftkv::ClusterConfigChangeReq req; + req.set_change_type(eraftkv::ChangeType::MetaMembersQuery); + eraftkv::ClusterConfigChangeResp resp; + auto status = stubs_[server.address()]->ClusterConfigChange( + &context, req, &resp); + for (int i = 0; i < resp.shard_group(0).servers_size(); i++) { + if (resp.shard_group(0).leader_id() == + resp.shard_group(0).servers(i).id()) { + leader_address = resp.shard_group(0).servers(i).address(); + } + } + } + } + } + } + } + + return leader_address; +} + +EStatus Client::SyncClusterConfig() { ClientContext context; eraftkv::ClusterConfigChangeReq req; req.set_change_type(eraftkv::ChangeType::ShardsQuery); - eraftkv::ClusterConfigChangeResp resp; - auto status = - this->stubs_.begin()->second->ClusterConfigChange(&context, req, &resp); - std::string leader_addr = ""; - for (int i = 0; i < resp.shard_group(0).servers_size(); i++) { - if (resp.shard_group(0).leader_id() == - resp.shard_group(0).servers(i).id()) { - leader_addr = resp.shard_group(0).servers(i).address(); + auto status_ = this->stubs_.begin()->second->ClusterConfigChange( + &context, req, &cluster_conf_); + for (auto sg : cluster_conf_.shard_group()) { + for (auto server : sg.servers()) { + auto chan_ = grpc::CreateChannel(server.address(), + grpc::InsecureChannelCredentials()); + std::unique_ptr stub_(ERaftKv::NewStub(chan_)); + this->stubs_[server.address()] = std::move(stub_); } } - return leader_addr; + + return status_.ok() ? EStatus::kOk : EStatus::kError; } diff --git a/src/client.h b/src/client.h index 2ff0f1a8..ca428d99 100644 --- a/src/client.h +++ b/src/client.h @@ -17,6 +17,7 @@ #include "command_handler.h" #include "eraftkv.grpc.pb.h" #include "eraftkv.pb.h" +#include "estatus.h" #include "proto_parser.h" #include "stream_socket.h" #include "unbounded_buffer.h" @@ -43,10 +44,14 @@ class Client : public StreamSocket { std::string leader_addr_; + eraftkv::ClusterConfigChangeResp cluster_conf_; + public: Client(std::string kv_addrs); - std::string GetLeaderAddr(); + std::string GetLeaderAddr(std::string partion_key); + + EStatus SyncClusterConfig(); void _Reset(); diff --git a/src/epoller.cc b/src/epoller.cc index 7a9b7a43..54111106 100644 --- a/src/epoller.cc +++ b/src/epoller.cc @@ -1,6 +1,6 @@ /** * @file epoller.cc - * @author ERaftGroup + * @author https://github.com/loveyacper/Qedis * @brief * @version 0.1 * @date 2023-06-17 diff --git a/src/eraftkv.cc b/src/eraftkv.cc index 040e9593..a42e8f70 100644 --- a/src/eraftkv.cc +++ b/src/eraftkv.cc @@ -43,6 +43,7 @@ * @param argc * @param argv (eg: eraftkv 0 /tmp/kv_db0 /tmp/log_db0 * 127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090) + * eraftkv [node id] [kv data path] [log data path] [meta server addrs] * @return int */ int main(int argc, char* argv[]) { diff --git a/src/eraftkv.pb.cc b/src/eraftkv.pb.cc index 59ef735f..d6886906 100644 --- a/src/eraftkv.pb.cc +++ b/src/eraftkv.pb.cc @@ -477,6 +477,8 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_eraftkv_2eproto::offsets[] PRO ~0u, // no _oneof_case_ ~0u, // no _weak_field_map_ PROTOBUF_FIELD_OFFSET(::eraftkv::ClientOperationResp, ops_), + PROTOBUF_FIELD_OFFSET(::eraftkv::ClientOperationResp, error_code_), + PROTOBUF_FIELD_OFFSET(::eraftkv::ClientOperationResp, leader_addr_), }; static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { { 0, -1, sizeof(::eraftkv::RequestVoteReq)}, @@ -565,30 +567,32 @@ const char descriptor_table_protodef_eraftkv_2eproto[] PROTOBUF_SECTION_VARIABLE "\t\022\r\n\005value\030\003 \001(\t\022\017\n\007success\030\004 \001(\010\022\020\n\010op_" "count\030\005 \001(\003\"J\n\022ClientOperationReq\022\024\n\014op_" "timestamp\030\001 \001(\004\022\036\n\003kvs\030\002 \003(\0132\021.eraftkv.K" - "vOpPair\"5\n\023ClientOperationResp\022\036\n\003ops\030\002 " - "\003(\0132\021.eraftkv.KvOpPair*O\n\tErrorCode\022\033\n\027R" - "EQUEST_NOT_LEADER_NODE\020\000\022\020\n\014NODE_IS_DOWN" - "\020\001\022\023\n\017REQUEST_TIMEOUT\020\002*1\n\tEntryType\022\n\n\006" - "Normal\020\000\022\016\n\nConfChange\020\001\022\010\n\004NoOp\020\002*A\n\nSl" - "otStatus\022\013\n\007Running\020\000\022\r\n\tMigrating\020\001\022\r\n\t" - "Importing\020\002\022\010\n\004Init\020\003* \n\014ServerStatus\022\006\n" - "\002Up\020\000\022\010\n\004Down\020\001*\222\001\n\nChangeType\022\017\n\013Cluste" - "rInit\020\000\022\r\n\tShardJoin\020\001\022\016\n\nShardLeave\020\002\022\017" - "\n\013ShardsQuery\020\003\022\014\n\010SlotMove\020\004\022\016\n\nServerJ" - "oin\020\005\022\017\n\013ServerLeave\020\006\022\024\n\020MetaMembersQue" - "ry\020\007*2\n\020HandleServerType\022\016\n\nMetaServer\020\000" - "\022\016\n\nDataServer\020\001*=\n\014ClientOpType\022\010\n\004Noop" - "\020\000\022\007\n\003Put\020\001\022\007\n\003Get\020\002\022\007\n\003Del\020\003\022\010\n\004Scan\020\0042" - "\367\002\n\007ERaftKv\022@\n\013RequestVote\022\027.eraftkv.Req" - "uestVoteReq\032\030.eraftkv.RequestVoteResp\022F\n" - "\rAppendEntries\022\031.eraftkv.AppendEntriesRe" - "q\032\032.eraftkv.AppendEntriesResp\0227\n\010Snapsho" - "t\022\024.eraftkv.SnapshotReq\032\025.eraftkv.Snapsh" - "otResp\022O\n\022ProcessRWOperation\022\033.eraftkv.C" - "lientOperationReq\032\034.eraftkv.ClientOperat" - "ionResp\022X\n\023ClusterConfigChange\022\037.eraftkv" - ".ClusterConfigChangeReq\032 .eraftkv.Cluste" - "rConfigChangeRespb\006proto3" + "vOpPair\"r\n\023ClientOperationResp\022\036\n\003ops\030\001 " + "\003(\0132\021.eraftkv.KvOpPair\022&\n\nerror_code\030\002 \001" + "(\0162\022.eraftkv.ErrorCode\022\023\n\013leader_addr\030\003 " + "\001(\003*O\n\tErrorCode\022\033\n\027REQUEST_NOT_LEADER_N" + "ODE\020\000\022\020\n\014NODE_IS_DOWN\020\001\022\023\n\017REQUEST_TIMEO" + "UT\020\002*1\n\tEntryType\022\n\n\006Normal\020\000\022\016\n\nConfCha" + "nge\020\001\022\010\n\004NoOp\020\002*A\n\nSlotStatus\022\013\n\007Running" + "\020\000\022\r\n\tMigrating\020\001\022\r\n\tImporting\020\002\022\010\n\004Init" + "\020\003* \n\014ServerStatus\022\006\n\002Up\020\000\022\010\n\004Down\020\001*\222\001\n" + "\nChangeType\022\017\n\013ClusterInit\020\000\022\r\n\tShardJoi" + "n\020\001\022\016\n\nShardLeave\020\002\022\017\n\013ShardsQuery\020\003\022\014\n\010" + "SlotMove\020\004\022\016\n\nServerJoin\020\005\022\017\n\013ServerLeav" + "e\020\006\022\024\n\020MetaMembersQuery\020\007*2\n\020HandleServe" + "rType\022\016\n\nMetaServer\020\000\022\016\n\nDataServer\020\001*=\n" + "\014ClientOpType\022\010\n\004Noop\020\000\022\007\n\003Put\020\001\022\007\n\003Get\020" + "\002\022\007\n\003Del\020\003\022\010\n\004Scan\020\0042\367\002\n\007ERaftKv\022@\n\013Requ" + "estVote\022\027.eraftkv.RequestVoteReq\032\030.eraft" + "kv.RequestVoteResp\022F\n\rAppendEntries\022\031.er" + "aftkv.AppendEntriesReq\032\032.eraftkv.AppendE" + "ntriesResp\0227\n\010Snapshot\022\024.eraftkv.Snapsho" + "tReq\032\025.eraftkv.SnapshotResp\022O\n\022ProcessRW" + "Operation\022\033.eraftkv.ClientOperationReq\032\034" + ".eraftkv.ClientOperationResp\022X\n\023ClusterC" + "onfigChange\022\037.eraftkv.ClusterConfigChang" + "eReq\032 .eraftkv.ClusterConfigChangeRespb\006" + "proto3" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_eraftkv_2eproto_deps[1] = { }; @@ -613,7 +617,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_era static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_eraftkv_2eproto_once; static bool descriptor_table_eraftkv_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_eraftkv_2eproto = { - &descriptor_table_eraftkv_2eproto_initialized, descriptor_table_protodef_eraftkv_2eproto, "eraftkv.proto", 2865, + &descriptor_table_eraftkv_2eproto_initialized, descriptor_table_protodef_eraftkv_2eproto, "eraftkv.proto", 2926, &descriptor_table_eraftkv_2eproto_once, descriptor_table_eraftkv_2eproto_sccs, descriptor_table_eraftkv_2eproto_deps, 16, 0, schemas, file_default_instances, TableStruct_eraftkv_2eproto::offsets, file_level_metadata_eraftkv_2eproto, 16, file_level_enum_descriptors_eraftkv_2eproto, file_level_service_descriptors_eraftkv_2eproto, @@ -5194,11 +5198,17 @@ ClientOperationResp::ClientOperationResp(const ClientOperationResp& from) _internal_metadata_(nullptr), ops_(from.ops_) { _internal_metadata_.MergeFrom(from._internal_metadata_); + ::memcpy(&leader_addr_, &from.leader_addr_, + static_cast(reinterpret_cast(&error_code_) - + reinterpret_cast(&leader_addr_)) + sizeof(error_code_)); // @@protoc_insertion_point(copy_constructor:eraftkv.ClientOperationResp) } void ClientOperationResp::SharedCtor() { ::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&scc_info_ClientOperationResp_eraftkv_2eproto.base); + ::memset(&leader_addr_, 0, static_cast( + reinterpret_cast(&error_code_) - + reinterpret_cast(&leader_addr_)) + sizeof(error_code_)); } ClientOperationResp::~ClientOperationResp() { @@ -5225,6 +5235,9 @@ void ClientOperationResp::Clear() { (void) cached_has_bits; ops_.Clear(); + ::memset(&leader_addr_, 0, static_cast( + reinterpret_cast(&error_code_) - + reinterpret_cast(&leader_addr_)) + sizeof(error_code_)); _internal_metadata_.Clear(); } @@ -5235,16 +5248,31 @@ const char* ClientOperationResp::_InternalParse(const char* ptr, ::PROTOBUF_NAME ptr = ::PROTOBUF_NAMESPACE_ID::internal::ReadTag(ptr, &tag); CHK_(ptr); switch (tag >> 3) { - // repeated .eraftkv.KvOpPair ops = 2; - case 2: - if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 18)) { + // repeated .eraftkv.KvOpPair ops = 1; + case 1: + if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 10)) { ptr -= 1; do { ptr += 1; ptr = ctx->ParseMessage(_internal_add_ops(), ptr); CHK_(ptr); if (!ctx->DataAvailable(ptr)) break; - } while (::PROTOBUF_NAMESPACE_ID::internal::ExpectTag<18>(ptr)); + } while (::PROTOBUF_NAMESPACE_ID::internal::ExpectTag<10>(ptr)); + } else goto handle_unusual; + continue; + // .eraftkv.ErrorCode error_code = 2; + case 2: + if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 16)) { + ::PROTOBUF_NAMESPACE_ID::uint64 val = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint(&ptr); + CHK_(ptr); + _internal_set_error_code(static_cast<::eraftkv::ErrorCode>(val)); + } else goto handle_unusual; + continue; + // int64 leader_addr = 3; + case 3: + if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 24)) { + leader_addr_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint(&ptr); + CHK_(ptr); } else goto handle_unusual; continue; default: { @@ -5273,12 +5301,25 @@ ::PROTOBUF_NAMESPACE_ID::uint8* ClientOperationResp::_InternalSerialize( ::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0; (void) cached_has_bits; - // repeated .eraftkv.KvOpPair ops = 2; + // repeated .eraftkv.KvOpPair ops = 1; for (unsigned int i = 0, n = static_cast(this->_internal_ops_size()); i < n; i++) { target = stream->EnsureSpace(target); target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite:: - InternalWriteMessage(2, this->_internal_ops(i), target, stream); + InternalWriteMessage(1, this->_internal_ops(i), target, stream); + } + + // .eraftkv.ErrorCode error_code = 2; + if (this->error_code() != 0) { + target = stream->EnsureSpace(target); + target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteEnumToArray( + 2, this->_internal_error_code(), target); + } + + // int64 leader_addr = 3; + if (this->leader_addr() != 0) { + target = stream->EnsureSpace(target); + target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64ToArray(3, this->_internal_leader_addr(), target); } if (PROTOBUF_PREDICT_FALSE(_internal_metadata_.have_unknown_fields())) { @@ -5297,13 +5338,26 @@ size_t ClientOperationResp::ByteSizeLong() const { // Prevent compiler warnings about cached_has_bits being unused (void) cached_has_bits; - // repeated .eraftkv.KvOpPair ops = 2; + // repeated .eraftkv.KvOpPair ops = 1; total_size += 1UL * this->_internal_ops_size(); for (const auto& msg : this->ops_) { total_size += ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::MessageSize(msg); } + // int64 leader_addr = 3; + if (this->leader_addr() != 0) { + total_size += 1 + + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::Int64Size( + this->_internal_leader_addr()); + } + + // .eraftkv.ErrorCode error_code = 2; + if (this->error_code() != 0) { + total_size += 1 + + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::EnumSize(this->_internal_error_code()); + } + if (PROTOBUF_PREDICT_FALSE(_internal_metadata_.have_unknown_fields())) { return ::PROTOBUF_NAMESPACE_ID::internal::ComputeUnknownFieldsSize( _internal_metadata_, total_size, &_cached_size_); @@ -5336,6 +5390,12 @@ void ClientOperationResp::MergeFrom(const ClientOperationResp& from) { (void) cached_has_bits; ops_.MergeFrom(from.ops_); + if (from.leader_addr() != 0) { + _internal_set_leader_addr(from._internal_leader_addr()); + } + if (from.error_code() != 0) { + _internal_set_error_code(from._internal_error_code()); + } } void ClientOperationResp::CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) { @@ -5360,6 +5420,8 @@ void ClientOperationResp::InternalSwap(ClientOperationResp* other) { using std::swap; _internal_metadata_.Swap(&other->_internal_metadata_); ops_.InternalSwap(&other->ops_); + swap(leader_addr_, other->leader_addr_); + swap(error_code_, other->error_code_); } ::PROTOBUF_NAMESPACE_ID::Metadata ClientOperationResp::GetMetadata() const { diff --git a/src/eraftkv.pb.h b/src/eraftkv.pb.h index 267270ed..ecef6f89 100644 --- a/src/eraftkv.pb.h +++ b/src/eraftkv.pb.h @@ -3087,9 +3087,11 @@ class ClientOperationResp : // accessors ------------------------------------------------------- enum : int { - kOpsFieldNumber = 2, + kOpsFieldNumber = 1, + kLeaderAddrFieldNumber = 3, + kErrorCodeFieldNumber = 2, }; - // repeated .eraftkv.KvOpPair ops = 2; + // repeated .eraftkv.KvOpPair ops = 1; int ops_size() const; private: int _internal_ops_size() const; @@ -3107,12 +3109,32 @@ class ClientOperationResp : const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::eraftkv::KvOpPair >& ops() const; + // int64 leader_addr = 3; + void clear_leader_addr(); + ::PROTOBUF_NAMESPACE_ID::int64 leader_addr() const; + void set_leader_addr(::PROTOBUF_NAMESPACE_ID::int64 value); + private: + ::PROTOBUF_NAMESPACE_ID::int64 _internal_leader_addr() const; + void _internal_set_leader_addr(::PROTOBUF_NAMESPACE_ID::int64 value); + public: + + // .eraftkv.ErrorCode error_code = 2; + void clear_error_code(); + ::eraftkv::ErrorCode error_code() const; + void set_error_code(::eraftkv::ErrorCode value); + private: + ::eraftkv::ErrorCode _internal_error_code() const; + void _internal_set_error_code(::eraftkv::ErrorCode value); + public: + // @@protoc_insertion_point(class_scope:eraftkv.ClientOperationResp) private: class _Internal; ::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_; ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::eraftkv::KvOpPair > ops_; + ::PROTOBUF_NAMESPACE_ID::int64 leader_addr_; + int error_code_; mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_; friend struct ::TableStruct_eraftkv_2eproto; }; @@ -5162,7 +5184,7 @@ ClientOperationReq::kvs() const { // ClientOperationResp -// repeated .eraftkv.KvOpPair ops = 2; +// repeated .eraftkv.KvOpPair ops = 1; inline int ClientOperationResp::_internal_ops_size() const { return ops_.size(); } @@ -5201,6 +5223,46 @@ ClientOperationResp::ops() const { return ops_; } +// .eraftkv.ErrorCode error_code = 2; +inline void ClientOperationResp::clear_error_code() { + error_code_ = 0; +} +inline ::eraftkv::ErrorCode ClientOperationResp::_internal_error_code() const { + return static_cast< ::eraftkv::ErrorCode >(error_code_); +} +inline ::eraftkv::ErrorCode ClientOperationResp::error_code() const { + // @@protoc_insertion_point(field_get:eraftkv.ClientOperationResp.error_code) + return _internal_error_code(); +} +inline void ClientOperationResp::_internal_set_error_code(::eraftkv::ErrorCode value) { + + error_code_ = value; +} +inline void ClientOperationResp::set_error_code(::eraftkv::ErrorCode value) { + _internal_set_error_code(value); + // @@protoc_insertion_point(field_set:eraftkv.ClientOperationResp.error_code) +} + +// int64 leader_addr = 3; +inline void ClientOperationResp::clear_leader_addr() { + leader_addr_ = PROTOBUF_LONGLONG(0); +} +inline ::PROTOBUF_NAMESPACE_ID::int64 ClientOperationResp::_internal_leader_addr() const { + return leader_addr_; +} +inline ::PROTOBUF_NAMESPACE_ID::int64 ClientOperationResp::leader_addr() const { + // @@protoc_insertion_point(field_get:eraftkv.ClientOperationResp.leader_addr) + return _internal_leader_addr(); +} +inline void ClientOperationResp::_internal_set_leader_addr(::PROTOBUF_NAMESPACE_ID::int64 value) { + + leader_addr_ = value; +} +inline void ClientOperationResp::set_leader_addr(::PROTOBUF_NAMESPACE_ID::int64 value) { + _internal_set_leader_addr(value); + // @@protoc_insertion_point(field_set:eraftkv.ClientOperationResp.leader_addr) +} + #ifdef __GNUC__ #pragma GCC diagnostic pop #endif // __GNUC__ diff --git a/src/eraftkv_server.cc b/src/eraftkv_server.cc index 89646c7e..06bc8cd1 100644 --- a/src/eraftkv_server.cc +++ b/src/eraftkv_server.cc @@ -107,6 +107,12 @@ grpc::Status ERaftKvServer::ProcessRWOperation( int64_t log_term; bool success; TraceLog("DEBUG: ", " recv rw op with ts ", req->op_timestamp()); + // no leader reject + if (!raft_context_->IsLeader()) { + resp->set_error_code(eraftkv::ErrorCode::REQUEST_NOT_LEADER_NODE); + resp->set_leader_addr(raft_context_->GetLeaderId()); + return grpc::Status::OK; + } for (auto kv_op : req->kvs()) { if (kv_op.op_type() == eraftkv::ClientOpType::Put) { std::mutex map_mutex_; diff --git a/src/eraftmetaserver_test.cc b/src/eraftmetaserver_test.cc index e3b8d795..06fc6a87 100644 --- a/src/eraftmetaserver_test.cc +++ b/src/eraftmetaserver_test.cc @@ -67,7 +67,6 @@ TEST(EraftMetaServerTests, TestMetaBasicOp) { } ASSERT_FALSE(leader_address.empty()); - // TraceLog("DEBUG: leader ", leader_address); auto leader_chan = grpc::CreateChannel(leader_address, grpc::InsecureChannelCredentials()); @@ -76,19 +75,19 @@ TEST(EraftMetaServerTests, TestMetaBasicOp) { ClientContext add_sg_context; eraftkv::ClusterConfigChangeReq req; req.set_handle_server_type(eraftkv::HandleServerType::MetaServer); - req.set_shard_id(888); - req.mutable_shard_group()->set_id(888); + req.set_shard_id(1); + req.mutable_shard_group()->set_id(1); req.mutable_shard_group()->set_leader_id(0); auto svr0 = req.mutable_shard_group()->add_servers(); - svr0->set_address("127.0.0.1:8080"); + svr0->set_address("172.18.0.10:8088"); svr0->set_id(0); svr0->set_server_status(eraftkv::ServerStatus::Up); auto svr1 = req.mutable_shard_group()->add_servers(); - svr1->set_address("127.0.0.1:8081"); + svr1->set_address("172.18.0.11:8089"); svr1->set_id(1); svr1->set_server_status(eraftkv::ServerStatus::Up); auto svr2 = req.mutable_shard_group()->add_servers(); - svr2->set_address("127.0.0.1:8080"); + svr2->set_address("172.18.0.12:8090"); svr2->set_id(2); svr2->set_server_status(eraftkv::ServerStatus::Up); req.set_change_type(eraftkv::ChangeType::ShardJoin); @@ -107,7 +106,34 @@ TEST(EraftMetaServerTests, TestMetaBasicOp) { ASSERT_EQ(status2.ok(), true); ASSERT_EQ(query_resp.shard_group_size(), 1); ASSERT_EQ(query_resp.shard_group(0).servers_size(), 3); - ASSERT_EQ(query_resp.shard_group(0).id(), 888); + ASSERT_EQ(query_resp.shard_group(0).id(), 1); + TraceLog("DEBUG: cluster config resp -> ", query_resp.DebugString()); + + ClientContext move_context; + eraftkv::ClusterConfigChangeReq move_req; + move_req.set_change_type(eraftkv::ChangeType::SlotMove); + move_req.set_shard_id(1); + auto to_move_sg = move_req.mutable_shard_group(); + to_move_sg->set_id(1); + + for (int64_t i = 0; i < 1024; i++) { + auto new_slot = to_move_sg->add_slots(); + new_slot->set_id(i); + new_slot->set_slot_status(eraftkv::SlotStatus::Running); + } + + eraftkv::ClusterConfigChangeResp move_resp; + auto status3 = + leader_stub->ClusterConfigChange(&move_context, move_req, &move_resp); + ASSERT_EQ(status3.ok(), true); + + std::this_thread::sleep_for(std::chrono::seconds(2)); + ClientContext query_context_; + auto status4 = + leader_stub->ClusterConfigChange(&query_context_, query_req, &query_resp); + // TraceLog("DEBUG: cluster config after move resp -> ", + // query_resp.DebugString()); + ASSERT_EQ(status4.ok(), true); } int main(int argc, char** argv) { diff --git a/src/get_command_handler.cc b/src/get_command_handler.cc index 343ccaa2..a15848b7 100644 --- a/src/get_command_handler.cc +++ b/src/get_command_handler.cc @@ -14,9 +14,7 @@ EStatus GetCommandHandler::Execute(const std::vector& params, Client* cli) { std::string leader_addr; - if (cli->leader_addr_ == "") { - leader_addr = cli->GetLeaderAddr(); - } + leader_addr = cli->GetLeaderAddr(params[1]); ClientContext op_context; eraftkv::ClientOperationReq op_req; auto kv_pair_ = op_req.add_kvs(); diff --git a/src/info_command_handler.cc b/src/info_command_handler.cc index 7838f6fa..f3fd0ffe 100644 --- a/src/info_command_handler.cc +++ b/src/info_command_handler.cc @@ -15,7 +15,7 @@ EStatus InfoCommandHandler::Execute(const std::vector& params, Client* cli) { ClientContext context; eraftkv::ClusterConfigChangeReq req; - req.set_change_type(eraftkv::ChangeType::ShardsQuery); + req.set_change_type(eraftkv::ChangeType::MetaMembersQuery); eraftkv::ClusterConfigChangeResp resp; auto status = diff --git a/src/raft_server.cc b/src/raft_server.cc index cb130ab5..e49d09d6 100644 --- a/src/raft_server.cc +++ b/src/raft_server.cc @@ -916,4 +916,8 @@ std::vector RaftServer::GetNodes() { */ int64_t RaftServer::GetLeaderId() { return leader_id_; +} + +bool RaftServer::IsLeader() { + return leader_id_ == id_; } \ No newline at end of file diff --git a/src/raft_server.h b/src/raft_server.h index 4ae5c33e..c39e007b 100644 --- a/src/raft_server.h +++ b/src/raft_server.h @@ -400,6 +400,14 @@ class RaftServer { */ int64_t GetLeaderId(); + /** + * @brief return true is node is leader + * + * @return true + * @return false + */ + bool IsLeader(); + /** * @brief * diff --git a/src/rocksdb_storage_impl.cc b/src/rocksdb_storage_impl.cc index 9eb48bca..eb3b2cd3 100644 --- a/src/rocksdb_storage_impl.cc +++ b/src/rocksdb_storage_impl.cc @@ -119,10 +119,10 @@ EStatus RocksDBStorageImpl::ApplyLog(RaftServer* raft, eraftkv::ClusterConfigChangeReq* conf_change_req = new eraftkv::ClusterConfigChangeReq(); conf_change_req->ParseFromString(ety->data()); + raft->log_store_->PersisLogMetaState(raft->commit_idx_, ety->id()); + raft->last_applied_idx_ = ety->id(); switch (conf_change_req->change_type()) { case eraftkv::ChangeType::ServerJoin: { - raft->log_store_->PersisLogMetaState(raft->commit_idx_, ety->id()); - raft->last_applied_idx_ = ety->id(); if (conf_change_req->server().id() != raft->id_) { RaftNode* new_node = new RaftNode(conf_change_req->server().id(), @@ -157,53 +157,69 @@ EStatus RocksDBStorageImpl::ApplyLog(RaftServer* raft, break; } case eraftkv::ChangeType::ServerLeave: { - if (conf_change_req->handle_server_type() == - eraftkv::HandleServerType::DataServer) { - raft->log_store_->PersisLogMetaState(raft->commit_idx_, - ety->id()); - raft->last_applied_idx_ = ety->id(); - auto to_remove_serverid = conf_change_req->server().id(); - for (auto iter = raft->nodes_.begin(); iter != raft->nodes_.end(); - iter++) { - if ((*iter)->id == to_remove_serverid && - conf_change_req->server().id() != raft->id_) { - (*iter)->node_state = NodeStateEnum::Down; - } + auto to_remove_serverid = conf_change_req->server().id(); + for (auto iter = raft->nodes_.begin(); iter != raft->nodes_.end(); + iter++) { + if ((*iter)->id == to_remove_serverid && + conf_change_req->server().id() != raft->id_) { + (*iter)->node_state = NodeStateEnum::Down; } } break; } case eraftkv::ChangeType::ShardJoin: { - if (conf_change_req->handle_server_type() == - eraftkv::HandleServerType::MetaServer) { - std::string key; - key.append(SG_META_PREFIX); - EncodeDecodeTool::PutFixed64( - &key, static_cast(conf_change_req->shard_id())); - auto sg = conf_change_req->shard_group(); - std::string val = sg.SerializeAsString(); - raft->store_->PutKV(key, val); - } + std::string key; + key.append(SG_META_PREFIX); + EncodeDecodeTool::PutFixed64( + &key, static_cast(conf_change_req->shard_id())); + auto sg = conf_change_req->shard_group(); + std::string val = sg.SerializeAsString(); + raft->store_->PutKV(key, val); } case eraftkv::ChangeType::ShardLeave: { - if (conf_change_req->handle_server_type() == - eraftkv::HandleServerType::MetaServer) { - std::string key; - key.append(SG_META_PREFIX); - EncodeDecodeTool::PutFixed64( - &key, static_cast(conf_change_req->shard_id())); - raft->store_->DelKV(key); - } + std::string key; + key.append(SG_META_PREFIX); + EncodeDecodeTool::PutFixed64( + &key, static_cast(conf_change_req->shard_id())); + raft->store_->DelKV(key); } case eraftkv::ChangeType::SlotMove: { + auto sg = conf_change_req->shard_group(); + std::string key = SG_META_PREFIX; + EncodeDecodeTool::PutFixed64( + &key, static_cast(conf_change_req->shard_id())); + auto value = raft->store_->GetKV(key); + if (!value.empty()) { + eraftkv::ShardGroup* old_sg = new eraftkv::ShardGroup(); + old_sg->ParseFromString(value); + // move slot to new sg + if (sg.id() == old_sg->id()) { + for (auto new_slot : sg.slots()) { + // check if slot already exists + bool slot_already_exists = false; + for (auto old_slot : old_sg->slots()) { + if (old_slot.id() == new_slot.id()) { + slot_already_exists = true; + } + } + // add slot to sg + if (!slot_already_exists) { + auto add_slot = old_sg->add_slots(); + add_slot->CopyFrom(new_slot); + } + } + // write back to db + EStatus st = + raft->store_->PutKV(key, old_sg->SerializeAsString()); + assert(st == EStatus::kOk); + } + } break; } case eraftkv::ChangeType::ShardsQuery: { break; } default: { - raft->log_store_->PersisLogMetaState(raft->commit_idx_, ety->id()); - raft->last_applied_idx_ = ety->id(); break; } } diff --git a/src/set_command_handler.cc b/src/set_command_handler.cc index 7b6662be..a74c6d9c 100644 --- a/src/set_command_handler.cc +++ b/src/set_command_handler.cc @@ -12,14 +12,13 @@ #include #include "command_handler.h" +#include "util.h" EStatus SetCommandHandler::Execute(const std::vector& params, Client* cli) { - std::string leader_addr; - if (cli->leader_addr_ == "") { - leader_addr = cli->GetLeaderAddr(); - } + leader_addr = cli->GetLeaderAddr(params[1]); + TraceLog("DEBUG: send request to leader ", leader_addr); ClientContext op_context; eraftkv::ClientOperationReq op_req; eraftkv::ClientOperationResp op_resp; diff --git a/src/util.cc b/src/util.cc index 14a91b37..3945c8d5 100644 --- a/src/util.cc +++ b/src/util.cc @@ -32,6 +32,176 @@ */ #include "util.h" +/* Redis uses the CRC64 variant with "Jones" coefficients and init value of 0. + * + * Specification of this CRC64 variant follows: + * Name: crc-64-jones + * Width: 64 bites + * Poly: 0xad93d23594c935a9 + * Reflected In: True + * Xor_In: 0xffffffffffffffff + * Reflected_Out: True + * Xor_Out: 0x0 + * Check("123456789"): 0xe9c6d914c4b8d9ca + * + * Copyright (c) 2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. */ + +static const uint64_t crc64_tab[256] = { + UINT64_C(0x0000000000000000), UINT64_C(0x7ad870c830358979), + UINT64_C(0xf5b0e190606b12f2), UINT64_C(0x8f689158505e9b8b), + UINT64_C(0xc038e5739841b68f), UINT64_C(0xbae095bba8743ff6), + UINT64_C(0x358804e3f82aa47d), UINT64_C(0x4f50742bc81f2d04), + UINT64_C(0xab28ecb46814fe75), UINT64_C(0xd1f09c7c5821770c), + UINT64_C(0x5e980d24087fec87), UINT64_C(0x24407dec384a65fe), + UINT64_C(0x6b1009c7f05548fa), UINT64_C(0x11c8790fc060c183), + UINT64_C(0x9ea0e857903e5a08), UINT64_C(0xe478989fa00bd371), + UINT64_C(0x7d08ff3b88be6f81), UINT64_C(0x07d08ff3b88be6f8), + UINT64_C(0x88b81eabe8d57d73), UINT64_C(0xf2606e63d8e0f40a), + UINT64_C(0xbd301a4810ffd90e), UINT64_C(0xc7e86a8020ca5077), + UINT64_C(0x4880fbd87094cbfc), UINT64_C(0x32588b1040a14285), + UINT64_C(0xd620138fe0aa91f4), UINT64_C(0xacf86347d09f188d), + UINT64_C(0x2390f21f80c18306), UINT64_C(0x594882d7b0f40a7f), + UINT64_C(0x1618f6fc78eb277b), UINT64_C(0x6cc0863448deae02), + UINT64_C(0xe3a8176c18803589), UINT64_C(0x997067a428b5bcf0), + UINT64_C(0xfa11fe77117cdf02), UINT64_C(0x80c98ebf2149567b), + UINT64_C(0x0fa11fe77117cdf0), UINT64_C(0x75796f2f41224489), + UINT64_C(0x3a291b04893d698d), UINT64_C(0x40f16bccb908e0f4), + UINT64_C(0xcf99fa94e9567b7f), UINT64_C(0xb5418a5cd963f206), + UINT64_C(0x513912c379682177), UINT64_C(0x2be1620b495da80e), + UINT64_C(0xa489f35319033385), UINT64_C(0xde51839b2936bafc), + UINT64_C(0x9101f7b0e12997f8), UINT64_C(0xebd98778d11c1e81), + UINT64_C(0x64b116208142850a), UINT64_C(0x1e6966e8b1770c73), + UINT64_C(0x8719014c99c2b083), UINT64_C(0xfdc17184a9f739fa), + UINT64_C(0x72a9e0dcf9a9a271), UINT64_C(0x08719014c99c2b08), + UINT64_C(0x4721e43f0183060c), UINT64_C(0x3df994f731b68f75), + UINT64_C(0xb29105af61e814fe), UINT64_C(0xc849756751dd9d87), + UINT64_C(0x2c31edf8f1d64ef6), UINT64_C(0x56e99d30c1e3c78f), + UINT64_C(0xd9810c6891bd5c04), UINT64_C(0xa3597ca0a188d57d), + UINT64_C(0xec09088b6997f879), UINT64_C(0x96d1784359a27100), + UINT64_C(0x19b9e91b09fcea8b), UINT64_C(0x636199d339c963f2), + UINT64_C(0xdf7adabd7a6e2d6f), UINT64_C(0xa5a2aa754a5ba416), + UINT64_C(0x2aca3b2d1a053f9d), UINT64_C(0x50124be52a30b6e4), + UINT64_C(0x1f423fcee22f9be0), UINT64_C(0x659a4f06d21a1299), + UINT64_C(0xeaf2de5e82448912), UINT64_C(0x902aae96b271006b), + UINT64_C(0x74523609127ad31a), UINT64_C(0x0e8a46c1224f5a63), + UINT64_C(0x81e2d7997211c1e8), UINT64_C(0xfb3aa75142244891), + UINT64_C(0xb46ad37a8a3b6595), UINT64_C(0xceb2a3b2ba0eecec), + UINT64_C(0x41da32eaea507767), UINT64_C(0x3b024222da65fe1e), + UINT64_C(0xa2722586f2d042ee), UINT64_C(0xd8aa554ec2e5cb97), + UINT64_C(0x57c2c41692bb501c), UINT64_C(0x2d1ab4dea28ed965), + UINT64_C(0x624ac0f56a91f461), UINT64_C(0x1892b03d5aa47d18), + UINT64_C(0x97fa21650afae693), UINT64_C(0xed2251ad3acf6fea), + UINT64_C(0x095ac9329ac4bc9b), UINT64_C(0x7382b9faaaf135e2), + UINT64_C(0xfcea28a2faafae69), UINT64_C(0x8632586aca9a2710), + UINT64_C(0xc9622c4102850a14), UINT64_C(0xb3ba5c8932b0836d), + UINT64_C(0x3cd2cdd162ee18e6), UINT64_C(0x460abd1952db919f), + UINT64_C(0x256b24ca6b12f26d), UINT64_C(0x5fb354025b277b14), + UINT64_C(0xd0dbc55a0b79e09f), UINT64_C(0xaa03b5923b4c69e6), + UINT64_C(0xe553c1b9f35344e2), UINT64_C(0x9f8bb171c366cd9b), + UINT64_C(0x10e3202993385610), UINT64_C(0x6a3b50e1a30ddf69), + UINT64_C(0x8e43c87e03060c18), UINT64_C(0xf49bb8b633338561), + UINT64_C(0x7bf329ee636d1eea), UINT64_C(0x012b592653589793), + UINT64_C(0x4e7b2d0d9b47ba97), UINT64_C(0x34a35dc5ab7233ee), + UINT64_C(0xbbcbcc9dfb2ca865), UINT64_C(0xc113bc55cb19211c), + UINT64_C(0x5863dbf1e3ac9dec), UINT64_C(0x22bbab39d3991495), + UINT64_C(0xadd33a6183c78f1e), UINT64_C(0xd70b4aa9b3f20667), + UINT64_C(0x985b3e827bed2b63), UINT64_C(0xe2834e4a4bd8a21a), + UINT64_C(0x6debdf121b863991), UINT64_C(0x1733afda2bb3b0e8), + UINT64_C(0xf34b37458bb86399), UINT64_C(0x8993478dbb8deae0), + UINT64_C(0x06fbd6d5ebd3716b), UINT64_C(0x7c23a61ddbe6f812), + UINT64_C(0x3373d23613f9d516), UINT64_C(0x49aba2fe23cc5c6f), + UINT64_C(0xc6c333a67392c7e4), UINT64_C(0xbc1b436e43a74e9d), + UINT64_C(0x95ac9329ac4bc9b5), UINT64_C(0xef74e3e19c7e40cc), + UINT64_C(0x601c72b9cc20db47), UINT64_C(0x1ac40271fc15523e), + UINT64_C(0x5594765a340a7f3a), UINT64_C(0x2f4c0692043ff643), + UINT64_C(0xa02497ca54616dc8), UINT64_C(0xdafce7026454e4b1), + UINT64_C(0x3e847f9dc45f37c0), UINT64_C(0x445c0f55f46abeb9), + UINT64_C(0xcb349e0da4342532), UINT64_C(0xb1eceec59401ac4b), + UINT64_C(0xfebc9aee5c1e814f), UINT64_C(0x8464ea266c2b0836), + UINT64_C(0x0b0c7b7e3c7593bd), UINT64_C(0x71d40bb60c401ac4), + UINT64_C(0xe8a46c1224f5a634), UINT64_C(0x927c1cda14c02f4d), + UINT64_C(0x1d148d82449eb4c6), UINT64_C(0x67ccfd4a74ab3dbf), + UINT64_C(0x289c8961bcb410bb), UINT64_C(0x5244f9a98c8199c2), + UINT64_C(0xdd2c68f1dcdf0249), UINT64_C(0xa7f41839ecea8b30), + UINT64_C(0x438c80a64ce15841), UINT64_C(0x3954f06e7cd4d138), + UINT64_C(0xb63c61362c8a4ab3), UINT64_C(0xcce411fe1cbfc3ca), + UINT64_C(0x83b465d5d4a0eece), UINT64_C(0xf96c151de49567b7), + UINT64_C(0x76048445b4cbfc3c), UINT64_C(0x0cdcf48d84fe7545), + UINT64_C(0x6fbd6d5ebd3716b7), UINT64_C(0x15651d968d029fce), + UINT64_C(0x9a0d8ccedd5c0445), UINT64_C(0xe0d5fc06ed698d3c), + UINT64_C(0xaf85882d2576a038), UINT64_C(0xd55df8e515432941), + UINT64_C(0x5a3569bd451db2ca), UINT64_C(0x20ed197575283bb3), + UINT64_C(0xc49581ead523e8c2), UINT64_C(0xbe4df122e51661bb), + UINT64_C(0x3125607ab548fa30), UINT64_C(0x4bfd10b2857d7349), + UINT64_C(0x04ad64994d625e4d), UINT64_C(0x7e7514517d57d734), + UINT64_C(0xf11d85092d094cbf), UINT64_C(0x8bc5f5c11d3cc5c6), + UINT64_C(0x12b5926535897936), UINT64_C(0x686de2ad05bcf04f), + UINT64_C(0xe70573f555e26bc4), UINT64_C(0x9ddd033d65d7e2bd), + UINT64_C(0xd28d7716adc8cfb9), UINT64_C(0xa85507de9dfd46c0), + UINT64_C(0x273d9686cda3dd4b), UINT64_C(0x5de5e64efd965432), + UINT64_C(0xb99d7ed15d9d8743), UINT64_C(0xc3450e196da80e3a), + UINT64_C(0x4c2d9f413df695b1), UINT64_C(0x36f5ef890dc31cc8), + UINT64_C(0x79a59ba2c5dc31cc), UINT64_C(0x037deb6af5e9b8b5), + UINT64_C(0x8c157a32a5b7233e), UINT64_C(0xf6cd0afa9582aa47), + UINT64_C(0x4ad64994d625e4da), UINT64_C(0x300e395ce6106da3), + UINT64_C(0xbf66a804b64ef628), UINT64_C(0xc5bed8cc867b7f51), + UINT64_C(0x8aeeace74e645255), UINT64_C(0xf036dc2f7e51db2c), + UINT64_C(0x7f5e4d772e0f40a7), UINT64_C(0x05863dbf1e3ac9de), + UINT64_C(0xe1fea520be311aaf), UINT64_C(0x9b26d5e88e0493d6), + UINT64_C(0x144e44b0de5a085d), UINT64_C(0x6e963478ee6f8124), + UINT64_C(0x21c640532670ac20), UINT64_C(0x5b1e309b16452559), + UINT64_C(0xd476a1c3461bbed2), UINT64_C(0xaeaed10b762e37ab), + UINT64_C(0x37deb6af5e9b8b5b), UINT64_C(0x4d06c6676eae0222), + UINT64_C(0xc26e573f3ef099a9), UINT64_C(0xb8b627f70ec510d0), + UINT64_C(0xf7e653dcc6da3dd4), UINT64_C(0x8d3e2314f6efb4ad), + UINT64_C(0x0256b24ca6b12f26), UINT64_C(0x788ec2849684a65f), + UINT64_C(0x9cf65a1b368f752e), UINT64_C(0xe62e2ad306bafc57), + UINT64_C(0x6946bb8b56e467dc), UINT64_C(0x139ecb4366d1eea5), + UINT64_C(0x5ccebf68aecec3a1), UINT64_C(0x2616cfa09efb4ad8), + UINT64_C(0xa97e5ef8cea5d153), UINT64_C(0xd3a62e30fe90582a), + UINT64_C(0xb0c7b7e3c7593bd8), UINT64_C(0xca1fc72bf76cb2a1), + UINT64_C(0x45775673a732292a), UINT64_C(0x3faf26bb9707a053), + UINT64_C(0x70ff52905f188d57), UINT64_C(0x0a2722586f2d042e), + UINT64_C(0x854fb3003f739fa5), UINT64_C(0xff97c3c80f4616dc), + UINT64_C(0x1bef5b57af4dc5ad), UINT64_C(0x61372b9f9f784cd4), + UINT64_C(0xee5fbac7cf26d75f), UINT64_C(0x9487ca0fff135e26), + UINT64_C(0xdbd7be24370c7322), UINT64_C(0xa10fceec0739fa5b), + UINT64_C(0x2e675fb4576761d0), UINT64_C(0x54bf2f7c6752e8a9), + UINT64_C(0xcdcf48d84fe75459), UINT64_C(0xb71738107fd2dd20), + UINT64_C(0x387fa9482f8c46ab), UINT64_C(0x42a7d9801fb9cfd2), + UINT64_C(0x0df7adabd7a6e2d6), UINT64_C(0x772fdd63e7936baf), + UINT64_C(0xf8474c3bb7cdf024), UINT64_C(0x829f3cf387f8795d), + UINT64_C(0x66e7a46c27f3aa2c), UINT64_C(0x1c3fd4a417c62355), + UINT64_C(0x935745fc4798b8de), UINT64_C(0xe98f353477ad31a7), + UINT64_C(0xa6df411fbfb21ca3), UINT64_C(0xdc0731d78f8795da), + UINT64_C(0x536fa08fdfd90e51), UINT64_C(0x29b7d047efec8728), +}; + bool DirectoryTool::IsDir(const std::string dirpath) { return fs::is_directory(dirpath); }; @@ -105,3 +275,13 @@ DirectoryTool::DirectoryTool() {} * */ DirectoryTool::~DirectoryTool() {} + +uint64_t HashUtil::CRC64(uint64_t crc, const char* s, uint64_t l) { + uint64_t j; + + for (j = 0; j < l; j++) { + uint8_t byte = s[j]; + crc = crc64_tab[(uint8_t)crc ^ byte] ^ (crc >> 8); + } + return crc; +} diff --git a/src/util.h b/src/util.h index d60968ce..396d697a 100644 --- a/src/util.h +++ b/src/util.h @@ -144,7 +144,9 @@ class StringUtil { } return elems; } +}; - StringUtil() = delete; - ~StringUtil() = delete; +class HashUtil { + public: + static uint64_t CRC64(uint64_t crc, const char* s, uint64_t l); }; diff --git a/utils/run-vdb-tests.sh b/utils/run-vdb-tests.sh index adf5776c..8e151198 100755 --- a/utils/run-vdb-tests.sh +++ b/utils/run-vdb-tests.sh @@ -2,5 +2,16 @@ set -xe redis-cli -h 172.18.0.6 -p 12306 info -redis-cli -h 172.18.0.6 -p 12306 set a testvalue +redis-cli -h 172.18.0.6 -p 12306 set a h +redis-cli -h 172.18.0.6 -p 12306 set b e +redis-cli -h 172.18.0.6 -p 12306 set c l +redis-cli -h 172.18.0.6 -p 12306 set d l +redis-cli -h 172.18.0.6 -p 12306 set e o + +sleep 1 # test mode raft interval is 1s + redis-cli -h 172.18.0.6 -p 12306 get a +redis-cli -h 172.18.0.6 -p 12306 get b +redis-cli -h 172.18.0.6 -p 12306 get c +redis-cli -h 172.18.0.6 -p 12306 get d +redis-cli -h 172.18.0.6 -p 12306 get e