Skip to content

Lab‐2C

M1ng edited this page Jan 18, 2024 · 6 revisions

Lab-2C考察持久化的实现,按框架代码的逻辑最终实现的是持久化到内存,核心代码已经有了,只需要定义两个函数然后在合适位置调用即可。

持久化和读取代码

// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
func (rf *Raft) persist() {
	// Your code here (2C).
	// 不用加锁,外层逻辑会锁
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	e.Encode(rf.currentTerm)
	e.Encode(rf.votedFor)
	e.Encode(rf.log)
	data := w.Bytes()
	DPrintf("RaftNode[%d] persist starts, currentTerm[%d] voteFor[%d] log[%v]", rf.me, rf.currentTerm, rf.votedFor, rf.log)
	rf.persister.SaveRaftState(data)
}

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
	if data == nil || len(data) < 1 { // bootstrap without any state?
		return
	}
	// Your code here (2C).
	r := bytes.NewBuffer(data)
	d := labgob.NewDecoder(r)
	rf.mu.Lock()
	defer rf.mu.Unlock()
	d.Decode(&rf.currentTerm)
	d.Decode(&rf.votedFor)
	d.Decode(&rf.log)
}

说明:

  • 按论文要求,当currentTerm、voteFor、log[]发生更新后,调用persister()将它们持久化下来
  • 核心是通过persister结构体来实现的,框架中已给出:persister.go,本质是通过persister结构体以及相关方法存储在内存中

至于为什么这里只持久化这三个信息呢?先回顾一下raft结构体:

// Raft
// A Go object implementing a single Raft peer.
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.

	// Persistent state on all servers
	currentTerm int         // 见过的最大任期 (initialized to 0 on first boot, increases monotonically)
	votedFor    int         // 记录在currentTerm任期投票给谁了 (or null if none)
	log         []*LogEntry // 日志记录集 (first index is 1)

	// Volatile state on all servers
	commitIndex int // 已知的最大已提交索引
	lastApplied int // 当前应用到状态机的索引 (initialized to 0, increases monotonically)

	// Volatile state on leaders(成为leader时重置)
	nextIndex  []int //	for each server, index of the next log entry to send to that server (initialized to leader last log index + 1)
	matchIndex []int // for each server, index of highest log entry known to be replicated on server (initialized to 0, increases monotonically)

	// others on all servers
	role           string    // 角色
	leaderId       int       // leader的id
	lastActiveTime time.Time // 最近一次活跃时间(刷新时机:收到leader AppendEntries心跳、给其他candidates投票、请求其他节点投票)
	heartBeatTime  time.Time // 最近一次心跳时间,作为leader,每隔固定毫秒向手下发送心跳包的时间
}

再看一下make函数内部的一些代码:

func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me

	// Your initialization code here (2A, 2B, 2C).
	rf.role = ROLE_FOLLOWER
	rf.votedFor = -1
	rf.leaderId = -1
	rf.lastActiveTime = time.Now()
	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())
        ...

这里可以看出来make函数初始化了大部分数据,nextIndex、matchIndex其实是不必要的因为这个是leader结点负责存储,再说如果是leader挂了那么再恢复后已经有新leader来指挥了,所以没必要。重要的就是currentTerm、voteFor、log[]这三个,节点恢复初始化时候会把这三个信息填进去,其实就是借尸还魂。

关于persister(额外说明)

type config struct {
	...
	rafts     []*Raft
	applyErr  []string // from apply channel readers
	connected []bool   // whether each server is on the net
	saved     []*Persister
	endnames  [][]string            // the port file names each sends to
	...
}

可见Persister结构体的应用主要在config.go文件中,在func (cfg *config) crash1(i int)func (cfg *config) start1(i int)有具体的使用情况。

总的来说,框架代码中的config结构体里维护着raft各个节点以及对应的persister,在宕机处理和恢复逻辑代码中都会调用persister包含的相关方法实现数据恢复。

回退优化(重点)

在执行 go test -run TestFigure8Unreliable2C 这个案例时候过不了,分析是日志同步协调太慢,导致单测超时失败,这个Case通过制造网络分区产生了2个leader,然后不断的向2个leader都写入日志,导致产生了2个很长的歧义链。当网络分区恢复后,2个歧义链之间日志同步过程,会因为不断的prevLogTerm冲突导致nextIndex不断回退,因为每次只回退1位,进而耗时非常长。需要对log复制逻辑进行优化。

优化参考:https://thesquareplanet.com/blog/students-guide-to-raft/#an-aside-on-optimizations

原文翻译

Raft 论文包含了一些可选的有趣功能。在 6.824 中,我们要求学生实现其中的两项:**日志压缩(第7节)**和加速日志回退(第8页左上角)。前者是为了避免日志无限增长,后者对于快速使陈旧的follower赶上很有用。

加速日志回退优化非常不明确,可能是因为作者认为大多数部署不需要它。从文本中很难确定从客户端发回的冲突索引和任期应该如何被领导者用来确定 nextIndex。我们认为作者可能希望你遵循的协议是:

  • 如果一个追随者在其日志中没有 prevLogIndex,它应该返回 conflictIndex = len(log)conflictTerm = None

  • 如果一个追随者在其日志中有 prevLogIndex,但任期不匹配,它应该返回 conflictTerm = log[prevLogIndex].Term,然后在其日志中搜索第一个条目的任期等与conflictTerm 的索引。

  • 在收到冲突响应时,领导者应该先在其日志中搜索 conflictTerm。如果它在日志中找到一个条目的任期等于该任期,它应该将 nextIndex 设置为该任期在其日志中的最后一个条目之后的一个。

  • 如果它没有找到任期等于该任期的条目,它应该设置 nextIndex = conflictIndex

总结

  1. 如果follower日志比leader短,那么leader可以直接从follower末尾的index开始尝试传日志

  2. 如果在leader中也有这个term的日志,则从leader日志中该term最后一次出现的位置开始尝试同步,避免给follower错过这个term的任何一条日志

  3. 如果冲突term在leader里压根不在,则从follower日志该term首次出现的下标开始同步,因为leader压根没有这个term的日志,相当于对follower截断

AppendEntries

// AppendEntries
// 用于各节点处理接受到的AppendEntries RPC请求并返回AppendEntriesReply
// Invoked by leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	// Your code here (2A, 2B).
	rf.mu.Lock()
	defer rf.mu.Unlock()

	DPrintf("RaftNode[%d] Handle AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] role=[%s] logIndex[%d] prevLogIndex[%d] prevLogTerm[%d] commitIndex[%d] Entries[%v]",
		rf.me, rf.leaderId, args.Term, rf.currentTerm, rf.role, len(rf.log), args.PrevLogIndex, args.PrevLogTerm, rf.commitIndex, args.Entries)

	reply.Term = rf.currentTerm
	reply.Success = false
	reply.ConflictIndex = -1
	reply.ConflictTerm = -1

	defer func() {
		DPrintf("RaftNode[%d] Return AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] role=[%s] logIndex[%d] prevLogIndex[%d] prevLogTerm[%d] Success[%v] commitIndex[%d] log[%v]",
			rf.me, rf.leaderId, args.Term, rf.currentTerm, rf.role, len(rf.log), args.PrevLogIndex, args.PrevLogTerm, reply.Success, rf.commitIndex, len(rf.log))
	}()

	if args.Term < rf.currentTerm {
		return
	}

	// 发现更大的任期,则转为该任期的follower
	if args.Term > rf.currentTerm {
		rf.currentTerm = args.Term
		rf.role = ROLE_FOLLOWER
		rf.votedFor = -1
		rf.persist()
		// 继续向下走
	}

	// 更新当前新的leader
	rf.leaderId = args.LeaderId
	// 刷新最近一次活跃时间
	rf.lastActiveTime = time.Now()

	// for lab-2b
	// 如果本地没有前一个日志(PrevLogIndex)的话,那么false
	if len(rf.log) < args.PrevLogIndex {
		// 直接移动返回len(rf.log),省的一个个移动
		reply.ConflictIndex = len(rf.log)
		return
	}
	// 如果本地有前一个日志的话,那么term必须相同,否则false
	// 回退优化改进,返回当前发生冲突的Term:ConflictTerm 和 follower中该term的首次出现位置:ConflictIndex
        // appendEntriesLoop收到reply后会根据优化逻辑调整args.PrevLogIndex、rf.nextIndex的值
	if args.PrevLogIndex > 0 && rf.log[args.PrevLogIndex-1].Term != args.PrevLogTerm {
		reply.ConflictTerm = rf.log[args.PrevLogIndex-1].Term
		for index := 1; index <= args.PrevLogIndex; index++ { // 标记冲突term在follower节点的首次出现位置,最差就是PrevLogIndex
			if rf.log[index-1].Term == reply.ConflictTerm {
				reply.ConflictIndex = index
				break
			}
		}
		return
	}

	for i, logEntry := range args.Entries {
		index := args.PrevLogIndex + i + 1
		if index > len(rf.log) {
			rf.log = append(rf.log, logEntry)
		} else {
			if rf.log[index-1].Term != logEntry.Term {
				rf.log = rf.log[:index-1]         // 删除当前以及后续所有log
				rf.log = append(rf.log, logEntry) // 把新log加入进来
			} // term一样啥也不用做,继续向后比对Log
		}
	}

	rf.persist()

	// 更新提交下标
	if args.LeaderCommit > rf.commitIndex {
		rf.commitIndex = args.LeaderCommit
		if len(rf.log) < rf.commitIndex {
			rf.commitIndex = len(rf.log)
		}
	}
	reply.Success = true
}

appendEntriesLoop

func (rf *Raft) appendEntriesLoop() {
	for !rf.killed() {
		time.Sleep(1 * time.Millisecond)

		func() {
			rf.mu.Lock()
			defer rf.mu.Unlock()

			// 只有leader才向外广播心跳
			if rf.role != ROLE_LEADER {
				return
			}

			// nowTime - heartBeatTime >= 100ms 则广播1次
			now := time.Now()
			if now.Sub(rf.heartBeatTime) < 100*time.Millisecond {
				return
			}
			rf.heartBeatTime = time.Now()

			// 并发RPC心跳(跳过自身)
			for peerId := 0; peerId < len(rf.peers); peerId++ {
				if peerId == rf.me {
					continue
				}

				//初始化RPC请求参数并填充
				args := AppendEntriesArgs{}
				args.Term = rf.currentTerm
				args.LeaderId = rf.me
				args.PrevLogIndex = rf.nextIndex[peerId] - 1
				if args.PrevLogIndex > 0 {
					args.PrevLogTerm = rf.log[args.PrevLogIndex-1].Term
				}
				args.Entries = make([]LogEntry, 0)
				args.Entries = append(args.Entries, rf.log[rf.nextIndex[peerId]-1:]...)
				args.LeaderCommit = rf.commitIndex
				DPrintf("RaftNode[%d] appendEntries starts,  currentTerm[%d] peer[%d] logIndex=[%d] nextIndex[%d] matchIndex[%d] args.Entries[%d] commitIndex[%d]",
					rf.me, rf.currentTerm, peerId, len(rf.log), rf.nextIndex[peerId], rf.matchIndex[peerId], len(args.Entries), rf.commitIndex)

				go func(id int, args *AppendEntriesArgs) {
					reply := AppendEntriesReply{}
					if ok := rf.sendAppendEntries(id, args, &reply); ok {
						rf.mu.Lock()
						defer rf.mu.Unlock()
						if reply.Term > rf.currentTerm { // 变成follower
							rf.role = ROLE_FOLLOWER
							rf.leaderId = -1
							rf.currentTerm = reply.Term
							rf.votedFor = -1
							rf.persist()
							return
						}
						if reply.Success {
							// leader中该peerId对应的nextIndex和matchIndex增加
							rf.nextIndex[id] = args.PrevLogIndex + len(args.Entries) + 1
							rf.matchIndex[id] = rf.nextIndex[id] - 1

							// 计算所有服务器的matchIndex
							// 排序后取中位数
							// 不包括自己在内
							sortedMatchIndex := make([]int, 0)
							sortedMatchIndex = append(sortedMatchIndex, len(rf.log))
							for i := 0; i < len(rf.peers); i++ {
								if i == rf.me {
									continue
								}
								sortedMatchIndex = append(sortedMatchIndex, rf.matchIndex[i])
							}
							sort.Ints(sortedMatchIndex)

							// 更新commitIndex
							// 如果中位数大于当前commitIndex且term匹配才更新,保证一致性
							// 不同term仅完成复制,不更新commitIndex
							newCommitIndex := sortedMatchIndex[len(rf.peers)/2]
							if newCommitIndex > rf.commitIndex &&
								rf.log[newCommitIndex-1].Term == rf.currentTerm {
								rf.commitIndex = newCommitIndex
							}
							// 如果复制失败,则重置nextIndex,注意不能为0
						} else {
							// 回退优化,参考:https://thesquareplanet.com/blog/students-guide-to-raft/#an-aside-on-optimizations
							if reply.ConflictTerm != -1 { // follower的prevLogIndex位置term不同
								conflictTermIndex := -1
								for index := args.PrevLogIndex; index >= 1; index-- { // 找最后一个conflictTerm
									if rf.log[index-1].Term == reply.ConflictTerm {
										conflictTermIndex = index
										break
									}
								}
								if conflictTermIndex != -1 { // leader也存在冲突term的日志,则从term最后一次出现之后的日志开始尝试同步,因为leader/follower可能在该term的日志有部分相同
									rf.nextIndex[id] = conflictTermIndex
								} else { // leader并没有term的日志,那么把follower日志中该term首次出现的位置作为尝试同步的位置,即截断follower在此term的所有日志
									rf.nextIndex[id] = reply.ConflictIndex
								}
							} else {
								// follower没有发现prevLogIndex term冲突, 日志长度不够
								// 这时候我们将返回的conflictIndex设置为nextIndex即可
								rf.nextIndex[id] = reply.ConflictIndex
								if rf.nextIndex[id] < 1 {
									rf.nextIndex[id] = 1
								}
							}
							DPrintf("RaftNode[%d] appendEntries ends, peerTerm[%d] myCurrentTerm[%d] myRole[%s]", rf.me, reply.Term, rf.currentTerm, rf.role)
						}
					}
				}(peerId, &args)
			}
		}()
	}
}
Clone this wiki locally