Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zero retryLimit Support in ReplayClient #442

Merged
merged 32 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d7621fc
Support infinite connection attempt retries
ezeike Feb 21, 2024
4cb108c
Added TODO
ezeike Mar 20, 2024
efca1f2
Removed panics in replay_client
ezeike Mar 27, 2024
bc721da
refactor: factor out connection retry limit
bryanchriswhite Apr 2, 2024
6f8153b
fix: default conn retry limit & simplify config usage
bryanchriswhite Apr 3, 2024
5c8f542
Fix in OnError and test update
ezeike Apr 12, 2024
3b3b7cb
retry test test fixes
ezeike Apr 15, 2024
983f58d
Removed panics in replay_client
ezeike Mar 27, 2024
def63d9
refactor: factor out connection retry limit
bryanchriswhite Apr 2, 2024
c397674
fix: default conn retry limit & simplify config usage
bryanchriswhite Apr 3, 2024
9fe81eb
Fix in OnError and test update
ezeike Apr 12, 2024
0efe669
Update pkg/client/events/replay_client.go
Olshansk Apr 12, 2024
5b2b59b
Variable rename
ezeike Apr 15, 2024
094f8bf
Empty commit
ezeike Apr 19, 2024
579c344
Empty commit
ezeike Apr 19, 2024
e382fe5
Update pkg/client/interface.go
ezeike Apr 23, 2024
c37e36c
Update pkg/retry/retry_test.go
ezeike Apr 23, 2024
f91721c
Update pkg/retry/retry_test.go
ezeike Apr 24, 2024
108303a
[Config] feat: Simplify relay miner config (#477)
red-0ne Apr 23, 2024
6680149
[Tooling] Add claim/proof/settlement dashboard & link (#479)
bryanchriswhite Apr 24, 2024
4ccf0ca
Merge remote-tracking branch 'pokt/main' into zero-retrylimit-replayc…
bryanchriswhite Apr 26, 2024
d991a18
lint fix
ezeike Apr 26, 2024
09f5fd7
Type coercion update
ezeike Apr 26, 2024
89076c0
fix: tests
bryanchriswhite Apr 26, 2024
008842e
fix: tests
bryanchriswhite Apr 26, 2024
9a68de3
Merge branch 'main' into zero-retrylimit-replayclient
Olshansk Apr 29, 2024
4bf968a
Godoc comment update
ezeike May 2, 2024
6209183
Merge branch 'main' into zero-retrylimit-replayclient
bryanchriswhite May 3, 2024
0d607ae
Empty commit
bryanchriswhite May 3, 2024
47eeadb
Merge branch 'main' into zero-retrylimit-replayclient
Olshansk May 3, 2024
1921dd8
Empty commit
bryanchriswhite May 6, 2024
0693ca1
Empty commit
Olshansk May 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions pkg/client/block/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,41 +38,46 @@ const (
func NewBlockClient(
ctx context.Context,
deps depinject.Config,
) (client.BlockClient, error) {
ctx, close := context.WithCancel(ctx)
opts ...client.BlockClientOption,
) (_ client.BlockClient, err error) {
ctx, cancel := context.WithCancel(ctx)

// latestBlockPublishCh is the channel that notifies the latestBlockReplayObs of a
// new block, whether it comes from a direct query or an event subscription query.
latestBlockReplayObs, latestBlockPublishCh := channel.NewReplayObservable[client.Block](ctx, 10)
bClient := &blockReplayClient{
latestBlockReplayObs: latestBlockReplayObs,
close: cancel,
}

for _, opt := range opts {
opt(bClient)
}

eventsReplayClient, err := events.NewEventsReplayClient[client.Block](
bClient.eventsReplayClient, err = events.NewEventsReplayClient[client.Block](
ctx,
deps,
committedBlocksQuery,
UnmarshalNewBlock,
defaultBlocksReplayLimit,
events.WithConnRetryLimit[client.Block](bClient.connRetryLimit),
)
if err != nil {
close()
cancel()
return nil, err
}

// latestBlockPublishCh is the channel that notifies the latestBlockReplayObs of a
// new block, whether it comes from a direct query or an event subscription query.
latestBlockReplayObs, latestBlockPublishCh := channel.NewReplayObservable[client.Block](ctx, 10)
blockReplayClient := &blockReplayClient{
eventsReplayClient: eventsReplayClient,
latestBlockReplayObs: latestBlockReplayObs,
close: close,
}

if err := depinject.Inject(deps, &blockReplayClient.onStartQueryClient); err != nil {
if err := depinject.Inject(deps, &bClient.onStartQueryClient); err != nil {
return nil, err
}

blockReplayClient.asyncForwardBlockEvent(ctx, latestBlockPublishCh)
bClient.asyncForwardBlockEvent(ctx, latestBlockPublishCh)

if err := blockReplayClient.getInitialBlock(ctx, latestBlockPublishCh); err != nil {
if err := bClient.getInitialBlock(ctx, latestBlockPublishCh); err != nil {
return nil, err
}

return blockReplayClient, nil
return bClient, nil
}

// blockReplayClient is BlockClient implementation that combines a CometRPC client
Expand All @@ -99,6 +104,11 @@ type blockReplayClient struct {

// close is a function that cancels the context of the blockReplayClient.
close context.CancelFunc

// connRetryLimit is the number of times the underlying replay client
// should retry in the event that it encounters an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
connRetryLimit int
}

// CommittedBlocksSequence returns a replay observable of new block events.
Expand Down
3 changes: 2 additions & 1 deletion pkg/client/block/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func TestBlockClient(t *testing.T) {
Hash: expectedHash,
},
}, nil
})
}).
AnyTimes()

deps := depinject.Supply(eventsQueryClient, cometClientMock)

Expand Down
13 changes: 13 additions & 0 deletions pkg/client/block/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package block

import "github.com/pokt-network/poktroll/pkg/client"

// WithConnRetryLimit returns an option function which sets the number
// of times the underlying replay client should retry in the event that it encounters
// an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
func WithConnRetryLimit(limit int) client.BlockClientOption {
return func(client client.BlockClient) {
client.(*blockReplayClient).connRetryLimit = limit
}
}
20 changes: 17 additions & 3 deletions pkg/client/delegation/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,27 @@ const (
func NewDelegationClient(
ctx context.Context,
deps depinject.Config,
) (client.DelegationClient, error) {
client, err := events.NewEventsReplayClient[client.Redelegation](
opts ...client.DelegationClientOption,
) (_ client.DelegationClient, err error) {
dClient := &delegationClient{}

for _, opt := range opts {
opt(dClient)
}

dClient.eventsReplayClient, err = events.NewEventsReplayClient[client.Redelegation](
ctx,
deps,
delegationEventQuery,
newRedelegationEventFactoryFn(),
defaultRedelegationsReplayLimit,
events.WithConnRetryLimit[client.Redelegation](dClient.connRetryLimit),
)
if err != nil {
return nil, err
}
return &delegationClient{eventsReplayClient: client}, nil

return dClient, nil
}

// delegationClient is a wrapper around an EventsReplayClient that implements
Expand All @@ -69,6 +78,11 @@ type delegationClient struct {
// These enable the EventsReplayClient to correctly map the raw event bytes
// to Redelegation objects and to correctly return a RedelegationReplayObservable
eventsReplayClient client.EventsReplayClient[client.Redelegation]

// connRetryLimit is the number of times the underlying replay client
// should retry in the event that it encounters an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
connRetryLimit int
}

// RedelegationsSequence returns a replay observable of Redelgation events
Expand Down
13 changes: 13 additions & 0 deletions pkg/client/delegation/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package delegation

import "github.com/pokt-network/poktroll/pkg/client"

// WithConnRetryLimit returns an option function which sets the number
// of times the underlying replay client should retry in the event that it encounters
// an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
func WithConnRetryLimit(limit int) client.DelegationClientOption {
return func(client client.DelegationClient) {
client.(*delegationClient).connRetryLimit = limit
}
}
15 changes: 15 additions & 0 deletions pkg/client/events/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,18 @@ func WithDialer(dialer client.Dialer) client.EventsQueryClientOption {
evtClient.(*eventsQueryClient).dialer = dialer
}
}

// WithConnRetryLimit returns an option function which sets the number
// of times the replay client should retry in the event that it encounters
// an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
func WithConnRetryLimit[T any](limit int) client.EventsReplayClientOption[T] {
return func(client client.EventsReplayClient[T]) {
// Ignore the zero value because limit may be provided via a partially
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bryanchriswhite When you work on updating one governance parameter at a time, taking this pattern into consideration will be critical!

// configured config struct (i.e. no retry limit set).
// The default will be used instead.
if limit != 0 {
client.(*replayClient[T]).connRetryLimit = limit
}
}
}
65 changes: 47 additions & 18 deletions pkg/client/events/replay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,21 @@ import (
)

const (
// DefaultConnRetryLimit is used to indicate how many times the
// underlying replay client should attempt to retry if it encounters an error
// or its connection is interrupted.
//
// TODO_IMPROVE: this should be configurable but can be overridden at compile-time:
// go build -ldflags "-X github.com/pokt-network/poktroll/DefaultConnRetryLimit=value".
DefaultConnRetryLimit = 10

// eventsBytesRetryDelay is the delay between retry attempts when the events
// bytes observable returns an error.
eventsBytesRetryDelay = time.Second
// eventsBytesRetryLimit is the maximum number of times to attempt to
// re-establish the events query bytes subscription when the events bytes
// observable returns an error or closes.
// TODO_TECHDEBT: to make this a customizable parameter in the appgateserver and relayminer config files.
eventsBytesRetryLimit = 10
eventsBytesRetryResetTimeout = 10 * time.Second
// replayObsCacheBufferSize is the replay buffer size of the
Expand Down Expand Up @@ -81,6 +90,10 @@ type replayClient[T any] struct {
// replayClientCancelCtx is the function to cancel the context of the replay client.
// It is called when the replay client is closed.
replayClientCancelCtx func()
// connRetryLimit is the number of times the replay client should retry
// in the event that it encounters an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
connRetryLimit int
}

// NewEventsReplayClient creates a new EventsReplayClient from the given
Expand All @@ -98,6 +111,7 @@ func NewEventsReplayClient[T any](
queryString string,
newEventFn NewEventsFn[T],
replayObsBufferSize int,
opts ...client.EventsReplayClientOption[T],
) (client.EventsReplayClient[T], error) {
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -107,7 +121,13 @@ func NewEventsReplayClient[T any](
eventDecoder: newEventFn,
replayObsBufferSize: replayObsBufferSize,
replayClientCancelCtx: cancel,
connRetryLimit: DefaultConnRetryLimit,
}

for _, opt := range opts {
opt(rClient)
}

// TODO_REFACTOR(@h5law): Look into making this a regular observable as
// we may no longer depend on it being replayable.
replayObsCache, replayObsCachePublishCh := channel.NewReplayObservable[observable.ReplayObservable[T]](
Expand Down Expand Up @@ -189,26 +209,26 @@ func (rClient *replayClient[T]) Close() {
// goPublishEvents runs the work function returned by retryPublishEventsFactory,
// re-invoking it according to the arguments to retry.OnError when the events bytes
// observable returns an asynchronous error.
// This function is intended to be called in a goroutine.
func (rClient *replayClient[T]) goPublishEvents(ctx context.Context) {
// React to errors by getting a new events bytes observable, re-mapping it,
// and send it to replayObsCachePublishCh such that
// replayObsCache.Last(ctx, 1) will return it.
publishError := retry.OnError(
publishErr := retry.OnError(
ctx,
eventsBytesRetryLimit,
rClient.connRetryLimit,
eventsBytesRetryDelay,
eventsBytesRetryResetTimeout,
"goPublishEvents",
rClient.retryPublishEventsFactory(ctx),
)

// If we get here, the retry limit was reached and the retry loop exited.
// Since this function runs in a goroutine, we can't return the error to the
// caller. Instead, we panic.
if publishError != nil {
panic(fmt.Errorf("EventsReplayClient[%T].goPublishEvents should never reach this spot: %w", *new(T), publishError))
if publishErr != nil {
panic(fmt.Errorf("EventsReplayClient[%T].goPublishEvents should never reach this spot: %w", *new(T), publishErr))
}

return
}

// retryPublishEventsFactory returns a function which is intended to be passed
Expand All @@ -217,20 +237,24 @@ func (rClient *replayClient[T]) goPublishEvents(ctx context.Context) {
// replayObsCache replay observable.
func (rClient *replayClient[T]) retryPublishEventsFactory(ctx context.Context) func() chan error {
return func() chan error {
eventsBzCtx, cancelEventsBzObs := context.WithCancel(ctx)
errCh := make(chan error, 1)
eventsBytesObs, err := rClient.eventsClient.EventsBytes(ctx, rClient.queryString)

eventsBytesObs, err := rClient.eventsClient.EventsBytes(eventsBzCtx, rClient.queryString)
if err != nil {
// No need to cancel eventsBytesObs in the case of a synchronous error.
errCh <- err
return errCh
}

// NB: must cast back to generic observable type to use with Map.
eventsBzObs := observable.Observable[either.Either[[]byte]](eventsBytesObs)

typedObs := channel.MapReplay(
ctx,
eventsBzCtx,
replayObsCacheBufferSize,
eventsBzObs,
rClient.newMapEventsBytesToTFn(errCh),
rClient.newMapEventsBytesToTFn(errCh, cancelEventsBzObs),
)

// Subscribe to the eventBzObs and block until the channel closes.
Expand Down Expand Up @@ -269,12 +293,12 @@ func (rClient *replayClient[T]) retryPublishEventsFactory(ctx context.Context) f
// If deserialisation failed because the event bytes were for a different event
// type, this value is also skipped. If deserialisation failed for some other
// reason, this function panics.
func (rClient *replayClient[T]) newMapEventsBytesToTFn(errCh chan<- error) func(
context.Context,
either.Bytes,
) (T, bool) {
func (rClient *replayClient[T]) newMapEventsBytesToTFn(
errCh chan<- error,
cancelEventsBzObs context.CancelFunc,
) func(context.Context, either.Bytes) (T, bool) {
return func(
_ context.Context,
ctx context.Context,
eitherEventBz either.Bytes,
) (_ T, skip bool) {
eventBz, err := eitherEventBz.ValueOrError()
Expand All @@ -296,10 +320,15 @@ func (rClient *replayClient[T]) newMapEventsBytesToTFn(errCh chan<- error) func(
return *new(T), true
}

panic(fmt.Sprintf(
"unexpected error deserialising event: %v; eventBz: %s",
err, string(eventBz),
))
// Don't publish (skip) if there was some other kind of error,
// and send that error on the errCh.
errCh <- err

// The source observable may not necessarily close automatically in this case,
// cancel its context to ensure its closure and prevent a memory/goroutine leak.
cancelEventsBzObs()

return *new(T), true
}
return event, false
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,15 @@ type TxClientOption func(TxClient)
// SupplierClientOption defines a function type that modifies the SupplierClient.
type SupplierClientOption func(SupplierClient)

// DelegationClientOption defines a function type that modifies the DelegationClient.
type DelegationClientOption func(DelegationClient)

// BlockClientOption defines a function type that modifies the BlockClient.
type BlockClientOption func(BlockClient)

// EventsReplayClientOption defines a function type that modifies the ReplayClient.
type EventsReplayClientOption[T any] func(EventsReplayClient[T])

// AccountQueryClient defines an interface that enables the querying of the
// on-chain account information
type AccountQueryClient interface {
Expand Down
6 changes: 6 additions & 0 deletions pkg/client/tx/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ type txClient struct {
// is used to ensure that transactions error channels receive and close in the event
// that they have not already by the given timeout height.
txTimeoutPool txTimeoutPool

// connRetryLimit is the number of times the underlying replay client
// should retry in the event that it encounters an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
connRetryLimit int
}

type (
Expand Down Expand Up @@ -167,6 +172,7 @@ func NewTxClient(
eventQuery,
UnmarshalTxResult,
defaultTxReplayLimit,
events.WithConnRetryLimit[*abci.TxResult](txnClient.connRetryLimit),
)
if err != nil {
return nil, err
Expand Down
10 changes: 10 additions & 0 deletions pkg/client/tx/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,13 @@ func WithSigningKeyName(keyName string) client.TxClientOption {
client.(*txClient).signingKeyName = keyName
}
}

// WithConnRetryLimit returns an option function which sets the number
// of times the underlying replay client should retry in the event that it encounters
// an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
func WithConnRetryLimit(limit int) client.TxClientOption {
return func(client client.TxClient) {
client.(*txClient).connRetryLimit = limit
}
}
Loading
Loading