Skip to content

Commit

Permalink
raft: advance commit index safely
Browse files Browse the repository at this point in the history
This change makes the commit index advancement in handleHeartbeat safe.
Previously, a follower would attempt to update the commit index to
whichever was sent in the MsgHeartbeat message. Out-of-bound indices
would crash the node.

It is always safe to advance a commit index if the follower's log is "in
sync" with the leader, i.e. when its log is guaranteed to be a prefix of
the leader's log. This is always true if the term of last entry in the
log matches the leader team, otherwise this guarantee is established
when the first MsgApp append message from the leader succeeds.

At the moment, the leader will never send a commit index that exceeds
the follower's log size. However, this may change in future. This change
is a defence-in-depth.

The newly added raftLog.leaderTerm field will be used for other safety
checks in the future, for example to establish that overriding a suffix
of entries in raftLog is safe.

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Jan 27, 2024
1 parent 026484c commit 87ac09e
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 6 deletions.
43 changes: 43 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,35 @@ type raftLog struct {
// they will be saved into storage.
unstable unstable

// leaderTerm is a term of the leader with whom our log is "consistent". The
// log is guaranteed to be a prefix of this term's leader log.
//
// The leaderTerm can be safely updated to `t` if:
// 1. the last entry in the log has term `t`, or, more generally,
// 2. the last successful append was sent by the leader `t`.
//
// This is due to the following safety property (see raft paper §5.3):
//
// Log Matching: if two logs contain an entry with the same index and term,
// then the logs are identical in all entries up through the given index.
//
// We use (1) to initialize leaderTerm, and (2) to maintain it on updates.
//
// NB: (2) does not imply (1). If our log is behind the leader's log, the last
// entry term can be below leaderTerm.
//
// NB: leaderTerm does not necessarily match this raft node's term. It only
// does for the leader. For followers and candidates, when we first learn or
// bump to a new term, we don't have a proof that our log is consistent with
// the new term's leader (current or prospective). The new leader may override
// any suffix of the log after the committed index. Only when the first append
// from the new leader succeeds, we can update leaderTerm.
//
// During normal operation, leaderTerm matches the node term though. During a
// leader change, it briefly lags behind, and matches again when the first
// append message succeeds.
leaderTerm uint64

// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
committed uint64
Expand Down Expand Up @@ -88,6 +117,11 @@ func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEnc
if err != nil {
panic(err) // TODO(bdarnell)
}
lastTerm, err := storage.Term(lastIndex)
if err != nil {
panic(err) // TODO(pav-kv)
}
log.leaderTerm = lastTerm
log.unstable.offset = lastIndex + 1
log.unstable.offsetInProgress = lastIndex + 1
log.unstable.logger = logger
Expand All @@ -106,6 +140,15 @@ func (l *raftLog) String() string {

// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
//
// TODO(pav-kv): pass in the term of the leader who sent this update. It is only
// safe to handle this append if this term is >= l.leaderTerm. It is only safe
// to override an uncommitted suffix of entries if term > l.leaderTerm.
//
// TODO(pav-kv): introduce a struct that consolidates the append metadata. The
// (prevEntryIndex, prevEntryTerm, leaderTerm) tuple must always be carried
// together, and safety of this append must be checked at the lowest layer here,
// rather than up in raft.go.
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if !l.matchTerm(index, logTerm) {
return 0, false
Expand Down
15 changes: 14 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,8 @@ func (r *raft) becomeLeader() {
// so the preceding log append does not count against the uncommitted log
// quota of the new leader. In other words, after the call to appendEntry,
// r.uncommittedSize is still 0.

r.raftLog.leaderTerm = r.Term // the leader's log is consistent with itself
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

Expand Down Expand Up @@ -1735,6 +1737,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.raftLog.leaderTerm = m.Term // the log is now consistent with the leader
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
return
}
Expand Down Expand Up @@ -1770,7 +1773,16 @@ func (r *raft) handleAppendEntries(m pb.Message) {
}

func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
// It is only safe to advance the commit index if our log is a prefix of the
// leader's log. Otherwise, entries at this index may mismatch.
//
// TODO(pav-kv): move this logic to r.raftLog, which is more appropriate for
// handling safety. The raftLog can use leaderTerm for other safety checks.
// For example, unstable.truncateAndAppend currently may override a suffix of
// the log unconditionally, but it can only be done if m.Term > leaderTerm.
if m.Term == r.raftLog.leaderTerm {
r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex()))
}
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

Expand All @@ -1785,6 +1797,7 @@ func (r *raft) handleSnapshot(m pb.Message) {
if r.restore(s) {
r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, sindex, sterm)
r.raftLog.leaderTerm = m.Term // the log is now consistent with the leader
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
} else {
r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
Expand Down
18 changes: 13 additions & 5 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,16 +1332,24 @@ func TestHandleMsgApp(t *testing.T) {
func TestHandleHeartbeat(t *testing.T) {
commit := uint64(2)
tests := []struct {
m pb.Message
wCommit uint64
m pb.Message
lastTerm uint64
wCommit uint64
}{
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1},
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 2, commit + 1},
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 2, commit}, // do not decrease commit
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 1, commit},

// Do not increase the commit index if the log is not guaranteed to be a
// prefix of the leader's log.
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 1, commit},
// Do not increase the commit index beyond our log size.
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 10}, 2, commit + 1},
}

for i, tt := range tests {
storage := newTestMemoryStorage(withPeers(1, 2))
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}, {Index: 3, Term: tt.lastTerm}})
sm := newTestRaft(1, 5, 1, storage)
sm.becomeFollower(2, 2)
sm.raftLog.commitTo(commit)
Expand Down

0 comments on commit 87ac09e

Please sign in to comment.