Skip to content

Lab‐3B

M1ng edited this page Jan 27, 2024 · 4 revisions

介绍

kv层传递通知raft落地snapshot并截断日志,本质上是实现了日志压缩快照两个功能,涉及的全是都是已apply的日志,能保证一致性的安全。

  • 快照功能本质是对kv层的数据库(Map)、raft层的raftState(currentTerm、votedFor、log、lastIncludedIndex和lastIncludedTerm)持久化。目的是为了kv层数据的快速恢复,当然也为了raft层状态参数和日志的快速恢复。这里也要认识到,只有快照了,才能放心的对日志进行丢弃,因为快照相当于对先前存储状态的总结(感觉叫压缩也行)。
  • 日志压缩其实就是主动丢弃最近一次应用索引(lastIncludedIndex)之前的所有日志,减少rf.LogEntry日志的大小,提高系统的吞吐量。

总的来说这两个概念不用分的太清楚,说快照是一种压缩也行,也是一种持久化,日志删减基于快照的实现。

主动快照

主动快照涉及的流程。主动快照是由kv层调用raft层的TakeSnapshot函数实现的,通过snapshotLoop循环定期发起,各个节点都会自主发起。

..

被动快照

被动快照涉及的流程。被动快照是由raft层的Leader节点在appendEntriesLoop函数中发起的,通过心跳包送至各个follower节点,对于follower来说是被动的。

..

代码方面

snapshootLoop函数中的死锁问题:

func (kv *KVServer) snapshotLoop() {
	for !kv.killed() {
		var snapshot []byte
		var lastAppliedIndex int
		// 锁内dump snapshot
		func() {
			// 如果raft log超过了maxraftstate大小,那么对kvStore快照下来
			if kv.maxraftstate != -1 && kv.rf.CheckLogSize(kv.maxraftstate) { // 这里调用ExceedLogSize不要加kv锁,否则会死锁
				// 锁内快照,离开锁通知raft处理
				kv.mu.Lock()
				w := new(bytes.Buffer)
				e := labgob.NewEncoder(w)
				e.Encode(kv.kvStore) // kv数据库
				e.Encode(kv.seqMap)  // 当前各客户端最大请求编号,也要随着snapshot走
				snapshot = w.Bytes()
				lastAppliedIndex = kv.lastAppliedIndex
				kv.mu.Unlock()
			}
		}()
		// 锁外通知raft层截断,否则有死锁
		if snapshot != nil {
			// 通知raft落地snapshot并截断日志(都是已提交的日志,不会因为主从切换截断,放心操作)
			kv.rf.TakeSnapshot(snapshot, lastAppliedIndex)
		}
		time.Sleep(10 * time.Millisecond)
	}
}

这里说一下可能会到=导致死锁的一种操作:raft层持有rf.mu向applyCh写入可能阻塞,此时如果kv层出现一种代码逻辑是先拿到了kv.mu然后再去拿rf.mu的话,此时肯定无法拿到rf.mu(因为raft层持有rf.mu并阻塞在chan),而此刻kv层如果正在处理前一条log并试图加kv.mu,那么也无法拿到kv.mu,就会死锁。

解决办法就是kv层不要拿着kv.mu去请求rf.mu,一定要在kv.mu的锁外操作raft,谨记这一点即可。

关于rf.CheckLogSize函数

rf.CheckLogSize函数判断的是调用时刻raft状态机的持久化大小是否超过了限定,但是实际执行持久化的时候,状态机的日志部分只是针对最新的kv.lastAppliedIndex情况进行截断压缩。

换句话说,判断时候的raft日志大小是可能包含apply以外的日志(提交的、未提交的),实际截断时候只从传入的kv.lastAppliedIndex截断。

AppendEntries中的相关调整

由于快照和日志压缩的原因,AppendEntries的逻辑也要做相应的调整,多了以下2个情况的判别:

if args.PrevLogIndex < rf.lastIncludedIndex {    //1. 如果prevLogIndex在快照内,只能从index=1开始同步了
		reply.ConflictIndex = 1
		return
	} else if args.PrevLogIndex == rf.lastIncludedIndex { //2. prevLogIndex正好等于快照的最后一个log
		if args.PrevLogTerm != rf.lastIncludedTerm { // 冲突了,那么从index=1开始同步吧
			reply.ConflictIndex = 1
			return
		}
	} else { // prevLogIndex在快照之后,那么进一步判定
     ...

appendEntriesLoop函数的改造

func (rf *Raft) appendEntriesLoop() {
	for !rf.killed() {
		time.Sleep(10 * time.Millisecond)

		func() {
			rf.mu.Lock()
			defer rf.mu.Unlock()

			// 只有leader才向外广播心跳
			if rf.role != ROLE_LEADER {
				return
			}

			// 100ms广播1次
			now := time.Now()
			if now.Sub(rf.heartBeatTime) < 100*time.Millisecond {
				return
			}
			rf.heartBeatTime = time.Now()

			// 向所有follower发送心跳
			for peerId := 0; peerId < len(rf.peers); peerId++ {
				if peerId == rf.me {
					continue
				}

				// 如果nextIndex在leader的snapshot内,那么直接同步snapshot
				if rf.nextIndex[peerId] <= rf.lastIncludedIndex {
					rf.doInstallSnapshot(peerId)
				} else { // 否则同步日志
					rf.doAppendEntries(peerId)
				}
			}
		}()
	}
}

主要是辨别什么时候doInstallSnapshot,什么时候doAppendEntries(比较rf.nextIndex[peerId]和rf.lastIncludedIndex)

如果nextIndex[peerId]在leader的snapshot范围内,那么直接走同步snapshot的分支,否则走常规的日志同步分支。

Q&A

为什么要实现两种快照方式?

  • 系统容错性要求每个节点都能独立判断是否需要快照,不完全依赖leader的指令,这样可以提高容错能力。
  • 再有,网络抖动或者临时断开时,follower可能会丢失leader发的快照信息,从而自己的日志会超出快照限制而需要自行触发。
  • 系统容错性要求每个节点都能独立判断是否需要快照,不完全依赖leader的指令,这样可以提高容错能力。

所以即使leader已经主动推送快照,follower出于各种不稳定因素以及高可用角度考虑,还是需要自己独立地检查并执行快照逻辑, 而不是仅仅被动等待leader的快照指令。

什么时候向应用层安装快照?(kvStore/seqMap)

rf.installSnapshotToApplication()这个函数负责raft层往应用层安装快照(本质是恢复kvStore和seqMap)

raft.go中在Make和InstallSnapshot这两个函数中调用了installSnapshotToApplication函数:

  • Make函数中调用是为了节点宕机重启考虑,好处是省去了日志重放带来的时间开销,直接快速的恢复宕机前的kvStore和seqMap
  • InstallSnapshot函数中调用是为了让follower节点快速跟上leader节点的压缩情况以及更新最新的kvStore和seqMap

什么函数、什么时候执行持久化?

SaveStateAndSnapshot这个函数是(内存)持久化的最终体现,是框架自带的持久化函数

// Save both Raft state and K/V snapshot as a single atomic action,
// to help avoid them getting out of sync.
func (ps *Persister) SaveStateAndSnapshot(state []byte, snapshot []byte) {
	ps.mu.Lock()
	defer ps.mu.Unlock()
	ps.raftstate = state
	ps.snapshot = snapshot
}

持久化的内容包括两个部分:

  • raftstate:raft层的currentTerm、votedFor、log、lastIncludedIndex和lastIncludedTerm
  • snapshot:kv层传下来的,lastAppliedIndex之后的kvStore、seqMap

执行时机:

  • TakeSnapshot函数内(主动快照)
  • InstallSnapshot函数内(被动快照)

⚠️注意:snapshot是kv层传入的最近一次lastAppliedIndex之后的kvStore、seqMap,而raftstate则保存的是执行时刻的raft状态参数。TakeSnapshot函数内,保存之前日志被删减了;InstallSnapshot函数内则要根据情况判断是否对日志删减。

为什么InstallSnapshot函数中会出现leader快照不如本地长的情况?

args.LastIncludedIndex <= rf.lastIncludedIndex

这个情况是因为各个节点的snapshotLoop发起快照的时机不同,可能是leader快照发起的时机比follower快照发起的时机晚,此时follower节点的lastIncludedIndex可能比leader节点的lastIncludedIndex大,从而导致leader快照不如本地长。

并不要总以为leader的任何参数都要领先follower,因为这里涉及的是已经apply的日志且执行快照的时间点不同,所以要考虑这种情况解决方案就是直接丢弃就行。