-
Notifications
You must be signed in to change notification settings - Fork 0
Lab‐3B
kv层传递通知raft落地snapshot并截断日志,本质上是实现了日志压缩和快照两个功能,涉及的全是都是已apply的日志,能保证一致性的安全。
- 快照功能本质是对kv层的数据库(Map)、raft层的raftState(currentTerm、votedFor、log、lastIncludedIndex和lastIncludedTerm)持久化。目的是为了kv层数据的快速恢复,当然也为了raft层状态参数和日志的快速恢复。这里也要认识到,只有快照了,才能放心的对日志进行丢弃,因为快照相当于对先前存储状态的总结(感觉叫压缩也行)。
- 日志压缩其实就是主动丢弃最近一次应用索引(lastIncludedIndex)之前的所有日志,减少rf.LogEntry日志的大小,提高系统的吞吐量。
总的来说这两个概念不用分的太清楚,说快照是一种压缩也行,也是一种持久化,日志删减基于快照的实现。
主动快照涉及的流程。主动快照是由kv层调用raft层的TakeSnapshot函数实现的,通过snapshotLoop循环定期发起,各个节点都会自主发起。
被动快照涉及的流程。被动快照是由raft层的Leader节点在appendEntriesLoop函数中发起的,通过心跳包送至各个follower节点,对于follower来说是被动的。
func (kv *KVServer) snapshotLoop() {
for !kv.killed() {
var snapshot []byte
var lastAppliedIndex int
// 锁内dump snapshot
func() {
// 如果raft log超过了maxraftstate大小,那么对kvStore快照下来
if kv.maxraftstate != -1 && kv.rf.CheckLogSize(kv.maxraftstate) { // 这里调用ExceedLogSize不要加kv锁,否则会死锁
// 锁内快照,离开锁通知raft处理
kv.mu.Lock()
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(kv.kvStore) // kv数据库
e.Encode(kv.seqMap) // 当前各客户端最大请求编号,也要随着snapshot走
snapshot = w.Bytes()
lastAppliedIndex = kv.lastAppliedIndex
kv.mu.Unlock()
}
}()
// 锁外通知raft层截断,否则有死锁
if snapshot != nil {
// 通知raft落地snapshot并截断日志(都是已提交的日志,不会因为主从切换截断,放心操作)
kv.rf.TakeSnapshot(snapshot, lastAppliedIndex)
}
time.Sleep(10 * time.Millisecond)
}
}
这里说一下可能会到=导致死锁的一种操作:raft层持有rf.mu向applyCh写入可能阻塞,此时如果kv层出现一种代码逻辑是先拿到了kv.mu然后再去拿rf.mu的话,此时肯定无法拿到rf.mu(因为raft层持有rf.mu并阻塞在chan),而此刻kv层如果正在处理前一条log并试图加kv.mu,那么也无法拿到kv.mu,就会死锁。
解决办法就是kv层不要拿着kv.mu去请求rf.mu,一定要在kv.mu的锁外操作raft,谨记这一点即可。
rf.CheckLogSize函数判断的是调用时刻raft状态机的持久化大小是否超过了限定,但是实际执行持久化的时候,状态机的日志部分只是针对最新的kv.lastAppliedIndex情况进行截断压缩。
换句话说,判断时候的raft日志大小是可能包含apply以外的日志(提交的、未提交的),实际截断时候只从传入的kv.lastAppliedIndex截断。
由于快照和日志压缩的原因,AppendEntries的逻辑也要做相应的调整,多了以下2个情况的判别:
if args.PrevLogIndex < rf.lastIncludedIndex { //1. 如果prevLogIndex在快照内,只能从index=1开始同步了
reply.ConflictIndex = 1
return
} else if args.PrevLogIndex == rf.lastIncludedIndex { //2. prevLogIndex正好等于快照的最后一个log
if args.PrevLogTerm != rf.lastIncludedTerm { // 冲突了,那么从index=1开始同步吧
reply.ConflictIndex = 1
return
}
} else { // prevLogIndex在快照之后,那么进一步判定
...
func (rf *Raft) appendEntriesLoop() {
for !rf.killed() {
time.Sleep(10 * time.Millisecond)
func() {
rf.mu.Lock()
defer rf.mu.Unlock()
// 只有leader才向外广播心跳
if rf.role != ROLE_LEADER {
return
}
// 100ms广播1次
now := time.Now()
if now.Sub(rf.heartBeatTime) < 100*time.Millisecond {
return
}
rf.heartBeatTime = time.Now()
// 向所有follower发送心跳
for peerId := 0; peerId < len(rf.peers); peerId++ {
if peerId == rf.me {
continue
}
// 如果nextIndex在leader的snapshot内,那么直接同步snapshot
if rf.nextIndex[peerId] <= rf.lastIncludedIndex {
rf.doInstallSnapshot(peerId)
} else { // 否则同步日志
rf.doAppendEntries(peerId)
}
}
}()
}
}
主要是辨别什么时候doInstallSnapshot,什么时候doAppendEntries(比较rf.nextIndex[peerId]和rf.lastIncludedIndex)
如果nextIndex[peerId]在leader的snapshot范围内,那么直接走同步snapshot的分支,否则走常规的日志同步分支。
为什么要实现两种快照方式?
- 系统容错性要求每个节点都能独立判断是否需要快照,不完全依赖leader的指令,这样可以提高容错能力。
- 再有,网络抖动或者临时断开时,follower可能会丢失leader发的快照信息,从而自己的日志会超出快照限制而需要自行触发。
- 系统容错性要求每个节点都能独立判断是否需要快照,不完全依赖leader的指令,这样可以提高容错能力。
所以即使leader已经主动推送快照,follower出于各种不稳定因素以及高可用角度考虑,还是需要自己独立地检查并执行快照逻辑, 而不是仅仅被动等待leader的快照指令。
什么时候向应用层安装快照?(kvStore/seqMap)
rf.installSnapshotToApplication()这个函数负责raft层往应用层安装快照(本质是恢复kvStore和seqMap)
raft.go中在Make和InstallSnapshot这两个函数中调用了installSnapshotToApplication函数:
- Make函数中调用是为了节点宕机重启考虑,好处是省去了日志重放带来的时间开销,直接快速的恢复宕机前的kvStore和seqMap
- InstallSnapshot函数中调用是为了让follower节点快速跟上leader节点的压缩情况以及更新最新的kvStore和seqMap
什么函数、什么时候执行持久化?
SaveStateAndSnapshot这个函数是(内存)持久化的最终体现,是框架自带的持久化函数
// Save both Raft state and K/V snapshot as a single atomic action,
// to help avoid them getting out of sync.
func (ps *Persister) SaveStateAndSnapshot(state []byte, snapshot []byte) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.raftstate = state
ps.snapshot = snapshot
}
持久化的内容包括两个部分:
- raftstate:raft层的currentTerm、votedFor、log、lastIncludedIndex和lastIncludedTerm
- snapshot:kv层传下来的,lastAppliedIndex之后的kvStore、seqMap
执行时机:
- TakeSnapshot函数内(主动快照)
- InstallSnapshot函数内(被动快照)
为什么InstallSnapshot函数中会出现leader快照不如本地长的情况?
args.LastIncludedIndex <= rf.lastIncludedIndex
这个情况是因为各个节点的snapshotLoop发起快照的时机不同,可能是leader快照发起的时机比follower快照发起的时机晚,此时follower节点的lastIncludedIndex可能比leader节点的lastIncludedIndex大,从而导致leader快照不如本地长。
并不要总以为leader的任何参数都要领先follower,因为这里涉及的是已经apply的日志且执行快照的时间点不同,所以要考虑这种情况解决方案就是直接丢弃就行。