diff --git a/CMakeLists.txt b/CMakeLists.txt index 2d212974..f7f854ef 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -167,19 +167,20 @@ list(APPEND eraftvdb_sources src/eraftkv.pb.cc) list(APPEND eraftvdb_sources src/info_command_handler.cc) list(APPEND eraftvdb_sources src/set_command_handler.cc) list(APPEND eraftvdb_sources src/get_command_handler.cc) +list(APPEND eraftvdb_sources src/del_command_handler.cc) list(APPEND eraftvdb_sources src/shardgroup_command_handler.cc) list(APPEND eraftvdb_sources src/unknow_command_handler.cc) list(APPEND eraftvdb_sources src/util.cc) -add_executable(eraft-vdb ${eraftvdb_sources}) -target_link_libraries(eraft-vdb +add_executable(eraft-kdb ${eraftvdb_sources}) +target_link_libraries(eraft-kdb pthread rocksdb gRPC::grpc++ ${Protobuf_LIBRARY} stdc++fs ) -target_include_directories(eraft-vdb PUBLIC ${eraftkv_INCLUDE_DIR}) +target_include_directories(eraft-kdb PUBLIC ${eraftkv_INCLUDE_DIR}) # build eraftkv set(eraftkv_sources) diff --git a/Makefile b/Makefile index a4331b32..2900e9cc 100644 --- a/Makefile +++ b/Makefile @@ -61,7 +61,7 @@ run-demo: docker run --name metaserver-node2 --network mytestnetwork --ip 172.18.0.3 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 1 /tmp/meta_db1 /tmp/log_db1 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 docker run --name metaserver-node3 --network mytestnetwork --ip 172.18.0.4 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 2 /tmp/meta_db2 /tmp/log_db2 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 sleep 16 - docker run --name vdbserver-node --network mytestnetwork --ip 172.18.0.6 -it --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraft-vdb 172.18.0.6:12306 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 + docker run --name vdbserver-node --network mytestnetwork --ip 172.18.0.6 -it --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraft-kdb 172.18.0.6:12306 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 stop-demo: docker stop kvserver-node1 kvserver-node2 kvserver-node3 vdbserver-node metaserver-node1 metaserver-node2 metaserver-node3 @@ -69,9 +69,13 @@ stop-demo: run-demo-bench: docker run --name kvserver-bench --network mytestnetwork --ip 172.18.0.5 --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv-ctl 172.18.0.2:8088 bench 64 64 10 -run-vdb-tests: - chmod +x utils/run-vdb-tests.sh - docker run --name vdbserver-node-tests --network mytestnetwork --ip 172.18.0.9 -it --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/utils/run-vdb-tests.sh +init-kdb-meta: + chmod +x utils/init-kdb-meta.sh + docker run --name vdbserver-node-tests --network mytestnetwork --ip 172.18.0.9 -it --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/utils/init-kdb-meta.sh + +run-kdb-tests: + chmod +x utils/run-kdb-tests.sh + docker run --name vdbserver-node-tests --network mytestnetwork --ip 172.18.0.9 -it --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/utils/run-kdb-tests.sh run-metaserver-tests: docker run --name metaserver-node1 --network mytestnetwork --ip 172.18.0.2 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 0 /tmp/meta_db0 /tmp/log_db0 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 diff --git a/README.md b/README.md index c53fa5d9..00790cbe 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,16 @@ ERaftKDB is a distributed database that supports the Redis RESP protocol, and us ![eraft-kdb](doc/eraft-kdb.png) +## Redis Command Support Plan + +| Command | Status | +| -------- | ------- | +| get | [x] | +| set | [x] | +| del | [x] | +| scan | [ ] | + + ## ERaftKV ERaftKV is a persistent distributed KV storage system, uses the Raft protocol to ensure data consistency, At the same time, it supports sharding to form multi shard large-scale data storage clusters. @@ -65,26 +75,43 @@ docker run --name metaserver-node2 --network mytestnetwork --ip 172.18.0.3 -d -- docker run --name metaserver-node3 --network mytestnetwork --ip 172.18.0.4 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 2 /tmp/meta_db2 /tmp/log_db2 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 10269f84d95e9f82f75d3c60f0d7b0dc0efe5efe643366e615b7644fb8851f04 sleep 16 -docker run --name vdbserver-node --network mytestnetwork --ip 172.18.0.6 -it --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraft-vdb 172.18.0.6:12306 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 +docker run --name vdbserver-node --network mytestnetwork --ip 172.18.0.6 -it --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraft-kdb 172.18.0.6:12306 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 run server success! ``` -- step 3, run eraft vdb tests +- step 3, run eraft kdb tests ``` -sudo make run-vdb-tests +sudo make init-kdb-meta +sudo make run-kdb-tests ``` command output -``` -chmod +x utils/run-vdb-tests.sh -docker run --name vdbserver-node-tests --network mytestnetwork --ip 172.18.0.9 -it --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/utils/run-vdb-tests.sh - -... (config change log) +``` +chmod +x utils/run-kdb-tests.sh +docker run --name vdbserver-node-tests --network mytestnetwork --ip 172.18.0.9 -it --rm -v /Users/colin/Documents/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/utils/run-kdb-tests.sh ++ redis-cli -h 172.18.0.6 -p 12306 shardgroup query +1) "shardgroup" +2) "1" +3) "servers" +4) "0" +5) "172.18.0.10:8088" +6) "1" +7) "172.18.0.11:8089" +8) "2" +9) "172.18.0.12:8090" ++ redis-cli -h 172.18.0.6 -p 12306 shardgroup join 1 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 +OK ++ redis-cli -h 172.18.0.6 -p 12306 shardgroup move 1 0-1023 +OK ++ sleep 1 + redis-cli -h 172.18.0.6 -p 12306 info -server_id: 0,server_address: 172.18.0.10:8088,status: Running,Role: Leader -server_id: 1,server_address: 172.18.0.11:8089,status: Running,Role: Follower -server_id: 2,server_address: 172.18.0.12:8090,status: Running,Role: Follower +meta server: +server_id: 0,server_address: 172.18.0.2:8088,status: Running,Role: Leader +meta server: +server_id: 1,server_address: 172.18.0.3:8089,status: Running,Role: Follower +meta server: +server_id: 2,server_address: 172.18.0.4:8090,status: Running,Role: Follower + redis-cli -h 172.18.0.6 -p 12306 set a h OK + redis-cli -h 172.18.0.6 -p 12306 set b e @@ -106,6 +133,8 @@ OK "l" + redis-cli -h 172.18.0.6 -p 12306 get e "o" ++ redis-cli -h 172.18.0.6 -p 12306 get nil_test +(nil) ``` - step 4, clean all diff --git a/doc/eraft-kdb.md b/doc/eraft-kdb.md index 32b71c3b..b3ad34ad 100644 --- a/doc/eraft-kdb.md +++ b/doc/eraft-kdb.md @@ -93,7 +93,7 @@ shard_group { [==========] 1 test from 1 test suite ran. (4028 ms total) [ PASSED ] 1 test. sleep 2 -docker run --name vdbserver-node --network mytestnetwork --ip 172.18.0.6 -it --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraft-vdb 172.18.0.6:12306 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 +docker run --name vdbserver-node --network mytestnetwork --ip 172.18.0.6 -it --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraft-kdb 172.18.0.6:12306 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 run server success! ``` diff --git a/protocol/eraftkv.proto b/protocol/eraftkv.proto index dfe5e929..5411257e 100644 --- a/protocol/eraftkv.proto +++ b/protocol/eraftkv.proto @@ -143,6 +143,8 @@ message ClusterConfigChangeResp { bool success = 1; repeated ShardGroup shard_group = 2; int64 config_version = 3; + ErrorCode error_code = 4; + int64 leader_addr = 5; } enum ClientOpType { diff --git a/src/client.cc b/src/client.cc index 389a6cfd..243b98b1 100644 --- a/src/client.cc +++ b/src/client.cc @@ -33,6 +33,8 @@ PacketLength Client::_HandlePacket(const char *start, std::size_t bytes) { handler = new SetCommandHandler(); } else if (parser_.GetParams()[0] == "get") { handler = new GetCommandHandler(); + } else if (parser_.GetParams()[0] == "del") { + handler = new DelCommandHandler(); } else if (parser_.GetParams()[0] == "shardgroup") { handler = new ShardGroupCommandHandler(); } else { diff --git a/src/client.h b/src/client.h index 0039f87a..62308546 100644 --- a/src/client.h +++ b/src/client.h @@ -31,6 +31,7 @@ class Client : public StreamSocket { friend class InfoCommandHandler; friend class SetCommandHandler; friend class GetCommandHandler; + friend class DelCommandHandler; friend class UnKnowCommandHandler; friend class ShardGroupCommandHandler; diff --git a/src/command_handler.h b/src/command_handler.h index 0eec9b06..65c441cc 100644 --- a/src/command_handler.h +++ b/src/command_handler.h @@ -50,6 +50,14 @@ class GetCommandHandler : public CommandHandler { ~GetCommandHandler(); }; +class DelCommandHandler : public CommandHandler { + public: + EStatus Execute(const std::vector& params, Client* cli); + + DelCommandHandler(); + ~DelCommandHandler(); +}; + class UnKnowCommandHandler : public CommandHandler { public: EStatus Execute(const std::vector& params, Client* cli); diff --git a/src/del_command_handler.cc b/src/del_command_handler.cc new file mode 100644 index 00000000..3ff6234f --- /dev/null +++ b/src/del_command_handler.cc @@ -0,0 +1,48 @@ +/** + * @file del_command_handler.cc + * @author jay_jieliu@outlook.com + * @brief + * @version 0.1 + * @date 2023-07-28 + * + * @copyright Copyright (c) 2023 + * + */ + +#include "command_handler.h" +#include "key_encode.h" + +EStatus DelCommandHandler::Execute(const std::vector& params, + Client* cli) { + std::string leader_addr; + uint16_t slot; + leader_addr = cli->GetShardLeaderAddrAndSlot(params[1], &slot); + TraceLog("DEBUG: send del request to leader ", leader_addr); + ClientContext op_context; + eraftkv::ClientOperationReq op_req; + eraftkv::ClientOperationResp op_resp; + auto kv_pair_ = op_req.add_kvs(); + std::string encode_key = EncodeStringKey(slot, params[1]); + kv_pair_->set_key(encode_key); + kv_pair_->set_op_type(eraftkv::ClientOpType::Del); + std::string reply_buf; + if (cli->kv_stubs_[leader_addr] != nullptr) { + auto status_ = cli->kv_stubs_[leader_addr]->ProcessRWOperation( + &op_context, op_req, &op_resp); + if (status_.ok()) { + reply_buf += "+OK\r\n"; + } else { + reply_buf += "-ERR Server error\r\n"; + } + } else { + reply_buf += "-ERR Server error\r\n"; + } + cli->reply_.PushData(reply_buf.c_str(), reply_buf.size()); + cli->SendPacket(cli->reply_); + cli->_Reset(); + return EStatus::kOk; +} + +DelCommandHandler::DelCommandHandler() {} + +DelCommandHandler::~DelCommandHandler() {} diff --git a/src/eraft_vdb_server.h b/src/eraft_vdb_server.h index 8bf47ef5..81f7572e 100644 --- a/src/eraft_vdb_server.h +++ b/src/eraft_vdb_server.h @@ -12,8 +12,6 @@ #include "server.h" -#define KV_SERVER_ADDRS "127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090" - class ERaftVdbServer : public Server { public: ERaftVdbServer(std::string addr, std::string kv_svr_addrs); diff --git a/src/eraftkv.pb.cc b/src/eraftkv.pb.cc index d7432a75..4fc49722 100644 --- a/src/eraftkv.pb.cc +++ b/src/eraftkv.pb.cc @@ -454,6 +454,8 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_eraftkv_2eproto::offsets[] PRO PROTOBUF_FIELD_OFFSET(::eraftkv::ClusterConfigChangeResp, success_), PROTOBUF_FIELD_OFFSET(::eraftkv::ClusterConfigChangeResp, shard_group_), PROTOBUF_FIELD_OFFSET(::eraftkv::ClusterConfigChangeResp, config_version_), + PROTOBUF_FIELD_OFFSET(::eraftkv::ClusterConfigChangeResp, error_code_), + PROTOBUF_FIELD_OFFSET(::eraftkv::ClusterConfigChangeResp, leader_addr_), ~0u, // no _has_bits_ PROTOBUF_FIELD_OFFSET(::eraftkv::KvOpPair, _internal_metadata_), ~0u, // no _extensions_ @@ -494,9 +496,9 @@ static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOB { 100, -1, sizeof(::eraftkv::ShardGroup)}, { 109, -1, sizeof(::eraftkv::ClusterConfigChangeReq)}, { 122, -1, sizeof(::eraftkv::ClusterConfigChangeResp)}, - { 130, -1, sizeof(::eraftkv::KvOpPair)}, - { 140, -1, sizeof(::eraftkv::ClientOperationReq)}, - { 147, -1, sizeof(::eraftkv::ClientOperationResp)}, + { 132, -1, sizeof(::eraftkv::KvOpPair)}, + { 142, -1, sizeof(::eraftkv::ClientOperationReq)}, + { 149, -1, sizeof(::eraftkv::ClientOperationResp)}, }; static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] = { @@ -559,40 +561,41 @@ const char descriptor_table_protodef_eraftkv_2eproto[] PROTOBUF_SECTION_VARIABLE "_id\030\003 \001(\003\022\037\n\006server\030\004 \001(\0132\017.eraftkv.Serv" "er\022\026\n\016config_version\030\005 \001(\003\022\020\n\010op_count\030\006" " \001(\003\022\022\n\ncommand_id\030\007 \001(\003\022(\n\013shard_group\030" - "\010 \001(\0132\023.eraftkv.ShardGroup\"l\n\027ClusterCon" - "figChangeResp\022\017\n\007success\030\001 \001(\010\022(\n\013shard_" - "group\030\002 \003(\0132\023.eraftkv.ShardGroup\022\026\n\016conf" - "ig_version\030\003 \001(\003\"q\n\010KvOpPair\022&\n\007op_type\030" - "\001 \001(\0162\025.eraftkv.ClientOpType\022\013\n\003key\030\002 \001(" - "\014\022\r\n\005value\030\003 \001(\014\022\017\n\007success\030\004 \001(\010\022\020\n\010op_" - "count\030\005 \001(\003\"J\n\022ClientOperationReq\022\024\n\014op_" - "timestamp\030\001 \001(\004\022\036\n\003kvs\030\002 \003(\0132\021.eraftkv.K" - "vOpPair\"r\n\023ClientOperationResp\022\036\n\003ops\030\001 " - "\003(\0132\021.eraftkv.KvOpPair\022&\n\nerror_code\030\002 \001" - "(\0162\022.eraftkv.ErrorCode\022\023\n\013leader_addr\030\003 " - "\001(\003*O\n\tErrorCode\022\033\n\027REQUEST_NOT_LEADER_N" - "ODE\020\000\022\020\n\014NODE_IS_DOWN\020\001\022\023\n\017REQUEST_TIMEO" - "UT\020\002*1\n\tEntryType\022\n\n\006Normal\020\000\022\016\n\nConfCha" - "nge\020\001\022\010\n\004NoOp\020\002*A\n\nSlotStatus\022\013\n\007Running" - "\020\000\022\r\n\tMigrating\020\001\022\r\n\tImporting\020\002\022\010\n\004Init" - "\020\003* \n\014ServerStatus\022\006\n\002Up\020\000\022\010\n\004Down\020\001*\216\001\n" - "\nChangeType\022\017\n\013ClusterInit\020\000\022\r\n\tShardJoi" - "n\020\001\022\016\n\nShardLeave\020\002\022\017\n\013ShardsQuery\020\003\022\014\n\010" - "SlotMove\020\004\022\016\n\nServerJoin\020\005\022\017\n\013ServerLeav" - "e\020\006\022\020\n\014MembersQuery\020\007*2\n\020HandleServerTyp" - "e\022\016\n\nMetaServer\020\000\022\016\n\nDataServer\020\001*=\n\014Cli" - "entOpType\022\010\n\004Noop\020\000\022\007\n\003Put\020\001\022\007\n\003Get\020\002\022\007\n" - "\003Del\020\003\022\010\n\004Scan\020\0042\367\002\n\007ERaftKv\022@\n\013RequestV" - "ote\022\027.eraftkv.RequestVoteReq\032\030.eraftkv.R" - "equestVoteResp\022F\n\rAppendEntries\022\031.eraftk" - "v.AppendEntriesReq\032\032.eraftkv.AppendEntri" - "esResp\0227\n\010Snapshot\022\024.eraftkv.SnapshotReq" - "\032\025.eraftkv.SnapshotResp\022O\n\022ProcessRWOper" - "ation\022\033.eraftkv.ClientOperationReq\032\034.era" - "ftkv.ClientOperationResp\022X\n\023ClusterConfi" - "gChange\022\037.eraftkv.ClusterConfigChangeReq" - "\032 .eraftkv.ClusterConfigChangeRespb\006prot" - "o3" + "\010 \001(\0132\023.eraftkv.ShardGroup\"\251\001\n\027ClusterCo" + "nfigChangeResp\022\017\n\007success\030\001 \001(\010\022(\n\013shard" + "_group\030\002 \003(\0132\023.eraftkv.ShardGroup\022\026\n\016con" + "fig_version\030\003 \001(\003\022&\n\nerror_code\030\004 \001(\0162\022." + "eraftkv.ErrorCode\022\023\n\013leader_addr\030\005 \001(\003\"q" + "\n\010KvOpPair\022&\n\007op_type\030\001 \001(\0162\025.eraftkv.Cl" + "ientOpType\022\013\n\003key\030\002 \001(\t\022\r\n\005value\030\003 \001(\t\022\017" + "\n\007success\030\004 \001(\010\022\020\n\010op_count\030\005 \001(\003\"J\n\022Cli" + "entOperationReq\022\024\n\014op_timestamp\030\001 \001(\004\022\036\n" + "\003kvs\030\002 \003(\0132\021.eraftkv.KvOpPair\"r\n\023ClientO" + "perationResp\022\036\n\003ops\030\001 \003(\0132\021.eraftkv.KvOp" + "Pair\022&\n\nerror_code\030\002 \001(\0162\022.eraftkv.Error" + "Code\022\023\n\013leader_addr\030\003 \001(\003*O\n\tErrorCode\022\033" + "\n\027REQUEST_NOT_LEADER_NODE\020\000\022\020\n\014NODE_IS_D" + "OWN\020\001\022\023\n\017REQUEST_TIMEOUT\020\002*1\n\tEntryType\022" + "\n\n\006Normal\020\000\022\016\n\nConfChange\020\001\022\010\n\004NoOp\020\002*A\n" + "\nSlotStatus\022\013\n\007Running\020\000\022\r\n\tMigrating\020\001\022" + "\r\n\tImporting\020\002\022\010\n\004Init\020\003* \n\014ServerStatus" + "\022\006\n\002Up\020\000\022\010\n\004Down\020\001*\216\001\n\nChangeType\022\017\n\013Clu" + "sterInit\020\000\022\r\n\tShardJoin\020\001\022\016\n\nShardLeave\020" + "\002\022\017\n\013ShardsQuery\020\003\022\014\n\010SlotMove\020\004\022\016\n\nServ" + "erJoin\020\005\022\017\n\013ServerLeave\020\006\022\020\n\014MembersQuer" + "y\020\007*2\n\020HandleServerType\022\016\n\nMetaServer\020\000\022" + "\016\n\nDataServer\020\001*=\n\014ClientOpType\022\010\n\004Noop\020" + "\000\022\007\n\003Put\020\001\022\007\n\003Get\020\002\022\007\n\003Del\020\003\022\010\n\004Scan\020\0042\367" + "\002\n\007ERaftKv\022@\n\013RequestVote\022\027.eraftkv.Requ" + "estVoteReq\032\030.eraftkv.RequestVoteResp\022F\n\r" + "AppendEntries\022\031.eraftkv.AppendEntriesReq" + "\032\032.eraftkv.AppendEntriesResp\0227\n\010Snapshot" + "\022\024.eraftkv.SnapshotReq\032\025.eraftkv.Snapsho" + "tResp\022O\n\022ProcessRWOperation\022\033.eraftkv.Cl" + "ientOperationReq\032\034.eraftkv.ClientOperati" + "onResp\022X\n\023ClusterConfigChange\022\037.eraftkv." + "ClusterConfigChangeReq\032 .eraftkv.Cluster" + "ConfigChangeRespb\006proto3" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_eraftkv_2eproto_deps[1] = { }; @@ -617,7 +620,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_era static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_eraftkv_2eproto_once; static bool descriptor_table_eraftkv_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_eraftkv_2eproto = { - &descriptor_table_eraftkv_2eproto_initialized, descriptor_table_protodef_eraftkv_2eproto, "eraftkv.proto", 2922, + &descriptor_table_eraftkv_2eproto_initialized, descriptor_table_protodef_eraftkv_2eproto, "eraftkv.proto", 2984, &descriptor_table_eraftkv_2eproto_once, descriptor_table_eraftkv_2eproto_sccs, descriptor_table_eraftkv_2eproto_deps, 16, 0, schemas, file_default_instances, TableStruct_eraftkv_2eproto::offsets, file_level_metadata_eraftkv_2eproto, 16, file_level_enum_descriptors_eraftkv_2eproto, file_level_service_descriptors_eraftkv_2eproto, @@ -4416,17 +4419,17 @@ ClusterConfigChangeResp::ClusterConfigChangeResp(const ClusterConfigChangeResp& _internal_metadata_(nullptr), shard_group_(from.shard_group_) { _internal_metadata_.MergeFrom(from._internal_metadata_); - ::memcpy(&config_version_, &from.config_version_, - static_cast(reinterpret_cast(&success_) - - reinterpret_cast(&config_version_)) + sizeof(success_)); + ::memcpy(&success_, &from.success_, + static_cast(reinterpret_cast(&leader_addr_) - + reinterpret_cast(&success_)) + sizeof(leader_addr_)); // @@protoc_insertion_point(copy_constructor:eraftkv.ClusterConfigChangeResp) } void ClusterConfigChangeResp::SharedCtor() { ::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&scc_info_ClusterConfigChangeResp_eraftkv_2eproto.base); - ::memset(&config_version_, 0, static_cast( - reinterpret_cast(&success_) - - reinterpret_cast(&config_version_)) + sizeof(success_)); + ::memset(&success_, 0, static_cast( + reinterpret_cast(&leader_addr_) - + reinterpret_cast(&success_)) + sizeof(leader_addr_)); } ClusterConfigChangeResp::~ClusterConfigChangeResp() { @@ -4453,9 +4456,9 @@ void ClusterConfigChangeResp::Clear() { (void) cached_has_bits; shard_group_.Clear(); - ::memset(&config_version_, 0, static_cast( - reinterpret_cast(&success_) - - reinterpret_cast(&config_version_)) + sizeof(success_)); + ::memset(&success_, 0, static_cast( + reinterpret_cast(&leader_addr_) - + reinterpret_cast(&success_)) + sizeof(leader_addr_)); _internal_metadata_.Clear(); } @@ -4492,6 +4495,21 @@ const char* ClusterConfigChangeResp::_InternalParse(const char* ptr, ::PROTOBUF_ CHK_(ptr); } else goto handle_unusual; continue; + // .eraftkv.ErrorCode error_code = 4; + case 4: + if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 32)) { + ::PROTOBUF_NAMESPACE_ID::uint64 val = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint(&ptr); + CHK_(ptr); + _internal_set_error_code(static_cast<::eraftkv::ErrorCode>(val)); + } else goto handle_unusual; + continue; + // int64 leader_addr = 5; + case 5: + if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 40)) { + leader_addr_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint(&ptr); + CHK_(ptr); + } else goto handle_unusual; + continue; default: { handle_unusual: if ((tag & 7) == 4 || tag == 0) { @@ -4538,6 +4556,19 @@ ::PROTOBUF_NAMESPACE_ID::uint8* ClusterConfigChangeResp::_InternalSerialize( target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64ToArray(3, this->_internal_config_version(), target); } + // .eraftkv.ErrorCode error_code = 4; + if (this->error_code() != 0) { + target = stream->EnsureSpace(target); + target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteEnumToArray( + 4, this->_internal_error_code(), target); + } + + // int64 leader_addr = 5; + if (this->leader_addr() != 0) { + target = stream->EnsureSpace(target); + target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64ToArray(5, this->_internal_leader_addr(), target); + } + if (PROTOBUF_PREDICT_FALSE(_internal_metadata_.have_unknown_fields())) { target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::InternalSerializeUnknownFieldsToArray( _internal_metadata_.unknown_fields(), target, stream); @@ -4561,6 +4592,17 @@ size_t ClusterConfigChangeResp::ByteSizeLong() const { ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::MessageSize(msg); } + // bool success = 1; + if (this->success() != 0) { + total_size += 1 + 1; + } + + // .eraftkv.ErrorCode error_code = 4; + if (this->error_code() != 0) { + total_size += 1 + + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::EnumSize(this->_internal_error_code()); + } + // int64 config_version = 3; if (this->config_version() != 0) { total_size += 1 + @@ -4568,9 +4610,11 @@ size_t ClusterConfigChangeResp::ByteSizeLong() const { this->_internal_config_version()); } - // bool success = 1; - if (this->success() != 0) { - total_size += 1 + 1; + // int64 leader_addr = 5; + if (this->leader_addr() != 0) { + total_size += 1 + + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::Int64Size( + this->_internal_leader_addr()); } if (PROTOBUF_PREDICT_FALSE(_internal_metadata_.have_unknown_fields())) { @@ -4605,11 +4649,17 @@ void ClusterConfigChangeResp::MergeFrom(const ClusterConfigChangeResp& from) { (void) cached_has_bits; shard_group_.MergeFrom(from.shard_group_); + if (from.success() != 0) { + _internal_set_success(from._internal_success()); + } + if (from.error_code() != 0) { + _internal_set_error_code(from._internal_error_code()); + } if (from.config_version() != 0) { _internal_set_config_version(from._internal_config_version()); } - if (from.success() != 0) { - _internal_set_success(from._internal_success()); + if (from.leader_addr() != 0) { + _internal_set_leader_addr(from._internal_leader_addr()); } } @@ -4635,8 +4685,10 @@ void ClusterConfigChangeResp::InternalSwap(ClusterConfigChangeResp* other) { using std::swap; _internal_metadata_.Swap(&other->_internal_metadata_); shard_group_.InternalSwap(&other->shard_group_); - swap(config_version_, other->config_version_); swap(success_, other->success_); + swap(error_code_, other->error_code_); + swap(config_version_, other->config_version_); + swap(leader_addr_, other->leader_addr_); } ::PROTOBUF_NAMESPACE_ID::Metadata ClusterConfigChangeResp::GetMetadata() const { @@ -4732,19 +4784,21 @@ const char* KvOpPair::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::i _internal_set_op_type(static_cast<::eraftkv::ClientOpType>(val)); } else goto handle_unusual; continue; - // bytes key = 2; + // string key = 2; case 2: if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 18)) { auto str = _internal_mutable_key(); ptr = ::PROTOBUF_NAMESPACE_ID::internal::InlineGreedyStringParser(str, ptr, ctx); + CHK_(::PROTOBUF_NAMESPACE_ID::internal::VerifyUTF8(str, "eraftkv.KvOpPair.key")); CHK_(ptr); } else goto handle_unusual; continue; - // bytes value = 3; + // string value = 3; case 3: if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 26)) { auto str = _internal_mutable_value(); ptr = ::PROTOBUF_NAMESPACE_ID::internal::InlineGreedyStringParser(str, ptr, ctx); + CHK_(::PROTOBUF_NAMESPACE_ID::internal::VerifyUTF8(str, "eraftkv.KvOpPair.value")); CHK_(ptr); } else goto handle_unusual; continue; @@ -4795,15 +4849,23 @@ ::PROTOBUF_NAMESPACE_ID::uint8* KvOpPair::_InternalSerialize( 1, this->_internal_op_type(), target); } - // bytes key = 2; + // string key = 2; if (this->key().size() > 0) { - target = stream->WriteBytesMaybeAliased( + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String( + this->_internal_key().data(), static_cast(this->_internal_key().length()), + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::SERIALIZE, + "eraftkv.KvOpPair.key"); + target = stream->WriteStringMaybeAliased( 2, this->_internal_key(), target); } - // bytes value = 3; + // string value = 3; if (this->value().size() > 0) { - target = stream->WriteBytesMaybeAliased( + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String( + this->_internal_value().data(), static_cast(this->_internal_value().length()), + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::SERIALIZE, + "eraftkv.KvOpPair.value"); + target = stream->WriteStringMaybeAliased( 3, this->_internal_value(), target); } @@ -4835,17 +4897,17 @@ size_t KvOpPair::ByteSizeLong() const { // Prevent compiler warnings about cached_has_bits being unused (void) cached_has_bits; - // bytes key = 2; + // string key = 2; if (this->key().size() > 0) { total_size += 1 + - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::BytesSize( + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::StringSize( this->_internal_key()); } - // bytes value = 3; + // string value = 3; if (this->value().size() > 0) { total_size += 1 + - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::BytesSize( + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::StringSize( this->_internal_value()); } diff --git a/src/eraftkv.pb.h b/src/eraftkv.pb.h index 8c1e141f..7b584569 100644 --- a/src/eraftkv.pb.h +++ b/src/eraftkv.pb.h @@ -2595,8 +2595,10 @@ class ClusterConfigChangeResp : enum : int { kShardGroupFieldNumber = 2, - kConfigVersionFieldNumber = 3, kSuccessFieldNumber = 1, + kErrorCodeFieldNumber = 4, + kConfigVersionFieldNumber = 3, + kLeaderAddrFieldNumber = 5, }; // repeated .eraftkv.ShardGroup shard_group = 2; int shard_group_size() const; @@ -2616,6 +2618,24 @@ class ClusterConfigChangeResp : const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::eraftkv::ShardGroup >& shard_group() const; + // bool success = 1; + void clear_success(); + bool success() const; + void set_success(bool value); + private: + bool _internal_success() const; + void _internal_set_success(bool value); + public: + + // .eraftkv.ErrorCode error_code = 4; + void clear_error_code(); + ::eraftkv::ErrorCode error_code() const; + void set_error_code(::eraftkv::ErrorCode value); + private: + ::eraftkv::ErrorCode _internal_error_code() const; + void _internal_set_error_code(::eraftkv::ErrorCode value); + public: + // int64 config_version = 3; void clear_config_version(); ::PROTOBUF_NAMESPACE_ID::int64 config_version() const; @@ -2625,13 +2645,13 @@ class ClusterConfigChangeResp : void _internal_set_config_version(::PROTOBUF_NAMESPACE_ID::int64 value); public: - // bool success = 1; - void clear_success(); - bool success() const; - void set_success(bool value); + // int64 leader_addr = 5; + void clear_leader_addr(); + ::PROTOBUF_NAMESPACE_ID::int64 leader_addr() const; + void set_leader_addr(::PROTOBUF_NAMESPACE_ID::int64 value); private: - bool _internal_success() const; - void _internal_set_success(bool value); + ::PROTOBUF_NAMESPACE_ID::int64 _internal_leader_addr() const; + void _internal_set_leader_addr(::PROTOBUF_NAMESPACE_ID::int64 value); public: // @@protoc_insertion_point(class_scope:eraftkv.ClusterConfigChangeResp) @@ -2640,8 +2660,10 @@ class ClusterConfigChangeResp : ::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_; ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::eraftkv::ShardGroup > shard_group_; - ::PROTOBUF_NAMESPACE_ID::int64 config_version_; bool success_; + int error_code_; + ::PROTOBUF_NAMESPACE_ID::int64 config_version_; + ::PROTOBUF_NAMESPACE_ID::int64 leader_addr_; mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_; friend struct ::TableStruct_eraftkv_2eproto; }; @@ -2759,13 +2781,13 @@ class KvOpPair : kSuccessFieldNumber = 4, kOpCountFieldNumber = 5, }; - // bytes key = 2; + // string key = 2; void clear_key(); const std::string& key() const; void set_key(const std::string& value); void set_key(std::string&& value); void set_key(const char* value); - void set_key(const void* value, size_t size); + void set_key(const char* value, size_t size); std::string* mutable_key(); std::string* release_key(); void set_allocated_key(std::string* key); @@ -2775,13 +2797,13 @@ class KvOpPair : std::string* _internal_mutable_key(); public: - // bytes value = 3; + // string value = 3; void clear_value(); const std::string& value() const; void set_value(const std::string& value); void set_value(std::string&& value); void set_value(const char* value); - void set_value(const void* value, size_t size); + void set_value(const char* value, size_t size); std::string* mutable_value(); std::string* release_value(); void set_allocated_value(std::string* value); @@ -4933,6 +4955,46 @@ inline void ClusterConfigChangeResp::set_config_version(::PROTOBUF_NAMESPACE_ID: // @@protoc_insertion_point(field_set:eraftkv.ClusterConfigChangeResp.config_version) } +// .eraftkv.ErrorCode error_code = 4; +inline void ClusterConfigChangeResp::clear_error_code() { + error_code_ = 0; +} +inline ::eraftkv::ErrorCode ClusterConfigChangeResp::_internal_error_code() const { + return static_cast< ::eraftkv::ErrorCode >(error_code_); +} +inline ::eraftkv::ErrorCode ClusterConfigChangeResp::error_code() const { + // @@protoc_insertion_point(field_get:eraftkv.ClusterConfigChangeResp.error_code) + return _internal_error_code(); +} +inline void ClusterConfigChangeResp::_internal_set_error_code(::eraftkv::ErrorCode value) { + + error_code_ = value; +} +inline void ClusterConfigChangeResp::set_error_code(::eraftkv::ErrorCode value) { + _internal_set_error_code(value); + // @@protoc_insertion_point(field_set:eraftkv.ClusterConfigChangeResp.error_code) +} + +// int64 leader_addr = 5; +inline void ClusterConfigChangeResp::clear_leader_addr() { + leader_addr_ = PROTOBUF_LONGLONG(0); +} +inline ::PROTOBUF_NAMESPACE_ID::int64 ClusterConfigChangeResp::_internal_leader_addr() const { + return leader_addr_; +} +inline ::PROTOBUF_NAMESPACE_ID::int64 ClusterConfigChangeResp::leader_addr() const { + // @@protoc_insertion_point(field_get:eraftkv.ClusterConfigChangeResp.leader_addr) + return _internal_leader_addr(); +} +inline void ClusterConfigChangeResp::_internal_set_leader_addr(::PROTOBUF_NAMESPACE_ID::int64 value) { + + leader_addr_ = value; +} +inline void ClusterConfigChangeResp::set_leader_addr(::PROTOBUF_NAMESPACE_ID::int64 value) { + _internal_set_leader_addr(value); + // @@protoc_insertion_point(field_set:eraftkv.ClusterConfigChangeResp.leader_addr) +} + // ------------------------------------------------------------------- // KvOpPair @@ -4957,7 +5019,7 @@ inline void KvOpPair::set_op_type(::eraftkv::ClientOpType value) { // @@protoc_insertion_point(field_set:eraftkv.KvOpPair.op_type) } -// bytes key = 2; +// string key = 2; inline void KvOpPair::clear_key() { key_.ClearToEmptyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); } @@ -4992,7 +5054,7 @@ inline void KvOpPair::set_key(const char* value) { key_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), ::std::string(value)); // @@protoc_insertion_point(field_set_char:eraftkv.KvOpPair.key) } -inline void KvOpPair::set_key(const void* value, size_t size) { +inline void KvOpPair::set_key(const char* value, size_t size) { key_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), ::std::string(reinterpret_cast(value), size)); @@ -5017,7 +5079,7 @@ inline void KvOpPair::set_allocated_key(std::string* key) { // @@protoc_insertion_point(field_set_allocated:eraftkv.KvOpPair.key) } -// bytes value = 3; +// string value = 3; inline void KvOpPair::clear_value() { value_.ClearToEmptyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); } @@ -5052,7 +5114,7 @@ inline void KvOpPair::set_value(const char* value) { value_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), ::std::string(value)); // @@protoc_insertion_point(field_set_char:eraftkv.KvOpPair.value) } -inline void KvOpPair::set_value(const void* value, size_t size) { +inline void KvOpPair::set_value(const char* value, size_t size) { value_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), ::std::string(reinterpret_cast(value), size)); diff --git a/src/eraftkv_server.cc b/src/eraftkv_server.cc index c68323b5..a0dbc7b8 100644 --- a/src/eraftkv_server.cc +++ b/src/eraftkv_server.cc @@ -54,7 +54,6 @@ grpc::Status ERaftKvServer::RequestVote(ServerContext* context, eraftkv::RequestVoteResp* resp) { if (raft_context_->HandleRequestVoteReq(nullptr, req, resp) == EStatus::kOk) { return grpc::Status::OK; - } else { return grpc::Status::CANCELLED; } @@ -73,7 +72,6 @@ grpc::Status ERaftKvServer::AppendEntries(ServerContext* context, if (raft_context_->HandleAppendEntriesReq(nullptr, req, resp) == EStatus::kOk) { return grpc::Status::OK; - } else { return grpc::Status::CANCELLED; } @@ -114,39 +112,48 @@ grpc::Status ERaftKvServer::ProcessRWOperation( return grpc::Status::OK; } for (auto kv_op : req->kvs()) { - if (kv_op.op_type() == eraftkv::ClientOpType::Put) { - std::mutex map_mutex_; - { - op_count_ += 1; - std::condition_variable* new_var = new std::condition_variable(); - std::lock_guard lg(map_mutex_); - ERaftKvServer::ready_cond_vars_[op_count_] = new_var; - kv_op.set_op_count(op_count_); + TraceLog("DEBUG: ", " recv rw op type ", kv_op.op_type()); + switch (kv_op.op_type()) { + case eraftkv::ClientOpType::Get: { + auto val = raft_context_->store_->GetKV(kv_op.key()); + TraceLog( + "DEBUG: ", " get key ", kv_op.key(), " with value ", val.first); + auto res = resp->add_ops(); + res->set_key(kv_op.key()); + res->set_value(val.first); + res->set_success(val.second); + res->set_op_type(eraftkv::ClientOpType::Get); + res->set_op_count(op_count_); + break; } - raft_context_->Propose( - kv_op.SerializeAsString(), &log_index, &log_term, &success); - { - std::unique_lock ul(ERaftKvServer::ready_mutex_); - ERaftKvServer::ready_cond_vars_[op_count_]->wait(ul, - [] { return true; }); - ERaftKvServer::ready_cond_vars_.erase(op_count_); + case eraftkv::ClientOpType::Put: + case eraftkv::ClientOpType::Del: { + std::mutex map_mutex_; + { + op_count_ += 1; + std::condition_variable* new_var = new std::condition_variable(); + std::lock_guard lg(map_mutex_); + ERaftKvServer::ready_cond_vars_[op_count_] = new_var; + kv_op.set_op_count(op_count_); + } + raft_context_->Propose( + kv_op.SerializeAsString(), &log_index, &log_term, &success); + { + std::unique_lock ul(ERaftKvServer::ready_mutex_); + ERaftKvServer::ready_cond_vars_[op_count_]->wait(ul, + [] { return true; }); + ERaftKvServer::ready_cond_vars_.erase(op_count_); + } + auto res = resp->add_ops(); + res->set_key(kv_op.key()); + res->set_value(kv_op.value()); + res->set_success(true); + res->set_op_type(kv_op.op_type()); + res->set_op_count(op_count_); + break; } - auto res = resp->add_ops(); - res->set_key(kv_op.key()); - res->set_value(kv_op.value()); - res->set_success(true); - res->set_op_type(eraftkv::ClientOpType::Put); - res->set_op_count(op_count_); - } - if (kv_op.op_type() == eraftkv::ClientOpType::Get) { - auto val = raft_context_->store_->GetKV(kv_op.key()); - TraceLog("DEBUG: ", " get key ", kv_op.key(), " with value ", val); - auto res = resp->add_ops(); - res->set_key(kv_op.key()); - res->set_value(val); - res->set_success(true); - res->set_op_type(eraftkv::ClientOpType::Get); - res->set_op_count(op_count_); + default: + break; } } return grpc::Status::OK; @@ -163,60 +170,69 @@ grpc::Status ERaftKvServer::ClusterConfigChange( eraftkv::ClusterConfigChangeResp* resp) { int64_t log_index; int64_t log_term; - bool success; TraceLog("DEBUG: ", " recv config change req with change_type ", req->change_type()); - // return cluster topology, Currently, only single raft group are supported auto conf_change_req = const_cast(req); - if (conf_change_req->change_type() == eraftkv::ChangeType::ShardsQuery) { - resp->set_success(true); - auto kvs = raft_context_->store_->PrefixScan(SG_META_PREFIX, 0, 256); - for (auto kv : kvs) { - eraftkv::ShardGroup* sg = new eraftkv::ShardGroup(); - sg->ParseFromString(kv.second); + switch (conf_change_req->change_type()) { + case eraftkv::ChangeType::ShardsQuery: { + resp->set_success(true); + auto kvs = raft_context_->store_->PrefixScan(SG_META_PREFIX, 0, 256); + for (auto kv : kvs) { + eraftkv::ShardGroup* sg = new eraftkv::ShardGroup(); + sg->ParseFromString(kv.second); + auto new_sg = resp->add_shard_group(); + new_sg->CopyFrom(*sg); + delete sg; + } + break; + } + case eraftkv::ChangeType::MembersQuery: { + resp->set_success(true); auto new_sg = resp->add_shard_group(); - new_sg->CopyFrom(*sg); - delete sg; + new_sg->set_id(0); + for (auto node : raft_context_->GetNodes()) { + auto g_server = new_sg->add_servers(); + g_server->set_id(node->id); + g_server->set_address(node->address); + node->node_state == NodeStateEnum::Running + ? g_server->set_server_status(eraftkv::ServerStatus::Up) + : g_server->set_server_status(eraftkv::ServerStatus::Down); + } + new_sg->set_leader_id(raft_context_->GetLeaderId()); + break; } - return grpc::Status::OK; - } + default: { + // no leader reject + if (!raft_context_->IsLeader()) { + resp->set_error_code(eraftkv::ErrorCode::REQUEST_NOT_LEADER_NODE); + resp->set_leader_addr(raft_context_->GetLeaderId()); + return grpc::Status::OK; + } + std::mutex map_mutex_; + { + op_count_ += 1; + std::condition_variable* new_var = new std::condition_variable(); + std::lock_guard lg(map_mutex_); + conf_change_req->set_op_count(op_count_); + } + bool success; + raft_context_->ProposeConfChange(conf_change_req->SerializeAsString(), + &log_index, + &log_term, + &success); - if (conf_change_req->change_type() == eraftkv::ChangeType::MembersQuery) { - resp->set_success(true); - auto new_sg = resp->add_shard_group(); - new_sg->set_id(0); - for (auto node : raft_context_->GetNodes()) { - auto g_server = new_sg->add_servers(); - g_server->set_id(node->id); - g_server->set_address(node->address); - node->node_state == NodeStateEnum::Running - ? g_server->set_server_status(eraftkv::ServerStatus::Up) - : g_server->set_server_status(eraftkv::ServerStatus::Down); + { + std::unique_lock ul(ERaftKvServer::ready_mutex_); + ERaftKvServer::ready_cond_vars_[op_count_]->wait(ul, + [] { return true; }); + ERaftKvServer::ready_cond_vars_.erase(op_count_); + } + break; } - new_sg->set_leader_id(raft_context_->GetLeaderId()); - return grpc::Status::OK; - } - - std::mutex map_mutex_; - { - op_count_ += 1; - std::condition_variable* new_var = new std::condition_variable(); - std::lock_guard lg(map_mutex_); - conf_change_req->set_op_count(op_count_); } - - raft_context_->ProposeConfChange( - conf_change_req->SerializeAsString(), &log_index, &log_term, &success); - - { - std::unique_lock ul(ERaftKvServer::ready_mutex_); - ERaftKvServer::ready_cond_vars_[op_count_]->wait(ul, [] { return true; }); - ERaftKvServer::ready_cond_vars_.erase(op_count_); - } - - return success ? grpc::Status::OK : grpc::Status::CANCELLED; + return grpc::Status::OK; } /** diff --git a/src/eraftkv_server_test.cc b/src/eraftkv_server_test.cc index 06c5b089..4beb93ad 100644 --- a/src/eraftkv_server_test.cc +++ b/src/eraftkv_server_test.cc @@ -47,4 +47,4 @@ int main(int argc, char **argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); -} \ No newline at end of file +} diff --git a/src/get_command_handler.cc b/src/get_command_handler.cc index 18838d1b..10b9b8f3 100644 --- a/src/get_command_handler.cc +++ b/src/get_command_handler.cc @@ -29,14 +29,19 @@ EStatus GetCommandHandler::Execute(const std::vector& params, auto status_ = cli->kv_stubs_[leader_addr]->ProcessRWOperation( &op_context, op_req, &op_resp); if (status_.ok()) { - reply_buf += "$"; - char flag; - uint32_t expire; - DecodeStringVal(op_resp.mutable_ops(0)->mutable_value(), &flag, &expire, user_val); - reply_buf += std::to_string(user_val->size()); - reply_buf += "\r\n"; - reply_buf += *user_val; - reply_buf += "\r\n"; + if (op_resp.ops(0).success()) { + reply_buf += "$"; + char flag; + uint32_t expire; + DecodeStringVal( + op_resp.mutable_ops(0)->mutable_value(), &flag, &expire, user_val); + reply_buf += std::to_string(user_val->size()); + reply_buf += "\r\n"; + reply_buf += *user_val; + reply_buf += "\r\n"; + } else { + reply_buf += "$-1\r\n"; + } } else { reply_buf += "-ERR Server error\r\n"; } diff --git a/src/key_encode.h b/src/key_encode.h index fe869c0d..851d465b 100644 --- a/src/key_encode.h +++ b/src/key_encode.h @@ -12,8 +12,8 @@ #include #include -#include "util.h" #include "proto_parser.h" +#include "util.h" /** * @brief @@ -49,9 +49,9 @@ static std::string EncodeStringKey(uint16_t key_slot, std::string user_key) { static void DecodeStringKey(std::string enc_key, uint16_t* key_slot, std::string* user_key) { - ProtoParser parser_; - const char *const end = enc_key.c_str() + enc_key.size(); - const char *ptr = enc_key.c_str(); + ProtoParser parser_; + const char* const end = enc_key.c_str() + enc_key.size(); + const char* ptr = enc_key.c_str(); parser_.ParseRequest(ptr, end); auto params = parser_.GetParams(); *key_slot = static_cast(std::stoi(params[0])); @@ -61,7 +61,6 @@ static void DecodeStringKey(std::string enc_key, /** * @brief * | flag | expire | user val | - * *3\r\n$[flags len]\r\n[flags]$[expire len]\r\n[expire]$[user val len]\r\n[user val] * @return std::string */ static std::string EncodeStringVal(uint32_t expire, std::string user_val) { @@ -95,13 +94,13 @@ static std::string EncodeStringVal(uint32_t expire, std::string user_val) { * @param expire * @param user_val */ -static void DecodeStringVal(std::string* enc_val, +static void DecodeStringVal(std::string* enc_val, char* flag, uint32_t* expire, std::string* user_val) { - ProtoParser parser_; - const char *const end = enc_val->c_str() + enc_val->size(); - const char *ptr = enc_val->c_str(); + ProtoParser parser_; + const char* const end = enc_val->c_str() + enc_val->size(); + const char* ptr = enc_val->c_str(); parser_.ParseRequest(ptr, end); auto params = parser_.GetParams(); *flag = params[0].at(0); diff --git a/src/raft_server.cc b/src/raft_server.cc index e49d09d6..b7476643 100644 --- a/src/raft_server.cc +++ b/src/raft_server.cc @@ -63,11 +63,10 @@ RaftServer::RaftServer(RaftConfig raft_config, , heartbeat_tick_count_(0) , election_tick_count_(0) , max_entries_per_append_req_(100) - , tick_interval_(1000) + , tick_interval_(50) , granted_votes_(0) , open_auto_apply_(true) , election_running_(true) { - this->log_store_ = log_store; this->store_ = store; this->net_ = net; @@ -158,7 +157,7 @@ EStatus RaftServer::RunCycle() { if (open_auto_apply_) { this->ApplyEntries(); } - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + std::this_thread::sleep_for(std::chrono::milliseconds(tick_interval_)); } return EStatus::kOk; } diff --git a/src/rocksdb_storage_impl.cc b/src/rocksdb_storage_impl.cc index eb3b2cd3..f3de5da0 100644 --- a/src/rocksdb_storage_impl.cc +++ b/src/rocksdb_storage_impl.cc @@ -105,6 +105,25 @@ EStatus RocksDBStorageImpl::ApplyLog(RaftServer* raft, } break; } + case eraftkv::ClientOpType::Del: { + if (DelKV(op_pair->key()) == EStatus::kOk) { + raft->log_store_->PersisLogMetaState(raft->commit_idx_, + ety->id()); + raft->last_applied_idx_ = ety->id(); + if (raft->role_ == NodeRaftRoleEnum::Leader) { + std::mutex map_mutex; + { + std::lock_guard lg(map_mutex); + if (ERaftKvServer::ready_cond_vars_[op_pair->op_count()] != + nullptr) { + ERaftKvServer::ready_cond_vars_[op_pair->op_count()] + ->notify_one(); + } + } + } + } + break; + } default: { raft->log_store_->PersisLogMetaState(raft->commit_idx_, ety->id()); raft->last_applied_idx_ = ety->id(); @@ -114,7 +133,6 @@ EStatus RocksDBStorageImpl::ApplyLog(RaftServer* raft, delete op_pair; break; } - case eraftkv::EntryType::ConfChange: { eraftkv::ClusterConfigChangeReq* conf_change_req = new eraftkv::ClusterConfigChangeReq(); @@ -170,28 +188,27 @@ EStatus RocksDBStorageImpl::ApplyLog(RaftServer* raft, case eraftkv::ChangeType::ShardJoin: { std::string key; key.append(SG_META_PREFIX); - EncodeDecodeTool::PutFixed64( - &key, static_cast(conf_change_req->shard_id())); + key.append(std::to_string(conf_change_req->shard_id())); auto sg = conf_change_req->shard_group(); std::string val = sg.SerializeAsString(); raft->store_->PutKV(key, val); + break; } case eraftkv::ChangeType::ShardLeave: { std::string key; key.append(SG_META_PREFIX); - EncodeDecodeTool::PutFixed64( - &key, static_cast(conf_change_req->shard_id())); + key.append(std::to_string(conf_change_req->shard_id())); raft->store_->DelKV(key); + break; } case eraftkv::ChangeType::SlotMove: { auto sg = conf_change_req->shard_group(); std::string key = SG_META_PREFIX; - EncodeDecodeTool::PutFixed64( - &key, static_cast(conf_change_req->shard_id())); + key.append(std::to_string(conf_change_req->shard_id())); auto value = raft->store_->GetKV(key); - if (!value.empty()) { + if (!value.first.empty()) { eraftkv::ShardGroup* old_sg = new eraftkv::ShardGroup(); - old_sg->ParseFromString(value); + old_sg->ParseFromString(value.first); // move slot to new sg if (sg.id() == old_sg->id()) { for (auto new_slot : sg.slots()) { @@ -368,10 +385,11 @@ EStatus RocksDBStorageImpl::PutKV(std::string key, std::string val) { * @param key * @return std::string */ -std::string RocksDBStorageImpl::GetKV(std::string key) { +std::pair RocksDBStorageImpl::GetKV(std::string key) { std::string value; auto status = kv_db_->Get(rocksdb::ReadOptions(), "U:" + key, &value); - return status.IsNotFound() ? "" : value; + return std::make_pair(std::move(value), + !status.IsNotFound()); } /** @@ -387,7 +405,7 @@ std::map RocksDBStorageImpl::PrefixScan( int64_t offset, int64_t limit) { auto iter = kv_db_->NewIterator(rocksdb::ReadOptions()); - iter->Seek(prefix); + iter->Seek("U:" + prefix); while (iter->Valid() && offset > 0) { offset -= 1; iter->Next(); @@ -413,7 +431,8 @@ std::map RocksDBStorageImpl::PrefixScan( * @return EStatus */ EStatus RocksDBStorageImpl::DelKV(std::string key) { - auto status = kv_db_->Delete(rocksdb::WriteOptions(), key); + TraceLog("DEBUG: ", " del key ", key); + auto status = kv_db_->Delete(rocksdb::WriteOptions(), "U:" + key); return status.ok() ? EStatus::kOk : EStatus::kDelFromRocksDBErr; } @@ -431,7 +450,7 @@ RocksDBStorageImpl::RocksDBStorageImpl(std::string db_path) { } /** - * @brief Destroy the Rocks D B Storage Impl:: RocksDB Storage Impl object + * @brief Destroy the Rocks DB Storage Impl:: RocksDB Storage Impl object * */ RocksDBStorageImpl::~RocksDBStorageImpl() { diff --git a/src/rocksdb_storage_impl.h b/src/rocksdb_storage_impl.h index 043bb1e1..db990dc7 100644 --- a/src/rocksdb_storage_impl.h +++ b/src/rocksdb_storage_impl.h @@ -157,7 +157,7 @@ class RocksDBStorageImpl : public Storage { * @param key * @return std::string */ - std::string GetKV(std::string key); + std::pair GetKV(std::string key); /** * @brief diff --git a/src/rocksdb_storage_impl_tests.cc b/src/rocksdb_storage_impl_tests.cc index c6c4a701..4481397f 100644 --- a/src/rocksdb_storage_impl_tests.cc +++ b/src/rocksdb_storage_impl_tests.cc @@ -42,8 +42,8 @@ TEST(RockDBStorageImplTest, PutGet) { std::string not_exist_key = "not_exist"; RocksDBStorageImpl* kv_store = new RocksDBStorageImpl("/tmp/testdb"); ASSERT_EQ(kv_store->PutKV(testk, testv), EStatus::kOk); - ASSERT_EQ(kv_store->GetKV(testk), testv); - ASSERT_EQ(kv_store->GetKV(""), std::string("")); + ASSERT_EQ(kv_store->GetKV(testk).first, testv); + ASSERT_EQ(kv_store->GetKV("").first, std::string("")); delete kv_store; DirectoryTool::DeleteDir("/tmp/testdb"); } diff --git a/src/set_command_handler.cc b/src/set_command_handler.cc index c3fe9cb8..2df6f9cd 100644 --- a/src/set_command_handler.cc +++ b/src/set_command_handler.cc @@ -25,8 +25,8 @@ EStatus SetCommandHandler::Execute(const std::vector& params, eraftkv::ClientOperationReq op_req; eraftkv::ClientOperationResp op_resp; auto kv_pair_ = op_req.add_kvs(); - std::string encode_key = EncodeStringKey(slot, params[1]); - std::string encode_val = EncodeStringVal(0, params[2]); + std::string encode_key = EncodeStringKey(slot, params[1]); + std::string encode_val = EncodeStringVal(0, params[2]); kv_pair_->set_key(encode_key); kv_pair_->set_value(encode_val); kv_pair_->set_op_type(eraftkv::ClientOpType::Put); diff --git a/src/shardgroup_command_handler.cc b/src/shardgroup_command_handler.cc index 49104f4a..bd23438e 100644 --- a/src/shardgroup_command_handler.cc +++ b/src/shardgroup_command_handler.cc @@ -109,7 +109,9 @@ EStatus ShardGroupCommandHandler::Execute( cli->SendPacket(cli->reply_); cli->_Reset(); } else if (params[1] == "move") { - ClientContext context; + auto slot_range_args = StringUtil::Split(params[3], '-'); + ClientContext context; + std::string reply_buf; eraftkv::ClusterConfigChangeReq req; int shard_id = stoi(params[2]); int slot_id = stoi(params[3]); @@ -117,16 +119,41 @@ EStatus ShardGroupCommandHandler::Execute( req.set_shard_id(shard_id); auto to_move_sg = req.mutable_shard_group(); to_move_sg->set_id(shard_id); - auto new_slot = to_move_sg->add_slots(); - new_slot->set_id(slot_id); - new_slot->set_slot_status(eraftkv::SlotStatus::Running); + // support move range slot shardgroup move 1 [startslot-endslot] + if (slot_range_args.size() == 2) { + try { + int64_t start_slot = + static_cast(std::stoi(slot_range_args[0])); + int64_t end_slot = static_cast(std::stoi(slot_range_args[1])); + for (int64_t i = start_slot; i <= end_slot; i++) { + auto new_slot = to_move_sg->add_slots(); + new_slot->set_id(i); + new_slot->set_slot_status(eraftkv::SlotStatus::Running); + } + } catch (const std::invalid_argument& e) { + reply_buf += "-ERR slot invalid_argument\r\n"; + cli->reply_.PushData(reply_buf.c_str(), reply_buf.size()); + cli->SendPacket(cli->reply_); + cli->_Reset(); + return EStatus::kOk; + } catch (const std::out_of_range& e) { + reply_buf += "-ERR slot out_of_range\r\n"; + cli->reply_.PushData(reply_buf.c_str(), reply_buf.size()); + cli->SendPacket(cli->reply_); + cli->_Reset(); + return EStatus::kOk; + } + } else { + auto new_slot = to_move_sg->add_slots(); + new_slot->set_id(slot_id); + new_slot->set_slot_status(eraftkv::SlotStatus::Running); + } eraftkv::ClusterConfigChangeResp resp; - auto st = leader_stub->ClusterConfigChange(&context, req, &resp); - std::string reply_buf; + auto st = leader_stub->ClusterConfigChange(&context, req, &resp); if (st.ok()) { reply_buf += "+OK\r\n"; } else { - reply_buf += "-ERR Server error\r\n"; + reply_buf += "-ERR server error\r\n"; } cli->reply_.PushData(reply_buf.c_str(), reply_buf.size()); cli->SendPacket(cli->reply_); diff --git a/src/storage.h b/src/storage.h index 6285b33a..4fd94ff4 100644 --- a/src/storage.h +++ b/src/storage.h @@ -33,6 +33,7 @@ #pragma once #include +#include #include "estatus.h" #include "raft_server.h" @@ -167,7 +168,7 @@ class Storage { * @param key * @return std::string */ - virtual std::string GetKV(std::string key) = 0; + virtual std::pair GetKV(std::string key) = 0; /** * @brief diff --git a/utils/init-kdb-meta.sh b/utils/init-kdb-meta.sh new file mode 100755 index 00000000..6b545bea --- /dev/null +++ b/utils/init-kdb-meta.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -xe + +redis-cli -h 172.18.0.6 -p 12306 shardgroup join 1 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 + +# for i in {1..1023}; do redis-cli -h 172.18.0.6 -p 12306 shardgroup move 1 $i; done + +redis-cli -h 172.18.0.6 -p 12306 shardgroup move 1 0-1023 diff --git a/utils/run-vdb-tests.sh b/utils/run-kdb-tests.sh similarity index 71% rename from utils/run-vdb-tests.sh rename to utils/run-kdb-tests.sh index fc539210..7b237d75 100755 --- a/utils/run-vdb-tests.sh +++ b/utils/run-kdb-tests.sh @@ -2,9 +2,6 @@ set -xe redis-cli -h 172.18.0.6 -p 12306 shardgroup query -redis-cli -h 172.18.0.6 -p 12306 shardgroup join 1 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 - -for i in {1..1023}; do redis-cli -h 172.18.0.6 -p 12306 shardgroup move 1 $i; done sleep 1 # test mode raft interval is 1s @@ -15,10 +12,16 @@ redis-cli -h 172.18.0.6 -p 12306 set c l redis-cli -h 172.18.0.6 -p 12306 set d l redis-cli -h 172.18.0.6 -p 12306 set e o -sleep 1 # test mode raft interval is 1s +# sleep 1 redis-cli -h 172.18.0.6 -p 12306 get a redis-cli -h 172.18.0.6 -p 12306 get b redis-cli -h 172.18.0.6 -p 12306 get c redis-cli -h 172.18.0.6 -p 12306 get d redis-cli -h 172.18.0.6 -p 12306 get e + +redis-cli -h 172.18.0.6 -p 12306 del e + +# sleep 1 + +redis-cli -h 172.18.0.6 -p 12306 get e