Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failed task signature on error handler #615

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions example/machinery.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ func worker() error {

// Here we inject some custom code for error handling,
// start and end of task hooks, useful for metrics for example.
errorhandler := func(err error) {
log.ERROR.Println("I am an error handler:", err)
errorhandler := func(signature *tasks.Signature, err error) {
log.ERROR.Println("I am an error handler:", signature.UUID, err)
}

pretaskhandler := func(signature *tasks.Signature) {
Expand Down
14 changes: 5 additions & 9 deletions v2/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"

"github.com/opentracing/opentracing-go"

"github.com/RichardKnop/machinery/v1/backends/amqp"
"github.com/RichardKnop/machinery/v1/brokers/errs"
"github.com/RichardKnop/machinery/v1/log"
Expand All @@ -26,7 +26,7 @@ type Worker struct {
ConsumerTag string
Concurrency int
Queue string
errorHandler func(err error)
errorHandler func(task *tasks.Signature, err error)
preTaskHandler func(*tasks.Signature)
postTaskHandler func(*tasks.Signature)
preConsumeHandler func(*Worker) bool
Expand Down Expand Up @@ -78,11 +78,7 @@ func (worker *Worker) LaunchAsync(errorsChan chan<- error) {
retry, err := broker.StartConsuming(worker.ConsumerTag, worker.Concurrency, worker)

if retry {
if worker.errorHandler != nil {
worker.errorHandler(err)
} else {
log.WARNING.Printf("Broker failed with error: %s", err)
}
Comment on lines -81 to -85
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation about Error Handling (https://github.com/RichardKnop/machinery#error-handling) suggest that it is used for handling task related error, so I think for broker.StartConsuming error, better not use the same error handler, so here I just log it instead.

log.WARNING.Printf("Broker failed with error: %s", err)
} else {
signalWG.Wait()
errorsChan <- err // stop the goroutine
Expand Down Expand Up @@ -365,7 +361,7 @@ func (worker *Worker) taskFailed(signature *tasks.Signature, taskErr error) erro
}

if worker.errorHandler != nil {
worker.errorHandler(taskErr)
worker.errorHandler(signature, taskErr)
} else {
log.ERROR.Printf("Failed processing task %s. Error = %v", signature.UUID, taskErr)
}
Expand Down Expand Up @@ -396,7 +392,7 @@ func (worker *Worker) hasAMQPBackend() bool {

// SetErrorHandler sets a custom error handler for task errors
// A default behavior is just to log the error after all the retry attempts fail
func (worker *Worker) SetErrorHandler(handler func(err error)) {
func (worker *Worker) SetErrorHandler(handler func(signature *tasks.Signature, err error)) {
worker.errorHandler = handler
}

Expand Down