Skip to content

Commit bfced92

Browse files
authored
Merge pull request #1188 from twmb/meta-more
Revert "kgo: avoid updating metadata before issuing ListOffsets / OffsetForLeaderEpoch
2 parents 80984f8 + 6c0f302 commit bfced92

File tree

4 files changed

+75
-23
lines changed

4 files changed

+75
-23
lines changed

pkg/kgo/consumer.go

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,7 +1007,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
10071007
} else { // else we guarded it
10081008
c.unguardSessionChange(session)
10091009
}
1010-
loadOffsets.loadWithSession(session)
1010+
loadOffsets.loadWithSession(session, "loading offsets in new session from assign") // odds are this assign came from a metadata update, so no reason to force a refresh with loadWithSessionNow
10111011

10121012
// If we started a new session or if we unguarded, we have one
10131013
// worker. This one worker allowed us to safely add our load
@@ -1287,6 +1287,8 @@ func (c *consumer) doOnMetadataUpdate() {
12871287
case c.g != nil:
12881288
c.g.findNewAssignments()
12891289
}
1290+
1291+
go c.loadSession().doOnMetadataUpdate()
12901292
}
12911293

12921294
go func() {
@@ -1299,6 +1301,23 @@ func (c *consumer) doOnMetadataUpdate() {
12991301
}
13001302
}
13011303

1304+
func (s *consumerSession) doOnMetadataUpdate() {
1305+
if s == nil || s == noConsumerSession { // no session started yet
1306+
return
1307+
}
1308+
1309+
s.listOrEpochMu.Lock()
1310+
defer s.listOrEpochMu.Unlock()
1311+
1312+
if s.listOrEpochMetaCh == nil {
1313+
return // nothing waiting to load epochs / offsets
1314+
}
1315+
select {
1316+
case s.listOrEpochMetaCh <- struct{}{}:
1317+
default:
1318+
}
1319+
}
1320+
13021321
type offsetLoadMap map[string]map[int32]offsetLoad
13031322

13041323
// offsetLoad is effectively an Offset, but also includes a potential replica
@@ -1446,11 +1465,20 @@ func (l *listOrEpochLoads) mergeFrom(src listOrEpochLoads) {
14461465

14471466
func (l listOrEpochLoads) isEmpty() bool { return len(l.List) == 0 && len(l.Epoch) == 0 }
14481467

1449-
func (l listOrEpochLoads) loadWithSession(s *consumerSession) {
1468+
func (l listOrEpochLoads) loadWithSession(s *consumerSession, why string) {
1469+
if !l.isEmpty() {
1470+
s.incWorker()
1471+
go s.listOrEpoch(l, false, why)
1472+
}
1473+
}
1474+
1475+
func (l listOrEpochLoads) loadWithSessionNow(s *consumerSession, why string) bool {
14501476
if !l.isEmpty() {
14511477
s.incWorker()
1452-
go s.listOrEpoch(l, false)
1478+
go s.listOrEpoch(l, true, why)
1479+
return true
14531480
}
1481+
return false
14541482
}
14551483

14561484
// A consumer session is responsible for an era of fetching records for a set
@@ -1489,6 +1517,7 @@ type consumerSession struct {
14891517
// assignPartitions).
14901518
listOrEpochMu sync.Mutex
14911519
listOrEpochLoadsWaiting listOrEpochLoads
1520+
listOrEpochMetaCh chan struct{} // non-nil if Loads is non-nil, signalled on meta update
14921521
listOrEpochLoadsLoading listOrEpochLoads
14931522
}
14941523

@@ -1740,7 +1769,7 @@ func (c *consumer) startNewSession(tps *topicsPartitions) *consumerSession {
17401769
// This function is responsible for issuing ListOffsets or
17411770
// OffsetForLeaderEpoch. These requests's responses are only handled within
17421771
// the context of a consumer session.
1743-
func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, isReload bool) {
1772+
func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool, why string) {
17441773
defer s.decWorker()
17451774

17461775
// It is possible for a metadata update to try to migrate partition
@@ -1752,34 +1781,36 @@ func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, isReload bool) {
17521781
return
17531782
}
17541783

1755-
s.listOrEpochMu.Lock() // collapse any listOrEpochs that occur during reload backoff into one
1784+
wait := true
1785+
if immediate {
1786+
s.c.cl.triggerUpdateMetadataNow(why)
1787+
} else {
1788+
wait = s.c.cl.triggerUpdateMetadata(false, why) // avoid trigger if within refresh interval
1789+
}
1790+
1791+
s.listOrEpochMu.Lock() // collapse any listOrEpochs that occur during meta update into one
17561792
if !s.listOrEpochLoadsWaiting.isEmpty() {
17571793
s.listOrEpochLoadsWaiting.mergeFrom(waiting)
17581794
s.listOrEpochMu.Unlock()
17591795
return
17601796
}
17611797
s.listOrEpochLoadsWaiting = waiting
1798+
s.listOrEpochMetaCh = make(chan struct{}, 1)
17621799
s.listOrEpochMu.Unlock()
17631800

1764-
// If this is a reload, we wait a bit to collect any other loads that
1765-
// are failing around the same time / new loads.
1766-
//
1767-
// We rely on the client list/epoch sharder to handle purging the cached
1768-
// metadata on any request / topic / partition error.
1769-
if isReload {
1770-
after := time.NewTimer(5 * time.Second)
1801+
if wait {
17711802
select {
17721803
case <-s.ctx.Done():
1773-
after.Stop()
17741804
return
1775-
case <-after.C:
1805+
case <-s.listOrEpochMetaCh:
17761806
}
17771807
}
17781808

17791809
s.listOrEpochMu.Lock()
17801810
loading := s.listOrEpochLoadsWaiting
17811811
s.listOrEpochLoadsLoading.mergeFrom(loading)
17821812
s.listOrEpochLoadsWaiting = listOrEpochLoads{}
1813+
s.listOrEpochMetaCh = nil
17831814
s.listOrEpochMu.Unlock()
17841815

17851816
brokerLoads := s.mapLoadsToBrokers(loading)
@@ -1803,7 +1834,23 @@ func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, isReload bool) {
18031834
defer func() {
18041835
if !reloads.isEmpty() {
18051836
s.incWorker()
1806-
go s.listOrEpoch(reloads, true)
1837+
go func() {
1838+
// Before we dec our worker, we must add the
1839+
// reloads back into the session's waiting loads.
1840+
// Doing so allows a concurrent stopSession to
1841+
// track the waiting loads, whereas if we did not
1842+
// add things back to the session, we could abandon
1843+
// loading these offsets and have a stuck cursor.
1844+
defer s.decWorker()
1845+
defer reloads.loadWithSession(s, "reload offsets from load failure")
1846+
after := time.NewTimer(time.Second)
1847+
defer after.Stop()
1848+
select {
1849+
case <-after.C:
1850+
case <-s.ctx.Done():
1851+
return
1852+
}
1853+
}()
18071854
}
18081855
}()
18091856

pkg/kgo/metadata.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,19 +136,20 @@ func (cl *Client) waitmeta(ctx context.Context, wait time.Duration, why string)
136136
cl.metawait.c.Broadcast()
137137
}
138138

139-
func (cl *Client) triggerUpdateMetadata(must bool, why string) {
139+
func (cl *Client) triggerUpdateMetadata(must bool, why string) bool {
140140
if !must {
141141
cl.metawait.mu.Lock()
142142
defer cl.metawait.mu.Unlock()
143143
if time.Since(cl.metawait.lastUpdate) < cl.cfg.metadataMinAge {
144-
return
144+
return false
145145
}
146146
}
147147

148148
select {
149149
case cl.updateMetadataCh <- why:
150150
default:
151151
}
152+
return true
152153
}
153154

154155
func (cl *Client) triggerUpdateMetadataNow(why string) {

pkg/kgo/source.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,12 +1023,16 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
10231023
// reload offsets *always* triggers a metadata update.
10241024
if updateWhy != nil {
10251025
why := updateWhy.reason(fmt.Sprintf("fetch had inner topic errors from broker %d", s.nodeID))
1026-
if updateWhy.isOnly(kerr.UnknownTopicOrPartition) || updateWhy.isOnly(kerr.UnknownTopicID) {
1027-
s.cl.triggerUpdateMetadata(false, why)
1028-
} else {
1029-
s.cl.triggerUpdateMetadataNow(why)
1026+
// loadWithSessionNow triggers a metadata update IF there are
1027+
// offsets to reload. If there are no offsets to reload, we
1028+
// trigger one here.
1029+
if !reloadOffsets.loadWithSessionNow(consumerSession, why) {
1030+
if updateWhy.isOnly(kerr.UnknownTopicOrPartition) || updateWhy.isOnly(kerr.UnknownTopicID) {
1031+
s.cl.triggerUpdateMetadata(false, why)
1032+
} else {
1033+
s.cl.triggerUpdateMetadataNow(why)
1034+
}
10301035
}
1031-
reloadOffsets.loadWithSession(consumerSession)
10321036
}
10331037

10341038
if fetch.hasErrorsOrRecords() {

pkg/kgo/topics_and_partitions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -957,5 +957,5 @@ func (css *consumerSessionStopper) maybeRestart() {
957957
}
958958
session := css.cl.consumer.startNewSession(css.tpsPrior)
959959
defer session.decWorker()
960-
css.reloadOffsets.loadWithSession(session)
960+
css.reloadOffsets.loadWithSession(session, "resuming reload offsets after session stopped for cursor migrating in metadata")
961961
}

0 commit comments

Comments
 (0)