Skip to content

3: 详细设计

Colin edited this page Mar 30, 2023 · 10 revisions

ERaftKV RPC

注意:设计文档里面代码都是伪代码

消息定义

  • 投票请求
message RequestVoteReq 
{
    bool prevote;

    int64 term;

    int64 candidtate_id;

    int64 last_log_idx;

    int64 last_log_term;
}
  • 投票响应
message RequestVoteResp
{
    bool prevote;

    int64 request_term;

    int64 term;

    bool vote_granted;
}
  • 日志项
enum EntryType {
  Normal = 0;
  JoinNode = 1;
  LeaveNode = 2;
  JoinGroup = 3;
  LeaveGroup = 4;
  NoOp = 5;
}

message Entry {
  int64     term = 1;
  int64     id = 2;
  EntryType e_type = 3;
  int64     data_size = 4;
  bytes     data = 5;
}
  • 追加日志请求
message AppendEntriesReq {
  int64          leader_id = 1;
  int64          message_index = 2;
  int64          term = 3;
  int64          prev_log_index = 4;
  int64          prev_log_term = 5;
  int64          leader_commit = 6;
  int64          entries_size = 7;
  repeated Entry entries = 8;
}
  • 追加日志的响应
message AppendEntriesResp {
  string message_token = 1;
  int64  term = 2;
  bool   success = 3;
  int64  current_index = 4;
}
  • 发送快照的请求
message SnapshotBlock {
  int64 offset = 1;
  bytes data = 2;
  int64 block_size = 3;
  bool  is_last_chunk = 4;
}

message SnapshotReq {
  int64         term = 1;
  int64         leader_id = 2;
  string        message_index = 3;
  int64         snapshot_index = 4;
  int64         snapshot_term = 5;
  SnapshotBlock block = 6;
}
  • 发送快照的响应
message SnapshotResp {
  int64  term = 1;
  string message_index = 2;
  int64  offset = 3;
  bool   success = 4;
  bool   is_last_chunk = 5;
}
  • KeyRange 定义

enum KeyRangeStatus {
  Running = 0;
  Migrating = 1;
  Importing = 2;
  Init = 3;
}

message KeyRange {
  KeyRangeStatus key_range_status = 1;
  int64          shard_id = 2;
  int64          status_modify_time = 3;
  string         start = 4;
  string         end = 5;
}

  • 服务定义

enum ServerStatus {
  Up = 0;
  Down = 1;
}

message Server {
  string       id = 1;
  string       address = 2;
  ServerStatus server_status = 3;
}

  • 集群分片定义

message ShardGroup {
  int64           id = 1;
  KeyRange        key_range = 2;
  repeated Server servers = 3;
}

  • 增删节点到分片的请求和响应
enum ClusterConfigChangeType {
  AddServer = 0;
  RemoveServer = 1;
}

message ClusterConfigChangeReq {
  ClusterConfigChangeType change_type = 1;
  int64                   shard_id = 2;
  Server                  server = 3;
  int64                   config_version = 4;
}

message ClusterConfigChangeResp {
  bool                success = 1;
  repeated ShardGroup shard_group = 2;
  int64               config_version = 3;
}

  • 客户数据读写请求
enum ClientOpType {
  Put = 0;
  Get = 1;
  Del = 2;
  Scan = 3;
}

message KvOpPair {
  ClientOpType op_type = 1;
  string       key = 2;
  string       value = 3;
  uint64       cursor = 4;
}

message ClientOperationReq {
  uint64            op_timestamp = 1;
  repeated KvOpPair kvs = 2;
}

message ClientOperationResp {
  bool              success = 1;
  repeated KvOpPair ops = 2;
}

服务定义


service ERaftKv {
  rpc RequestVote(RequestVoteReq) returns (RequestVoteResp);
  rpc AppendEntries(AppendEntriesReq) returns (RequestVoteResp);
  rpc Snapshot(SnapshotReq) returns (SnapshotResp);

  rpc ProcessRWOperation(ClientOperationReq) returns (ClientOperationResp);
  rpc ClusterConfigChange(ClusterConfigChangeReq)
      returns (ClusterConfigChangeResp);
}

ERaftKV Raft 服务层数据结构以及操作接口

  • raft_server 核心
enum RaftStateEnum { Follower, PreCandidate, Candidate, Leader };


/**
 * @brief
 *
 */
class Network {
 public:
  /**
   * @brief Destroy the Network object
   *
   */
  virtual ~Network() {}

  /**
   * @brief
   *
   * @param raft
   * @param target_node
   * @param req
   * @return EStatus
   */
  virtual EStatus SendRequestVote(RaftServer*              raft,
                                  RaftNode*                target_node,
                                  eraftkv::RequestVoteReq* req) = 0;

  /**
   * @brief
   *
   * @param raft
   * @param target_node
   * @param req
   * @return EStatus
   */
  virtual EStatus SendAppendEntries(RaftServer*                raft,
                                    RaftNode*                  target_node,
                                    eraftkv::AppendEntriesReq* req) = 0;

  /**
   * @brief
   *
   * @param raft
   * @param target_node
   * @param req
   * @return EStatus
   */
  virtual EStatus SendSnapshot(RaftServer*           raft,
                               RaftNode*             target_node,
                               eraftkv::SnapshotReq* req) = 0;
}

/**
 * @brief
 *
 */
class Event {
 public:
  /**
   * @brief Destroy the Event object
   *
   */
  virtual ~Event() {}

  /**
   * @brief
   *
   * @param raft
   * @param state
   */
  virtual void RaftStateChangeEvent(RaftServer* raft, RaftStateEnum* state) = 0;

  /**
   * @brief 
   * 
   * @param raft 
   * @param node 
   * @param ety 
   */
  virtual void RaftGroupMembershipChangeEvent(RaftServer*     raft,
                                              RaftNode*       node,
                                              eraftkv::Entry* ety) = 0;
}


/**
 * @brief
 *
 */
class Storage {

 public:
  /**
   * @brief Destroy the Storage object
   *
   */
  virtual ~Storage() {}


  /**
   * @brief Get the Node Address object
   *
   * @param raft
   * @param id
   * @return std::string
   */
  virtual std::string GetNodeAddress(RaftServer* raft, std::string id) = 0;

  /**
   * @brief
   *
   * @param raft
   * @param id
   * @param address
   * @return EStatus
   */
  virtual EStatus SaveNodeAddress(RaftServer* raft,
                                  std::string id,
                                  std::string address) = 0;

  /**
   * @brief
   *
   * @param raft
   * @param snapshot_index
   * @param snapshot_term
   * @return EStatus
   */
  virtual EStatus ApplyLog(RaftServer* raft,
                           int64_t     snapshot_index,
                           int64_t     snapshot_term) = 0;

  /**
   * @brief Get the Snapshot Block object
   *
   * @param raft
   * @param node
   * @param offset
   * @param block
   * @return EStatus
   */
  virtual EStatus GetSnapshotBlock(RaftServer*             raft,
                                   RaftNode*               node,
                                   int64_t                 offset,
                                   eraftkv::SnapshotBlock* block) = 0;

  /**
   * @brief
   *
   * @param raft
   * @param snapshot_index
   * @param offset
   * @param block
   * @return EStatus
   */
  virtual EStatus StoreSnapshotBlock(RaftServer*             raft,
                                     int64_t                 snapshot_index,
                                     int64_t                 offset,
                                     eraftkv::SnapshotBlock* block) = 0;

  /**
   * @brief
   *
   * @param raft
   * @return EStatus
   */
  virtual EStatus ClearSnapshot(RaftServer* raft) = 0;

  /**
   * @brief
   *
   * @return EStatus
   */
  virtual EStatus CreateDBSnapshot() = 0;

  /**
   * @brief
   *
   * @param raft
   * @param term
   * @param vote
   * @return EStatus
   */
  virtual EStatus SaveRaftMeta(RaftServer* raft,
                               int64_t     term,
                               int64_t     vote) = 0;

  /**
   * @brief
   *
   * @param raft
   * @param term
   * @param vote
   * @return EStatus
   */
  virtual EStatus ReadRaftMeta(RaftServer* raft,
                               int64_t*    term,
                               int64_t*    vote) = 0;
};

/**
 * @brief
 *
 */
class LogStore {
 public:
  /**
   * @brief Destroy the Log Store object
   *
   */
  virtual ~LogStore() {}

  /**
   * @brief
   *
   */
  virtual void Init() = 0;

  /**
   * @brief
   *
   */
  virtual void Free() = 0;

  /**
   * @brief Append add new entries
   *
   * @param ety
   * @return EStatus
   */
  virtual EStatus Append(eraftkv::Entry* ety) = 0;

  /**
   * @brief EraseBefore erase all entries before the given index
   *
   * @param first_index
   * @return EStatus
   */
  virtual EStatus EraseBefore(int64_t first_index) = 0;

  /**
   * @brief EraseAfter erase all entries after the given index
   *
   * @param from_index
   * @return EStatus
   */
  virtual EStatus EraseAfter(int64_t from_index) = 0;

  /**
   * @brief Get get the given index entry
   *
   * @param index
   * @return eraftkv::Entry*
   */
  virtual eraftkv::Entry* Get(int64_t index) = 0;

  /**
   * @brief Gets get the given index range entry
   *
   * @param start_index
   * @param end_index
   * @return std::vector<eraftkv::Entry*>
   */
  virtual std::vector<eraftkv::Entry*> Gets(int64_t start_index,
                                            int64_t end_index) = 0;

  /**
   * @brief FirstIndex get the first index in the entry
   *
   * @return int64_t
   */
  virtual int64_t FirstIndex() = 0;

  /**
   * @brief LastIndex get the last index in the entry
   *
   * @return int64_t
   */
  virtual int64_t LastIndex() = 0;

  /**
   * @brief LogCount get the number of entries
   *
   * @return int64_t
   */
  virtual int64_t LogCount() = 0;
}


/**
 * @brief
 *
 */
class RaftServer {

 public:
  /**
   * @brief Construct a new Raft Server object
   *
   * @param raft_config
   */
  RaftServer(RaftConfig* raft_config) {}

  /**
   * @brief Get the Entries To Be Send object
   *
   * @param node
   * @param index
   * @param count
   * @return std::vector<eraftkv::Entry*>
   */
  std::vector<eraftkv::Entry*> GetEntriesToBeSend(RaftNode* node,
                                                  int64_t   index,
                                                  int64_t   count) {}

  /**
   * @brief
   *
   * @param term
   * @param vote
   * @return EStatus
   */
  EStatus SaveMetaData(int64 term, int64 vote) {}

  /**
   * @brief
   *
   * @return EStatus
   */
  EStatus ReadMetaData() {}

  /**
   * @brief
   *
   * @param id
   * @param is_self
   * @return RaftNode*
   */
  RaftNode* JoinNode(int64 id, bool is_self) {}

  /**
   * @brief
   *
   * @param node
   * @return EStatus
   */
  EStatus RemoveNode(RaftNode* node) {}

  /**
   * @brief raft core cycle
   *
   * @return EStatus
   */
  EStatus RunCycle() {
    // 1. only one node, to become the leader
    // call BecomeLeader

    // 2. is leader, send append entries to other node
    // call net_-> SendAppendEntries

    // 3. if not leader, inter election
    // call ElectionStart
  }

  /**
   * @brief
   *
   * @return EStatus
   */
  EStatus ApplyEntries() {}

  /**
   * @brief
   *
   * @param from_node
   * @param req
   * @param resp
   * @return EStatus
   */
  EStatus HandleRequestVoteReq(RaftNode*                 from_node,
                               eraftkv::RequestVoteReq*  req,
                               eraftkv::RequestVoteResp* resp) {
    // 1.  deal request vote with raft paper description.
  }

  /**
   * @brief
   *
   * @param from_node
   * @param resp
   * @return EStatus
   */
  EStatus HandleRequestVoteResp(RaftNode*                 from_node,
                                eraftkv::RequestVoteResp* resp) {
    // 1.if resp term > this node, call become follower.

    // 2.get majority vote, become candidate or leader.
  }

  /**
   * @brief
   *
   * @param from_node
   * @param req
   * @param resp
   * @return EStatus
   */
  EStatus HandleAppendEntriesReq(RaftNode*                   from_node,
                                 eraftkv::AppendEntriesReq*  req,
                                 eraftkv::AppendEntriesResp* resp) {
    // 1. deal append entries req with raft paper description.
  }

  /**
   * @brief
   *
   * @param from_node
   * @param resp
   * @return EStatus
   */
  EStatus HandleAppendEntriesResp(RaftNode*                   from_node,
                                  eraftkv::AppendEntriesResp* resp) {
    // 1.deal append entries resp with raft paper description.
  }


  /**
   * @brief
   *
   * @param from_node
   * @param req
   * @param resp
   * @return EStatus
   */
  EStatus HandleSnapshotReq(RaftNode*              from_node,
                            eraftkv::SnapshotReq*  req,
                            eraftkv::SnapshotResp* resp) {
    // 1. deal snapshot req with raft paper description.
  }


  /**
   * @brief
   *
   * @param from_node
   * @param resp
   * @return EStatus
   */
  EStatus HandleSnapshotResp(RaftNode* from_node, eraftkv::SnapshotResp* resp) {
    // 1. deal snapshot resp with raft paper description.
  }

  /**
   * @brief
   *
   * @param from_node
   * @param ety
   * @param ety_index
   * @return EStatus
   */
  EStatus HandleApplyConfigChange(RaftNode*       from_node,
                                  eraftkv::Entry* ety,
                                  int64_t         ety_index) {}

  /**
   * @brief
   *
   * @param ety
   * @return EStatus
   */
  EStatus ProposeEntry(eraftkv::Entry* ety) {
    // 1.append entry to log
  }


  /**
   * @brief
   *
   * @return EStatus
   */
  EStatus BecomeLeader() {
    // 1.call net_->SendAppendEntries()
  }

  /**
   * @brief
   *
   * @return EStatus
   */
  EStatus BecomeFollower() {
    // 1.reset election time out
  }

  /**
   * @brief
   *
   * @return EStatus
   */
  EStatus BecomeCandidate() {
    // 1. set status canditate

    // 2. incr current node term + 1
    // call net_->SendRequestVote();
  }

  /**
   * @brief
   *
   * @return EStatus
   */
  EStatus BecomePreCandidate() {
    // 1. set status pre canditate

    // 2. set request vote without current node term + 1
    // call net_->SendRequestVote();
  }

  /**
   * @brief
   *
   * @param is_prevote
   * @return EStatus
   */
  EStatus ElectionStart(bool is_prevote) {
    // 1. set random election timeout

    // 2. if is_prevote = true, BecomePreCandidate else BecomeCandidate
  }

  /**
   * @brief
   *
   * @return EStatus
   */
  EStatus BeginSnapshot() {
    // 1. apply all commited log
    // 2. set up snapshot status
  }

  /**
   * @brief
   *
   * @return EStatus
   */
  EStatus EndSnapshot() {
    // 1. call net_->SendSnapshot()
  }

  /**
   * @brief
   *
   * @return true
   * @return false
   */
  bool SnapshotRunning() {}

  /**
   * @brief Get the Last Applied Entry object
   *
   * @return Entry*
   */
  Entry* GetLastAppliedEntry() {}

  /**
   * @brief Get the First Entry Idx object
   *
   * @return int64
   */
  int64 GetFirstEntryIdx() {}

  /**
   * @brief
   *
   * @return EStatus
   */
  EStatus RestoreSnapshotAfterRestart() {}

  /**
   * @brief
   *
   * @param last_included_term
   * @param last_included_index
   * @return EStatus
   */
  EStatus BeginLoadSnapshot(int64 last_included_term,
                            int64 last_included_index) {}

  /**
   * @brief
   *
   * @return EStatus
   */
  EStatus EndLoadSnapshot() {}

  /**
   * @brief
   *
   * @return EStatus
   */
  EStatus ProposeReadReq() {}

  /**
   * @brief Get the Logs Count Can Snapshot object
   *
   * @return int64
   */
  int64 GetLogsCountCanSnapshot() {}

  /**
   * @brief
   *
   * @return EStatus
   */
  EStatus RestoreLog() {}

 private:
  /**
   * @brief
   *
   */
  std::string id_;

  /**
   * @brief
   *
   */
  int64_t current_term_;
  /**
   * @brief
   *
   */
  int64_t voted_for_;
  /**
   * @brief
   *
   */
  int64_t commit_idx_;
  /**
   * @brief
   *
   */
  int64_t last_applied_idx_;
  /**
   * @brief
   *
   */
  int64_t last_applied_term_;
  /**
   * @brief
   *
   */
  RaftStateEnum state_;
  /**
   * @brief
   *
   */
  int64_t leader_id_;
  /**
   * @brief
   *
   */
  int64_t message_index_;
  /**
   * @brief
   *
   */
  int64_t last_acked_message_index_;
  /**
   * @brief
   *
   */
  int64_t node_count_;
  /**
   * @brief
   *
   */
  std::vector<RaftNode> nodes_;
  /**
   * @brief
   *
   */
  RaftConfig config_;
  /**
   * @brief
   *
   */
  Network* net_;
  /**
   * @brief
   *
   */
  Storage* store_;
  /**
   * @brief
   *
   */
  LogStore* log_store_;
};

ERaftKV KvServer 层数据结构以及操作接口

/**
 * @brief
 *
 */
struct ERaftKvServerOptions {
  std::string svr_version;
  std::string svr_addr;
  std::string kv_db_path;
  std::string log_db_path;

  int64_t tick_interval;
  int64_t request_timeout;
  int64_t election_timeout;

  int64_t response_timeout;

  int64_t ae_max_count;
  int64_t ae_max_size;

  int64_t snap_max_count;
  int64_t snap_max_size;

  int64_t grpc_max_recv_msg_size;
  int64_t grpc_max_send_msg_size;
}

class ERaftKvServer : public grpc::EraftKv::Service {

  /**
   * @brief Construct a new ERaftKvServer object
   *
   * @param config
   */
  ERaftKvServer(ERaftKvServerOptions config) {
    // init raft lib
    RaftConfig raft_config;
    raft_config.net_impl = new GRpcNetworkImpl();
    raft_config.store_impl = new RocksDBStorageImpl();
    raft_config.log_impl = new RocksDBLogStorageImpl();
    raft_context_ = new RaftServer(raft_config);
  };


  /**
   * @brief
   *
   * @param req
   * @param resp
   * @return grpc::Status
   */
  grpc::Status RequestVote(RequestVoteReq* req, RequestVoteResp* resp){
      //
      // call raft_context_->HandleRequestVoteReq()
      //
  };

  /**
   * @brief
   *
   * @param req
   * @param resp
   * @return grpc::Status
   */
  grpc::Status AppendEntries(AppendEntriesReq* req, RequestVoteResp* resp){
      // 1.call raft_context_->HandleAppendEntriesReq()
  };

  /**
   * @brief
   *
   * @param req
   * @param resp
   * @return grpc::Status
   */
  grpc::Status Snapshot(SnapshotReq* req, SnapshotResp* resp){
      //  raftcore
      // 1.call raft_context_->HandleSnapshotReq();
  };

  /**
   * @brief
   *
   * @param req
   * @param resp
   * @return grpc::Status
   */
  grpc::Status ProcessRWOperation(ClientOperationReq*  req,
                                  ClientOperationResp* resp){
      // 1. req into log entry
      // 2. call raft_context_->ProposeEntry()
      // 3. wait commit
  };

  /**
   * @brief
   *
   * @return grpc::Status
   */
  grpc::Status ClusterConfigChange(ClusterConfigChangeReq,
                                   ClusterConfigChangeResp) {
    return EStatus::NotSupport();
  }

  /**
   * @brief
   *
   * @param interval
   * @return EStatus
   */
  EStatus InitTicker(int interval){
      // 1.set up raft_context_->RunCycle() run interval with periodic_caller_
  };

  /**
   * @brief
   *
   * @return EStatus
   */
  EStatus BuildAndRunRpcServer() {
    // set up rpc
    ERaftKvServer service;
    grpc::EnableDefaultHealthCheckService(true);
    grpc::ServerBuilder builder;
    builder.AddListeningPort(this->options_.svr_addr,
                             grpc::InsecureServerCredentials());
    builder.RegisterService(&service);
    std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
    server->Wait();
  };

 private:
  /**
   * @brief
   *
   */
  RaftServer* raft_context_;

  /**
   * @brief
   *
   */
  PeriodicCaller* periodic_caller_;

  /**
   * @brief
   *
   */
  ERaftKvServerOptions options_;
};