diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 1ea2a29f..f1f06b7a 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -549,6 +549,12 @@ func (p *producer) promiseRecordBeforeBuf(pr promisedRec, err error) { func (p *producer) finishPromises(b batchPromise) { cl := p.cl var more bool + var broadcast bool + defer func() { + if broadcast { + p.c.Broadcast() + } + }() start: p.promisesMu.Lock() for i, pr := range b.recs { @@ -558,7 +564,8 @@ start: pr.ProducerID = b.pid pr.ProducerEpoch = b.epoch pr.Attrs = b.attrs - cl.finishRecordPromise(pr, b.err, b.beforeBuf) + recBroadcast := cl.finishRecordPromise(pr, b.err, b.beforeBuf) + broadcast = broadcast || recBroadcast b.recs[i] = promisedRec{} } p.promisesMu.Unlock() @@ -572,7 +579,7 @@ start: } } -func (cl *Client) finishRecordPromise(pr promisedRec, err error, beforeBuffering bool) { +func (cl *Client) finishRecordPromise(pr promisedRec, err error, beforeBuffering bool) (broadcast bool) { p := &cl.producer if p.hooks != nil && len(p.hooks.unbuffered) > 0 { @@ -599,12 +606,10 @@ func (cl *Client) finishRecordPromise(pr promisedRec, err error, beforeBuffering p.mu.Lock() p.bufferedBytes -= userSize p.bufferedRecords-- - broadcast := p.blocked.Load() > 0 || p.bufferedRecords == 0 && p.flushing.Load() > 0 + broadcast = p.blocked.Load() > 0 || p.bufferedRecords == 0 && p.flushing.Load() > 0 p.mu.Unlock() - if broadcast { - p.c.Broadcast() - } + return broadcast } // partitionRecord loads the partitions for a topic and produce to them. If