diff --git a/pkg/compute/bidder.go b/pkg/compute/bidder.go index 588ba65a63..8fbf27cf72 100644 --- a/pkg/compute/bidder.go +++ b/pkg/compute/bidder.go @@ -2,12 +2,14 @@ package compute import ( "context" + "errors" "fmt" "reflect" "strings" "github.com/rs/zerolog/log" + "github.com/bacalhau-project/bacalhau/pkg/bacerrors" "github.com/bacalhau-project/bacalhau/pkg/compute/capacity" "github.com/bacalhau-project/bacalhau/pkg/logger" "github.com/bacalhau-project/bacalhau/pkg/models" @@ -185,7 +187,7 @@ func (b Bidder) handleBidResult( ctx context.Context, execution *models.Execution, result *bidStrategyResponse, -) { +) error { var newExecutionValues models.Execution var newExecutionState models.ExecutionStateType var events []*models.Event @@ -217,12 +219,23 @@ func (b Bidder) handleBidResult( ExpectedStates: []models.ExecutionStateType{models.ExecutionStateNew}, }, }) - // TODO: handle error by either gracefully skipping if the execution is no longer in the created state - // or by failing the execution + if err != nil { - log.Ctx(ctx).Error().Err(err).Msg("failed to update execution state") - return + var invalidStateErr store.ErrInvalidExecutionState + if errors.As(err, &invalidStateErr) { + log.Ctx(ctx).Debug(). + Err(err). + Str("executionID", execution.ID). + Str("expectedState", models.ExecutionStateNew.String()). + Str("actualState", invalidStateErr.Actual.String()). + Msg("skipping execution state update - execution no longer in expected state") + return nil + } + + // Propagate the error to be handled by the execution watcher + return bacerrors.Wrap(err, "failed to update execution state for execution %s", execution.ID) } + return nil } // handleError is a helper function to handle errors in the bidder. diff --git a/pkg/compute/metrics.go b/pkg/compute/metrics.go index f29ef2a16b..efefead05c 100644 --- a/pkg/compute/metrics.go +++ b/pkg/compute/metrics.go @@ -29,4 +29,23 @@ var ( metric.WithDescription("Duration of a job on the compute node in milliseconds."), metric.WithUnit("ms"), )) + + // Execution error metrics + ExecutionBiddingErrors = lo.Must(meter.Int64Counter( + "execution_bidding_errors", + metric.WithDescription("Number of errors encountered during execution bidding."), + metric.WithUnit("1"), + )) + + ExecutionRunErrors = lo.Must(meter.Int64Counter( + "execution_run_errors", + metric.WithDescription("Number of errors encountered during execution running."), + metric.WithUnit("1"), + )) + + ExecutionCancelErrors = lo.Must(meter.Int64Counter( + "execution_cancel_errors", + metric.WithDescription("Number of errors encountered during execution cancellation."), + metric.WithUnit("1"), + )) ) diff --git a/pkg/compute/watchers/executor_watcher.go b/pkg/compute/watchers/executor_watcher.go index 5fc2129235..e1ac60dcf1 100644 --- a/pkg/compute/watchers/executor_watcher.go +++ b/pkg/compute/watchers/executor_watcher.go @@ -4,9 +4,11 @@ import ( "context" "fmt" + "github.com/bacalhau-project/bacalhau/pkg/bacerrors" "github.com/bacalhau-project/bacalhau/pkg/compute" "github.com/bacalhau-project/bacalhau/pkg/lib/watcher" "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/rs/zerolog/log" ) type ExecutionUpsertHandler struct { @@ -29,15 +31,42 @@ func (h *ExecutionUpsertHandler) HandleEvent(ctx context.Context, event watcher. } execution := upsert.Current + logger := log.Ctx(ctx).With(). + Str("executionID", execution.ID). + Str("state", execution.ComputeState.StateType.String()). + Logger() + + var err error switch execution.ComputeState.StateType { case models.ExecutionStateNew: - return h.bidder.RunBidding(ctx, execution) + if err = h.bidder.RunBidding(ctx, execution); err != nil { + compute.ExecutionBiddingErrors.Add(ctx, 1) + logger.Error(). + Err(err). + Msg("failed to run bidding") + } case models.ExecutionStateBidAccepted: - return h.executor.Run(ctx, execution) + err = h.executor.Run(ctx, execution) + if err != nil { + compute.ExecutionRunErrors.Add(ctx, 1) + logger.Error(). + Err(err). + Msg("failed to run execution") + } case models.ExecutionStateCancelled: - return h.executor.Cancel(ctx, execution) + case models.ExecutionStateCancelled: + err = h.executor.Cancel(ctx, execution) + if err != nil { ++ logger.Error().Err(err).Msg("failed to cancel execution") + compute.ExecutionCancelErrors.Add(ctx, 1) + } default: + // No action needed for other states + return nil } + if err != nil { + return bacerrors.Wrap(err, "failed to handle execution state %s", execution.ComputeState.StateType) + } return nil }