From 0d255135acb04a13deae007b2b494784c2668872 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 3 Apr 2024 10:20:56 +0200 Subject: [PATCH] fix: default conn retry limit & simplify config usage --- e2e/tests/session_steps_test.go | 1 - pkg/client/block/client.go | 4 +--- pkg/client/delegation/client.go | 4 +--- pkg/client/events/options.go | 7 ++++++- pkg/client/events/replay_client.go | 21 ++++++++++++------- pkg/client/tx/client.go | 1 - pkg/sdk/deps_builder.go | 13 +++++------- pkg/sdk/sdk.go | 2 +- testutil/testclient/testeventsquery/client.go | 2 +- 9 files changed, 28 insertions(+), 27 deletions(-) diff --git a/e2e/tests/session_steps_test.go b/e2e/tests/session_steps_test.go index 73cae45e0..1039981b5 100644 --- a/e2e/tests/session_steps_test.go +++ b/e2e/tests/session_steps_test.go @@ -111,7 +111,6 @@ func (s *suite) TheSupplierHasServicedASessionWithRelaysForServiceForApplication msgSenderQuery, tx.UnmarshalTxResult, testEventsReplayClientBufferSize, - events.DefaultConnRetryLimit, ) require.NoError(s, err) diff --git a/pkg/client/block/client.go b/pkg/client/block/client.go index 244c54a61..2ba432ef0 100644 --- a/pkg/client/block/client.go +++ b/pkg/client/block/client.go @@ -37,9 +37,7 @@ func NewBlockClient( deps depinject.Config, opts ...client.BlockClientOption, ) (_ client.BlockClient, err error) { - bClient := &blockClient{ - connRetryLimit: events.DefaultConnRetryLimit, - } + bClient := &blockClient{} for _, opt := range opts { opt(bClient) diff --git a/pkg/client/delegation/client.go b/pkg/client/delegation/client.go index 20173c620..00f45e2fa 100644 --- a/pkg/client/delegation/client.go +++ b/pkg/client/delegation/client.go @@ -48,9 +48,7 @@ func NewDelegationClient( deps depinject.Config, opts ...client.DelegationClientOption, ) (_ client.DelegationClient, err error) { - dClient := &delegationClient{ - connRetryLimit: events.DefaultConnRetryLimit, - } + dClient := &delegationClient{} for _, opt := range opts { opt(dClient) diff --git a/pkg/client/events/options.go b/pkg/client/events/options.go index caf0a8841..b3a959fb0 100644 --- a/pkg/client/events/options.go +++ b/pkg/client/events/options.go @@ -16,6 +16,11 @@ func WithDialer(dialer client.Dialer) client.EventsQueryClientOption { // If connRetryLimit is < 0, it will retry indefinitely. func WithConnRetryLimit[T any](limit int) client.EventsReplayClientOption[T] { return func(client client.EventsReplayClient[T]) { - client.(*replayClient[T]).connRetryLimit = limit + // Ignore the zero value because limit may be provided via a partially + // configured config struct (i.e. no retry limit set). + // The default will be used instead. + if limit != 0 { + client.(*replayClient[T]).connRetryLimit = limit + } } } diff --git a/pkg/client/events/replay_client.go b/pkg/client/events/replay_client.go index 7df1a0997..14c246a7f 100644 --- a/pkg/client/events/replay_client.go +++ b/pkg/client/events/replay_client.go @@ -114,6 +114,7 @@ func NewEventsReplayClient[T any]( queryString: queryString, eventDecoder: newEventFn, replayObsBufferSize: replayObsBufferSize, + connRetryLimit: DefaultConnRetryLimit, } for _, opt := range opts { @@ -137,7 +138,7 @@ func NewEventsReplayClient[T any]( } // Concurrently publish events to the observable emitted by replayObsCache. - go rClient.goPublishEvents(ctx, rClient.connRetryLimit) + go rClient.goPublishEvents(ctx) return rClient, nil } @@ -198,13 +199,13 @@ func (rClient *replayClient[T]) Close() { // goPublishEvents runs the work function returned by retryPublishEventsFactory, // re-invoking it according to the arguments to retry.OnError when the events bytes // observable returns an asynchronous error. -func (rClient *replayClient[T]) goPublishEvents(ctx context.Context, retryLimit int) { +func (rClient *replayClient[T]) goPublishEvents(ctx context.Context) { // React to errors by getting a new events bytes observable, re-mapping it, // and send it to replayObsCachePublishCh such that // replayObsCache.Last(ctx, 1) will return it. publishErr := retry.OnError( ctx, - retryLimit, + rClient.connRetryLimit, eventsBytesRetryDelay, eventsBytesRetryResetTimeout, "goPublishEvents", @@ -282,10 +283,10 @@ func (rClient *replayClient[T]) retryPublishEventsFactory(ctx context.Context) f // If deserialisation failed because the event bytes were for a different event // type, this value is also skipped. If deserialisation failed for some other // reason, this function panics. -func (rClient *replayClient[T]) newMapEventsBytesToTFn(errCh chan<- error, cancel context.CancelFunc) func( - context.Context, - either.Bytes, -) (T, bool) { +func (rClient *replayClient[T]) newMapEventsBytesToTFn( + errCh chan<- error, + cancelEventsBzObs context.CancelFunc, +) func(context.Context, either.Bytes) (T, bool) { return func( ctx context.Context, eitherEventBz either.Bytes, @@ -311,8 +312,12 @@ func (rClient *replayClient[T]) newMapEventsBytesToTFn(errCh chan<- error, cance // Don't publish (skip) if there was some other kind of error, // and send that error on the errCh. - cancel() errCh <- err + + // The source observable may not necessarily close automatically in this case, + // cancel its context to ensure its closure and prevent a memory/goroutine leak. + cancelEventsBzObs() + return *new(T), true } return event, false diff --git a/pkg/client/tx/client.go b/pkg/client/tx/client.go index cdc0e7f2a..47f60fb58 100644 --- a/pkg/client/tx/client.go +++ b/pkg/client/tx/client.go @@ -144,7 +144,6 @@ func NewTxClient( commitTimeoutHeightOffset: DefaultCommitTimeoutHeightOffset, txErrorChans: make(txErrorChansByHash), txTimeoutPool: make(txTimeoutPool), - connRetryLimit: events.DefaultConnRetryLimit, } if err = depinject.Inject( diff --git a/pkg/sdk/deps_builder.go b/pkg/sdk/deps_builder.go index 41b13ac71..16e71c90c 100644 --- a/pkg/sdk/deps_builder.go +++ b/pkg/sdk/deps_builder.go @@ -19,11 +19,8 @@ import ( // buildDeps builds the dependencies for the POKTRollSDK if they are not provided // in the config. This is useful for the SDK consumers that do not want or // cannot provide the dependencies through depinject. -func (sdk *poktrollSDK) buildDeps( - ctx context.Context, - config *POKTRollSDKConfig, -) (depinject.Config, error) { - pocketNodeWebsocketURL := HostToWebsocketURL(config.QueryNodeUrl.Host) +func (sdk *poktrollSDK) buildDeps(ctx context.Context) (depinject.Config, error) { + pocketNodeWebsocketURL := HostToWebsocketURL(sdk.config.QueryNodeUrl.Host) // Have a new depinject config deps := depinject.Configs() @@ -35,7 +32,7 @@ func (sdk *poktrollSDK) buildDeps( eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketURL) deps = depinject.Configs(deps, depinject.Supply(eventsQueryClient)) - blockClientConnRetryLimitOpt := block.WithConnRetryLimit(config.ConnRetryLimit) + blockClientConnRetryLimitOpt := block.WithConnRetryLimit(sdk.config.ConnRetryLimit) // Create and supply the block client that depends on the events query client blockClient, err := block.NewBlockClient(ctx, deps, blockClientConnRetryLimitOpt) @@ -48,7 +45,7 @@ func (sdk *poktrollSDK) buildDeps( // TODO_TECHDEBT: Configure the grpc client options from the config. var grpcClient grpctypes.ClientConn grpcClient, err = grpc.Dial( - config.QueryNodeGRPCUrl.Host, + sdk.config.QueryNodeGRPCUrl.Host, grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { @@ -77,7 +74,7 @@ func (sdk *poktrollSDK) buildDeps( } deps = depinject.Configs(deps, depinject.Supply(sessionQuerier)) - delegationClientConnRetryLimitOpt := delegation.WithConnRetryLimit(config.ConnRetryLimit) + delegationClientConnRetryLimitOpt := delegation.WithConnRetryLimit(sdk.config.ConnRetryLimit) // Create and supply the delegation client delegationClient, err := delegation.NewDelegationClient(ctx, deps, delegationClientConnRetryLimitOpt) diff --git a/pkg/sdk/sdk.go b/pkg/sdk/sdk.go index 634c632a4..0ab944624 100644 --- a/pkg/sdk/sdk.go +++ b/pkg/sdk/sdk.go @@ -85,7 +85,7 @@ func NewPOKTRollSDK(ctx context.Context, config *POKTRollSDKConfig) (POKTRollSDK // Build the dependencies if they are not provided in the config. if config.Deps != nil { deps = config.Deps - } else if deps, err = sdk.buildDeps(ctx, config); err != nil { + } else if deps, err = sdk.buildDeps(ctx); err != nil { return nil, err } diff --git a/testutil/testclient/testeventsquery/client.go b/testutil/testclient/testeventsquery/client.go index b16461ab6..462764af7 100644 --- a/testutil/testclient/testeventsquery/client.go +++ b/testutil/testclient/testeventsquery/client.go @@ -45,7 +45,7 @@ func NewOneTimeEventsQuery( ctrl := gomock.NewController(t) eventsQueryClient := mockclient.NewMockEventsQueryClient(ctrl) - eventsQueryClient.EXPECT().EventsBytes(gomock.Eq(ctx), gomock.Eq(query)). + eventsQueryClient.EXPECT().EventsBytes(gomock.Any(), gomock.Eq(query)). DoAndReturn(func( ctx context.Context, query string,