Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consuming: reset to nearest if we receive OOOR while fetching #628

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ type cursorOffset struct {
// details.
lastConsumedEpoch int32

// If we receive OFFSET_OUT_OF_RANGE, and we previously *know* we
// consumed an offset, we reset to the nearest offset after our prior
// known valid consumed offset.
lastConsumedTime time.Time

// The current high watermark of the partition. Uninitialized (0) means
// we do not know the HWM, or there is no lag.
hwm int64
Expand Down Expand Up @@ -506,6 +511,7 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) {
pCursor.from.setOffset(cursorOffset{
offset: lastReturnedRecord.Offset + 1,
lastConsumedEpoch: lastReturnedRecord.LeaderEpoch,
lastConsumedTime: lastReturnedRecord.Timestamp,
hwm: p.HighWatermark,
})
}
Expand Down Expand Up @@ -1067,23 +1073,44 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
// In all cases except case 4, we also have to check if
// no reset offset was configured. If so, we ignore
// trying to reset and instead keep our failed partition.
addList := func(replica int32) {
addList := func(replica int32, log bool) {
if s.cl.cfg.resetOffset.noReset {
keep = true
} else if !partOffset.from.lastConsumedTime.IsZero() {
reloadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{
replica: replica,
Offset: NewOffset().AfterMilli(partOffset.from.lastConsumedTime.UnixMilli()),
})
if log {
s.cl.cfg.logger.Log(LogLevelWarn, "received OFFSET_OUT_OF_RANGE, resetting to the nearest offset; either you were consuming too slowly and the broker has deleted the segment you were in the middle of consuming, or the broker has lost data and has not yet transferred leadership",
"broker", logID(s.nodeID),
"topic", topic,
"partition", partition,
"prior_offset", partOffset.offset,
)
}
} else {
reloadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{
replica: replica,
Offset: s.cl.cfg.resetOffset,
})
if log {
s.cl.cfg.logger.Log(LogLevelInfo, "received OFFSET_OUT_OF_RANGE on the first fetch, resetting to the configured ConsumeResetOffset",
"broker", logID(s.nodeID),
"topic", topic,
"partition", partition,
"prior_offset", partOffset.offset,
)
}
}
}

switch {
case s.nodeID == partOffset.from.leader: // non KIP-392 case
addList(-1)
addList(-1, true)

case partOffset.offset < fp.LogStartOffset: // KIP-392 case 3
addList(s.nodeID)
addList(s.nodeID, false)

default: // partOffset.offset > fp.HighWatermark, KIP-392 case 4
if kip320 {
Expand All @@ -1098,7 +1125,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
// If the broker does not support offset for leader epoch but
// does support follower fetching for some reason, we have to
// fallback to listing.
addList(-1)
addList(-1, true)
}
}

Expand Down Expand Up @@ -1630,6 +1657,7 @@ func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, a
// topic is compacted.
o.offset = record.Offset + 1
o.lastConsumedEpoch = record.LeaderEpoch
o.lastConsumedTime = record.Timestamp
}

///////////////////////////////
Expand Down