Skip to content

Lab‐2A

M1ng edited this page Jan 14, 2024 · 7 revisions

Raft结构体定义

// Raft
// A Go object implementing a single Raft peer.
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.

	// Persistent state on all servers
	currentTerm int         // 见过的最大任期 (initialized to 0 on first boot, increases monotonically)
	votedFor    int         // 记录在currentTerm任期投票给谁了 (or null if none)
	log         []*LogEntry // 日志记录集 (first index is 1)

	// Volatile state on all servers
	commitIndex int // 已知的最大已提交索引
	lastApplied int // 当前应用到状态机的索引 (initialized to 0, increases monotonically)

	// Volatile state on leaders(成为leader时重置)
	nextIndex  []int //	for each server, index of the next log entry to send to that server (initialized to leader last log index + 1)
	matchIndex []int // for each server, index of highest log entry known to be replicated on server (initialized to 0, increases monotonically)

	// others on all servers
	role           string    // 角色
	leaderId       int       // leader的id
	lastActiveTime time.Time // 最近一次活跃时间(刷新时机:收到leader AppendEntries心跳、给其他candidates投票、请求其他节点投票)
	heartBeatTime  time.Time // 最近一次心跳时间,作为leader,每隔固定毫秒向手下发送心跳包的时间
}

关于commitIndex和lastApplied的区别

打个比方,一个县有三个社区(server), 每个社区都有一个自己的居委会(raft), 负责商议各项提案(request)是否执行。现在这个县收到一个上级(client)的提案(request)。由master居委会(master raft)所在的社区接收这个提案,居委会将其记录在案(log),然后组织三个居委会开始讨论商议(appendentry),只要两个及以上的居委会表示赞成,那么这个县就决定执行这个提案,但是决议不代表这个事就完成了,还要发动社区的人去做事的(commit != execute)。 为了号召相应社区的群众去完成这个提案,必须在社区公告上对该提案进行公示通知(记录commitIndex),意思说群众们,咱们社区又有事要干了。但是完成事情是要时间的,社区(server)只能按照公告栏上提案被公示的时间顺序(commitIndex表示的是最后公示的提案)来一件一件完成相应的提案(lastApplied的表示这个社区刚做完的提案)。所以一个关系是,社区做完的事永远慢于社区需要做的事(lastApplied <= commitIndex)。只有等社区完成了(而不是仅仅通过了)这个提案,上级才能看的到。


RPC相关结构体

// RequestVoteArgs
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
type RequestVoteArgs struct {
	// Your data here (2A, 2B).
	// For Lab-2A:
	Term         int
	CandidateId  int
	LastLogIndex int
	LastLogTerm  int
}

// RequestVoteReply
// example RequestVote RPC reply structure.
// field names must start with capital letters!
type RequestVoteReply struct {
	// Your data here (2A).
	Term        int
	VoteGranted bool
}

// AppendEntriesArgs
// Invoked by leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
// term			leader's term
// leaderId 	so follower can redirect clients index of log entry immediately preceding new ones
// prevLogindex	index of log entry immediately preceding new ones
// prevLogTerm	term of prevLogIndex entry
// entries		log entries to store (empty for heartbeat; may send more than one for efficiency)
// leaderCommit	leader's commitIndex

type AppendEntriesArgs struct {
	Term         int
	LeaderId     int
	PrevLogIndex int
	PrevLogTerm  int
	Entries      []*LogEntry
	LeaderCommit int
}

// AppendEntriesReply
// term 		currentTerm, for leader to update itself
// success		true if follower contained entry matching prevLogIndex and prevLogTerm
type AppendEntriesReply struct {
	Term    int
	Success bool
}

补充

这边没什么好说的,完全按照论文来定义就行。唯一注意变量大写开头,为了供RPC相关包调用,必须公有。

Make程序入口

// Make
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me

	// Your initialization code here (2A, 2B, 2C).
	rf.role = ROLE_FOLLOWER
	rf.votedFor = -1
	rf.leaderId = -1
	rf.lastActiveTime = time.Now()
	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())

	// election逻辑
	go rf.electionLoop()
	// leader逻辑
	go rf.appendEntriesLoop()

	DPrintf("Raftnode[%d]启动", me)

	return rf
}

说明

Make函数由server.go和config.go来调用,用于构造raft节点实例并初始化。每个rf实例初始化后立即调用两个goroutine用于控制选举和心跳,两个都是循环函数。

func (rf *Raft) electionLoop()

这个函数负责选举逻辑,rf.role == ROLE_CANDIDATES && elapses >= timeout 时会执行主要逻辑代码。这个循环代码每个节点都会执行。

关键点

  1. time.Sleep(1 * time.Millisecond) 这个写在开头是为了避免这个goroutine独占CPU。
  2. 并发RPC请求vote之前释放锁rf.mu.Unlock(),这个课上有讲,一方面影响性能另一方面会有概率导致死锁(例如网不好丢包了,程序持锁卡在这,其他用锁的地方也会卡住..连锁反应最后死锁)。
  3. voteResultChan := make(chan *VoteResult, len(rf.peers))构造管道来接受每个goroutine的RPC-reply很巧妙,管道是同步的。
  4. VOTE_END这个地方注意再上锁,因为下面要操作共享变量。
  5. 下面这个判断很重要:(因为第二点说了不能一把锁把整个代码全包住,过程中会释放一下,所以到这里就得再判断一次,看看有没有变动过,比如说别的Candidates请求先来了自己变成别人follower了,那么后面就没必要再走了)
// RPC完成后lock住做state的再次检查,确保还是之前的状态,如果角色改变了,则忽略本轮投票结果
if rf.role != ROLE_CANDIDATES {
	return
}
  1. 选举代码最后记得rf.heartBeatTime = time.Unix(0, 0)相当于赋值 1970年1月1日01:00:00 UTC 具体作用时间讲解部分再说。
  2. 注意candidates请求vote的时间随机性,避免多个结点同时成为candidates导致票数分散无法选举。
  3. 注意requestVote得到大多数投票后立即结束等待剩余RPC。
  4. 注意成为leader后尽快appendEntries心跳,否则其他节点又会成为candidates。
  5. RequestVoteReply中发现了更高的任期,记得切回follower状态,不再担任领导人。

func (rf *Raft) appendEntriesLoop()

心跳的具体逻辑,由leader执行,其他节点过不了判定走不通。lab-2A只做心跳,暂时不考虑log同步。

  1. time.Sleep(1 * time.Millisecond) 这个写在开头是为了避免这个goroutine独占CPU。
  2. 注意角色判定if rf.role != ROLE_LEADER {return},leader才能发送心跳包。
  3. AppendEntriesReply中发现了更高的任期,记得切回follower状态,不再担任领导人。
  4. 这个函数的RPC前就没必要再释放锁了,因为RPC后就没代码了,暂时不需要根据reply进行后续处理。

RequestVote和AppendEntries

RequestVote

用于投票选举环节,各节点处理接受到的RequestVote RPC请求并返回RequestVoteReply

If votedFor is null or candidateld, and candidate's log is at least as up-to-date as receiver's log, grant vote.

lastLogTerm := 0
if len(rf.log) != 0 {
	lastLogTerm = rf.log[len(rf.log)-1].Term
}
if args.LastLogTerm < lastLogTerm || args.LastLogIndex < len(rf.log) {return}

上面这段代码细说一下。为什么投不投还得判断日志的term和index?

如果一个follower落后了leader若干条日志(没有漏一整个任期),那么下次选举中, 按照领导者选举里的规则,它有可能当选leader。它在当选新leader后就永远也无 法补上之前缺失的那部分日志,从而造成状态机之间的不一致。 所以需要对领导者选举增加一个限制,保证被选出来的leader一定包含了之前各任期的所有被提交的日志条目。

RPC中包含了candidate的日志信息,如果投票者自己的日志比candidate的还新,它会拒绝掉该投票请求。Raft通过比较两份日志中最后一条日志条目的索引值和任期号来定义谁的日志比较新:

  • 如果两份日志最后条目的任期号不同,那么任期号大的日志更“新"。
  • 如果两份日志最后条目的任期号相同,那么日志较长的那个更“新”

日志操作lab-2A不实现。

AppendEntries

用于选举后定期到心跳环节,各节点处理接受到的AppendEntries RPC请求并返回AppendEntriesReply

共同点

  1. 读取/操作共享变量 加锁、放锁
  2. 任期不如自身大,则拒绝并返回currentTerm
  3. 发现更大的任期,则转为该任期的follower
  4. 最后记得刷新rf.lastActiveTime = time.Now()

关于时间

论文中涉及两个重要时间:选举时间 & 心跳时间,具体逻辑看论文,这里说一下代码里怎么实现的。

选举时间

> lastActiveTime time.Time // 最近活跃时间(刷新时机:收到leader AppendEntries心跳、给其他candidates投票、请求其他节点投票)

这个时间用来协助各结点判断什么时候可以翻身做主人。怎么做到的呢?如下:

//electionLoop
                        now := time.Now()
			timeout := time.Duration(200+rand.Int31n(150)) * time.Millisecond // 注意随机化防止多个结点共同选举分散选票
			elapses := now.Sub(rf.lastActiveTime)
			// follower -> candidates
			if rf.role == ROLE_FOLLOWER {
				if elapses >= timeout {
					rf.role = ROLE_CANDIDATES
					DPrintf("RaftNode[%d] Follower -> Candidate", rf.me)
				}
			}
			// 请求vote
			if rf.role == ROLE_CANDIDATES && elapses >= timeout {...}

其中lastActiveTime在electionLoop、RequestVote、AppendEntries函数中都会得到刷新(逻辑走通的前提下)

心跳时间

heartBeatTime  time.Time // 最近一次心跳时间,作为leader,每隔固定毫秒向手下发送心跳包的时间

这个时间在领导人选出来时候赋值,具体在electionLoop代码最后:rf.heartBeatTime = time.Unix(0, 0) // 令appendEntries广播立即执行。相当于赋值 1970年1月1日01:00:00 UTC 。

这个值的使用和刷新在appendEntriesLoop代码中:

// nowTime - heartBeatTime >= 100ms 则广播1次
			now := time.Now()
			if now.Sub(rf.heartBeatTime) < 100*time.Millisecond {
				return
			}
			rf.heartBeatTime = time.Now()

可以看出来,领导人选好后,heartBeatTime被赋值1970年1月1日01:00:00 UTC,那么appendEntriesLoop这个进程运行到判断肯定 >= 100ms 立即就开始心跳了,跳完一次刷新时间然后 100ms 后会再通过判断然后再跳一次..

间与可用性限制

raft算法整体不依赖客观时间,也就是说,哪怕因为网络或其他因素,造成后发的RPC先到,也不会影响raft的正确性。(这点和Spanner不同)

只要整个系统满足下面的时间要求,Raft就可以选举出并维持一个稳定的leader:

广播时间(broadcastTime)<< 选举超时时间(electionTimeout)<<平均故障时间(MTBF)

广播时间和平均故障时间是由系统决定的,但是选举超时时间是我们自己选择的。Raft的RPC需要接受并将信息落盘,所以广播时间大约是0.5ms到20ms,取决于存储的技术。 因此,选举超时时间可能需要在10ms到500ms之间。大多数服务器的平均故障间隔时间都在几个月甚至更长。·