Skip to content


M1ng edited this page Jan 14, 2024 · 7 revisions


// Raft
// A Go object implementing a single Raft peer.
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.

	// Persistent state on all servers
	currentTerm int         // 见过的最大任期 (initialized to 0 on first boot, increases monotonically)
	votedFor    int         // 记录在currentTerm任期投票给谁了 (or null if none)
	log         []*LogEntry // 日志记录集 (first index is 1)

	// Volatile state on all servers
	commitIndex int // 已知的最大已提交索引
	lastApplied int // 当前应用到状态机的索引 (initialized to 0, increases monotonically)

	// Volatile state on leaders(成为leader时重置)
	nextIndex  []int //	for each server, index of the next log entry to send to that server (initialized to leader last log index + 1)
	matchIndex []int // for each server, index of highest log entry known to be replicated on server (initialized to 0, increases monotonically)

	// others on all servers
	role           string    // 角色
	leaderId       int       // leader的id
	lastActiveTime time.Time // 最近一次活跃时间(刷新时机:收到leader AppendEntries心跳、给其他candidates投票、请求其他节点投票)
	heartBeatTime  time.Time // 最近一次心跳时间,作为leader,每隔固定毫秒向手下发送心跳包的时间


打个比方,一个县有三个社区(server), 每个社区都有一个自己的居委会(raft), 负责商议各项提案(request)是否执行。现在这个县收到一个上级(client)的提案(request)。由master居委会(master raft)所在的社区接收这个提案,居委会将其记录在案(log),然后组织三个居委会开始讨论商议(appendentry),只要两个及以上的居委会表示赞成,那么这个县就决定执行这个提案,但是决议不代表这个事就完成了,还要发动社区的人去做事的(commit != execute)。 为了号召相应社区的群众去完成这个提案,必须在社区公告上对该提案进行公示通知(记录commitIndex),意思说群众们,咱们社区又有事要干了。但是完成事情是要时间的,社区(server)只能按照公告栏上提案被公示的时间顺序(commitIndex表示的是最后公示的提案)来一件一件完成相应的提案(lastApplied的表示这个社区刚做完的提案)。所以一个关系是,社区做完的事永远慢于社区需要做的事(lastApplied <= commitIndex)。只有等社区完成了(而不是仅仅通过了)这个提案,上级才能看的到。


// RequestVoteArgs
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
type RequestVoteArgs struct {
	// Your data here (2A, 2B).
	// For Lab-2A:
	Term         int
	CandidateId  int
	LastLogIndex int
	LastLogTerm  int

// RequestVoteReply
// example RequestVote RPC reply structure.
// field names must start with capital letters!
type RequestVoteReply struct {
	// Your data here (2A).
	Term        int
	VoteGranted bool

// AppendEntriesArgs
// Invoked by leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
// term			leader's term
// leaderId 	so follower can redirect clients index of log entry immediately preceding new ones
// prevLogindex	index of log entry immediately preceding new ones
// prevLogTerm	term of prevLogIndex entry
// entries		log entries to store (empty for heartbeat; may send more than one for efficiency)
// leaderCommit	leader's commitIndex

type AppendEntriesArgs struct {
	Term         int
	LeaderId     int
	PrevLogIndex int
	PrevLogTerm  int
	Entries      []*LogEntry
	LeaderCommit int

// AppendEntriesReply
// term 		currentTerm, for leader to update itself
// success		true if follower contained entry matching prevLogIndex and prevLogTerm
type AppendEntriesReply struct {
	Term    int
	Success bool




// Make
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister = me

	// Your initialization code here (2A, 2B, 2C).
	rf.role = ROLE_FOLLOWER
	rf.votedFor = -1
	rf.leaderId = -1
	rf.lastActiveTime = time.Now()
	// initialize from state persisted before a crash

	// election逻辑
	go rf.electionLoop()
	// leader逻辑
	go rf.appendEntriesLoop()

	DPrintf("Raftnode[%d]启动", me)

	return rf



func (rf *Raft) electionLoop()

这个函数负责选举逻辑,rf.role == ROLE_CANDIDATES && elapses >= timeout 时会执行主要逻辑代码。这个循环代码每个节点都会执行。


  1. time.Sleep(1 * time.Millisecond) 这个写在开头是为了避免这个goroutine独占CPU。
  2. 并发RPC请求vote之前释放锁,这个课上有讲,一方面影响性能另一方面会有概率导致死锁(例如网不好丢包了,程序持锁卡在这,其他用锁的地方也会卡住..连锁反应最后死锁)。
  3. voteResultChan := make(chan *VoteResult, len(rf.peers))构造管道来接受每个goroutine的RPC-reply很巧妙,管道是同步的。
  4. VOTE_END这个地方注意再上锁,因为下面要操作共享变量。
  5. 下面这个判断很重要:(因为第二点说了不能一把锁把整个代码全包住,过程中会释放一下,所以到这里就得再判断一次,看看有没有变动过,比如说别的Candidates请求先来了自己变成别人follower了,那么后面就没必要再走了)
// RPC完成后lock住做state的再次检查,确保还是之前的状态,如果角色改变了,则忽略本轮投票结果
if rf.role != ROLE_CANDIDATES {
  1. 选举代码最后记得rf.heartBeatTime = time.Unix(0, 0)相当于赋值 1970年1月1日01:00:00 UTC 具体作用时间讲解部分再说。
  2. 注意candidates请求vote的时间随机性,避免多个结点同时成为candidates导致票数分散无法选举。
  3. 注意requestVote得到大多数投票后立即结束等待剩余RPC。
  4. 注意成为leader后尽快appendEntries心跳,否则其他节点又会成为candidates。
  5. RequestVoteReply中发现了更高的任期,记得切回follower状态,不再担任领导人。

func (rf *Raft) appendEntriesLoop()


  1. time.Sleep(1 * time.Millisecond) 这个写在开头是为了避免这个goroutine独占CPU。
  2. 注意角色判定if rf.role != ROLE_LEADER {return},leader才能发送心跳包。
  3. AppendEntriesReply中发现了更高的任期,记得切回follower状态,不再担任领导人。
  4. 这个函数的RPC前就没必要再释放锁了,因为RPC后就没代码了,暂时不需要根据reply进行后续处理。



用于投票选举环节,各节点处理接受到的RequestVote RPC请求并返回RequestVoteReply

If votedFor is null or candidateld, and candidate's log is at least as up-to-date as receiver's log, grant vote.

lastLogTerm := 0
if len(rf.log) != 0 {
	lastLogTerm = rf.log[len(rf.log)-1].Term
if args.LastLogTerm < lastLogTerm || args.LastLogIndex < len(rf.log) {return}


如果一个follower落后了leader若干条日志(没有漏一整个任期),那么下次选举中, 按照领导者选举里的规则,它有可能当选leader。它在当选新leader后就永远也无 法补上之前缺失的那部分日志,从而造成状态机之间的不一致。 所以需要对领导者选举增加一个限制,保证被选出来的leader一定包含了之前各任期的所有被提交的日志条目。


  • 如果两份日志最后条目的任期号不同,那么任期号大的日志更“新"。
  • 如果两份日志最后条目的任期号相同,那么日志较长的那个更“新”



用于选举后定期到心跳环节,各节点处理接受到的AppendEntries RPC请求并返回AppendEntriesReply


  1. 读取/操作共享变量 加锁、放锁
  2. 任期不如自身大,则拒绝并返回currentTerm
  3. 发现更大的任期,则转为该任期的follower
  4. 最后记得刷新rf.lastActiveTime = time.Now()


论文中涉及两个重要时间:选举时间 & 心跳时间,具体逻辑看论文,这里说一下代码里怎么实现的。


> lastActiveTime time.Time // 最近活跃时间(刷新时机:收到leader AppendEntries心跳、给其他candidates投票、请求其他节点投票)


                        now := time.Now()
			timeout := time.Duration(200+rand.Int31n(150)) * time.Millisecond // 注意随机化防止多个结点共同选举分散选票
			elapses := now.Sub(rf.lastActiveTime)
			// follower -> candidates
			if rf.role == ROLE_FOLLOWER {
				if elapses >= timeout {
					rf.role = ROLE_CANDIDATES
					DPrintf("RaftNode[%d] Follower -> Candidate",
			// 请求vote
			if rf.role == ROLE_CANDIDATES && elapses >= timeout {...}



heartBeatTime  time.Time // 最近一次心跳时间,作为leader,每隔固定毫秒向手下发送心跳包的时间

这个时间在领导人选出来时候赋值,具体在electionLoop代码最后:rf.heartBeatTime = time.Unix(0, 0) // 令appendEntries广播立即执行。相当于赋值 1970年1月1日01:00:00 UTC 。


// nowTime - heartBeatTime >= 100ms 则广播1次
			now := time.Now()
			if now.Sub(rf.heartBeatTime) < 100*time.Millisecond {
			rf.heartBeatTime = time.Now()

可以看出来,领导人选好后,heartBeatTime被赋值1970年1月1日01:00:00 UTC,那么appendEntriesLoop这个进程运行到判断肯定 >= 100ms 立即就开始心跳了,跳完一次刷新时间然后 100ms 后会再通过判断然后再跳一次..




广播时间(broadcastTime)<< 选举超时时间(electionTimeout)<<平均故障时间(MTBF)

广播时间和平均故障时间是由系统决定的,但是选举超时时间是我们自己选择的。Raft的RPC需要接受并将信息落盘,所以广播时间大约是0.5ms到20ms,取决于存储的技术。 因此,选举超时时间可能需要在10ms到500ms之间。大多数服务器的平均故障间隔时间都在几个月甚至更长。·