diff --git a/pkg/client/events/replay_client.go b/pkg/client/events/replay_client.go index 5175a78e0..fa736c8ad 100644 --- a/pkg/client/events/replay_client.go +++ b/pkg/client/events/replay_client.go @@ -2,7 +2,6 @@ package events import ( "context" - "fmt" "time" "cosmossdk.io/depinject" @@ -11,6 +10,7 @@ import ( "github.com/pokt-network/poktroll/pkg/either" "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/observable/channel" + "github.com/pokt-network/poktroll/pkg/polylog" "github.com/pokt-network/poktroll/pkg/retry" ) @@ -22,7 +22,7 @@ const ( // re-establish the events query bytes subscription when the events bytes // observable returns an error or closes. // TODO to make this a customisable parameter in the appgateserver and relayminer config files. - eventsBytesRetryLimit = 0 + eventsBytesRetryLimit = 2 eventsBytesRetryResetTimeout = 10 * time.Second // replayObsCacheBufferSize is the replay buffer size of the // replayObsCache replay observable which is used to cache the replay @@ -118,7 +118,16 @@ func NewEventsReplayClient[T any]( } // Concurrently publish events to the observable emitted by replayObsCache. - go rClient.goPublishEvents(ctx) + go func() { + for { + select { + case <-ctx.Done(): + return + default: + rClient.publishEvents(ctx) + } + } + }() return rClient, nil } @@ -176,11 +185,12 @@ func (rClient *replayClient[T]) Close() { close(rClient.replayObsCachePublishCh) } -// goPublishEvents runs the work function returned by retryPublishEventsFactory, +// publishEvents runs the work function returned by retryPublishEventsFactory, // re-invoking it according to the arguments to retry.OnError when the events bytes // observable returns an asynchronous error. -// This function is intended to be called in a goroutine. -func (rClient *replayClient[T]) goPublishEvents(ctx context.Context) { +func (rClient *replayClient[T]) publishEvents(ctx context.Context) error { + logger := polylog.Ctx(ctx) + // React to errors by getting a new events bytes observable, re-mapping it, // and send it to replayObsCachePublishCh such that // replayObsCache.Last(ctx, 1) will return it. @@ -194,11 +204,8 @@ func (rClient *replayClient[T]) goPublishEvents(ctx context.Context) { ) // If we get here, the retry limit was reached and the retry loop exited. - // Since this function runs in a goroutine, we can't return the error to the - // caller. Instead, we panic. - if publishError != nil { - panic(fmt.Errorf("EventsReplayClient[%T].goPublishEvents should never reach this spot: %w", *new(T), publishError)) - } + logger.Error().Err(publishError).Msgf("EventsReplayClient[%T].goPublishEvents error") + return publishError } // retryPublishEventsFactory returns a function which is intended to be passed @@ -264,9 +271,10 @@ func (rClient *replayClient[T]) newMapEventsBytesToTFn(errCh chan<- error) func( either.Bytes, ) (T, bool) { return func( - _ context.Context, + ctx context.Context, eitherEventBz either.Bytes, ) (_ T, skip bool) { + logger := polylog.Ctx(ctx) eventBz, err := eitherEventBz.ValueOrError() if err != nil { errCh <- err @@ -286,10 +294,8 @@ func (rClient *replayClient[T]) newMapEventsBytesToTFn(errCh chan<- error) func( return *new(T), true } - panic(fmt.Sprintf( - "unexpected error deserialising event: %v; eventBz: %s", - err, string(eventBz), - )) + logger.Error().Err(err).Msg("Error in event decoder") + return *new(T), true } return event, false } diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index b73af9f3e..bd06af941 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -58,7 +58,7 @@ func OnError( return nil } - if retryLimit > 0 && retryCount >= retryLimit { + if retryCount >= retryLimit { return err }