-
-
Notifications
You must be signed in to change notification settings - Fork 264
Open
Description
We utilize SASL authentication with MSK.
Every month we have a security patch that causes our brokers to restart in a rolling fashion.
Previously we'd get 'stuck' on polling since it seems that the client was unable to reconnect to the brokers.
To remedy this, we created a Polling deadline within our fetch loop like so:
for {
ctx, cancel = context.WithTimeout(ctx, deadline)
fetches = consumer.kclient.PollRecords(ctx, consumer.cfg.recordsPerPoll)
defer cancel()
}We then check fetch errors for timeouts / retryable errors
func checkIfRetryable(err error, logger *zap.Logger) bool {
var groupSessionErr *kgo.ErrGroupSession
if errors.Is(err, context.DeadlineExceeded) {
return true
} else if errors.As(err, &groupSessionErr) {
return true
} else if kgo.IsRetryableBrokerErr(err) {
return true
}
return false
}Is this the best way to recover from these errors?
I was assuming that the client would retry brokers that it couldn't connect to before, but our logs indicate that isn't the case.
The context never times out so we're stuck in an infinite blocking poll option.
Metadata
Metadata
Assignees
Labels
No labels