Skip to content

Commit

Permalink
kgo: broadcast batch finishes in one big blast
Browse files Browse the repository at this point in the history
Previously, we would broadcast after every individual record finished in
a batch. If we produced 5000 records concurrently but only allowed 1000
in flight, we would first send a few large batches, and then once they
started finishing, we would:
* Finish a record
* Wake up a waiting record
* That record would race into producing
* The producing wouldn't linger even if we wanted to, because other
  records are blocked
* That record would be produced alone in one batch immediately
* Goto start

This forced unnecessary synchronization and caused things to produce
slowly.

By doing one broadcast at the end of a batch, we actually give more
"space" for the client to buffer before waking up everything waiting.
The first goroutine awoken will still produce a small batch, _but_ we
will reach a point where the client is buffering larger batches.

Overall, this speeds things up for a niche case and is not detrimental
in any way to the non-niche case.
  • Loading branch information
twmb committed Dec 24, 2024
1 parent 3d0d08c commit ead18d3
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,12 @@ func (p *producer) promiseRecordBeforeBuf(pr promisedRec, err error) {
func (p *producer) finishPromises(b batchPromise) {
cl := p.cl
var more bool
var broadcast bool
defer func() {
if broadcast {
p.c.Broadcast()
}
}()
start:
p.promisesMu.Lock()
for i, pr := range b.recs {
Expand All @@ -558,7 +564,8 @@ start:
pr.ProducerID = b.pid
pr.ProducerEpoch = b.epoch
pr.Attrs = b.attrs
cl.finishRecordPromise(pr, b.err, b.beforeBuf)
recBroadcast := cl.finishRecordPromise(pr, b.err, b.beforeBuf)
broadcast = broadcast || recBroadcast
b.recs[i] = promisedRec{}
}
p.promisesMu.Unlock()
Expand All @@ -572,7 +579,7 @@ start:
}
}

func (cl *Client) finishRecordPromise(pr promisedRec, err error, beforeBuffering bool) {
func (cl *Client) finishRecordPromise(pr promisedRec, err error, beforeBuffering bool) (broadcast bool) {
p := &cl.producer

if p.hooks != nil && len(p.hooks.unbuffered) > 0 {
Expand All @@ -599,12 +606,10 @@ func (cl *Client) finishRecordPromise(pr promisedRec, err error, beforeBuffering
p.mu.Lock()
p.bufferedBytes -= userSize
p.bufferedRecords--
broadcast := p.blocked.Load() > 0 || p.bufferedRecords == 0 && p.flushing.Load() > 0
broadcast = p.blocked.Load() > 0 || p.bufferedRecords == 0 && p.flushing.Load() > 0
p.mu.Unlock()

if broadcast {
p.c.Broadcast()
}
return broadcast
}

// partitionRecord loads the partitions for a topic and produce to them. If
Expand Down

0 comments on commit ead18d3

Please sign in to comment.