From a437f45d56e815f2ac12d81c2933e451ce4c212c Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Wed, 12 Feb 2025 13:16:57 +0530 Subject: [PATCH 01/10] Enhance value column configuration with width constraints in config list --- cmd/cli/config/list.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/cli/config/list.go b/cmd/cli/config/list.go index 905f443811..720ec5d508 100644 --- a/cmd/cli/config/list.go +++ b/cmd/cli/config/list.go @@ -99,7 +99,7 @@ var listColumns = []output.TableColumn[configListEntry]{ }, }, { - ColumnConfig: table.ColumnConfig{Name: "Value"}, + ColumnConfig: table.ColumnConfig{Name: "Value", WidthMax: 80, WidthMaxEnforcer: text.WrapSoft}, Value: func(s configListEntry) string { return fmt.Sprintf("%v", s.Value) }, From 49d44cd228ac0c4a4889e480412675c5a6787584 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Thu, 13 Feb 2025 20:59:46 +0530 Subject: [PATCH 02/10] Add error handling and metrics for execution state updates and bidding processes --- pkg/compute/bidder.go | 24 +++++++++++++-- pkg/compute/metrics.go | 16 ++++++++++ pkg/compute/watchers/executor_watcher.go | 38 ++++++++++++++++++++++-- 3 files changed, 72 insertions(+), 6 deletions(-) diff --git a/pkg/compute/bidder.go b/pkg/compute/bidder.go index 588ba65a63..e8a4b29f3b 100644 --- a/pkg/compute/bidder.go +++ b/pkg/compute/bidder.go @@ -2,6 +2,7 @@ package compute import ( "context" + "errors" "fmt" "reflect" "strings" @@ -217,10 +218,27 @@ func (b Bidder) handleBidResult( ExpectedStates: []models.ExecutionStateType{models.ExecutionStateNew}, }, }) - // TODO: handle error by either gracefully skipping if the execution is no longer in the created state - // or by failing the execution + if err != nil { - log.Ctx(ctx).Error().Err(err).Msg("failed to update execution state") + var invalidStateErr store.ErrInvalidExecutionState + if errors.As(err, &invalidStateErr) { + log.Ctx(ctx).Debug(). + Err(err). + Str("executionID", execution.ID). + Str("expectedState", models.ExecutionStateNew.String()). + Str("actualState", invalidStateErr.Actual.String()). + Msg("skipping execution state update - execution no longer in expected state") + return + } + + failErr := b.handleError(ctx, execution, fmt.Errorf("failed to update execution state: %w", err)) + if failErr != nil { + log.Ctx(ctx).Error(). + Err(failErr). + Str("executionID", execution.ID). + Str("originalError", err.Error()). + Msg("failed to update execution to failed state after update error") + } return } } diff --git a/pkg/compute/metrics.go b/pkg/compute/metrics.go index f29ef2a16b..1bbcc49d0c 100644 --- a/pkg/compute/metrics.go +++ b/pkg/compute/metrics.go @@ -29,4 +29,20 @@ var ( metric.WithDescription("Duration of a job on the compute node in milliseconds."), metric.WithUnit("ms"), )) + + // Execution error metrics + ExecutionBiddingErrors = lo.Must(meter.Int64Counter( + "execution_bidding_errors", + metric.WithDescription("Number of errors encountered during execution bidding."), + )) + + ExecutionRunErrors = lo.Must(meter.Int64Counter( + "execution_run_errors", + metric.WithDescription("Number of errors encountered during execution running."), + )) + + ExecutionCancelErrors = lo.Must(meter.Int64Counter( + "execution_cancel_errors", + metric.WithDescription("Number of errors encountered during execution cancellation."), + )) ) diff --git a/pkg/compute/watchers/executor_watcher.go b/pkg/compute/watchers/executor_watcher.go index 5fc2129235..da64462231 100644 --- a/pkg/compute/watchers/executor_watcher.go +++ b/pkg/compute/watchers/executor_watcher.go @@ -7,6 +7,7 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/compute" "github.com/bacalhau-project/bacalhau/pkg/lib/watcher" "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/rs/zerolog/log" ) type ExecutionUpsertHandler struct { @@ -29,14 +30,45 @@ func (h *ExecutionUpsertHandler) HandleEvent(ctx context.Context, event watcher. } execution := upsert.Current + logger := log.Ctx(ctx).With(). + Str("executionID", execution.ID). + Str("state", execution.ComputeState.StateType.String()). + Logger() + + var err error switch execution.ComputeState.StateType { case models.ExecutionStateNew: - return h.bidder.RunBidding(ctx, execution) + err = h.bidder.RunBidding(ctx, execution) + if err != nil { + compute.ExecutionBiddingErrors.Add(ctx, 1) + logger.Error(). + Err(err). + Msg("failed to run bidding") + } case models.ExecutionStateBidAccepted: - return h.executor.Run(ctx, execution) + err = h.executor.Run(ctx, execution) + if err != nil { + compute.ExecutionRunErrors.Add(ctx, 1) + logger.Error(). + Err(err). + Msg("failed to run execution") + } case models.ExecutionStateCancelled: - return h.executor.Cancel(ctx, execution) + err = h.executor.Cancel(ctx, execution) + if err != nil { + compute.ExecutionCancelErrors.Add(ctx, 1) + logger.Error(). + Err(err). + Msg("failed to cancel execution") + } default: + // No action needed for other states + return nil + } + + if err != nil { + return fmt.Errorf("failed to handle execution state %s: %w", + execution.ComputeState.StateType, err) } return nil From 3232529bbc846360c7dcacb23cc313b87a8cd938 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Thu, 13 Feb 2025 21:09:43 +0530 Subject: [PATCH 03/10] Update pkg/compute/metrics.go Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- pkg/compute/metrics.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/compute/metrics.go b/pkg/compute/metrics.go index 1bbcc49d0c..0f0786179a 100644 --- a/pkg/compute/metrics.go +++ b/pkg/compute/metrics.go @@ -34,6 +34,7 @@ var ( ExecutionBiddingErrors = lo.Must(meter.Int64Counter( "execution_bidding_errors", metric.WithDescription("Number of errors encountered during execution bidding."), + metric.WithUnit("1"), )) ExecutionRunErrors = lo.Must(meter.Int64Counter( From ae682ba7ee9cb15192c2df597062ef610fd3610b Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Thu, 13 Feb 2025 21:09:53 +0530 Subject: [PATCH 04/10] Update pkg/compute/bidder.go Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- pkg/compute/bidder.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/compute/bidder.go b/pkg/compute/bidder.go index e8a4b29f3b..a47e6c8fb2 100644 --- a/pkg/compute/bidder.go +++ b/pkg/compute/bidder.go @@ -231,8 +231,7 @@ func (b Bidder) handleBidResult( return } - failErr := b.handleError(ctx, execution, fmt.Errorf("failed to update execution state: %w", err)) - if failErr != nil { + if failErr := b.handleError(ctx, execution, fmt.Errorf("failed to update execution state: %w", err)); failErr != nil { log.Ctx(ctx).Error(). Err(failErr). Str("executionID", execution.ID). From 2d0e8378f02c786a0e0627bf640e821763889d3a Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Thu, 13 Feb 2025 21:19:09 +0530 Subject: [PATCH 05/10] Refactor bidding error handling in execution state updates --- pkg/compute/watchers/executor_watcher.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/compute/watchers/executor_watcher.go b/pkg/compute/watchers/executor_watcher.go index da64462231..f5bbcdcb04 100644 --- a/pkg/compute/watchers/executor_watcher.go +++ b/pkg/compute/watchers/executor_watcher.go @@ -38,13 +38,15 @@ func (h *ExecutionUpsertHandler) HandleEvent(ctx context.Context, event watcher. var err error switch execution.ComputeState.StateType { case models.ExecutionStateNew: - err = h.bidder.RunBidding(ctx, execution) - if err != nil { + if err := h.bidder.RunBidding(ctx, execution); err != nil { compute.ExecutionBiddingErrors.Add(ctx, 1) logger.Error(). Err(err). Msg("failed to run bidding") + return fmt.Errorf("failed to handle execution state %s: %w", + execution.ComputeState.StateType, err) } + return nil case models.ExecutionStateBidAccepted: err = h.executor.Run(ctx, execution) if err != nil { From 232c7b603cd67efa57e6e13b87c6f69db5de25c7 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Mon, 17 Feb 2025 21:31:41 +0530 Subject: [PATCH 06/10] Improve error handling in bidder and executor watcher for execution state updates --- pkg/compute/bidder.go | 12 +++++------- pkg/compute/watchers/executor_watcher.go | 20 +++++--------------- 2 files changed, 10 insertions(+), 22 deletions(-) diff --git a/pkg/compute/bidder.go b/pkg/compute/bidder.go index a47e6c8fb2..6d06f71539 100644 --- a/pkg/compute/bidder.go +++ b/pkg/compute/bidder.go @@ -231,13 +231,11 @@ func (b Bidder) handleBidResult( return } - if failErr := b.handleError(ctx, execution, fmt.Errorf("failed to update execution state: %w", err)); failErr != nil { - log.Ctx(ctx).Error(). - Err(failErr). - Str("executionID", execution.ID). - Str("originalError", err.Error()). - Msg("failed to update execution to failed state after update error") - } + // Propagate the error to be handled by the execution watcher + log.Ctx(ctx).Debug(). + Err(err). + Str("executionID", execution.ID). + Msg("failed to update execution state") return } } diff --git a/pkg/compute/watchers/executor_watcher.go b/pkg/compute/watchers/executor_watcher.go index f5bbcdcb04..b01c9a89a4 100644 --- a/pkg/compute/watchers/executor_watcher.go +++ b/pkg/compute/watchers/executor_watcher.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/bacalhau-project/bacalhau/pkg/bacerrors" "github.com/bacalhau-project/bacalhau/pkg/compute" "github.com/bacalhau-project/bacalhau/pkg/lib/watcher" "github.com/bacalhau-project/bacalhau/pkg/models" @@ -40,11 +41,7 @@ func (h *ExecutionUpsertHandler) HandleEvent(ctx context.Context, event watcher. case models.ExecutionStateNew: if err := h.bidder.RunBidding(ctx, execution); err != nil { compute.ExecutionBiddingErrors.Add(ctx, 1) - logger.Error(). - Err(err). - Msg("failed to run bidding") - return fmt.Errorf("failed to handle execution state %s: %w", - execution.ComputeState.StateType, err) + return bacerrors.Wrap(err, "failed to handle execution state %s", execution.ComputeState.StateType) } return nil case models.ExecutionStateBidAccepted: @@ -55,23 +52,16 @@ func (h *ExecutionUpsertHandler) HandleEvent(ctx context.Context, event watcher. Err(err). Msg("failed to run execution") } + return err case models.ExecutionStateCancelled: err = h.executor.Cancel(ctx, execution) if err != nil { compute.ExecutionCancelErrors.Add(ctx, 1) - logger.Error(). - Err(err). - Msg("failed to cancel execution") + return bacerrors.Wrap(err, "failed to handle execution state %s", execution.ComputeState.StateType) } + return nil default: // No action needed for other states return nil } - - if err != nil { - return fmt.Errorf("failed to handle execution state %s: %w", - execution.ComputeState.StateType, err) - } - - return nil } From 907aa4677f3c5cc8938de2b470797b4220253834 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Mon, 17 Feb 2025 21:45:15 +0530 Subject: [PATCH 07/10] Update pkg/compute/watchers/executor_watcher.go Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- pkg/compute/watchers/executor_watcher.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/compute/watchers/executor_watcher.go b/pkg/compute/watchers/executor_watcher.go index b01c9a89a4..873aad3094 100644 --- a/pkg/compute/watchers/executor_watcher.go +++ b/pkg/compute/watchers/executor_watcher.go @@ -41,6 +41,9 @@ func (h *ExecutionUpsertHandler) HandleEvent(ctx context.Context, event watcher. case models.ExecutionStateNew: if err := h.bidder.RunBidding(ctx, execution); err != nil { compute.ExecutionBiddingErrors.Add(ctx, 1) + logger.Error(). + Err(err). + Msg("failed to run bidding") return bacerrors.Wrap(err, "failed to handle execution state %s", execution.ComputeState.StateType) } return nil From fe293bcb8b60839fdbdc47b08ea5d1b45eafd209 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Mon, 17 Feb 2025 21:45:42 +0530 Subject: [PATCH 08/10] Update pkg/compute/metrics.go Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- pkg/compute/metrics.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/compute/metrics.go b/pkg/compute/metrics.go index 0f0786179a..efefead05c 100644 --- a/pkg/compute/metrics.go +++ b/pkg/compute/metrics.go @@ -40,10 +40,12 @@ var ( ExecutionRunErrors = lo.Must(meter.Int64Counter( "execution_run_errors", metric.WithDescription("Number of errors encountered during execution running."), + metric.WithUnit("1"), )) ExecutionCancelErrors = lo.Must(meter.Int64Counter( "execution_cancel_errors", metric.WithDescription("Number of errors encountered during execution cancellation."), + metric.WithUnit("1"), )) ) From c3308548b74bb5fa085c88d2f540960fb80ccb31 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Tue, 18 Feb 2025 15:52:06 +0530 Subject: [PATCH 09/10] Refactor error handling in bidder and executor watcher for improved clarity and propagation --- pkg/compute/bidder.go | 12 +++++------- pkg/compute/watchers/executor_watcher.go | 12 ++++++------ 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/pkg/compute/bidder.go b/pkg/compute/bidder.go index 6d06f71539..8fbf27cf72 100644 --- a/pkg/compute/bidder.go +++ b/pkg/compute/bidder.go @@ -9,6 +9,7 @@ import ( "github.com/rs/zerolog/log" + "github.com/bacalhau-project/bacalhau/pkg/bacerrors" "github.com/bacalhau-project/bacalhau/pkg/compute/capacity" "github.com/bacalhau-project/bacalhau/pkg/logger" "github.com/bacalhau-project/bacalhau/pkg/models" @@ -186,7 +187,7 @@ func (b Bidder) handleBidResult( ctx context.Context, execution *models.Execution, result *bidStrategyResponse, -) { +) error { var newExecutionValues models.Execution var newExecutionState models.ExecutionStateType var events []*models.Event @@ -228,16 +229,13 @@ func (b Bidder) handleBidResult( Str("expectedState", models.ExecutionStateNew.String()). Str("actualState", invalidStateErr.Actual.String()). Msg("skipping execution state update - execution no longer in expected state") - return + return nil } // Propagate the error to be handled by the execution watcher - log.Ctx(ctx).Debug(). - Err(err). - Str("executionID", execution.ID). - Msg("failed to update execution state") - return + return bacerrors.Wrap(err, "failed to update execution state for execution %s", execution.ID) } + return nil } // handleError is a helper function to handle errors in the bidder. diff --git a/pkg/compute/watchers/executor_watcher.go b/pkg/compute/watchers/executor_watcher.go index 873aad3094..b61adc477b 100644 --- a/pkg/compute/watchers/executor_watcher.go +++ b/pkg/compute/watchers/executor_watcher.go @@ -39,14 +39,12 @@ func (h *ExecutionUpsertHandler) HandleEvent(ctx context.Context, event watcher. var err error switch execution.ComputeState.StateType { case models.ExecutionStateNew: - if err := h.bidder.RunBidding(ctx, execution); err != nil { + if err = h.bidder.RunBidding(ctx, execution); err != nil { compute.ExecutionBiddingErrors.Add(ctx, 1) logger.Error(). Err(err). Msg("failed to run bidding") - return bacerrors.Wrap(err, "failed to handle execution state %s", execution.ComputeState.StateType) } - return nil case models.ExecutionStateBidAccepted: err = h.executor.Run(ctx, execution) if err != nil { @@ -55,16 +53,18 @@ func (h *ExecutionUpsertHandler) HandleEvent(ctx context.Context, event watcher. Err(err). Msg("failed to run execution") } - return err case models.ExecutionStateCancelled: err = h.executor.Cancel(ctx, execution) if err != nil { compute.ExecutionCancelErrors.Add(ctx, 1) - return bacerrors.Wrap(err, "failed to handle execution state %s", execution.ComputeState.StateType) } - return nil default: // No action needed for other states return nil } + + if err != nil { + return bacerrors.Wrap(err, "failed to handle execution state %s", execution.ComputeState.StateType) + } + return nil } From 1d8ddab456d35217bd5abae3a966f2688209ac19 Mon Sep 17 00:00:00 2001 From: Viraj Bhartiya Date: Tue, 18 Feb 2025 16:00:45 +0530 Subject: [PATCH 10/10] Update pkg/compute/watchers/executor_watcher.go Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- pkg/compute/watchers/executor_watcher.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/compute/watchers/executor_watcher.go b/pkg/compute/watchers/executor_watcher.go index b61adc477b..e1ac60dcf1 100644 --- a/pkg/compute/watchers/executor_watcher.go +++ b/pkg/compute/watchers/executor_watcher.go @@ -54,10 +54,12 @@ func (h *ExecutionUpsertHandler) HandleEvent(ctx context.Context, event watcher. Msg("failed to run execution") } case models.ExecutionStateCancelled: - err = h.executor.Cancel(ctx, execution) - if err != nil { - compute.ExecutionCancelErrors.Add(ctx, 1) - } + case models.ExecutionStateCancelled: + err = h.executor.Cancel(ctx, execution) + if err != nil { ++ logger.Error().Err(err).Msg("failed to cancel execution") + compute.ExecutionCancelErrors.Add(ctx, 1) + } default: // No action needed for other states return nil