diff --git a/raftcore/raft.go b/raftcore/raft.go index fb7369bd..46712c68 100644 --- a/raftcore/raft.go +++ b/raftcore/raft.go @@ -523,7 +523,6 @@ func (rf *Raft) replicateOneRound(peer *RaftPeerNode) { first_log := rf.logs.GetFirst() // TODO: send kv leveldb snapshot - snap_shot_req := &pb.InstallSnapshotRequest{ Term: rf.curTerm, LeaderId: int64(rf.id), @@ -543,69 +542,68 @@ func (rf *Raft) replicateOneRound(peer *RaftPeerNode) { rf.mu.Lock() logger.ELogger().Sugar().Debugf("send snapshot to %s with resp %s", peer.addr, snapshot_resp.String()) - if snapshot_resp != nil { - if rf.role == NodeRoleLeader && rf.curTerm == snap_shot_req.Term { - if snapshot_resp.Term > rf.curTerm { - rf.SwitchRaftNodeRole(NodeRoleFollower) - rf.curTerm = snapshot_resp.Term - rf.votedFor = -1 - rf.PersistRaftState() - } else { - logger.ELogger().Sugar().Debugf("set peer %d matchIdx %d", peer.id, snap_shot_req.LastIncludedIndex) - rf.matchIdx[peer.id] = int(snap_shot_req.LastIncludedIndex) - rf.nextIdx[peer.id] = int(snap_shot_req.LastIncludedIndex) + 1 - } - } + if snapshot_resp != nil && rf.role == NodeRoleLeader && + rf.curTerm == snap_shot_req.Term && snapshot_resp.Term > rf.curTerm { + rf.SwitchRaftNodeRole(NodeRoleFollower) + rf.curTerm = snapshot_resp.Term + rf.votedFor = -1 + rf.PersistRaftState() + rf.mu.Unlock() + return } + logger.ELogger().Sugar().Debugf("set peer %d matchIdx %d", peer.id, snap_shot_req.LastIncludedIndex) + rf.matchIdx[peer.id] = int(snap_shot_req.LastIncludedIndex) + rf.nextIdx[peer.id] = int(snap_shot_req.LastIncludedIndex) + 1 rf.mu.Unlock() - } else { - first_index := rf.logs.GetFirst().Index - logger.ELogger().Sugar().Debugf("first log index %d", first_index) - new_ents, _ := rf.logs.EraseBefore(int64(prev_log_index)+1, false) - entries := make([]*pb.Entry, len(new_ents)) - copy(entries, new_ents) - - append_ent_req := &pb.AppendEntriesRequest{ - Term: rf.curTerm, - LeaderId: int64(rf.id), - PrevLogIndex: int64(prev_log_index), - PrevLogTerm: int64(rf.logs.GetEntry(int64(prev_log_index)).Term), - Entries: entries, - LeaderCommit: rf.commitIdx, - } - rf.mu.RUnlock() + return + } - // send empty ae to peers - resp, err := (*peer.raftServiceCli).AppendEntries(context.Background(), append_ent_req) - if err != nil { - logger.ELogger().Sugar().Errorf("send append entries to %s failed %v\n", peer.addr, err.Error()) - } - if rf.role == NodeRoleLeader && rf.curTerm == append_ent_req.Term { - if resp != nil { - // deal with appendRnt resp - if resp.Success { - logger.ELogger().Sugar().Debugf("send heart beat to %s success", peer.addr) - rf.matchIdx[peer.id] = int(append_ent_req.PrevLogIndex) + len(append_ent_req.Entries) - rf.nextIdx[peer.id] = rf.matchIdx[peer.id] + 1 - rf.advanceCommitIndexForLeader() - } else { - // there is a new leader in group - if resp.Term > rf.curTerm { - rf.SwitchRaftNodeRole(NodeRoleFollower) - rf.curTerm = resp.Term - rf.votedFor = VOTE_FOR_NO_ONE - rf.PersistRaftState() - } else if resp.Term == rf.curTerm { - rf.nextIdx[peer.id] = int(resp.ConflictIndex) - if resp.ConflictTerm != -1 { - for i := append_ent_req.PrevLogIndex; i >= int64(first_index); i-- { - if rf.logs.GetEntry(i).Term == uint64(resp.ConflictTerm) { - rf.nextIdx[peer.id] = int(i + 1) - break - } - } - } - } + first_index := rf.logs.GetFirst().Index + logger.ELogger().Sugar().Debugf("first log index %d", first_index) + new_ents, _ := rf.logs.EraseBefore(int64(prev_log_index)+1, false) + entries := make([]*pb.Entry, len(new_ents)) + copy(entries, new_ents) + + append_ent_req := &pb.AppendEntriesRequest{ + Term: rf.curTerm, + LeaderId: int64(rf.id), + PrevLogIndex: int64(prev_log_index), + PrevLogTerm: int64(rf.logs.GetEntry(int64(prev_log_index)).Term), + Entries: entries, + LeaderCommit: rf.commitIdx, + } + rf.mu.RUnlock() + + // send empty ae to peers + resp, err := (*peer.raftServiceCli).AppendEntries(context.Background(), append_ent_req) + if err != nil { + logger.ELogger().Sugar().Errorf("send append entries to %s failed %v\n", peer.addr, err.Error()) + } + if rf.role == NodeRoleLeader && rf.curTerm == append_ent_req.Term && resp != nil && resp.Success { + // deal with appendRnt resp + logger.ELogger().Sugar().Debugf("send heart beat to %s success", peer.addr) + rf.matchIdx[peer.id] = int(append_ent_req.PrevLogIndex) + len(append_ent_req.Entries) + rf.nextIdx[peer.id] = rf.matchIdx[peer.id] + 1 + rf.advanceCommitIndexForLeader() + return + } + + // there is a new leader in group + if resp.Term > rf.curTerm { + rf.SwitchRaftNodeRole(NodeRoleFollower) + rf.curTerm = resp.Term + rf.votedFor = VOTE_FOR_NO_ONE + rf.PersistRaftState() + return + } + + if resp.Term == rf.curTerm { + rf.nextIdx[peer.id] = int(resp.ConflictIndex) + if resp.ConflictTerm != -1 { + for i := append_ent_req.PrevLogIndex; i >= int64(first_index); i-- { + if rf.logs.GetEntry(i).Term == uint64(resp.ConflictTerm) { + rf.nextIdx[peer.id] = int(i + 1) + break } } } diff --git a/tests/integration_test.go b/tests/integration_test.go index 8126f213..2faa332e 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -141,6 +141,67 @@ func TestBasicClusterRW(t *testing.T) { common.RemoveDir("./data") } +func TestClusterSingleShardRwBench(t *testing.T) { + // start metaserver cluster + go RunMetaServer(map[int]string{0: "127.0.0.1:8088", 1: "127.0.0.1:8089", 2: "127.0.0.1:8090"}, 0) + go RunMetaServer(map[int]string{0: "127.0.0.1:8088", 1: "127.0.0.1:8089", 2: "127.0.0.1:8090"}, 1) + go RunMetaServer(map[int]string{0: "127.0.0.1:8088", 1: "127.0.0.1:8089", 2: "127.0.0.1:8090"}, 2) + time.Sleep(time.Second * 5) + // start shardserver cluster + go RunShardKvServer(map[int]string{0: "127.0.0.1:6088", 1: "127.0.0.1:6089", 2: "127.0.0.1:6090"}, 0, 1, "127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090") + go RunShardKvServer(map[int]string{0: "127.0.0.1:6088", 1: "127.0.0.1:6089", 2: "127.0.0.1:6090"}, 1, 1, "127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090") + go RunShardKvServer(map[int]string{0: "127.0.0.1:6088", 1: "127.0.0.1:6089", 2: "127.0.0.1:6090"}, 2, 1, "127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090") + time.Sleep(time.Second * 5) + // init meta server + AddServerGroup("127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090", 1, "127.0.0.1:6088,127.0.0.1:6089,127.0.0.1:6090") + MoveSlotToServerGroup("127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090", 0, 9, 1) + time.Sleep(time.Second * 20) + + // R-W test + shardkvcli := shardkvserver.MakeKvClient("127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090") + + N := 1000 + KEY_SIZE := 64 + VAL_SIZE := 64 + bench_kvs := map[string]string{} + for i := 0; i < N; i++ { + k := strconv.Itoa(i) + "-" + common.RandStringRunes(KEY_SIZE) + v := common.RandStringRunes(VAL_SIZE) + bench_kvs[k] = v + } + timecost := []int64{} + + for key, val := range bench_kvs { + start := time.Now() + shardkvcli.Put(key, val) + elapsed := time.Since(start) + timecost = append(timecost, elapsed.Milliseconds()) + } + + sum := 0.0 + avg := 0.0 + max := 0.0 + min := 9999999999999999.0 + + for _, cost := range timecost { + sum += float64(cost) + if cost > int64(max) { + max = float64(cost) + } + if cost < int64(min) { + min = float64(cost) + } + } + avg = sum / float64(len(timecost)) + logger.ELogger().Sugar().Debugf("total request: %d", N) + logger.ELogger().Sugar().Debugf("total time cost: %f", sum) + logger.ELogger().Sugar().Debugf("avg time cost: %f", avg) + logger.ELogger().Sugar().Debugf("max time cost: %f", max) + logger.ELogger().Sugar().Debugf("min time cost: %f", min) + time.Sleep(time.Second * 2) + common.RemoveDir("./data") +} + func TestClusterRwBench(t *testing.T) { // start metaserver cluster go RunMetaServer(map[int]string{0: "127.0.0.1:8088", 1: "127.0.0.1:8089", 2: "127.0.0.1:8090"}, 0)