Skip to content

Commit

Permalink
Merge pull request #180 from eraft-io/feature_20230701_multishard
Browse files Browse the repository at this point in the history
fix encode bug
  • Loading branch information
LLiuJJ authored Jul 27, 2023
2 parents 43d5382 + ffadb01 commit d58d514
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 31 deletions.
4 changes: 2 additions & 2 deletions protocol/eraftkv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ enum ClientOpType {

message KvOpPair {
ClientOpType op_type = 1;
bytes key = 2;
bytes value = 3;
string key = 2;
string value = 3;
bool success = 4;
int64 op_count = 5;
}
Expand Down
1 change: 1 addition & 0 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ std::string Client::GetShardLeaderAddrAndSlot(std::string partion_key,
int64_t key_slot = -1;
key_slot = HashUtil::CRC64(0, partion_key.c_str(), partion_key.size()) % 1024;
TraceLog("DEBUG: partion key " + partion_key + " with slot ", key_slot);
*slot = key_slot;
for (auto sg : cluster_conf_.shard_group()) {
for (auto sl : sg.slots()) {
if (key_slot == sl.id()) {
Expand Down
6 changes: 2 additions & 4 deletions src/get_command_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ EStatus GetCommandHandler::Execute(const std::vector<std::string>& params,
&op_context, op_req, &op_resp);
if (status_.ok()) {
reply_buf += "$";
// decode string val
uint8_t flag;
char flag;
uint32_t expire;
std::string enc_val = op_resp.ops(0).value();
DecodeStringVal(enc_val, &flag, &expire, user_val);
DecodeStringVal(op_resp.mutable_ops(0)->mutable_value(), &flag, &expire, user_val);
reply_buf += std::to_string(user_val->size());
reply_buf += "\r\n";
reply_buf += *user_val;
Expand Down
73 changes: 50 additions & 23 deletions src/key_encode.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,29 @@
#include <vector>

#include "util.h"

enum Flag {
TYPE_STRING = 0x01,
TYPE_HASH = 0x02,
TYPE_LIST = 0x03,
TYPE_SET = 0x04,
TYPE_ZSET = 0x05,
};
#include "proto_parser.h"

/**
* @brief
*
* | slot (2B) | user_key (n B) |
* | slot | user_key |
* @return std::string
*/
static std::string EncodeStringKey(uint16_t key_slot, std::string user_key) {
std::string dst;
EncodeDecodeTool::PutFixed16(&dst, key_slot);
dst.push_back('*');
dst.push_back('2');
dst.append("\r\n");
dst.push_back('$');
dst.append(std::to_string(std::to_string(key_slot).size()));
dst.append("\r\n");
dst.append(std::to_string(key_slot));
dst.append("\r\n");
dst.push_back('$');
dst.append(std::to_string(user_key.size()));
dst.append("\r\n");
dst.append(user_key);
dst.append("\r\n");
return dst;
}

Expand All @@ -45,22 +49,41 @@ static std::string EncodeStringKey(uint16_t key_slot, std::string user_key) {
static void DecodeStringKey(std::string enc_key,
uint16_t* key_slot,
std::string* user_key) {
std::vector<uint8_t> enc_key_seq = {enc_key.begin(), enc_key.end()};
*key_slot = EncodeDecodeTool::DecodeFixed16(&enc_key_seq[0]);
*user_key = std::string(enc_key_seq.begin() + 2, enc_key_seq.end());
ProtoParser parser_;
const char *const end = enc_key.c_str() + enc_key.size();
const char *ptr = enc_key.c_str();
parser_.ParseRequest(ptr, end);
auto params = parser_.GetParams();
*key_slot = static_cast<uint16_t>(std::stoi(params[0]));
*user_key = params[1];
}

/**
* @brief
*
* | flags (1B) | expire (4B) | user_value (nB) |
* | flag | expire | user val |
* *3\r\n$[flags len]\r\n[flags]$[expire len]\r\n[expire]$[user val len]\r\n[user val]
* @return std::string
*/
static std::string EncodeStringVal(uint32_t expire, std::string user_val) {
std::string dst;
dst.append(Flag::TYPE_STRING, 1);
EncodeDecodeTool::PutFixed32(&dst, expire);
dst.push_back('*');
dst.push_back('3');
dst.append("\r\n");
dst.push_back('$');
dst.push_back('1');
dst.append("\r\n");
dst.push_back('s');
dst.append("\r\n");
dst.push_back('$');
dst.append(std::to_string(std::to_string(expire).size()));
dst.append("\r\n");
dst.append(std::to_string(expire));
dst.append("\r\n");
dst.push_back('$');
dst.append(std::to_string(user_val.size()));
dst.append("\r\n");
dst.append(user_val);
dst.append("\r\n");
return dst;
}

Expand All @@ -72,12 +95,16 @@ static std::string EncodeStringVal(uint32_t expire, std::string user_val) {
* @param expire
* @param user_val
*/
static void DecodeStringVal(std::string enc_val,
uint8_t* flag,
static void DecodeStringVal(std::string* enc_val,
char* flag,
uint32_t* expire,
std::string* user_val) {
std::vector<uint8_t> enc_val_seq = {enc_val.begin(), enc_val.end()};
*flag = enc_val_seq[0];
*expire = EncodeDecodeTool::DecodeFixed32(&enc_val_seq[1]);
*user_val = std::string(enc_val_seq.begin() + 5, enc_val_seq.end());
ProtoParser parser_;
const char *const end = enc_val->c_str() + enc_val->size();
const char *ptr = enc_val->c_str();
parser_.ParseRequest(ptr, end);
auto params = parser_.GetParams();
*flag = params[0].at(0);
*expire = static_cast<uint32_t>(std::stoi(params[1]));
*user_val = params[2];
}
6 changes: 4 additions & 2 deletions src/set_command_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ EStatus SetCommandHandler::Execute(const std::vector<std::string>& params,
eraftkv::ClientOperationReq op_req;
eraftkv::ClientOperationResp op_resp;
auto kv_pair_ = op_req.add_kvs();
kv_pair_->set_key(EncodeStringKey(slot, params[1]));
kv_pair_->set_value(EncodeStringVal(0, params[2]));
std::string encode_key = EncodeStringKey(slot, params[1]);
std::string encode_val = EncodeStringVal(0, params[2]);
kv_pair_->set_key(encode_key);
kv_pair_->set_value(encode_val);
kv_pair_->set_op_type(eraftkv::ClientOpType::Put);
std::string reply_buf;
if (cli->kv_stubs_[leader_addr] != nullptr) {
Expand Down

0 comments on commit d58d514

Please sign in to comment.