Skip to content

Commit 0f4ad28

Browse files
committed
kgo: mark ErrFirstReadEOF non-retryable after 3 tries
On very busy systems, it is possible for connections to be cut immediately after opening even if things are otherwise healthy and tls and sasl are configured properly. We now track EOF errors happening 3x in a row (on a given connection type), and only at that point mark the error as non-retryable. This looks a bit weird with the deferred track closure, but doing it like this ensures we reset the EOF count on any success state. Updates b2620e2.
1 parent b077119 commit 0f4ad28

File tree

2 files changed

+75
-6
lines changed

2 files changed

+75
-6
lines changed

pkg/kgo/broker.go

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ type broker struct {
157157
cxnGroup *brokerCxn
158158
cxnSlow *brokerCxn
159159

160+
countFirstRespEOF countFirstRespEOF
161+
160162
reapMu sync.Mutex // held when modifying a brokerCxn
161163

162164
// reqs manages incoming message requests.
@@ -537,6 +539,7 @@ func (p bufPool) put(b []byte) { p.p.Put(&b) }
537539
func (b *broker) loadConnection(ctx context.Context, req kmsg.Request) (*brokerCxn, error) {
538540
var (
539541
pcxn = &b.cxnNormal
542+
incEOF = b.countFirstRespEOF.incNormal
540543
isProduceCxn bool
541544
isFetchCxn bool
542545
reqKey = req.Key()
@@ -545,14 +548,18 @@ func (b *broker) loadConnection(ctx context.Context, req kmsg.Request) (*brokerC
545548
switch {
546549
case reqKey == 0:
547550
pcxn = &b.cxnProduce
551+
incEOF = b.countFirstRespEOF.incProduce
548552
isProduceCxn = true
549553
case reqKey == 1:
550554
pcxn = &b.cxnFetch
555+
incEOF = b.countFirstRespEOF.incFetch
551556
isFetchCxn = true
552557
case reqKey == 11 || reqKey == 14: // join || sync
553558
pcxn = &b.cxnGroup
559+
incEOF = b.countFirstRespEOF.incGroup
554560
case isTimeout:
555561
pcxn = &b.cxnSlow
562+
incEOF = b.countFirstRespEOF.incSlow
556563
}
557564

558565
if *pcxn != nil && !(*pcxn).dead.Load() {
@@ -582,6 +589,7 @@ doConnect:
582589

583590
addr: b.addr,
584591
conn: conn,
592+
incEOF: incEOF,
585593
deadCh: make(chan struct{}),
586594
}
587595
if err = cxn.init(isProduceCxn, tries); err != nil {
@@ -696,7 +704,7 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) {
696704
if !errors.Is(err, ErrClientClosed) && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "operation was canceled") {
697705
if errors.Is(err, io.EOF) {
698706
b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker due to an immediate EOF, which often means the client is using TLS when the broker is not expecting it (is TLS misconfigured?)", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err)
699-
return nil, &ErrFirstReadEOF{kind: firstReadDial, err: err}
707+
return nil, &ErrFirstReadEOF{kind: firstReadDial, err: err, fail: true}
700708
}
701709
b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err)
702710
}
@@ -732,6 +740,7 @@ type brokerCxn struct {
732740
reading atomic.Bool
733741

734742
successes uint64
743+
incEOF func(bool) bool
735744

736745
// resps manages reading kafka responses.
737746
resps ring[promisedResp]
@@ -807,7 +816,7 @@ start:
807816
return &errApiVersionsReset{err}
808817
} else if errors.Is(err, io.EOF) {
809818
cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker received EOF during api versions discovery, which often happens when the broker requires TLS and the client is not using it (is TLS misconfigured?)", "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "err", err)
810-
err = &ErrFirstReadEOF{kind: firstReadTLS, err: err}
819+
err = &ErrFirstReadEOF{kind: firstReadTLS, err: err, fail: true}
811820
}
812821
return err
813822
}
@@ -1534,6 +1543,20 @@ start:
15341543
}
15351544

15361545
func (cxn *brokerCxn) handleResp(pr promisedResp) {
1546+
var (
1547+
tracked bool
1548+
at3 bool
1549+
trackEOF = func(inc bool) bool {
1550+
if tracked {
1551+
return at3
1552+
}
1553+
tracked = true
1554+
at3 = cxn.incEOF(inc)
1555+
return at3
1556+
}
1557+
)
1558+
defer trackEOF(false)
1559+
15371560
rawResp, err := cxn.readResponse(
15381561
pr.ctx,
15391562
pr.resp.Key(),
@@ -1553,7 +1576,7 @@ func (cxn *brokerCxn) handleResp(pr promisedResp) {
15531576
} else {
15541577
cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker errored, killing connection after 0 successful responses (is SASL missing?)", "req", kmsg.Key(pr.resp.Key()).Name(), "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "err", err)
15551578
if err == io.EOF { // specifically avoid checking errors.Is to ensure this is not already wrapped
1556-
err = &ErrFirstReadEOF{kind: firstReadSASL, err: err}
1579+
err = &ErrFirstReadEOF{kind: firstReadSASL, err: err, fail: trackEOF(true)}
15571580
}
15581581
}
15591582
}
@@ -1599,3 +1622,43 @@ func (cxn *brokerCxn) handleResp(pr promisedResp) {
15991622

16001623
pr.promise(pr.resp, readErr)
16011624
}
1625+
1626+
// For EOF errors, we bubble them up as "fatal" after three repeated failures
1627+
// in a row against a broker. We have five different potential connections to
1628+
// the broker; we count the failures individually per connection. However, a
1629+
// success on ANY connection resets the counter globally: first read EOF errors
1630+
// are meant to signal invalid tls / sasl, so a success anywhere should imply a
1631+
// success everywhere.
1632+
//
1633+
// Anyway, point is, to count the connections failing 3x in a row individually,
1634+
// we use a bitfield, with two bits assigned per connection. If both bits are 1,
1635+
// it's failed 3x and rather than storing `11`, we return failure and reset the
1636+
// whole counter to 0.
1637+
type countFirstRespEOF struct {
1638+
c atomic.Uint32
1639+
}
1640+
1641+
func (c *countFirstRespEOF) incShift(inc bool, shift uint32) (at3 bool) {
1642+
if !inc { // if we are not incrementing, EOF did not happen; we reset the count and return false (no overflow)
1643+
c.c.Store(0)
1644+
return false
1645+
}
1646+
for {
1647+
prior := c.c.Load()
1648+
1649+
next := prior + 1<<shift
1650+
if (next>>shift)&3 == 3 {
1651+
c.c.Store(0)
1652+
return true // the increment would overflow to 3, reset the counter and return true (we hit the EOF limit)
1653+
}
1654+
if c.c.CompareAndSwap(prior, next) {
1655+
return false
1656+
}
1657+
}
1658+
}
1659+
1660+
func (c *countFirstRespEOF) incNormal(inc bool) (at3 bool) { return c.incShift(inc, 0) }
1661+
func (c *countFirstRespEOF) incProduce(inc bool) (at3 bool) { return c.incShift(inc, 2) }
1662+
func (c *countFirstRespEOF) incFetch(inc bool) (at3 bool) { return c.incShift(inc, 4) }
1663+
func (c *countFirstRespEOF) incGroup(inc bool) (at3 bool) { return c.incShift(inc, 6) }
1664+
func (c *countFirstRespEOF) incSlow(inc bool) (at3 bool) { return c.incShift(inc, 8) }

pkg/kgo/errors.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,15 @@ func isRetryableBrokerErr(err error) bool {
6464
// we can retry that. Same for ErrClosed.
6565
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
6666
// If the FIRST read is EOF, that is usually not a good sign,
67-
// often it's from bad SASL. We err on the side of pessimism
68-
// and do not retry.
67+
// often it's from bad SASL. If we have failed EOF 3x in a row,
68+
// We err on the side of pessimism and do not retry.
6969
if ee := (*ErrFirstReadEOF)(nil); errors.As(err, &ee) {
70-
return false
70+
if ee.fail {
71+
fmt.Println("NOT RETRYABLE, AT 3")
72+
return false
73+
} else {
74+
fmt.Println("retryable, NOT AT 3")
75+
}
7176
}
7277
return true
7378
}
@@ -240,6 +245,7 @@ var (
240245
type ErrFirstReadEOF struct {
241246
kind uint8
242247
err error
248+
fail bool
243249
}
244250

245251
type errProducerIDLoadFail struct {

0 commit comments

Comments
 (0)