From 847095b5b304691f2b91a79ae3351223d3bca9eb Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Wed, 15 Jan 2025 11:20:02 -0700 Subject: [PATCH] kgo: redirect back to the leader on KIP-392 case 3 failure See embedded comments. I have a test in kfake that needs to be pushed as a followup PR since kfake depends on franz-go. Closes #885. --- pkg/kgo/consumer.go | 4 ++++ pkg/kgo/source.go | 37 +++++++++++++++++++++++++++++++++---- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index eb15ec00..60222497 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -1955,6 +1955,10 @@ func (s *consumerSession) mapLoadsToBrokers(loads listOrEpochLoads) map[*broker] // If we are fetching from a follower, we can list // offsets against the follower itself. The replica // being non-negative signals that. + // + // Note this is not actually true (i.e. KIP-392 lies), + // but we keep this logic in case we can revert + // to using non-leaders someday. brokerID = offset.replica } if tryBroker := findBroker(brokers, brokerID); tryBroker != nil { diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 12732e90..bf19b244 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -234,6 +234,7 @@ type cursorOffsetNext struct { type cursorOffsetPreferred struct { cursorOffsetNext preferredReplica int32 + ooor bool } // Moves a cursor from one source to another. This is done while handling @@ -268,12 +269,13 @@ func (cs cursorPreferreds) String() string { type pnext struct { p int32 next int32 + ooor bool } ts := make(map[string][]pnext) for _, c := range cs { t := c.from.topic p := c.from.partition - ts[t] = append(ts[t], pnext{p, c.preferredReplica}) + ts[t] = append(ts[t], pnext{p, c.preferredReplica, c.ooor}) } tsorted := make([]string, 0, len(ts)) for t, ps := range ts { @@ -303,9 +305,17 @@ func (cs cursorPreferreds) String() string { for j, p := range ps { if j < len(ps)-1 { - fmt.Fprintf(sb, "%d=>%d, ", p.p, p.next) + if p.ooor { + fmt.Fprintf(sb, "%d=>%d[ooor], ", p.p, p.next) + } else { + fmt.Fprintf(sb, "%d=>%d, ", p.p, p.next) + } } else { - fmt.Fprintf(sb, "%d=>%d", p.p, p.next) + if p.ooor { + fmt.Fprintf(sb, "%d=>%d[ooor]", p.p, p.next) + } else { + fmt.Fprintf(sb, "%d=>%d", p.p, p.next) + } } } @@ -1065,6 +1075,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe preferreds = append(preferreds, cursorOffsetPreferred{ *partOffset, preferred, + false, }) continue } @@ -1134,6 +1145,9 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // KIP-392 (case 3) specifies that if we are consuming // from a follower, then if our offset request is before // the low watermark, we list offsets from the follower. + // However, Kafka does not actually implement handling + // ListOffsets from anything from the leader, so we + // need to redirect ourselves back to the leader. // // KIP-392 (case 4) specifies that if we are consuming // a follower and our request is larger than the high @@ -1187,7 +1201,22 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe addList(-1, true) case partOffset.offset < fp.LogStartOffset: // KIP-392 case 3 - addList(s.nodeID, false) + // KIP-392 specifies that we should list offsets against the follower, + // but that actually is not supported and the Java client redirects + // back to the leader. The leader then does *not* direct the client + // back to the follower because the follower is not an in sync + // replica. If we did not redirect back to the leader, we would spin + // loop receiving offset_out_of_range from the follower for Fetch, and + // then not_leader_or_follower from the follower for ListOffsets + // (even though it is a follower). So, we just set the preferred replica + // back to the follower. We go directly back to fetching with the + // hope that the offset is available on the leader, and if not, we'll + // just get an OOOR error again and fall into case 1 just above. + preferreds = append(preferreds, cursorOffsetPreferred{ + *partOffset, + partOffset.from.leader, + true, + }) default: // partOffset.offset > fp.HighWatermark, KIP-392 case 4 if kip320 {