Skip to content

Commit 9494d87

Browse files
committed
kgo: allow record ctx cancelation to propagate a bit more
If a record's context is canceled, we now allow it to be failed in two more locations: * while the producer ID is loading -- we can actually now cancel the producer ID loading request (which may also benefit people using transactions that want to force quit the client) * while a sink is backing off due to request failures For people using transactions, canceling a context now allows you to force quit in more areas, but the same caveat applies: your client will likely end up in an invalid transactional state and be unable to continue. For #769.
1 parent 9ff20d2 commit 9494d87

File tree

7 files changed

+192
-30
lines changed

7 files changed

+192
-30
lines changed

pkg/kgo/errors.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func isRetryableBrokerErr(err error) bool {
5353
}
5454
// We could have a retryable producer ID failure, which then bubbled up
5555
// as errProducerIDLoadFail so as to be retried later.
56-
if errors.Is(err, errProducerIDLoadFail) {
56+
if pe := (*errProducerIDLoadFail)(nil); errors.As(err, &pe) {
5757
return true
5858
}
5959
// We could have chosen a broker, and then a concurrent metadata update
@@ -139,8 +139,6 @@ var (
139139
// restart a new connection ourselves.
140140
errSaslReauthLoop = errors.New("the broker is repeatedly giving us sasl lifetimes that are too short to write a request")
141141

142-
errProducerIDLoadFail = errors.New("unable to initialize a producer ID due to request failures")
143-
144142
// A temporary error returned when Kafka replies with a different
145143
// correlation ID than we were expecting for the request the client
146144
// issued.
@@ -224,6 +222,19 @@ type ErrFirstReadEOF struct {
224222
err error
225223
}
226224

225+
type errProducerIDLoadFail struct {
226+
err error
227+
}
228+
229+
func (e *errProducerIDLoadFail) Error() string {
230+
if e.err == nil {
231+
return "unable to initialize a producer ID due to request failures"
232+
}
233+
return fmt.Sprintf("unable to initialize a producer ID due to request failures: %v", e.err)
234+
}
235+
236+
func (e *errProducerIDLoadFail) Unwrap() error { return e.err }
237+
227238
const (
228239
firstReadSASL uint8 = iota
229240
firstReadTLS

pkg/kgo/helpers_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,32 @@ var (
5555
npartitionsAt int64
5656
)
5757

58+
type slowConn struct {
59+
net.Conn
60+
}
61+
62+
func (s *slowConn) Write(p []byte) (int, error) {
63+
time.Sleep(100 * time.Millisecond)
64+
return s.Conn.Write(p)
65+
}
66+
67+
func (s *slowConn) Read(p []byte) (int, error) {
68+
time.Sleep(100 * time.Millisecond)
69+
return s.Conn.Read(p)
70+
}
71+
72+
type slowDialer struct {
73+
d net.Dialer
74+
}
75+
76+
func (s *slowDialer) DialContext(ctx context.Context, network, host string) (net.Conn, error) {
77+
c, err := s.d.DialContext(ctx, network, host)
78+
if err != nil {
79+
return nil, err
80+
}
81+
return &slowConn{c}, nil
82+
}
83+
5884
func init() {
5985
var err error
6086
if n, _ := strconv.Atoi(os.Getenv("KGO_TEST_RF")); n > 0 {

pkg/kgo/produce_request_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"errors"
77
"hash/crc32"
8+
"strings"
89
"testing"
910
"time"
1011

@@ -90,7 +91,7 @@ func TestIssue769(t *testing.T) {
9091
case <-timer.C:
9192
t.Fatal("expected record to fail within 3s")
9293
}
93-
if pe := (*errProducerIDLoadFail)(nil); !errors.As(rerr, &pe) || !errors.Is(pe.err, context.Canceled) {
94+
if pe := (*errProducerIDLoadFail)(nil); !errors.As(rerr, &pe) || !(errors.Is(pe.err, context.Canceled) || strings.Contains(pe.err.Error(), "canceled")) {
9495
t.Errorf("got %v != exp errProducerIDLoadFail{context.Canceled}", rerr)
9596
}
9697
}

pkg/kgo/producer.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,11 @@ func (cl *Client) TryProduce(
358358
// retries. If any of these conditions are hit and it is currently safe to fail
359359
// records, all buffered records for the relevant partition are failed. Only
360360
// the first record's context in a batch is considered when determining whether
361-
// the batch should be canceled.
361+
// the batch should be canceled. A record is not safe to fail if the client
362+
// is idempotently producing and a request has been sent; in this case, the
363+
// client cannot know if the broker actually processed the request (if so, then
364+
// removing the records from the client will create errors the next time you
365+
// produce).
362366
//
363367
// If the client is transactional and a transaction has not been begun, the
364368
// promise is immediately called with an error corresponding to not being in a
@@ -626,7 +630,7 @@ func (cl *Client) ProducerID(ctx context.Context) (int64, int16, error) {
626630

627631
go func() {
628632
defer close(done)
629-
id, epoch, err = cl.producerID()
633+
id, epoch, err = cl.producerID(ctx2fn(ctx))
630634
}()
631635

632636
select {
@@ -648,7 +652,7 @@ var errReloadProducerID = errors.New("producer id needs reloading")
648652
// initProducerID initializes the client's producer ID for idempotent
649653
// producing only (no transactions, which are more special). After the first
650654
// load, this clears all buffered unknown topics.
651-
func (cl *Client) producerID() (int64, int16, error) {
655+
func (cl *Client) producerID(ctxFn func() context.Context) (int64, int16, error) {
652656
p := &cl.producer
653657

654658
id := p.id.Load().(*producerID)
@@ -677,7 +681,7 @@ func (cl *Client) producerID() (int64, int16, error) {
677681
}
678682
p.id.Store(id)
679683
} else {
680-
newID, keep := cl.doInitProducerID(id.id, id.epoch)
684+
newID, keep := cl.doInitProducerID(ctxFn, id.id, id.epoch)
681685
if keep {
682686
id = newID
683687
// Whenever we have a new producer ID, we need
@@ -695,7 +699,7 @@ func (cl *Client) producerID() (int64, int16, error) {
695699
id = &producerID{
696700
id: id.id,
697701
epoch: id.epoch,
698-
err: errProducerIDLoadFail,
702+
err: &errProducerIDLoadFail{newID.err},
699703
}
700704
}
701705
}
@@ -772,7 +776,7 @@ func (cl *Client) failProducerID(id int64, epoch int16, err error) {
772776

773777
// doInitProducerID inits the idempotent ID and potentially the transactional
774778
// producer epoch, returning whether to keep the result.
775-
func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) (*producerID, bool) {
779+
func (cl *Client) doInitProducerID(ctxFn func() context.Context, lastID int64, lastEpoch int16) (*producerID, bool) {
776780
cl.cfg.logger.Log(LogLevelInfo, "initializing producer id")
777781
req := kmsg.NewPtrInitProducerIDRequest()
778782
req.TransactionalID = cl.cfg.txnID
@@ -782,7 +786,8 @@ func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) (*producerID,
782786
req.TransactionTimeoutMillis = int32(cl.cfg.txnTimeout.Milliseconds())
783787
}
784788

785-
resp, err := req.RequestWith(cl.ctx, cl)
789+
ctx := ctxFn()
790+
resp, err := req.RequestWith(ctx, cl)
786791
if err != nil {
787792
if errors.Is(err, errUnknownRequestKey) || errors.Is(err, errBrokerTooOld) {
788793
cl.cfg.logger.Log(LogLevelInfo, "unable to initialize a producer id because the broker is too old or the client is pinned to an old version, continuing without a producer id")
@@ -887,13 +892,14 @@ func (cl *Client) addUnknownTopicRecord(pr promisedRec) {
887892
}
888893
unknown.buffered = append(unknown.buffered, pr)
889894
if len(unknown.buffered) == 1 {
890-
go cl.waitUnknownTopic(pr.ctx, pr.Topic, unknown)
895+
go cl.waitUnknownTopic(pr.ctx, pr.Record.Context, pr.Topic, unknown)
891896
}
892897
}
893898

894899
// waitUnknownTopic waits for a notification
895900
func (cl *Client) waitUnknownTopic(
896-
rctx context.Context,
901+
pctx context.Context, // context passed to Produce
902+
rctx context.Context, // context on the record itself
897903
topic string,
898904
unknown *unknownTopicProduces,
899905
) {
@@ -921,6 +927,8 @@ func (cl *Client) waitUnknownTopic(
921927

922928
for err == nil {
923929
select {
930+
case <-pctx.Done():
931+
err = pctx.Err()
924932
case <-rctx.Done():
925933
err = rctx.Err()
926934
case <-cl.ctx.Done():

pkg/kgo/sink.go

Lines changed: 116 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ func (s *sink) maybeBackoff() {
208208
select {
209209
case <-after.C:
210210
case <-s.cl.ctx.Done():
211+
case <-s.anyCtx().Done():
211212
}
212213
}
213214

@@ -247,6 +248,34 @@ func (s *sink) drain() {
247248
}
248249
}
249250

251+
// Returns the first context encountered ranging across all records.
252+
// This does not use defers to make it clear at the return that all
253+
// unlocks are called in proper order. Ideally, do not call this func
254+
// due to lock intensity.
255+
func (s *sink) anyCtx() context.Context {
256+
s.recBufsMu.Lock()
257+
for _, recBuf := range s.recBufs {
258+
recBuf.mu.Lock()
259+
if len(recBuf.batches) > 0 {
260+
batch0 := recBuf.batches[0]
261+
batch0.mu.Lock()
262+
if batch0.canFailFromLoadErrs && len(batch0.records) > 0 {
263+
r0 := batch0.records[0]
264+
if rctx := r0.cancelingCtx(); rctx != nil {
265+
batch0.mu.Unlock()
266+
recBuf.mu.Unlock()
267+
s.recBufsMu.Unlock()
268+
return rctx
269+
}
270+
}
271+
batch0.mu.Unlock()
272+
}
273+
recBuf.mu.Unlock()
274+
}
275+
s.recBufsMu.Unlock()
276+
return context.Background()
277+
}
278+
250279
func (s *sink) produce(sem <-chan struct{}) bool {
251280
var produced bool
252281
defer func() {
@@ -267,6 +296,7 @@ func (s *sink) produce(sem <-chan struct{}) bool {
267296
// - auth failure
268297
// - transactional: a produce failure that failed the producer ID
269298
// - AddPartitionsToTxn failure (see just below)
299+
// - some head-of-line context failure
270300
//
271301
// All but the first error is fatal. Recovery may be possible with
272302
// EndTransaction in specific cases, but regardless, all buffered
@@ -275,10 +305,71 @@ func (s *sink) produce(sem <-chan struct{}) bool {
275305
// NOTE: we init the producer ID before creating a request to ensure we
276306
// are always using the latest id/epoch with the proper sequence
277307
// numbers. (i.e., resetAllSequenceNumbers && producerID logic combo).
278-
id, epoch, err := s.cl.producerID()
308+
//
309+
// For the first-discovered-record-head-of-line context, we want to
310+
// avoid looking it up if possible (which is why producerID takes a
311+
// ctxFn). If we do use one, we want to be sure that the
312+
// context.Canceled error is from *that* context rather than the client
313+
// context or something else. So, we go through some special care to
314+
// track setting the ctx / looking up if it is canceled.
315+
var holCtxMu sync.Mutex
316+
var holCtx context.Context
317+
ctxFn := func() context.Context {
318+
holCtxMu.Lock()
319+
defer holCtxMu.Unlock()
320+
holCtx = s.anyCtx()
321+
return holCtx
322+
}
323+
isHolCtxDone := func() bool {
324+
holCtxMu.Lock()
325+
defer holCtxMu.Unlock()
326+
if holCtx == nil {
327+
return false
328+
}
329+
select {
330+
case <-holCtx.Done():
331+
return true
332+
default:
333+
}
334+
return false
335+
}
336+
337+
id, epoch, err := s.cl.producerID(ctxFn)
279338
if err != nil {
339+
var pe *errProducerIDLoadFail
280340
switch {
281-
case errors.Is(err, errProducerIDLoadFail):
341+
case errors.As(err, &pe):
342+
if errors.Is(pe.err, context.Canceled) && isHolCtxDone() {
343+
// Some head-of-line record in a partition had a context cancelation.
344+
// We look for any partition with HOL cancelations and fail them all.
345+
s.cl.cfg.logger.Log(LogLevelInfo, "the first record in some partition(s) had a context cancelation; failing all relevant partitions", "broker", logID(s.nodeID))
346+
s.recBufsMu.Lock()
347+
defer s.recBufsMu.Unlock()
348+
for _, recBuf := range s.recBufs {
349+
recBuf.mu.Lock()
350+
var failAll bool
351+
if len(recBuf.batches) > 0 {
352+
batch0 := recBuf.batches[0]
353+
batch0.mu.Lock()
354+
if batch0.canFailFromLoadErrs && len(batch0.records) > 0 {
355+
r0 := batch0.records[0]
356+
if rctx := r0.cancelingCtx(); rctx != nil {
357+
select {
358+
case <-rctx.Done():
359+
failAll = true // we must not call failAllRecords here, because failAllRecords locks batches!
360+
default:
361+
}
362+
}
363+
}
364+
batch0.mu.Unlock()
365+
}
366+
if failAll {
367+
recBuf.failAllRecords(err)
368+
}
369+
recBuf.mu.Unlock()
370+
}
371+
return true
372+
}
282373
s.cl.bumpRepeatedLoadErr(err)
283374
s.cl.cfg.logger.Log(LogLevelWarn, "unable to load producer ID, bumping client's buffered record load errors by 1 and retrying")
284375
return true // whatever caused our produce, we did nothing, so keep going
@@ -385,6 +476,9 @@ func (s *sink) doSequenced(
385476
promise: promise,
386477
}
387478

479+
// We can NOT use any record context. If we do, we force the request to
480+
// fail while also force the batch to be unfailable (due to no
481+
// response),
388482
br, err := s.cl.brokerOrErr(s.cl.ctx, s.nodeID, errUnknownBroker)
389483
if err != nil {
390484
wait.err = err
@@ -432,6 +526,11 @@ func (s *sink) doTxnReq(
432526
req.batches.eachOwnerLocked(seqRecBatch.removeFromTxn)
433527
}
434528
}()
529+
// We do NOT let record context cancelations fail this request: doing
530+
// so would put the transactional ID in an unknown state. This is
531+
// similar to the warning we give in the txn.go file, but the
532+
// difference there is the user knows explicitly at the function call
533+
// that canceling the context will opt them into invalid state.
435534
err = s.cl.doWithConcurrentTransactions(s.cl.ctx, "AddPartitionsToTxn", func() error {
436535
stripped, err = s.issueTxnReq(req, txnReq)
437536
return err
@@ -1393,6 +1492,16 @@ type promisedRec struct {
13931492
*Record
13941493
}
13951494

1495+
func (pr promisedRec) cancelingCtx() context.Context {
1496+
if pr.ctx.Done() != nil {
1497+
return pr.ctx
1498+
}
1499+
if pr.Context.Done() != nil {
1500+
return pr.Context
1501+
}
1502+
return nil
1503+
}
1504+
13961505
// recBatch is the type used for buffering records before they are written.
13971506
type recBatch struct {
13981507
owner *recBuf // who owns us
@@ -1421,10 +1530,12 @@ type recBatch struct {
14211530
// Returns an error if the batch should fail.
14221531
func (b *recBatch) maybeFailErr(cfg *cfg) error {
14231532
if len(b.records) > 0 {
1424-
ctx := b.records[0].ctx
1533+
r0 := &b.records[0]
14251534
select {
1426-
case <-ctx.Done():
1427-
return ctx.Err()
1535+
case <-r0.ctx.Done():
1536+
return r0.ctx.Err()
1537+
case <-r0.Context.Done():
1538+
return r0.Context.Err()
14281539
default:
14291540
}
14301541
}

pkg/kgo/source.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -894,6 +894,9 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
894894
// reload offsets *always* triggers a metadata update.
895895
if updateWhy != nil {
896896
why := updateWhy.reason(fmt.Sprintf("fetch had inner topic errors from broker %d", s.nodeID))
897+
// loadWithSessionNow triggers a metadata update IF there are
898+
// offsets to reload. If there are no offsets to reload, we
899+
// trigger one here.
897900
if !reloadOffsets.loadWithSessionNow(consumerSession, why) {
898901
if updateWhy.isOnly(kerr.UnknownTopicOrPartition) || updateWhy.isOnly(kerr.UnknownTopicID) {
899902
s.cl.triggerUpdateMetadata(false, why)

0 commit comments

Comments
 (0)