Skip to content

Commit

Permalink
optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
LLiuJJ committed Jan 30, 2024
1 parent f47f2a8 commit bc1bbce
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 288 deletions.
6 changes: 3 additions & 3 deletions metaserver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
)

type MetaSvrCli struct {
endpoints []*raftcore.RaftClientEnd
endpoints []*raftcore.RaftPeerNode
leaderId int64
clientId int64
commandId int64
Expand All @@ -60,14 +60,14 @@ func MakeMetaSvrClient(targetId uint64, targetAddrs []string) *MetaSvrCli {
}

for _, addr := range targetAddrs {
cli := raftcore.MakeRaftClientEnd(addr, targetId)
cli := raftcore.MakeRaftPeerNode(addr, targetId)
mate_svr_cli.endpoints = append(mate_svr_cli.endpoints, cli)
}

return mate_svr_cli
}

func (meta_svr_cli *MetaSvrCli) GetRpcClis() []*raftcore.RaftClientEnd {
func (meta_svr_cli *MetaSvrCli) GetRpcClis() []*raftcore.RaftPeerNode {
return meta_svr_cli.endpoints
}

Expand Down
4 changes: 2 additions & 2 deletions metaserver/metaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ type MetaServer struct {
}

func MakeMetaServer(peerMaps map[int]string, nodeId int) *MetaServer {
client_ends := []*raftcore.RaftClientEnd{}
client_ends := []*raftcore.RaftPeerNode{}
for id, addr := range peerMaps {
new_end := raftcore.MakeRaftClientEnd(addr, uint64(id))
new_end := raftcore.MakeRaftPeerNode(addr, uint64(id))
client_ends = append(client_ends, new_end)
}
newApplyCh := make(chan *pb.ApplyMsg)
Expand Down
48 changes: 21 additions & 27 deletions raftcore/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NodeToString(role NodeRole) string {
// raft stack definition
type Raft struct {
mu sync.RWMutex
peers []*RaftClientEnd // rpc client end
peers []*RaftPeerNode // rpc client end
id int64
dead int32
applyCh chan *pb.ApplyMsg
Expand All @@ -71,7 +71,6 @@ type Raft struct {
votedFor int64
grantedVotes int
logs *RaftLog
persister *RaftLog
commitIdx int64
lastApplied int64
nextIdx []int
Expand All @@ -85,7 +84,7 @@ type Raft struct {
baseElecTimeout uint64
}

func MakeRaft(peers []*RaftClientEnd, me int64, newdbEng storage_eng.KvStore, applyCh chan *pb.ApplyMsg, heartbeatTimeOutMs uint64, baseElectionTimeOutMs uint64) *Raft {
func MakeRaft(peers []*RaftPeerNode, me int64, newdbEng storage_eng.KvStore, applyCh chan *pb.ApplyMsg, heartbeatTimeOutMs uint64, baseElectionTimeOutMs uint64) *Raft {
rf := &Raft{
peers: peers,
id: me,
Expand All @@ -98,15 +97,14 @@ func MakeRaft(peers []*RaftClientEnd, me int64, newdbEng storage_eng.KvStore, ap
grantedVotes: 0,
isSnapshoting: false,
logs: MakePersistRaftLog(newdbEng),
persister: MakePersistRaftLog(newdbEng),
nextIdx: make([]int, len(peers)),
matchIdx: make([]int, len(peers)),
heartbeatTimer: time.NewTimer(time.Millisecond * time.Duration(heartbeatTimeOutMs)),
electionTimer: time.NewTimer(time.Millisecond * time.Duration(MakeAnRandomElectionTimeout(int(baseElectionTimeOutMs)))),
baseElecTimeout: baseElectionTimeOutMs,
heartBeatTimeout: heartbeatTimeOutMs,
}
rf.curTerm, rf.votedFor = rf.persister.ReadRaftState()
rf.curTerm, rf.votedFor = rf.logs.ReadRaftState()
rf.applyCond = sync.NewCond(&rf.mu)
last_log := rf.logs.GetLast()
for _, peer := range peers {
Expand All @@ -126,7 +124,7 @@ func MakeRaft(peers []*RaftClientEnd, me int64, newdbEng storage_eng.KvStore, ap
}

func (rf *Raft) PersistRaftState() {
rf.persister.PersistRaftState(rf.curTerm, rf.votedFor)
rf.logs.PersistRaftState(rf.curTerm, rf.votedFor)
}

func (rf *Raft) Kill() {
Expand All @@ -137,10 +135,6 @@ func (rf *Raft) IsKilled() bool {
return atomic.LoadInt32(&rf.dead) == 1
}

func (rf *Raft) GetFirstLogEnt() *pb.Entry {
return rf.logs.GetFirst()
}

func (rf *Raft) SwitchRaftNodeRole(role NodeRole) {
if rf.role == role {
return
Expand Down Expand Up @@ -257,9 +251,9 @@ func (rf *Raft) HandleAppendEntries(req *pb.AppendEntriesRequest, resp *pb.Appen
resp.ConflictIndex = last_index + 1
} else {
first_index := rf.logs.GetFirst().Index
resp.ConflictTerm = int64(rf.logs.GetEntry(req.PrevLogIndex - int64(first_index)).Term)
resp.ConflictTerm = int64(rf.logs.GetEntry(req.PrevLogIndex).Term)
index := req.PrevLogIndex - 1
for index >= int64(first_index) && rf.logs.GetEntry(index-first_index).Term == uint64(resp.ConflictTerm) {
for index >= int64(first_index) && rf.logs.GetEntry(index).Term == uint64(resp.ConflictTerm) {
index--
}
resp.ConflictIndex = index
Expand All @@ -269,7 +263,7 @@ func (rf *Raft) HandleAppendEntries(req *pb.AppendEntriesRequest, resp *pb.Appen

first_index := rf.logs.GetFirst().Index
for index, entry := range req.Entries {
if int(entry.Index-first_index) >= rf.logs.LogItemCount() || rf.logs.GetEntry(entry.Index-first_index).Term != entry.Term {
if int(entry.Index-first_index) >= rf.logs.LogItemCount() || rf.logs.GetEntry(entry.Index).Term != entry.Term {
rf.logs.EraseAfter(entry.Index-first_index, true)
for _, newEnt := range req.Entries[index:] {
rf.logs.Append(newEnt)
Expand All @@ -294,11 +288,11 @@ func (rf *Raft) CondInstallSnapshot(lastIncluedTerm int, lastIncludedIndex int,
if lastIncludedIndex > int(rf.logs.GetLast().Index) {
rf.logs.ReInitLogs()
} else {
rf.logs.EraseBefore(int64(lastIncludedIndex)-rf.logs.GetFirst().Index, true)
rf.logs.EraseBefore(int64(lastIncludedIndex), true)
rf.logs.SetEntFirstData([]byte{})
}
// update dummy entry with lastIncludedTerm and lastIncludedIndex
rf.logs.SetEntFirstTermAndIndex(int64(lastIncluedTerm), int64(lastIncludedIndex))
rf.logs.ResetFirstEntryTermAndIndex(int64(lastIncluedTerm), int64(lastIncludedIndex))

rf.lastApplied = int64(lastIncludedIndex)
rf.commitIdx = int64(lastIncludedIndex)
Expand All @@ -317,7 +311,7 @@ func (rf *Raft) Snapshot(index int, snapshot []byte) {
logger.ELogger().Sugar().Warnf("reject snapshot, current snapshotIndex is larger in cur term")
return
}
rf.logs.EraseBefore(int64(index)-int64(snapshot_index), true)
rf.logs.EraseBefore(int64(index), true)
rf.logs.SetEntFirstData([]byte{})
logger.ELogger().Sugar().Debugf("del log entry before idx %d", index)
rf.isSnapshoting = false
Expand Down Expand Up @@ -382,7 +376,7 @@ func (rf *Raft) advanceCommitIndexForLeader() {
new_commit_index := rf.matchIdx[n-(n/2+1)]
if new_commit_index > int(rf.commitIdx) {
if rf.MatchLog(rf.curTerm, int64(new_commit_index)) {
logger.ELogger().Sugar().Debugf("peer %d advance commit index %d at term %d", rf.id, rf.commitIdx, rf.curTerm)
logger.ELogger().Sugar().Debugf("leader advance commit lid %d index %d at term %d", rf.id, rf.commitIdx, rf.curTerm)
rf.commitIdx = int64(new_commit_index)
rf.applyCond.Signal()
}
Expand All @@ -400,7 +394,7 @@ func (rf *Raft) advanceCommitIndexForFollower(leaderCommit int) {

// MatchLog is log matched
func (rf *Raft) MatchLog(term, index int64) bool {
return index <= int64(rf.logs.GetLast().Index) && rf.logs.GetEntry(index-int64(rf.logs.GetFirst().Index)).Term == uint64(term)
return index <= int64(rf.logs.GetLast().Index) && rf.logs.GetEntry(index).Term == uint64(term)
}

// Election make a new election
Expand All @@ -420,7 +414,7 @@ func (rf *Raft) Election() {
if int64(peer.id) == rf.id {
continue
}
go func(peer *RaftClientEnd) {
go func(peer *RaftPeerNode) {
logger.ELogger().Sugar().Debugf("send request vote to %s %s", peer.addr, vote_req.String())

request_vote_resp, err := (*peer.raftServiceCli).RequestVote(context.Background(), vote_req)
Expand Down Expand Up @@ -473,7 +467,7 @@ func (rf *Raft) BroadcastHeartbeat() {
continue
}
logger.ELogger().Sugar().Debugf("send heart beat to %s", peer.addr)
go func(peer *RaftClientEnd) {
go func(peer *RaftPeerNode) {
rf.replicateOneRound(peer)
}(peer)
}
Expand Down Expand Up @@ -545,7 +539,7 @@ func (rf *Raft) CloseEndsConn() {
}

// Replicator manager duplicate run
func (rf *Raft) Replicator(peer *RaftClientEnd) {
func (rf *Raft) Replicator(peer *RaftPeerNode) {
rf.replicatorCond[peer.id].L.Lock()
defer rf.replicatorCond[peer.id].L.Unlock()
for !rf.IsKilled() {
Expand All @@ -558,7 +552,7 @@ func (rf *Raft) Replicator(peer *RaftClientEnd) {
}

// replicateOneRound duplicate log entries to other nodes in the cluster
func (rf *Raft) replicateOneRound(peer *RaftClientEnd) {
func (rf *Raft) replicateOneRound(peer *RaftPeerNode) {
rf.mu.RLock()
if rf.role != NodeRoleLeader {
rf.mu.RUnlock()
Expand Down Expand Up @@ -606,15 +600,15 @@ func (rf *Raft) replicateOneRound(peer *RaftClientEnd) {
} else {
first_index := rf.logs.GetFirst().Index
logger.ELogger().Sugar().Debugf("first log index %d", first_index)
_, new_ents := rf.logs.EraseBefore(int64(prev_log_index)+1-first_index, false)
new_ents, _ := rf.logs.EraseBefore(int64(prev_log_index)+1, false)
entries := make([]*pb.Entry, len(new_ents))
copy(entries, new_ents)

append_ent_req := &pb.AppendEntriesRequest{
Term: rf.curTerm,
LeaderId: int64(rf.id),
PrevLogIndex: int64(prev_log_index),
PrevLogTerm: int64(rf.logs.GetEntry(int64(prev_log_index) - first_index).Term),
PrevLogTerm: int64(rf.logs.GetEntry(int64(prev_log_index)).Term),
Entries: entries,
LeaderCommit: rf.commitIdx,
}
Expand Down Expand Up @@ -644,7 +638,7 @@ func (rf *Raft) replicateOneRound(peer *RaftClientEnd) {
rf.nextIdx[peer.id] = int(resp.ConflictIndex)
if resp.ConflictTerm != -1 {
for i := append_ent_req.PrevLogIndex; i >= int64(first_index); i-- {
if rf.logs.GetEntry(i-int64(first_index)).Term == uint64(resp.ConflictTerm) {
if rf.logs.GetEntry(i).Term == uint64(resp.ConflictTerm) {
rf.nextIdx[peer.id] = int(i + 1)
break
}
Expand All @@ -667,9 +661,9 @@ func (rf *Raft) Applier() {
rf.applyCond.Wait()
}

first_index, commit_index, last_applied := rf.logs.GetFirst().Index, rf.commitIdx, rf.lastApplied
commit_index, last_applied := rf.commitIdx, rf.lastApplied
entries := make([]*pb.Entry, commit_index-last_applied)
copy(entries, rf.logs.GetRange(last_applied+1-int64(first_index), commit_index+1-int64(first_index)))
copy(entries, rf.logs.GetRange(last_applied+1, commit_index))
logger.ELogger().Sugar().Debugf("%d, applies entries %d-%d in term %d", rf.id, rf.lastApplied, commit_index, rf.curTerm)

rf.mu.Unlock()
Expand Down
101 changes: 0 additions & 101 deletions raftcore/raft_mem_log.go

This file was deleted.

Loading

0 comments on commit bc1bbce

Please sign in to comment.