Skip to content

Commit

Permalink
kgo: redirect back to the leader on KIP-392 case 3 failure
Browse files Browse the repository at this point in the history
See embedded comments.

Closes #885.
  • Loading branch information
twmb committed Jan 15, 2025
1 parent 293b7c4 commit 0b6cdd1
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
4 changes: 4 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1955,6 +1955,10 @@ func (s *consumerSession) mapLoadsToBrokers(loads listOrEpochLoads) map[*broker]
// If we are fetching from a follower, we can list
// offsets against the follower itself. The replica
// being non-negative signals that.
//
// Note this is not actually true (i.e. KIP-392 lies),
// but we keep this logic in case we can revert
// to using non-leaders someday.
brokerID = offset.replica
}
if tryBroker := findBroker(brokers, brokerID); tryBroker != nil {
Expand Down
19 changes: 18 additions & 1 deletion pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,9 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
// KIP-392 (case 3) specifies that if we are consuming
// from a follower, then if our offset request is before
// the low watermark, we list offsets from the follower.
// However, Kafka does not actually implement handling
// ListOffsets from anything from the leader, so we
// need to redirect ourselves back to the leader.
//
// KIP-392 (case 4) specifies that if we are consuming
// a follower and our request is larger than the high
Expand Down Expand Up @@ -1187,7 +1190,21 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
addList(-1, true)

case partOffset.offset < fp.LogStartOffset: // KIP-392 case 3
addList(s.nodeID, false)
// KIP-392 specifies that we should list offsets against the follower,
// but that actually is not supported and the Java client redirects
// back to the leader. The leader then does *not* direct the client
// back to the follower because the follower is not an in sync
// replica. If we did not redirect back to the leader, we would spin
// loop receiving offset_out_of_range from the follower for Fetch, and
// then not_leader_or_follower from the follower for ListOffsets
// (even though it is a follower). So, we just set the preferred replica
// back to the follower. We go directly back to fetching with the
// hope that the offset is available on the leader, and if not, we'll
// just get an OOOR error again and fall into case 1 just above.
preferreds = append(preferreds, cursorOffsetPreferred{
*partOffset,
partOffset.from.partition,
})

default: // partOffset.offset > fp.HighWatermark, KIP-392 case 4
if kip320 {
Expand Down

0 comments on commit 0b6cdd1

Please sign in to comment.