Skip to content

Commit

Permalink
bugfix: graceful consumer shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
eric-chao authored Jul 2, 2024
1 parent ef6a380 commit a4e904e
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,26 +84,26 @@ func NewConsumer(
// Run starts consuming with automatic reconnection handling. Do not reuse the
// consumer for anything other than to close it.
func (consumer *Consumer) Run(handler Handler) error {
err := consumer.startGoroutines(
handler,
consumer.options,
)
if err != nil {
return err
}

handler = func(d Delivery) (action Action) {
handlerWrapper := func(d Delivery) (action Action) {
if !consumer.handlerMu.TryRLock() {
return NackRequeue
}
defer consumer.handlerMu.RUnlock()
return handler(d)
}

err := consumer.startGoroutines(
handlerWrapper,
consumer.options,
)
if err != nil {
return err
}

for err := range consumer.reconnectErrCh {
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
err = consumer.startGoroutines(
handler,
handlerWrapper,
consumer.options,
)
if err != nil {
Expand Down

0 comments on commit a4e904e

Please sign in to comment.