Skip to content

Commit

Permalink
goPublishEvents & OnError updated
Browse files Browse the repository at this point in the history
  • Loading branch information
ezeike committed Mar 20, 2024
1 parent d408c7f commit 126353d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 17 deletions.
43 changes: 27 additions & 16 deletions pkg/client/events/replay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pokt-network/poktroll/pkg/either"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/pkg/polylog"
"github.com/pokt-network/poktroll/pkg/retry"
)

Expand Down Expand Up @@ -181,24 +182,34 @@ func (rClient *replayClient[T]) Close() {
// observable returns an asynchronous error.
// This function is intended to be called in a goroutine.
func (rClient *replayClient[T]) goPublishEvents(ctx context.Context) {
// React to errors by getting a new events bytes observable, re-mapping it,
// and send it to replayObsCachePublishCh such that
// replayObsCache.Last(ctx, 1) will return it.
publishError := retry.OnError(
ctx,
eventsBytesRetryLimit,
eventsBytesRetryDelay,
eventsBytesRetryResetTimeout,
"goPublishEvents",
rClient.retryPublishEventsFactory(ctx),
)
logger := polylog.Ctx(ctx)

for {
// React to errors by getting a new events bytes observable, re-mapping it,
// and send it to replayObsCachePublishCh such that
// replayObsCache.Last(ctx, 1) will return it.
publishError := retry.OnError(
ctx,
eventsBytesRetryLimit,
eventsBytesRetryDelay,
eventsBytesRetryResetTimeout,
"goPublishEvents",
rClient.retryPublishEventsFactory(ctx),
)

// OnError returned due to a context close
if publishError == nil {
return
}

// If we get here, the retry limit was reached and the retry loop exited.
// Since this function runs in a goroutine, we can't return the error to the
// caller. Instead, we panic.
if publishError != nil {
panic(fmt.Errorf("EventsReplayClient[%T].goPublishEvents should never reach this spot: %w", *new(T), publishError))
// OnError returned due to an error being received on errCh or errCh being closed
// In either case we emit a warning and call start over
logger.Warn().
Str("work_name", "goPublishEvents").
Err(publishError).
Msgf("EventsRelayClient[%T].goPublishEvents OnError returned")
}

}

// retryPublishEventsFactory returns a function which is intended to be passed
Expand Down
3 changes: 2 additions & 1 deletion pkg/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package retry

import (
"context"
"errors"
"time"

"github.com/pokt-network/poktroll/pkg/polylog"
Expand Down Expand Up @@ -55,7 +56,7 @@ func OnError(
logger.Warn().
Str("work_name", workName).
Msg("error channel closed, will no longer retry on error")
return nil
return errors.New("error channel closed")
}

if retryLimit > 0 && retryCount >= retryLimit {
Expand Down

0 comments on commit 126353d

Please sign in to comment.