Skip to content

Commit

Permalink
Merge pull request #177 from eraft-io/feature_20230701_multishard
Browse files Browse the repository at this point in the history
Feature 20230701 multishard
  • Loading branch information
LLiuJJ authored Jul 11, 2023
2 parents 4dd3818 + 9d76198 commit 70d7343
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 58 deletions.
71 changes: 43 additions & 28 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ PacketLength Client::_HandlePacket(const char *start, std::size_t bytes) {
return static_cast<PacketLength>(bytes);
}

Client::Client(std::string meta_addrs) : leader_addr_("") {
Client::Client(std::string meta_addrs)
: leader_addr_(""), meta_addrs_(meta_addrs) {
// init stub to meta server node
auto meta_node_addrs = StringUtil::Split(meta_addrs, ',');
for (auto meta_node_addr : meta_node_addrs) {
TraceLog("DEBUG: init rpc link to ", meta_node_addr);
auto chan_ =
grpc::CreateChannel(meta_node_addr, grpc::InsecureChannelCredentials());
std::unique_ptr<ERaftKv::Stub> stub_(ERaftKv::NewStub(chan_));
this->stubs_[meta_node_addr] = std::move(stub_);
this->meta_stubs_[meta_node_addr] = std::move(stub_);
}
// sync config
SyncClusterConfig();
Expand All @@ -73,42 +74,56 @@ std::string Client::GetLeaderAddr(std::string partion_key) {
if (key_slot == sl.id()) {
// find sg leader addr
for (auto server : sg.servers()) {
if (server.id() == sg.leader_id()) {
ClientContext context;
eraftkv::ClusterConfigChangeReq req;
req.set_change_type(eraftkv::ChangeType::MetaMembersQuery);
eraftkv::ClusterConfigChangeResp resp;
auto status = stubs_[server.address()]->ClusterConfigChange(
&context, req, &resp);
for (int i = 0; i < resp.shard_group(0).servers_size(); i++) {
if (resp.shard_group(0).leader_id() ==
resp.shard_group(0).servers(i).id()) {
leader_address = resp.shard_group(0).servers(i).address();
}
ClientContext context;
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::milliseconds(50);
context.set_deadline(deadline);
eraftkv::ClusterConfigChangeReq req;
req.set_change_type(eraftkv::ChangeType::MetaMembersQuery);
eraftkv::ClusterConfigChangeResp resp;
auto status = stubs_[server.address()]->ClusterConfigChange(
&context, req, &resp);
if (!status.ok()) {
continue;
}
for (int i = 0; i < resp.shard_group(0).servers_size(); i++) {
if (resp.shard_group(0).leader_id() ==
resp.shard_group(0).servers(i).id()) {
leader_address = resp.shard_group(0).servers(i).address();
}
}
}
}
}
}

return leader_address;
}

EStatus Client::SyncClusterConfig() {
ClientContext context;
eraftkv::ClusterConfigChangeReq req;
req.set_change_type(eraftkv::ChangeType::ShardsQuery);
auto status_ = this->stubs_.begin()->second->ClusterConfigChange(
&context, req, &cluster_conf_);
for (auto sg : cluster_conf_.shard_group()) {
for (auto server : sg.servers()) {
auto chan_ = grpc::CreateChannel(server.address(),
grpc::InsecureChannelCredentials());
std::unique_ptr<ERaftKv::Stub> stub_(ERaftKv::NewStub(chan_));
this->stubs_[server.address()] = std::move(stub_);
for (auto it = this->meta_stubs_.begin(); it != this->meta_stubs_.end();
it++) {
ClientContext context;
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::milliseconds(50);
context.set_deadline(deadline);
eraftkv::ClusterConfigChangeReq req;
req.set_change_type(eraftkv::ChangeType::ShardsQuery);
auto status_ =
it->second->ClusterConfigChange(&context, req, &cluster_conf_);
if (!status_.ok()) {
continue;
}
for (auto sg : cluster_conf_.shard_group()) {
for (auto server : sg.servers()) {
auto chan_ = grpc::CreateChannel(server.address(),
grpc::InsecureChannelCredentials());
std::unique_ptr<ERaftKv::Stub> stub_(ERaftKv::NewStub(chan_));
this->stubs_[server.address()] = std::move(stub_);
}
}
if (status_.ok()) {
return EStatus::kOk;
}
}

return status_.ok() ? EStatus::kOk : EStatus::kError;
return EStatus::kOk;
}
6 changes: 5 additions & 1 deletion src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ class Client : public StreamSocket {

std::map<std::string, std::unique_ptr<ERaftKv::Stub> > stubs_;

std::map<std::string, std::unique_ptr<ERaftKv::Stub> > meta_stubs_;

std::string leader_addr_;

eraftkv::ClusterConfigChangeResp cluster_conf_;

std::string meta_addrs_;

public:
Client(std::string kv_addrs);
Client(std::string meta_addrs);

std::string GetLeaderAddr(std::string partion_key);

Expand Down
71 changes: 42 additions & 29 deletions src/info_command_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,53 @@
*/

#include "command_handler.h"
#include "util.h"

EStatus InfoCommandHandler::Execute(const std::vector<std::string>& params,
Client* cli) {
ClientContext context;
eraftkv::ClusterConfigChangeReq req;
req.set_change_type(eraftkv::ChangeType::MetaMembersQuery);
eraftkv::ClusterConfigChangeResp resp;
for (auto it = cli->meta_stubs_.begin(); it != cli->meta_stubs_.end(); it++) {
ClientContext context;
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::seconds(1);
context.set_deadline(deadline);
eraftkv::ClusterConfigChangeReq req;
req.set_change_type(eraftkv::ChangeType::MetaMembersQuery);
eraftkv::ClusterConfigChangeResp resp;
auto status = it->second->ClusterConfigChange(&context, req, &resp);
if (!status.ok()) {
continue;
}
std::string info_str;
for (int i = 0; i < resp.shard_group(0).servers_size(); i++) {
info_str += "meta server: \r\n";
info_str += "server_id: ";
info_str += std::to_string(resp.shard_group(0).servers(i).id());
info_str += ",server_address: ";
info_str += resp.shard_group(0).servers(i).address();
resp.shard_group(0).servers(i).server_status() ==
eraftkv::ServerStatus::Up
? info_str += ",status: Running"
: info_str += ",status: Down";
resp.shard_group(0).leader_id() == resp.shard_group(0).servers(i).id()
? info_str += ",Role: Leader"
: info_str += ",Role: Follower";
info_str += "\r\n";
}
std::string reply_buf;
reply_buf += "$";
reply_buf += std::to_string(info_str.size());
reply_buf += "\r\n";
reply_buf += info_str;
reply_buf += "\r\n";

auto status =
cli->stubs_.begin()->second->ClusterConfigChange(&context, req, &resp);
std::string info_str;
for (int i = 0; i < resp.shard_group(0).servers_size(); i++) {
info_str += "server_id: ";
info_str += std::to_string(resp.shard_group(0).servers(i).id());
info_str += ",server_address: ";
info_str += resp.shard_group(0).servers(i).address();
resp.shard_group(0).servers(i).server_status() == eraftkv::ServerStatus::Up
? info_str += ",status: Running"
: info_str += ",status: Down";
resp.shard_group(0).leader_id() == resp.shard_group(0).servers(i).id()
? info_str += ",Role: Leader"
: info_str += ",Role: Follower";
info_str += "\r\n";
}
std::string reply_buf;
reply_buf += "$";
reply_buf += std::to_string(info_str.size());
reply_buf += "\r\n";
reply_buf += info_str;
reply_buf += "\r\n";
cli->reply_.PushData(reply_buf.c_str(), reply_buf.size());
cli->SendPacket(cli->reply_);
cli->reply_.PushData(reply_buf.c_str(), reply_buf.size());
cli->SendPacket(cli->reply_);

cli->_Reset();
cli->_Reset();
if (status.ok()) {
break;
}
}
return EStatus::kOk;
}

Expand Down

0 comments on commit 70d7343

Please sign in to comment.