Skip to content

Commit

Permalink
fix: default conn retry limit & simplify config usage
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Apr 3, 2024
1 parent 6fc34fa commit 0d25513
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 27 deletions.
1 change: 0 additions & 1 deletion e2e/tests/session_steps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func (s *suite) TheSupplierHasServicedASessionWithRelaysForServiceForApplication
msgSenderQuery,
tx.UnmarshalTxResult,
testEventsReplayClientBufferSize,
events.DefaultConnRetryLimit,
)
require.NoError(s, err)

Expand Down
4 changes: 1 addition & 3 deletions pkg/client/block/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ func NewBlockClient(
deps depinject.Config,
opts ...client.BlockClientOption,
) (_ client.BlockClient, err error) {
bClient := &blockClient{
connRetryLimit: events.DefaultConnRetryLimit,
}
bClient := &blockClient{}

for _, opt := range opts {
opt(bClient)
Expand Down
4 changes: 1 addition & 3 deletions pkg/client/delegation/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ func NewDelegationClient(
deps depinject.Config,
opts ...client.DelegationClientOption,
) (_ client.DelegationClient, err error) {
dClient := &delegationClient{
connRetryLimit: events.DefaultConnRetryLimit,
}
dClient := &delegationClient{}

for _, opt := range opts {
opt(dClient)
Expand Down
7 changes: 6 additions & 1 deletion pkg/client/events/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ func WithDialer(dialer client.Dialer) client.EventsQueryClientOption {
// If connRetryLimit is < 0, it will retry indefinitely.
func WithConnRetryLimit[T any](limit int) client.EventsReplayClientOption[T] {
return func(client client.EventsReplayClient[T]) {
client.(*replayClient[T]).connRetryLimit = limit
// Ignore the zero value because limit may be provided via a partially
// configured config struct (i.e. no retry limit set).
// The default will be used instead.
if limit != 0 {
client.(*replayClient[T]).connRetryLimit = limit
}
}
}
21 changes: 13 additions & 8 deletions pkg/client/events/replay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func NewEventsReplayClient[T any](
queryString: queryString,
eventDecoder: newEventFn,
replayObsBufferSize: replayObsBufferSize,
connRetryLimit: DefaultConnRetryLimit,
}

for _, opt := range opts {
Expand All @@ -137,7 +138,7 @@ func NewEventsReplayClient[T any](
}

// Concurrently publish events to the observable emitted by replayObsCache.
go rClient.goPublishEvents(ctx, rClient.connRetryLimit)
go rClient.goPublishEvents(ctx)

return rClient, nil
}
Expand Down Expand Up @@ -198,13 +199,13 @@ func (rClient *replayClient[T]) Close() {
// goPublishEvents runs the work function returned by retryPublishEventsFactory,
// re-invoking it according to the arguments to retry.OnError when the events bytes
// observable returns an asynchronous error.
func (rClient *replayClient[T]) goPublishEvents(ctx context.Context, retryLimit int) {
func (rClient *replayClient[T]) goPublishEvents(ctx context.Context) {
// React to errors by getting a new events bytes observable, re-mapping it,
// and send it to replayObsCachePublishCh such that
// replayObsCache.Last(ctx, 1) will return it.
publishErr := retry.OnError(
ctx,
retryLimit,
rClient.connRetryLimit,
eventsBytesRetryDelay,
eventsBytesRetryResetTimeout,
"goPublishEvents",
Expand Down Expand Up @@ -282,10 +283,10 @@ func (rClient *replayClient[T]) retryPublishEventsFactory(ctx context.Context) f
// If deserialisation failed because the event bytes were for a different event
// type, this value is also skipped. If deserialisation failed for some other
// reason, this function panics.
func (rClient *replayClient[T]) newMapEventsBytesToTFn(errCh chan<- error, cancel context.CancelFunc) func(
context.Context,
either.Bytes,
) (T, bool) {
func (rClient *replayClient[T]) newMapEventsBytesToTFn(
errCh chan<- error,
cancelEventsBzObs context.CancelFunc,
) func(context.Context, either.Bytes) (T, bool) {
return func(
ctx context.Context,
eitherEventBz either.Bytes,
Expand All @@ -311,8 +312,12 @@ func (rClient *replayClient[T]) newMapEventsBytesToTFn(errCh chan<- error, cance

// Don't publish (skip) if there was some other kind of error,
// and send that error on the errCh.
cancel()
errCh <- err

// The source observable may not necessarily close automatically in this case,
// cancel its context to ensure its closure and prevent a memory/goroutine leak.
cancelEventsBzObs()

return *new(T), true
}
return event, false
Expand Down
1 change: 0 additions & 1 deletion pkg/client/tx/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ func NewTxClient(
commitTimeoutHeightOffset: DefaultCommitTimeoutHeightOffset,
txErrorChans: make(txErrorChansByHash),
txTimeoutPool: make(txTimeoutPool),
connRetryLimit: events.DefaultConnRetryLimit,
}

if err = depinject.Inject(
Expand Down
13 changes: 5 additions & 8 deletions pkg/sdk/deps_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ import (
// buildDeps builds the dependencies for the POKTRollSDK if they are not provided
// in the config. This is useful for the SDK consumers that do not want or
// cannot provide the dependencies through depinject.
func (sdk *poktrollSDK) buildDeps(
ctx context.Context,
config *POKTRollSDKConfig,
) (depinject.Config, error) {
pocketNodeWebsocketURL := HostToWebsocketURL(config.QueryNodeUrl.Host)
func (sdk *poktrollSDK) buildDeps(ctx context.Context) (depinject.Config, error) {
pocketNodeWebsocketURL := HostToWebsocketURL(sdk.config.QueryNodeUrl.Host)

// Have a new depinject config
deps := depinject.Configs()
Expand All @@ -35,7 +32,7 @@ func (sdk *poktrollSDK) buildDeps(
eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketURL)
deps = depinject.Configs(deps, depinject.Supply(eventsQueryClient))

blockClientConnRetryLimitOpt := block.WithConnRetryLimit(config.ConnRetryLimit)
blockClientConnRetryLimitOpt := block.WithConnRetryLimit(sdk.config.ConnRetryLimit)

// Create and supply the block client that depends on the events query client
blockClient, err := block.NewBlockClient(ctx, deps, blockClientConnRetryLimitOpt)
Expand All @@ -48,7 +45,7 @@ func (sdk *poktrollSDK) buildDeps(
// TODO_TECHDEBT: Configure the grpc client options from the config.
var grpcClient grpctypes.ClientConn
grpcClient, err = grpc.Dial(
config.QueryNodeGRPCUrl.Host,
sdk.config.QueryNodeGRPCUrl.Host,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
Expand Down Expand Up @@ -77,7 +74,7 @@ func (sdk *poktrollSDK) buildDeps(
}
deps = depinject.Configs(deps, depinject.Supply(sessionQuerier))

delegationClientConnRetryLimitOpt := delegation.WithConnRetryLimit(config.ConnRetryLimit)
delegationClientConnRetryLimitOpt := delegation.WithConnRetryLimit(sdk.config.ConnRetryLimit)

// Create and supply the delegation client
delegationClient, err := delegation.NewDelegationClient(ctx, deps, delegationClientConnRetryLimitOpt)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewPOKTRollSDK(ctx context.Context, config *POKTRollSDKConfig) (POKTRollSDK
// Build the dependencies if they are not provided in the config.
if config.Deps != nil {
deps = config.Deps
} else if deps, err = sdk.buildDeps(ctx, config); err != nil {
} else if deps, err = sdk.buildDeps(ctx); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion testutil/testclient/testeventsquery/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewOneTimeEventsQuery(
ctrl := gomock.NewController(t)

eventsQueryClient := mockclient.NewMockEventsQueryClient(ctrl)
eventsQueryClient.EXPECT().EventsBytes(gomock.Eq(ctx), gomock.Eq(query)).
eventsQueryClient.EXPECT().EventsBytes(gomock.Any(), gomock.Eq(query)).
DoAndReturn(func(
ctx context.Context,
query string,
Expand Down

0 comments on commit 0d25513

Please sign in to comment.