Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve error handling and logging in the compute package #4850

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions pkg/compute/bidder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package compute

import (
"context"
"errors"
"fmt"
"reflect"
"strings"

"github.com/rs/zerolog/log"

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/compute/capacity"
"github.com/bacalhau-project/bacalhau/pkg/logger"
"github.com/bacalhau-project/bacalhau/pkg/models"
Expand Down Expand Up @@ -185,7 +187,7 @@ func (b Bidder) handleBidResult(
ctx context.Context,
execution *models.Execution,
result *bidStrategyResponse,
) {
) error {
var newExecutionValues models.Execution
var newExecutionState models.ExecutionStateType
var events []*models.Event
Expand Down Expand Up @@ -217,12 +219,23 @@ func (b Bidder) handleBidResult(
ExpectedStates: []models.ExecutionStateType{models.ExecutionStateNew},
},
})
// TODO: handle error by either gracefully skipping if the execution is no longer in the created state
// or by failing the execution

if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to update execution state")
return
var invalidStateErr store.ErrInvalidExecutionState
if errors.As(err, &invalidStateErr) {
log.Ctx(ctx).Debug().
Err(err).
Str("executionID", execution.ID).
Str("expectedState", models.ExecutionStateNew.String()).
Str("actualState", invalidStateErr.Actual.String()).
Msg("skipping execution state update - execution no longer in expected state")
return nil
}
Comment on lines +222 to +233
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Skips update if ErrInvalidExecutionState.
Returning nil here is appropriate if you intentionally ignore an outdated or conflicting state. To enhance clarity, add a comment explaining the decision and how it aligns with the desired concurrency or state-management logic.

 if errors.As(err, &invalidStateErr) {
+	// The execution has likely advanced to a newer state. Skipping update is intentional.
 	log.Ctx(ctx).Debug().
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to update execution state")
return
var invalidStateErr store.ErrInvalidExecutionState
if errors.As(err, &invalidStateErr) {
log.Ctx(ctx).Debug().
Err(err).
Str("executionID", execution.ID).
Str("expectedState", models.ExecutionStateNew.String()).
Str("actualState", invalidStateErr.Actual.String()).
Msg("skipping execution state update - execution no longer in expected state")
return nil
}
if err != nil {
var invalidStateErr store.ErrInvalidExecutionState
if errors.As(err, &invalidStateErr) {
// The execution has likely advanced to a newer state. Skipping update is intentional.
log.Ctx(ctx).Debug().
Err(err).
Str("executionID", execution.ID).
Str("expectedState", models.ExecutionStateNew.String()).
Str("actualState", invalidStateErr.Actual.String()).
Msg("skipping execution state update - execution no longer in expected state")
return nil
}


// Propagate the error to be handled by the execution watcher
return bacerrors.Wrap(err, "failed to update execution state for execution %s", execution.ID)
}
return nil
}

// handleError is a helper function to handle errors in the bidder.
Expand Down
19 changes: 19 additions & 0 deletions pkg/compute/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,23 @@ var (
metric.WithDescription("Duration of a job on the compute node in milliseconds."),
metric.WithUnit("ms"),
))

// Execution error metrics
ExecutionBiddingErrors = lo.Must(meter.Int64Counter(
"execution_bidding_errors",
metric.WithDescription("Number of errors encountered during execution bidding."),
metric.WithUnit("1"),
))

ExecutionRunErrors = lo.Must(meter.Int64Counter(
"execution_run_errors",
metric.WithDescription("Number of errors encountered during execution running."),
metric.WithUnit("1"),
))

ExecutionCancelErrors = lo.Must(meter.Int64Counter(
"execution_cancel_errors",
metric.WithDescription("Number of errors encountered during execution cancellation."),
metric.WithUnit("1"),
))
)
35 changes: 32 additions & 3 deletions pkg/compute/watchers/executor_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
"context"
"fmt"

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/compute"
"github.com/bacalhau-project/bacalhau/pkg/lib/watcher"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/rs/zerolog/log"
)

type ExecutionUpsertHandler struct {
Expand All @@ -29,15 +31,42 @@
}

execution := upsert.Current
logger := log.Ctx(ctx).With().
Str("executionID", execution.ID).
Str("state", execution.ComputeState.StateType.String()).
Logger()
Comment on lines +34 to +37
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice


var err error
switch execution.ComputeState.StateType {
case models.ExecutionStateNew:
return h.bidder.RunBidding(ctx, execution)
if err = h.bidder.RunBidding(ctx, execution); err != nil {
compute.ExecutionBiddingErrors.Add(ctx, 1)
logger.Error().
Err(err).
Msg("failed to run bidding")
}
case models.ExecutionStateBidAccepted:
return h.executor.Run(ctx, execution)
err = h.executor.Run(ctx, execution)
if err != nil {
compute.ExecutionRunErrors.Add(ctx, 1)
logger.Error().
Err(err).
Msg("failed to run execution")
}
case models.ExecutionStateCancelled:

Check failure on line 56 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / lint / go-lint (ubuntu-latest)

previous case

Check failure on line 56 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / lint / go-lint (ubuntu-latest)

previous case

Check failure on line 56 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / lint / go-lint (ubuntu-latest)

previous case

Check failure on line 56 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (linux, amd64)

previous case

Check failure on line 56 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (linux, arm64)

previous case

Check failure on line 56 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (linux, armv7)

previous case

Check failure on line 56 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (linux, armv6)

previous case

Check failure on line 56 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (darwin, amd64)

previous case

Check failure on line 56 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (darwin, arm64)

previous case

Check failure on line 56 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (windows, amd64)

previous case
return h.executor.Cancel(ctx, execution)
case models.ExecutionStateCancelled:

Check failure on line 57 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / lint / go-lint (ubuntu-latest)

duplicate case models.ExecutionStateCancelled (constant 11 of type "github.com/bacalhau-project/bacalhau/pkg/models".ExecutionStateType) in expression switch

Check failure on line 57 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / lint / go-lint (ubuntu-latest)

duplicate case models.ExecutionStateCancelled (constant 11 of type "github.com/bacalhau-project/bacalhau/pkg/models".ExecutionStateType) in expression switch

Check failure on line 57 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / lint / go-lint (ubuntu-latest)

duplicate case models.ExecutionStateCancelled (constant 11 of type "github.com/bacalhau-project/bacalhau/pkg/models".ExecutionStateType) in expression switch

Check failure on line 57 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (linux, amd64)

duplicate case models.ExecutionStateCancelled (constant 11 of type "github.com/bacalhau-project/bacalhau/pkg/models".ExecutionStateType) in expression switch

Check failure on line 57 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (linux, arm64)

duplicate case models.ExecutionStateCancelled (constant 11 of type "github.com/bacalhau-project/bacalhau/pkg/models".ExecutionStateType) in expression switch

Check failure on line 57 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (linux, armv7)

duplicate case models.ExecutionStateCancelled (constant 11 of type "github.com/bacalhau-project/bacalhau/pkg/models".ExecutionStateType) in expression switch

Check failure on line 57 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (linux, armv6)

duplicate case models.ExecutionStateCancelled (constant 11 of type "github.com/bacalhau-project/bacalhau/pkg/models".ExecutionStateType) in expression switch

Check failure on line 57 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (darwin, amd64)

duplicate case models.ExecutionStateCancelled (constant 11 of type "github.com/bacalhau-project/bacalhau/pkg/models".ExecutionStateType) in expression switch

Check failure on line 57 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (darwin, arm64)

duplicate case models.ExecutionStateCancelled (constant 11 of type "github.com/bacalhau-project/bacalhau/pkg/models".ExecutionStateType) in expression switch

Check failure on line 57 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (windows, amd64)

duplicate case models.ExecutionStateCancelled (constant 11 of type "github.com/bacalhau-project/bacalhau/pkg/models".ExecutionStateType) in expression switch
err = h.executor.Cancel(ctx, execution)
if err != nil {
+ logger.Error().Err(err).Msg("failed to cancel execution")

Check failure on line 60 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / lint / go-lint (ubuntu-latest)

logger.Error().Err(err).Msg("failed to cancel execution") (no value) used as value (typecheck)

Check failure on line 60 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / lint / go-lint (ubuntu-latest)

logger.Error().Err(err).Msg("failed to cancel execution") (no value) used as value) (typecheck)

Check failure on line 60 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (linux, amd64)

logger.Error().Err(err).Msg("failed to cancel execution") (no value) used as value

Check failure on line 60 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (linux, arm64)

logger.Error().Err(err).Msg("failed to cancel execution") (no value) used as value

Check failure on line 60 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (linux, armv7)

logger.Error().Err(err).Msg("failed to cancel execution") (no value) used as value

Check failure on line 60 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (linux, armv6)

logger.Error().Err(err).Msg("failed to cancel execution") (no value) used as value

Check failure on line 60 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (darwin, amd64)

logger.Error().Err(err).Msg("failed to cancel execution") (no value) used as value

Check failure on line 60 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (darwin, arm64)

logger.Error().Err(err).Msg("failed to cancel execution") (no value) used as value

Check failure on line 60 in pkg/compute/watchers/executor_watcher.go

View workflow job for this annotation

GitHub Actions / build / Build Binary (windows, amd64)

logger.Error().Err(err).Msg("failed to cancel execution") (no value) used as value
compute.ExecutionCancelErrors.Add(ctx, 1)
}
Comment on lines 56 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix duplicate case statement and logger syntax.

There are two critical issues in the cancelled state handling:

  1. Duplicate case statement for ExecutionStateCancelled
  2. Syntax error in the logger statement

Apply this diff to fix the issues:

-	case models.ExecutionStateCancelled:
-case models.ExecutionStateCancelled:
+	case models.ExecutionStateCancelled:
   err = h.executor.Cancel(ctx, execution)
   if err != nil {
-    logger.Error().Err(err).Msg("failed to cancel execution")
+    logger.Error().
+      Err(err).
+      Msg("failed to cancel execution")
     compute.ExecutionCancelErrors.Add(ctx, 1)
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case models.ExecutionStateCancelled:
return h.executor.Cancel(ctx, execution)
case models.ExecutionStateCancelled:
err = h.executor.Cancel(ctx, execution)
if err != nil {
+ logger.Error().Err(err).Msg("failed to cancel execution")
compute.ExecutionCancelErrors.Add(ctx, 1)
}
case models.ExecutionStateCancelled:
err = h.executor.Cancel(ctx, execution)
if err != nil {
logger.Error().
Err(err).
Msg("failed to cancel execution")
compute.ExecutionCancelErrors.Add(ctx, 1)
}
🧰 Tools
🪛 GitHub Check: lint / go-lint (ubuntu-latest)

[failure] 57-57:
duplicate case models.ExecutionStateCancelled (constant 11 of type "github.com/bacalhau-project/bacalhau/pkg/models".ExecutionStateType) in expression switch


[failure] 56-56:
previous case


[failure] 60-60:
logger.Error().Err(err).Msg("failed to cancel execution") (no value) used as value (typecheck)


[failure] 57-57:
duplicate case models.ExecutionStateCancelled (constant 11 of type "github.com/bacalhau-project/bacalhau/pkg/models".ExecutionStateType) in expression switch


[failure] 56-56:
previous case


[failure] 60-60:
logger.Error().Err(err).Msg("failed to cancel execution") (no value) used as value) (typecheck)


[failure] 57-57:
duplicate case models.ExecutionStateCancelled (constant 11 of type "github.com/bacalhau-project/bacalhau/pkg/models".ExecutionStateType) in expression switch


[failure] 56-56:
previous case

default:
// No action needed for other states
return nil
}

if err != nil {
return bacerrors.Wrap(err, "failed to handle execution state %s", execution.ComputeState.StateType)
}
return nil
}
Loading