Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 66 additions & 3 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ type broker struct {
cxnGroup *brokerCxn
cxnSlow *brokerCxn

countFirstRespEOF countFirstRespEOF

reapMu sync.Mutex // held when modifying a brokerCxn

// reqs manages incoming message requests.
Expand Down Expand Up @@ -537,6 +539,7 @@ func (p bufPool) put(b []byte) { p.p.Put(&b) }
func (b *broker) loadConnection(ctx context.Context, req kmsg.Request) (*brokerCxn, error) {
var (
pcxn = &b.cxnNormal
incEOF = b.countFirstRespEOF.incNormal
isProduceCxn bool
isFetchCxn bool
reqKey = req.Key()
Expand All @@ -545,14 +548,18 @@ func (b *broker) loadConnection(ctx context.Context, req kmsg.Request) (*brokerC
switch {
case reqKey == 0:
pcxn = &b.cxnProduce
incEOF = b.countFirstRespEOF.incProduce
isProduceCxn = true
case reqKey == 1:
pcxn = &b.cxnFetch
incEOF = b.countFirstRespEOF.incFetch
isFetchCxn = true
case reqKey == 11 || reqKey == 14: // join || sync
pcxn = &b.cxnGroup
incEOF = b.countFirstRespEOF.incGroup
case isTimeout:
pcxn = &b.cxnSlow
incEOF = b.countFirstRespEOF.incSlow
}

if *pcxn != nil && !(*pcxn).dead.Load() {
Expand Down Expand Up @@ -582,6 +589,7 @@ doConnect:

addr: b.addr,
conn: conn,
incEOF: incEOF,
deadCh: make(chan struct{}),
}
if err = cxn.init(isProduceCxn, tries); err != nil {
Expand Down Expand Up @@ -696,7 +704,7 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) {
if !errors.Is(err, ErrClientClosed) && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "operation was canceled") {
if errors.Is(err, io.EOF) {
b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker due to an immediate EOF, which often means the client is using TLS when the broker is not expecting it (is TLS misconfigured?)", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err)
return nil, &ErrFirstReadEOF{kind: firstReadDial, err: err}
return nil, &ErrFirstReadEOF{kind: firstReadDial, err: err, fail: true}
}
b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err)
}
Expand Down Expand Up @@ -732,6 +740,7 @@ type brokerCxn struct {
reading atomic.Bool

successes uint64
incEOF func(bool) bool

// resps manages reading kafka responses.
resps ring[promisedResp]
Expand Down Expand Up @@ -807,7 +816,7 @@ start:
return &errApiVersionsReset{err}
} else if errors.Is(err, io.EOF) {
cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker received EOF during api versions discovery, which often happens when the broker requires TLS and the client is not using it (is TLS misconfigured?)", "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "err", err)
err = &ErrFirstReadEOF{kind: firstReadTLS, err: err}
err = &ErrFirstReadEOF{kind: firstReadTLS, err: err, fail: true}
}
return err
}
Expand Down Expand Up @@ -1534,6 +1543,20 @@ start:
}

func (cxn *brokerCxn) handleResp(pr promisedResp) {
var (
tracked bool
at3 bool
trackEOF = func(inc bool) bool {
if tracked {
return at3
}
tracked = true
at3 = cxn.incEOF(inc)
return at3
}
)
defer trackEOF(false)

rawResp, err := cxn.readResponse(
pr.ctx,
pr.resp.Key(),
Expand All @@ -1553,7 +1576,7 @@ func (cxn *brokerCxn) handleResp(pr promisedResp) {
} else {
cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker errored, killing connection after 0 successful responses (is SASL missing?)", "req", kmsg.Key(pr.resp.Key()).Name(), "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "err", err)
if err == io.EOF { // specifically avoid checking errors.Is to ensure this is not already wrapped
err = &ErrFirstReadEOF{kind: firstReadSASL, err: err}
err = &ErrFirstReadEOF{kind: firstReadSASL, err: err, fail: trackEOF(true)}
}
}
}
Expand Down Expand Up @@ -1599,3 +1622,43 @@ func (cxn *brokerCxn) handleResp(pr promisedResp) {

pr.promise(pr.resp, readErr)
}

// For EOF errors, we bubble them up as "fatal" after three repeated failures
// in a row against a broker. We have five different potential connections to
// the broker; we count the failures individually per connection. However, a
// success on ANY connection resets the counter globally: first read EOF errors
// are meant to signal invalid tls / sasl, so a success anywhere should imply a
// success everywhere.
//
// Anyway, point is, to count the connections failing 3x in a row individually,
// we use a bitfield, with two bits assigned per connection. If both bits are 1,
// it's failed 3x and rather than storing `11`, we return failure and reset the
// whole counter to 0.
type countFirstRespEOF struct {
c atomic.Uint32
}

func (c *countFirstRespEOF) incShift(inc bool, shift uint32) (at3 bool) {
if !inc { // if we are not incrementing, EOF did not happen; we reset the count and return false (no overflow)
c.c.Store(0)
return false
}
for {
prior := c.c.Load()

next := prior + 1<<shift
if (next>>shift)&3 == 3 {
c.c.Store(0)
return true // the increment would overflow to 3, reset the counter and return true (we hit the EOF limit)
}
if c.c.CompareAndSwap(prior, next) {
return false
}
}
}

func (c *countFirstRespEOF) incNormal(inc bool) (at3 bool) { return c.incShift(inc, 0) }
func (c *countFirstRespEOF) incProduce(inc bool) (at3 bool) { return c.incShift(inc, 2) }
func (c *countFirstRespEOF) incFetch(inc bool) (at3 bool) { return c.incShift(inc, 4) }
func (c *countFirstRespEOF) incGroup(inc bool) (at3 bool) { return c.incShift(inc, 6) }
func (c *countFirstRespEOF) incSlow(inc bool) (at3 bool) { return c.incShift(inc, 8) }
12 changes: 9 additions & 3 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,15 @@
// we can retry that. Same for ErrClosed.
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
// If the FIRST read is EOF, that is usually not a good sign,
// often it's from bad SASL. We err on the side of pessimism
// and do not retry.
// often it's from bad SASL. If we have failed EOF 3x in a row,
// We err on the side of pessimism and do not retry.
if ee := (*ErrFirstReadEOF)(nil); errors.As(err, &ee) {
return false
if ee.fail {
fmt.Println("NOT RETRYABLE, AT 3")
return false
} else {

Check failure on line 73 in pkg/kgo/errors.go

View workflow job for this annotation

GitHub Actions / golangci-lint-root on amd64

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)

Check failure on line 73 in pkg/kgo/errors.go

View workflow job for this annotation

GitHub Actions / golangci-lint-root on amd64

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)
fmt.Println("retryable, NOT AT 3")
}
}
return true
}
Expand Down Expand Up @@ -240,6 +245,7 @@
type ErrFirstReadEOF struct {
kind uint8
err error
fail bool
}

type errProducerIDLoadFail struct {
Expand Down
Loading