Skip to content

Lab‐2B

M1ng edited this page Jan 18, 2024 · 8 revisions

论文笔记

个人认为Lab-2b的日志复制部分细节很多,故在此详细记录

日志复制

Start函数详解

调用者

服务端使用Raft的组件(比如kv服务器)会调用这个函数(实验中,由config.go和test_test.go调用进行测试)。也就是客户端的请求会转发到Leader节点,由Leader节点的Start函数处理。

处理逻辑

  1. Start函数会检查自己是否是Leader。如果不是Leader,直接返回false。
  2. 如果是Leader,会为这个命令生成一个新的日志条目,然后启动日志复制流程,将日志条目复制给其他Follower。

返回值

Start函数会返回3个值:

  1. 命令的索引值index,如果命令被提交,表示在日志中的位置
  2. 当前的任期号term
  3. 一个布尔值,表示是否认为自己是Leader

在Raft中的作用

  1. Start函数实现了Leader接收客户端请求的入口逻辑。
  2. 它将客户端的命令转换为日志条目,作为一个新的日志复制过程的开始。
  3. Start函数启动了一次新的日志复制,将日志条目复制给其他服务器。
  4. 它实现了Raft算法中Leader的核心功能之一。
/![日志复制](https://i0.imgs.ovh/2024/01/16/h7lEd.png)/ Start
// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
	index := -1
	term := -1
	isLeader := true

	// Your code here (2B).
	rf.mu.Lock()
	defer rf.mu.Unlock()

	//只能往leader上附加日志,其他角色不接受该操作
	if rf.role != ROLE_LEADER {
		return -1, -1, false
	}

	log := LogEntry{
		Command: command,
		Term:    rf.currentTerm,
	}
	rf.log = append(rf.log, log)
	index = len(rf.log)
	term = rf.currentTerm
	rf.persist()

	DPrintf("Leader-Node[%d] add new Command[%d], logIndex[%d] currentTerm[%d]", rf.me, log.Command, index, term)
	return index, term, isLeader
}

electionLoop函数修改

此函数内部新增如下内容:nextIndex设置为leader当前最大日志长度+1,matchIndex置为0

	// for lab-2b. 新选举出来的leader要刷新nextIndex和matchIndex
		rf.nextIndex = make([]int, len(rf.peers))
		for i := 0; i < len(rf.peers); i++ {
			rf.nextIndex[i] = len(rf.log) + 1
		}
		rf.matchIndex = make([]int, len(rf.peers))
		for i := 0; i < len(rf.peers); i++ {
			rf.matchIndex[i] = 0
		}

appendEntriesLoop函数修改(核心)

for peerId := 0; peerId < len(rf.peers); peerId++ {
				if peerId == rf.me {
					continue
				}

				//初始化RPC请求参数并填充
				args := AppendEntriesArgs{}
				args.Term = rf.currentTerm
				args.LeaderId = rf.me
				args.PrevLogIndex = rf.nextIndex[peerId] - 1
				if args.PrevLogIndex > 0 {
					args.PrevLogTerm = rf.log[args.PrevLogIndex-1].Term
				}
				args.Entries = make([]LogEntry, 0)
				args.Entries = append(args.Entries, rf.log[rf.nextIndex[peerId]-1:]...)
				args.LeaderCommit = rf.commitIndex
				DPrintf("RaftNode[%d] appendEntries starts,  currentTerm[%d] peer[%d] logIndex=[%d] nextIndex[%d] matchIndex[%d] args.Entries[%d] commitIndex[%d]",
					rf.me, rf.currentTerm, peerId, len(rf.log), rf.nextIndex[peerId], rf.matchIndex[peerId], len(args.Entries), rf.commitIndex)

				go func(id int, args *AppendEntriesArgs) {
					reply := AppendEntriesReply{}
					if ok := rf.sendAppendEntries(id, args, &reply); ok {
						rf.mu.Lock()
						defer rf.mu.Unlock()
						if reply.Term > rf.currentTerm { // 变成follower
							rf.role = ROLE_FOLLOWER
							rf.leaderId = -1
							rf.currentTerm = reply.Term
							rf.votedFor = -1
							rf.persist()
                                                        return // 身份变更,及时中断退出
						}
						if reply.Success {
							// leader中该peerId对应的nextIndex和matchIndex增加
							rf.nextIndex[id] = args.PrevLogIndex + len(args.Entries) + 1
							rf.matchIndex[id] = rf.nextIndex[id] - 1

							// 计算所有服务器的matchIndex
							// 排序后取中位数
							// 不包括自己在内
							sortedMatchIndex := make([]int, 0)
							sortedMatchIndex = append(sortedMatchIndex, len(rf.log))
							for i := 0; i < len(rf.peers); i++ {
								if i == rf.me {
									continue
								}
								sortedMatchIndex = append(sortedMatchIndex, rf.matchIndex[i])
							}
							sort.Ints(sortedMatchIndex)

							// 更新commitIndex
							// 如果中位数大于当前commitIndex且term匹配才更新,保证一致性
							// 不同term仅完成复制,不更新commitIndex
							newCommitIndex := sortedMatchIndex[len(rf.peers)/2]
							if newCommitIndex > rf.commitIndex &&
								rf.log[newCommitIndex-1].Term == rf.currentTerm {
								rf.commitIndex = newCommitIndex
							}
							// 如果复制失败,则重置nextIndex,注意不能为0
						} else {
							rf.nextIndex[id] -= 1
							if rf.nextIndex[id] < 1 {
								rf.nextIndex[id] = 1
							}
						}
						DPrintf("RaftNode[%d] appendEntries ends, peerTerm[%d] myCurrentTerm[%d] myRole[%s]", rf.me, reply.Term, rf.currentTerm, rf.role)
					}
				}(peerId, &args)
			}

重点关注if reply.Success {...}内部

  1. leader中该peerId对应的nextIndex和matchIndex增加
  2. sort.Ints(sortedMatchIndex)利用排序取中位数法得到大多数节点的复制进度
  3. 严格遵守论文fig.8! term匹配才更新, 不同term仅完成复制,不更新commitIndex,具体原因见图片笔记
  4. 复制失败,则重置nextIndex -= 1,注意不能为0

AppendEntries函数修改(核心)

// applyLogLoop 定期检查是否有新的日志需要提交给应用层
// for lab-2b
	// 如果本地没有前一个日志(PrevLogIndex)的话,那么false
	if len(rf.log) < args.PrevLogIndex {
		return
	}
	// 如果本地有前一个日志的话,那么term必须相同,否则false
	if args.PrevLogIndex > 0 && rf.log[args.PrevLogIndex-1].Term != args.PrevLogTerm {
		return
	}

	for i, logEntry := range args.Entries {
		index := args.PrevLogIndex + i + 1
		if index > len(rf.log) {
			rf.log = append(rf.log, logEntry)
		} else {
			if rf.log[index-1].Term != logEntry.Term {
				rf.log = rf.log[:index-1]         // 删除当前以及后续所有log
				rf.log = append(rf.log, logEntry) // 把新log加入进来
			} // term一样啥也不用做,继续向后比对Log
		}
	}

	rf.persist()

	// 更新提交下标
	if args.LeaderCommit > rf.commitIndex {
		rf.commitIndex = args.LeaderCommit
		if len(rf.log) < rf.commitIndex {
			rf.commitIndex = len(rf.log)
		}
	}
	reply.Success = true

这里新增代码逻辑主要用于处理日志复制以及相关属性更新。for循环中的强行覆盖复制符合论文描述,具体细节不再阐述。

applyLogLoop

Lab-2b新增函数,在Make函数中通过go rf.applyLogLoop(applyCh)启动。定时把节点中已经提交的日志apply到应用层。代码中的server.go和config.go会需要使用applyCh

// applyLogLoop 定期检查是否有新的日志需要提交给应用层
// 它会一直运行直到这个 Raft 节点被 kill
func (rf *Raft) applyLogLoop(applyCh chan ApplyMsg) {
	for !rf.killed() {
		time.Sleep(10 * time.Millisecond)

		var appliedMsgs = make([]ApplyMsg, 0)

		func() {
			rf.mu.Lock()
			defer rf.mu.Unlock()

			for rf.commitIndex > rf.lastApplied {
				rf.lastApplied += 1
				appliedMsgs = append(appliedMsgs, ApplyMsg{
					CommandValid: true,
					Command:      rf.log[rf.lastApplied-1].Command,
					CommandIndex: rf.lastApplied,
					CommandTerm:  rf.log[rf.lastApplied-1].Term,
				})
				DPrintf("RaftNode[%d] applyLog, currentTerm[%d] lastApplied[%d] commitIndex[%d]", rf.me, rf.currentTerm, rf.lastApplied, rf.commitIndex)
			}
		}()
		// 释放锁,向应用层提交日志
		for _, msg := range appliedMsgs {
			applyCh <- msg
		}
	}
}