Skip to content

Commit

Permalink
Removed panics in replay_client
Browse files Browse the repository at this point in the history
  • Loading branch information
ezeike committed Mar 27, 2024
1 parent 21b98ae commit f57bd58
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 17 deletions.
38 changes: 22 additions & 16 deletions pkg/client/events/replay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package events

import (
"context"
"fmt"
"time"

"cosmossdk.io/depinject"
Expand All @@ -11,6 +10,7 @@ import (
"github.com/pokt-network/poktroll/pkg/either"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/pkg/polylog"
"github.com/pokt-network/poktroll/pkg/retry"
)

Expand All @@ -22,7 +22,7 @@ const (
// re-establish the events query bytes subscription when the events bytes
// observable returns an error or closes.
// TODO to make this a customisable parameter in the appgateserver and relayminer config files.
eventsBytesRetryLimit = 0
eventsBytesRetryLimit = 2
eventsBytesRetryResetTimeout = 10 * time.Second
// replayObsCacheBufferSize is the replay buffer size of the
// replayObsCache replay observable which is used to cache the replay
Expand Down Expand Up @@ -118,7 +118,16 @@ func NewEventsReplayClient[T any](
}

// Concurrently publish events to the observable emitted by replayObsCache.
go rClient.goPublishEvents(ctx)
go func() {
for {
select {
case <-ctx.Done():
return
default:
rClient.publishEvents(ctx)
}
}
}()

return rClient, nil
}
Expand Down Expand Up @@ -176,11 +185,12 @@ func (rClient *replayClient[T]) Close() {
close(rClient.replayObsCachePublishCh)
}

// goPublishEvents runs the work function returned by retryPublishEventsFactory,
// publishEvents runs the work function returned by retryPublishEventsFactory,
// re-invoking it according to the arguments to retry.OnError when the events bytes
// observable returns an asynchronous error.
// This function is intended to be called in a goroutine.
func (rClient *replayClient[T]) goPublishEvents(ctx context.Context) {
func (rClient *replayClient[T]) publishEvents(ctx context.Context) error {
logger := polylog.Ctx(ctx)

// React to errors by getting a new events bytes observable, re-mapping it,
// and send it to replayObsCachePublishCh such that
// replayObsCache.Last(ctx, 1) will return it.
Expand All @@ -194,11 +204,8 @@ func (rClient *replayClient[T]) goPublishEvents(ctx context.Context) {
)

// If we get here, the retry limit was reached and the retry loop exited.
// Since this function runs in a goroutine, we can't return the error to the
// caller. Instead, we panic.
if publishError != nil {
panic(fmt.Errorf("EventsReplayClient[%T].goPublishEvents should never reach this spot: %w", *new(T), publishError))
}
logger.Error().Err(publishError).Msgf("EventsReplayClient[%T].goPublishEvents error")
return publishError
}

// retryPublishEventsFactory returns a function which is intended to be passed
Expand Down Expand Up @@ -264,9 +271,10 @@ func (rClient *replayClient[T]) newMapEventsBytesToTFn(errCh chan<- error) func(
either.Bytes,
) (T, bool) {
return func(
_ context.Context,
ctx context.Context,
eitherEventBz either.Bytes,
) (_ T, skip bool) {
logger := polylog.Ctx(ctx)
eventBz, err := eitherEventBz.ValueOrError()
if err != nil {
errCh <- err
Expand All @@ -286,10 +294,8 @@ func (rClient *replayClient[T]) newMapEventsBytesToTFn(errCh chan<- error) func(
return *new(T), true
}

panic(fmt.Sprintf(
"unexpected error deserialising event: %v; eventBz: %s",
err, string(eventBz),
))
logger.Error().Err(err).Msg("Error in event decoder")
return *new(T), true
}
return event, false
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func OnError(
return nil
}

if retryLimit > 0 && retryCount >= retryLimit {
if retryCount >= retryLimit {
return err
}

Expand Down

0 comments on commit f57bd58

Please sign in to comment.