Skip to content

Commit

Permalink
add clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
LLiuJJ committed Sep 4, 2024
1 parent 0127d01 commit 92216c5
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 61 deletions.
120 changes: 59 additions & 61 deletions raftcore/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,6 @@ func (rf *Raft) replicateOneRound(peer *RaftPeerNode) {
first_log := rf.logs.GetFirst()

// TODO: send kv leveldb snapshot

snap_shot_req := &pb.InstallSnapshotRequest{
Term: rf.curTerm,
LeaderId: int64(rf.id),
Expand All @@ -543,69 +542,68 @@ func (rf *Raft) replicateOneRound(peer *RaftPeerNode) {
rf.mu.Lock()
logger.ELogger().Sugar().Debugf("send snapshot to %s with resp %s", peer.addr, snapshot_resp.String())

if snapshot_resp != nil {
if rf.role == NodeRoleLeader && rf.curTerm == snap_shot_req.Term {
if snapshot_resp.Term > rf.curTerm {
rf.SwitchRaftNodeRole(NodeRoleFollower)
rf.curTerm = snapshot_resp.Term
rf.votedFor = -1
rf.PersistRaftState()
} else {
logger.ELogger().Sugar().Debugf("set peer %d matchIdx %d", peer.id, snap_shot_req.LastIncludedIndex)
rf.matchIdx[peer.id] = int(snap_shot_req.LastIncludedIndex)
rf.nextIdx[peer.id] = int(snap_shot_req.LastIncludedIndex) + 1
}
}
if snapshot_resp != nil && rf.role == NodeRoleLeader &&
rf.curTerm == snap_shot_req.Term && snapshot_resp.Term > rf.curTerm {
rf.SwitchRaftNodeRole(NodeRoleFollower)
rf.curTerm = snapshot_resp.Term
rf.votedFor = -1
rf.PersistRaftState()
rf.mu.Unlock()
return
}
logger.ELogger().Sugar().Debugf("set peer %d matchIdx %d", peer.id, snap_shot_req.LastIncludedIndex)
rf.matchIdx[peer.id] = int(snap_shot_req.LastIncludedIndex)
rf.nextIdx[peer.id] = int(snap_shot_req.LastIncludedIndex) + 1
rf.mu.Unlock()
} else {
first_index := rf.logs.GetFirst().Index
logger.ELogger().Sugar().Debugf("first log index %d", first_index)
new_ents, _ := rf.logs.EraseBefore(int64(prev_log_index)+1, false)
entries := make([]*pb.Entry, len(new_ents))
copy(entries, new_ents)

append_ent_req := &pb.AppendEntriesRequest{
Term: rf.curTerm,
LeaderId: int64(rf.id),
PrevLogIndex: int64(prev_log_index),
PrevLogTerm: int64(rf.logs.GetEntry(int64(prev_log_index)).Term),
Entries: entries,
LeaderCommit: rf.commitIdx,
}
rf.mu.RUnlock()
return
}

// send empty ae to peers
resp, err := (*peer.raftServiceCli).AppendEntries(context.Background(), append_ent_req)
if err != nil {
logger.ELogger().Sugar().Errorf("send append entries to %s failed %v\n", peer.addr, err.Error())
}
if rf.role == NodeRoleLeader && rf.curTerm == append_ent_req.Term {
if resp != nil {
// deal with appendRnt resp
if resp.Success {
logger.ELogger().Sugar().Debugf("send heart beat to %s success", peer.addr)
rf.matchIdx[peer.id] = int(append_ent_req.PrevLogIndex) + len(append_ent_req.Entries)
rf.nextIdx[peer.id] = rf.matchIdx[peer.id] + 1
rf.advanceCommitIndexForLeader()
} else {
// there is a new leader in group
if resp.Term > rf.curTerm {
rf.SwitchRaftNodeRole(NodeRoleFollower)
rf.curTerm = resp.Term
rf.votedFor = VOTE_FOR_NO_ONE
rf.PersistRaftState()
} else if resp.Term == rf.curTerm {
rf.nextIdx[peer.id] = int(resp.ConflictIndex)
if resp.ConflictTerm != -1 {
for i := append_ent_req.PrevLogIndex; i >= int64(first_index); i-- {
if rf.logs.GetEntry(i).Term == uint64(resp.ConflictTerm) {
rf.nextIdx[peer.id] = int(i + 1)
break
}
}
}
}
first_index := rf.logs.GetFirst().Index
logger.ELogger().Sugar().Debugf("first log index %d", first_index)
new_ents, _ := rf.logs.EraseBefore(int64(prev_log_index)+1, false)
entries := make([]*pb.Entry, len(new_ents))
copy(entries, new_ents)

append_ent_req := &pb.AppendEntriesRequest{
Term: rf.curTerm,
LeaderId: int64(rf.id),
PrevLogIndex: int64(prev_log_index),
PrevLogTerm: int64(rf.logs.GetEntry(int64(prev_log_index)).Term),
Entries: entries,
LeaderCommit: rf.commitIdx,
}
rf.mu.RUnlock()

// send empty ae to peers
resp, err := (*peer.raftServiceCli).AppendEntries(context.Background(), append_ent_req)
if err != nil {
logger.ELogger().Sugar().Errorf("send append entries to %s failed %v\n", peer.addr, err.Error())
}
if rf.role == NodeRoleLeader && rf.curTerm == append_ent_req.Term && resp != nil && resp.Success {
// deal with appendRnt resp
logger.ELogger().Sugar().Debugf("send heart beat to %s success", peer.addr)
rf.matchIdx[peer.id] = int(append_ent_req.PrevLogIndex) + len(append_ent_req.Entries)
rf.nextIdx[peer.id] = rf.matchIdx[peer.id] + 1
rf.advanceCommitIndexForLeader()
return
}

// there is a new leader in group
if resp.Term > rf.curTerm {
rf.SwitchRaftNodeRole(NodeRoleFollower)
rf.curTerm = resp.Term
rf.votedFor = VOTE_FOR_NO_ONE
rf.PersistRaftState()
return
}

if resp.Term == rf.curTerm {
rf.nextIdx[peer.id] = int(resp.ConflictIndex)
if resp.ConflictTerm != -1 {
for i := append_ent_req.PrevLogIndex; i >= int64(first_index); i-- {
if rf.logs.GetEntry(i).Term == uint64(resp.ConflictTerm) {
rf.nextIdx[peer.id] = int(i + 1)
break
}
}
}
Expand Down
61 changes: 61 additions & 0 deletions tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,67 @@ func TestBasicClusterRW(t *testing.T) {
common.RemoveDir("./data")
}

func TestClusterSingleShardRwBench(t *testing.T) {
// start metaserver cluster
go RunMetaServer(map[int]string{0: "127.0.0.1:8088", 1: "127.0.0.1:8089", 2: "127.0.0.1:8090"}, 0)
go RunMetaServer(map[int]string{0: "127.0.0.1:8088", 1: "127.0.0.1:8089", 2: "127.0.0.1:8090"}, 1)
go RunMetaServer(map[int]string{0: "127.0.0.1:8088", 1: "127.0.0.1:8089", 2: "127.0.0.1:8090"}, 2)
time.Sleep(time.Second * 5)
// start shardserver cluster
go RunShardKvServer(map[int]string{0: "127.0.0.1:6088", 1: "127.0.0.1:6089", 2: "127.0.0.1:6090"}, 0, 1, "127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090")
go RunShardKvServer(map[int]string{0: "127.0.0.1:6088", 1: "127.0.0.1:6089", 2: "127.0.0.1:6090"}, 1, 1, "127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090")
go RunShardKvServer(map[int]string{0: "127.0.0.1:6088", 1: "127.0.0.1:6089", 2: "127.0.0.1:6090"}, 2, 1, "127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090")
time.Sleep(time.Second * 5)
// init meta server
AddServerGroup("127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090", 1, "127.0.0.1:6088,127.0.0.1:6089,127.0.0.1:6090")
MoveSlotToServerGroup("127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090", 0, 9, 1)
time.Sleep(time.Second * 20)

// R-W test
shardkvcli := shardkvserver.MakeKvClient("127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090")

N := 1000
KEY_SIZE := 64
VAL_SIZE := 64
bench_kvs := map[string]string{}
for i := 0; i < N; i++ {
k := strconv.Itoa(i) + "-" + common.RandStringRunes(KEY_SIZE)
v := common.RandStringRunes(VAL_SIZE)
bench_kvs[k] = v
}
timecost := []int64{}

for key, val := range bench_kvs {
start := time.Now()
shardkvcli.Put(key, val)
elapsed := time.Since(start)
timecost = append(timecost, elapsed.Milliseconds())
}

sum := 0.0
avg := 0.0
max := 0.0
min := 9999999999999999.0

for _, cost := range timecost {
sum += float64(cost)
if cost > int64(max) {
max = float64(cost)
}
if cost < int64(min) {
min = float64(cost)
}
}
avg = sum / float64(len(timecost))
logger.ELogger().Sugar().Debugf("total request: %d", N)
logger.ELogger().Sugar().Debugf("total time cost: %f", sum)
logger.ELogger().Sugar().Debugf("avg time cost: %f", avg)
logger.ELogger().Sugar().Debugf("max time cost: %f", max)
logger.ELogger().Sugar().Debugf("min time cost: %f", min)
time.Sleep(time.Second * 2)
common.RemoveDir("./data")
}

func TestClusterRwBench(t *testing.T) {
// start metaserver cluster
go RunMetaServer(map[int]string{0: "127.0.0.1:8088", 1: "127.0.0.1:8089", 2: "127.0.0.1:8090"}, 0)
Expand Down

0 comments on commit 92216c5

Please sign in to comment.