Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve error handling and logging in the compute package #4850

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions pkg/compute/bidder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package compute

import (
"context"
"errors"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -217,10 +218,27 @@ func (b Bidder) handleBidResult(
ExpectedStates: []models.ExecutionStateType{models.ExecutionStateNew},
},
})
// TODO: handle error by either gracefully skipping if the execution is no longer in the created state
// or by failing the execution

if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to update execution state")
var invalidStateErr store.ErrInvalidExecutionState
if errors.As(err, &invalidStateErr) {
log.Ctx(ctx).Debug().
Err(err).
Str("executionID", execution.ID).
Str("expectedState", models.ExecutionStateNew.String()).
Str("actualState", invalidStateErr.Actual.String()).
Msg("skipping execution state update - execution no longer in expected state")
return
}

failErr := b.handleError(ctx, execution, fmt.Errorf("failed to update execution state: %w", err))
if failErr != nil {
log.Ctx(ctx).Error().
Err(failErr).
Str("executionID", execution.ID).
Str("originalError", err.Error()).
Msg("failed to update execution to failed state after update error")
}
virajbhartiya marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/compute/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,20 @@ var (
metric.WithDescription("Duration of a job on the compute node in milliseconds."),
metric.WithUnit("ms"),
))

// Execution error metrics
ExecutionBiddingErrors = lo.Must(meter.Int64Counter(
"execution_bidding_errors",
metric.WithDescription("Number of errors encountered during execution bidding."),
))

ExecutionRunErrors = lo.Must(meter.Int64Counter(
"execution_run_errors",
metric.WithDescription("Number of errors encountered during execution running."),
))

ExecutionCancelErrors = lo.Must(meter.Int64Counter(
"execution_cancel_errors",
metric.WithDescription("Number of errors encountered during execution cancellation."),
))
virajbhartiya marked this conversation as resolved.
Show resolved Hide resolved
)
38 changes: 35 additions & 3 deletions pkg/compute/watchers/executor_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/compute"
"github.com/bacalhau-project/bacalhau/pkg/lib/watcher"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/rs/zerolog/log"
)

type ExecutionUpsertHandler struct {
Expand All @@ -29,14 +30,45 @@ func (h *ExecutionUpsertHandler) HandleEvent(ctx context.Context, event watcher.
}

execution := upsert.Current
logger := log.Ctx(ctx).With().
Str("executionID", execution.ID).
Str("state", execution.ComputeState.StateType.String()).
Logger()
Comment on lines +34 to +37
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice


var err error
switch execution.ComputeState.StateType {
case models.ExecutionStateNew:
return h.bidder.RunBidding(ctx, execution)
err = h.bidder.RunBidding(ctx, execution)
if err != nil {
compute.ExecutionBiddingErrors.Add(ctx, 1)
logger.Error().
Err(err).
Msg("failed to run bidding")
}
case models.ExecutionStateBidAccepted:
return h.executor.Run(ctx, execution)
err = h.executor.Run(ctx, execution)
if err != nil {
compute.ExecutionRunErrors.Add(ctx, 1)
logger.Error().
Err(err).
Msg("failed to run execution")
}
case models.ExecutionStateCancelled:
return h.executor.Cancel(ctx, execution)
err = h.executor.Cancel(ctx, execution)
if err != nil {
compute.ExecutionCancelErrors.Add(ctx, 1)
logger.Error().
Err(err).
Msg("failed to cancel execution")
}
default:
// No action needed for other states
return nil
}
virajbhartiya marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
return fmt.Errorf("failed to handle execution state %s: %w",
execution.ComputeState.StateType, err)
}
virajbhartiya marked this conversation as resolved.
Show resolved Hide resolved

return nil
Expand Down