diff --git a/README.md b/README.md index 066ff74a..c53fa5d9 100644 --- a/README.md +++ b/README.md @@ -50,49 +50,21 @@ sudo make run-demo command output ``` docker run --name kvserver-node1 --network mytestnetwork --ip 172.18.0.10 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 0 /tmp/kv_db0 /tmp/log_db0 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 -eca081a545a9eb8dbf9b05c2a307f38c74b4fea2910776e85c806c1b70cedf20 +bef74b85fcf9c0a2dedb15399b1f53010791e329f0c60d69fcd097e0843cbb86 sleep 2 docker run --name kvserver-node2 --network mytestnetwork --ip 172.18.0.11 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 1 /tmp/kv_db1 /tmp/log_db1 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 -74d14edf114f47889b50f0ed20ea810af7cd383de26ebdd3d1e36078290674e7 +333c02093fcb8c974cc1dc491fc7d2e19f474e3fda354fc130cf6be6d8920c85 docker run --name kvserver-node3 --network mytestnetwork --ip 172.18.0.12 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 2 /tmp/kv_db2 /tmp/log_db2 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 -36bd437e67d00f019732e95e31b7f7ab9c19739a0f10676f31e9c0a7fad98a6c +9856291dd34776cea94ab957780f7a244cb387bd0d74388b5a9d440175d6d28e sleep 1 docker run --name metaserver-node1 --network mytestnetwork --ip 172.18.0.2 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 0 /tmp/meta_db0 /tmp/log_db0 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 -f8a1382542f41d14e645ddeb285e8b93afc4367b8537e5bc4030487116d8f5cd +09f9f12bc74212d1ae09a89bfecbc5a991f1b46cd9e8ba43fc278f775dd6176d sleep 3 docker run --name metaserver-node2 --network mytestnetwork --ip 172.18.0.3 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 1 /tmp/meta_db1 /tmp/log_db1 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 -7f5385341bc1f990f50020bd09526eaba3eeec56ab3c67fab325d313ab4ceaea +3b98b3f317e834263ddb81c0bc5b245ac31b69cd47f495415a3d70e951c13900 docker run --name metaserver-node3 --network mytestnetwork --ip 172.18.0.4 -d --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 2 /tmp/meta_db2 /tmp/log_db2 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 -666732b5a9b10cd828e9c0829bd97159b3ac9a7d39d1d3f2dcbbc2e5af654373 +10269f84d95e9f82f75d3c60f0d7b0dc0efe5efe643366e615b7644fb8851f04 sleep 16 -docker run --name metaserver-tests --network mytestnetwork --ip 172.18.0.8 -it --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta_server_test -[==========] Running 1 test from 1 test suite. -[----------] Global test environment set-up. -[----------] 1 test from EraftMetaServerTests -[ RUN ] EraftMetaServerTests.TestMetaBasicOp -DEBUG: cluster config resp -> success: true -shard_group { - id: 1 - servers { - address: "172.18.0.10:8088" - } - servers { - id: 1 - address: "172.18.0.11:8089" - } - servers { - id: 2 - address: "172.18.0.12:8090" - } -} - -[ OK ] EraftMetaServerTests.TestMetaBasicOp (4028 ms) -[----------] 1 test from EraftMetaServerTests (4028 ms total) - -[----------] Global test environment tear-down -[==========] 1 test from 1 test suite ran. (4028 ms total) -[ PASSED ] 1 test. -sleep 2 docker run --name vdbserver-node --network mytestnetwork --ip 172.18.0.6 -it --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraft-vdb 172.18.0.6:12306 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 run server success! ``` @@ -106,6 +78,9 @@ command output ``` chmod +x utils/run-vdb-tests.sh docker run --name vdbserver-node-tests --network mytestnetwork --ip 172.18.0.9 -it --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/utils/run-vdb-tests.sh + +... (config change log) + + redis-cli -h 172.18.0.6 -p 12306 info server_id: 0,server_address: 172.18.0.10:8088,status: Running,Role: Leader server_id: 1,server_address: 172.18.0.11:8089,status: Running,Role: Follower diff --git a/protocol/eraftkv.proto b/protocol/eraftkv.proto index ca57418b..c14b2c67 100644 --- a/protocol/eraftkv.proto +++ b/protocol/eraftkv.proto @@ -155,8 +155,8 @@ enum ClientOpType { message KvOpPair { ClientOpType op_type = 1; - string key = 2; - string value = 3; + bytes key = 2; + bytes value = 3; bool success = 4; int64 op_count = 5; } diff --git a/src/client.cc b/src/client.cc index 7bcb14b2..54fe1c98 100644 --- a/src/client.cc +++ b/src/client.cc @@ -93,7 +93,8 @@ std::string Client::GetMetaLeaderAddr() { return leader_address; } -std::string Client::GetShardLeaderAddr(std::string partion_key) { +std::string Client::GetShardLeaderAddrAndSlot(std::string partion_key, + uint16_t *slot) { std::string leader_address; int64_t key_slot = -1; key_slot = HashUtil::CRC64(0, partion_key.c_str(), partion_key.size()) % 1024; diff --git a/src/client.h b/src/client.h index f2f86a18..0039f87a 100644 --- a/src/client.h +++ b/src/client.h @@ -35,7 +35,7 @@ class Client : public StreamSocket { friend class ShardGroupCommandHandler; private: - PacketLength _HandlePacket(const char *msg, std::size_t len) override; + PacketLength _HandlePacket(const char* msg, std::size_t len) override; UnboundedBuffer reply_; @@ -60,7 +60,8 @@ class Client : public StreamSocket { * @param partion_key * @return std::string */ - std::string GetShardLeaderAddr(std::string partion_key); + std::string GetShardLeaderAddrAndSlot(std::string partion_key, + uint16_t* slot); /** * @brief Get the meta server leader address diff --git a/src/eraftkv.pb.cc b/src/eraftkv.pb.cc index b185b011..d7432a75 100644 --- a/src/eraftkv.pb.cc +++ b/src/eraftkv.pb.cc @@ -564,7 +564,7 @@ const char descriptor_table_protodef_eraftkv_2eproto[] PROTOBUF_SECTION_VARIABLE "group\030\002 \003(\0132\023.eraftkv.ShardGroup\022\026\n\016conf" "ig_version\030\003 \001(\003\"q\n\010KvOpPair\022&\n\007op_type\030" "\001 \001(\0162\025.eraftkv.ClientOpType\022\013\n\003key\030\002 \001(" - "\t\022\r\n\005value\030\003 \001(\t\022\017\n\007success\030\004 \001(\010\022\020\n\010op_" + "\014\022\r\n\005value\030\003 \001(\014\022\017\n\007success\030\004 \001(\010\022\020\n\010op_" "count\030\005 \001(\003\"J\n\022ClientOperationReq\022\024\n\014op_" "timestamp\030\001 \001(\004\022\036\n\003kvs\030\002 \003(\0132\021.eraftkv.K" "vOpPair\"r\n\023ClientOperationResp\022\036\n\003ops\030\001 " @@ -4732,21 +4732,19 @@ const char* KvOpPair::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::i _internal_set_op_type(static_cast<::eraftkv::ClientOpType>(val)); } else goto handle_unusual; continue; - // string key = 2; + // bytes key = 2; case 2: if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 18)) { auto str = _internal_mutable_key(); ptr = ::PROTOBUF_NAMESPACE_ID::internal::InlineGreedyStringParser(str, ptr, ctx); - CHK_(::PROTOBUF_NAMESPACE_ID::internal::VerifyUTF8(str, "eraftkv.KvOpPair.key")); CHK_(ptr); } else goto handle_unusual; continue; - // string value = 3; + // bytes value = 3; case 3: if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 26)) { auto str = _internal_mutable_value(); ptr = ::PROTOBUF_NAMESPACE_ID::internal::InlineGreedyStringParser(str, ptr, ctx); - CHK_(::PROTOBUF_NAMESPACE_ID::internal::VerifyUTF8(str, "eraftkv.KvOpPair.value")); CHK_(ptr); } else goto handle_unusual; continue; @@ -4797,23 +4795,15 @@ ::PROTOBUF_NAMESPACE_ID::uint8* KvOpPair::_InternalSerialize( 1, this->_internal_op_type(), target); } - // string key = 2; + // bytes key = 2; if (this->key().size() > 0) { - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String( - this->_internal_key().data(), static_cast(this->_internal_key().length()), - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::SERIALIZE, - "eraftkv.KvOpPair.key"); - target = stream->WriteStringMaybeAliased( + target = stream->WriteBytesMaybeAliased( 2, this->_internal_key(), target); } - // string value = 3; + // bytes value = 3; if (this->value().size() > 0) { - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::VerifyUtf8String( - this->_internal_value().data(), static_cast(this->_internal_value().length()), - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::SERIALIZE, - "eraftkv.KvOpPair.value"); - target = stream->WriteStringMaybeAliased( + target = stream->WriteBytesMaybeAliased( 3, this->_internal_value(), target); } @@ -4845,17 +4835,17 @@ size_t KvOpPair::ByteSizeLong() const { // Prevent compiler warnings about cached_has_bits being unused (void) cached_has_bits; - // string key = 2; + // bytes key = 2; if (this->key().size() > 0) { total_size += 1 + - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::StringSize( + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::BytesSize( this->_internal_key()); } - // string value = 3; + // bytes value = 3; if (this->value().size() > 0) { total_size += 1 + - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::StringSize( + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::BytesSize( this->_internal_value()); } diff --git a/src/eraftkv.pb.h b/src/eraftkv.pb.h index c710e66d..8c1e141f 100644 --- a/src/eraftkv.pb.h +++ b/src/eraftkv.pb.h @@ -2759,13 +2759,13 @@ class KvOpPair : kSuccessFieldNumber = 4, kOpCountFieldNumber = 5, }; - // string key = 2; + // bytes key = 2; void clear_key(); const std::string& key() const; void set_key(const std::string& value); void set_key(std::string&& value); void set_key(const char* value); - void set_key(const char* value, size_t size); + void set_key(const void* value, size_t size); std::string* mutable_key(); std::string* release_key(); void set_allocated_key(std::string* key); @@ -2775,13 +2775,13 @@ class KvOpPair : std::string* _internal_mutable_key(); public: - // string value = 3; + // bytes value = 3; void clear_value(); const std::string& value() const; void set_value(const std::string& value); void set_value(std::string&& value); void set_value(const char* value); - void set_value(const char* value, size_t size); + void set_value(const void* value, size_t size); std::string* mutable_value(); std::string* release_value(); void set_allocated_value(std::string* value); @@ -4957,7 +4957,7 @@ inline void KvOpPair::set_op_type(::eraftkv::ClientOpType value) { // @@protoc_insertion_point(field_set:eraftkv.KvOpPair.op_type) } -// string key = 2; +// bytes key = 2; inline void KvOpPair::clear_key() { key_.ClearToEmptyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); } @@ -4992,7 +4992,7 @@ inline void KvOpPair::set_key(const char* value) { key_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), ::std::string(value)); // @@protoc_insertion_point(field_set_char:eraftkv.KvOpPair.key) } -inline void KvOpPair::set_key(const char* value, size_t size) { +inline void KvOpPair::set_key(const void* value, size_t size) { key_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), ::std::string(reinterpret_cast(value), size)); @@ -5017,7 +5017,7 @@ inline void KvOpPair::set_allocated_key(std::string* key) { // @@protoc_insertion_point(field_set_allocated:eraftkv.KvOpPair.key) } -// string value = 3; +// bytes value = 3; inline void KvOpPair::clear_value() { value_.ClearToEmptyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); } @@ -5052,7 +5052,7 @@ inline void KvOpPair::set_value(const char* value) { value_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), ::std::string(value)); // @@protoc_insertion_point(field_set_char:eraftkv.KvOpPair.value) } -inline void KvOpPair::set_value(const char* value, size_t size) { +inline void KvOpPair::set_value(const void* value, size_t size) { value_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), ::std::string(reinterpret_cast(value), size)); diff --git a/src/get_command_handler.cc b/src/get_command_handler.cc index 81f3d135..ba6fecd3 100644 --- a/src/get_command_handler.cc +++ b/src/get_command_handler.cc @@ -10,26 +10,34 @@ */ #include "command_handler.h" +#include "key_encode.h" EStatus GetCommandHandler::Execute(const std::vector& params, Client* cli) { std::string leader_addr; - leader_addr = cli->GetShardLeaderAddr(params[1]); + uint16_t slot; + leader_addr = cli->GetShardLeaderAddrAndSlot(params[1], &slot); ClientContext op_context; eraftkv::ClientOperationReq op_req; auto kv_pair_ = op_req.add_kvs(); - kv_pair_->set_key(params[1]); + kv_pair_->set_key(EncodeStringKey(slot, params[1])); kv_pair_->set_op_type(eraftkv::ClientOpType::Get); eraftkv::ClientOperationResp op_resp; std::string reply_buf; + std::string* user_val = new std::string(); if (cli->kv_stubs_[leader_addr] != nullptr) { auto status_ = cli->kv_stubs_[leader_addr]->ProcessRWOperation( &op_context, op_req, &op_resp); if (status_.ok()) { reply_buf += "$"; - reply_buf += std::to_string(op_resp.ops(0).value().size()); + // decode string val + uint8_t flag; + uint32_t expire; + std::string enc_val = op_resp.ops(0).value(); + DecodeStringVal(enc_val, &flag, &expire, user_val); + reply_buf += std::to_string(user_val->size()); reply_buf += "\r\n"; - reply_buf += op_resp.ops(0).value(); + reply_buf += *user_val; reply_buf += "\r\n"; } else { reply_buf += "-ERR Server error\r\n"; @@ -40,6 +48,7 @@ EStatus GetCommandHandler::Execute(const std::vector& params, cli->reply_.PushData(reply_buf.c_str(), reply_buf.size()); cli->SendPacket(cli->reply_); cli->_Reset(); + delete user_val; return EStatus::kOk; } diff --git a/src/key_encode.h b/src/key_encode.h new file mode 100644 index 00000000..706f30b7 --- /dev/null +++ b/src/key_encode.h @@ -0,0 +1,83 @@ +/** + * @file key_encode.h + * @author jay_jieliu@outlook.com + * @brief + * @version 0.1 + * @date 2023-07-24 + * + * @copyright Copyright (c) 2023 + * + */ + +#include +#include + +#include "util.h" + +enum Flag { + TYPE_STRING = 0x01, + TYPE_HASH = 0x02, + TYPE_LIST = 0x03, + TYPE_SET = 0x04, + TYPE_ZSET = 0x05, +}; + +/** + * @brief + * + * | slot (2B) | user_key (n B) | + * @return std::string + */ +static std::string EncodeStringKey(uint16_t key_slot, std::string user_key) { + std::string dst; + EncodeDecodeTool::PutFixed16(&dst, key_slot); + dst.append(user_key); + return dst; +} + +/** + * @brief decode an encode key + * + * @param enc_key + * @param key_slot + * @param user_key + */ +static void DecodeStringKey(std::string enc_key, + uint16_t* key_slot, + std::string* user_key) { + std::vector enc_key_seq = {enc_key.begin(), enc_key.end()}; + *key_slot = EncodeDecodeTool::DecodeFixed16(&enc_key_seq[0]); + *user_key = std::string(enc_key_seq.begin() + 2, enc_key_seq.end()); +} + +/** + * @brief + * + * | flags (1B) | expire (4B) | user_value (nB) | + * @return std::string + */ +static std::string EncodeStringVal(uint32_t expire, std::string user_val) { + std::string dst; + dst.append(Flag::TYPE_STRING, 1); + EncodeDecodeTool::PutFixed32(&dst, expire); + dst.append(user_val); + return dst; +} + +/** + * @brief decode an encode value + * + * @param enc_val + * @param flag + * @param expire + * @param user_val + */ +static void DecodeStringVal(std::string enc_val, + uint8_t* flag, + uint32_t* expire, + std::string* user_val) { + std::vector enc_val_seq = {enc_val.begin(), enc_val.end()}; + *flag = enc_val_seq[0]; + *expire = EncodeDecodeTool::DecodeFixed32(&enc_val_seq[1]); + *user_val = std::string(enc_val_seq.begin() + 5, enc_val_seq.end()); +} diff --git a/src/set_command_handler.cc b/src/set_command_handler.cc index f9199075..6673e32d 100644 --- a/src/set_command_handler.cc +++ b/src/set_command_handler.cc @@ -12,19 +12,21 @@ #include #include "command_handler.h" +#include "key_encode.h" #include "util.h" EStatus SetCommandHandler::Execute(const std::vector& params, Client* cli) { std::string leader_addr; - leader_addr = cli->GetShardLeaderAddr(params[1]); + uint16_t slot; + leader_addr = cli->GetShardLeaderAddrAndSlot(params[1], &slot); TraceLog("DEBUG: send request to leader ", leader_addr); ClientContext op_context; eraftkv::ClientOperationReq op_req; eraftkv::ClientOperationResp op_resp; auto kv_pair_ = op_req.add_kvs(); - kv_pair_->set_key(params[1]); - kv_pair_->set_value(params[2]); + kv_pair_->set_key(EncodeStringKey(slot, params[1])); + kv_pair_->set_value(EncodeStringVal(0, params[2])); kv_pair_->set_op_type(eraftkv::ClientOpType::Put); std::string reply_buf; if (cli->kv_stubs_[leader_addr] != nullptr) { diff --git a/src/shardgroup_command_handler.cc b/src/shardgroup_command_handler.cc index 9ec2126f..49104f4a 100644 --- a/src/shardgroup_command_handler.cc +++ b/src/shardgroup_command_handler.cc @@ -1,6 +1,28 @@ +// MIT License + +// Copyright (c) 2023 ERaftGroup + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + /** * @file shardgroup_command_handler.cc - * @author your name (you@domain.com) + * @author jay_jieliu@outlook.com * @brief * @version 0.1 * @date 2023-07-23 diff --git a/src/util.h b/src/util.h index 396d697a..e66afa6f 100644 --- a/src/util.h +++ b/src/util.h @@ -74,6 +74,40 @@ class EncodeDecodeTool { std::memcpy(&result, buffer, sizeof(uint64_t)); return result; } + + static void EncodeFixed16(char* dst, uint16_t value) { + uint8_t* const buffer = reinterpret_cast(dst); + std::memcpy(buffer, &value, sizeof(uint16_t)); + } + + static void PutFixed16(std::string* dst, uint16_t value) { + char buf[sizeof(value)]; + EncodeFixed16(buf, value); + dst->append(buf, sizeof(buf)); + } + + static uint16_t DecodeFixed16(const uint8_t* buffer) { + uint16_t result; + std::memcpy(&result, buffer, sizeof(uint16_t)); + return result; + } + + static void EncodeFixed32(char* dst, uint32_t value) { + uint8_t* const buffer = reinterpret_cast(dst); + std::memcpy(buffer, &value, sizeof(uint32_t)); + } + + static void PutFixed32(std::string* dst, uint32_t value) { + char buf[sizeof(value)]; + EncodeFixed32(buf, value); + dst->append(buf, sizeof(buf)); + } + + static uint32_t DecodeFixed32(const uint8_t* buffer) { + uint32_t result; + std::memcpy(&result, buffer, sizeof(uint32_t)); + return result; + } }; template diff --git a/utils/format-code.sh b/utils/format-code.sh index 61c3e028..0c3d1fa4 100644 --- a/utils/format-code.sh +++ b/utils/format-code.sh @@ -66,3 +66,5 @@ clang-format -style=file --sort-includes -i ${SRCPATH}/src/get_command_handler.c clang-format -style=file --sort-includes -i ${SRCPATH}/src/unknow_command_handler.cc clang-format -style=file --sort-includes -i ${SRCPATH}/src/shardgroup_command_handler.cc clang-format -style=file --sort-includes -i ${SRCPATH}/src/eraftmetaserver_test.cc +clang-format -style=file --sort-includes -i ${SRCPATH}/src/key_encode.h +