Skip to content

Guidance with Consumer polling during rolling broker restarts #1214

@AquilesGomez

Description

@AquilesGomez

We utilize SASL authentication with MSK.
Every month we have a security patch that causes our brokers to restart in a rolling fashion.
Previously we'd get 'stuck' on polling since it seems that the client was unable to reconnect to the brokers.
To remedy this, we created a Polling deadline within our fetch loop like so:

for {
  ctx, cancel = context.WithTimeout(ctx, deadline)
  fetches = consumer.kclient.PollRecords(ctx, consumer.cfg.recordsPerPoll)
  defer cancel()
}

We then check fetch errors for timeouts / retryable errors

func checkIfRetryable(err error, logger *zap.Logger) bool {
	var groupSessionErr *kgo.ErrGroupSession

	if errors.Is(err, context.DeadlineExceeded) {
		return true
	} else if errors.As(err, &groupSessionErr) {
		return true
	} else if kgo.IsRetryableBrokerErr(err) {
		return true
	}
	return false
}

Is this the best way to recover from these errors?
I was assuming that the client would retry brokers that it couldn't connect to before, but our logs indicate that isn't the case.

The context never times out so we're stuck in an infinite blocking poll option.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions