diff --git a/Makefile b/Makefile index 8accba245..025c397e1 100644 --- a/Makefile +++ b/Makefile @@ -274,19 +274,19 @@ gateway_list: ## List all the staked gateways .PHONY: gateway_stake gateway_stake: ## Stake tokens for the gateway specified (must specify the gateway env var) - poktrolld --home=$(POKTROLLD_HOME) tx gateway stake-gateway 1000upokt --keyring-backend test --from $(GATEWAY) --node $(POCKET_NODE) + poktrolld --home=$(POKTROLLD_HOME) tx gateway stake-gateway --config $(POKTROLLD_HOME)/config/$(STAKE) --keyring-backend test --from $(GATEWAY) --node $(POCKET_NODE) .PHONY: gateway1_stake gateway1_stake: ## Stake gateway1 - GATEWAY=gateway1 make gateway_stake + GATEWAY=gateway1 STAKE=gateway1_stake_config.yaml make gateway_stake .PHONY: gateway2_stake gateway2_stake: ## Stake gateway2 - GATEWAY=gateway2 make gateway_stake + GATEWAY=gateway2 STAKE=gateway2_stake_config.yaml make gateway_stake .PHONY: gateway3_stake gateway3_stake: ## Stake gateway3 - GATEWAY=gateway3 make gateway_stake + GATEWAY=gateway3 STAKE=gateway3_stake_config.yaml make gateway_stake .PHONY: gateway_unstake gateway_unstake: ## Unstake an gateway (must specify the GATEWAY env var) diff --git a/e2e/tests/init_test.go b/e2e/tests/init_test.go index 7317156b0..c97017c2c 100644 --- a/e2e/tests/init_test.go +++ b/e2e/tests/init_test.go @@ -5,6 +5,7 @@ package e2e import ( "flag" "fmt" + "io/ioutil" "log" "os" "regexp" @@ -166,17 +167,36 @@ func (s *suite) TheUserShouldWaitForSeconds(dur int64) { } func (s *suite) TheUserStakesAWithUpoktFromTheAccount(actorType string, amount int64, accName string) { + // Create a temporary config file + configPathPattern := fmt.Sprintf("%s_stake_config_*.yaml", accName) + configContent := fmt.Sprintf(`stake_amount: %d upokt`, amount) + configFile, err := ioutil.TempFile("", configPathPattern) + if err != nil { + s.Fatalf("error creating config file: %q", err) + } + if _, err = configFile.Write([]byte(configContent)); err != nil { + s.Fatalf("error writing config file: %q", err) + } + args := []string{ "tx", actorType, fmt.Sprintf("stake-%s", actorType), - fmt.Sprintf("%dupokt", amount), + "--config", + configFile.Name(), "--from", accName, keyRingFlag, "-y", } res, err := s.pocketd.RunCommandOnHost("", args...) + + // Remove the temporary config file + err = os.Remove(configFile.Name()) + if err != nil { + s.Fatalf("error removing config file: %q", err) + } + if err != nil { s.Fatalf("error staking %s: %s", actorType, err) } diff --git a/go.mod b/go.mod index d5c8ccc43..9be97f1d6 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,6 @@ require ( github.com/athanorlabs/go-dleq v0.1.0 github.com/cometbft/cometbft v0.37.2 github.com/cometbft/cometbft-db v0.8.0 - github.com/cosmos/cosmos-proto v1.0.0-beta.2 github.com/cosmos/cosmos-sdk v0.47.3 github.com/cosmos/gogoproto v1.4.11 github.com/cosmos/ibc-go/v7 v7.1.0 @@ -36,7 +35,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2 github.com/noot/ring-go v0.0.0-20231019173746-6c4b33bcf03f - github.com/pokt-network/smt v0.8.2 + github.com/pokt-network/smt v0.9.2 github.com/pokt-network/smt/kvstore/badger v0.0.0-20240104123447-abb5c71c14ce github.com/regen-network/gocuke v0.6.2 github.com/rs/zerolog v1.29.1 @@ -48,7 +47,6 @@ require ( golang.org/x/crypto v0.15.0 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 golang.org/x/sync v0.5.0 - google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb google.golang.org/grpc v1.59.0 gopkg.in/yaml.v2 v2.4.0 ) @@ -92,6 +90,7 @@ require ( github.com/containerd/cgroups v1.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cosmos/btcutil v1.0.5 // indirect + github.com/cosmos/cosmos-proto v1.0.0-beta.2 // indirect github.com/cosmos/go-bip39 v1.0.0 // indirect github.com/cosmos/gogogateway v1.2.0 // indirect github.com/cosmos/iavl v0.20.0 // indirect @@ -286,6 +285,7 @@ require ( google.golang.org/api v0.143.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230913181813-007df8e322eb // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index 95bf478d8..ec5e16d20 100644 --- a/go.sum +++ b/go.sum @@ -1598,8 +1598,8 @@ github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/pokt-network/smt v0.8.2 h1:pCuIuSrEiSZEc6fhjjUXM9395Jqduzx6fZI8MheSJLA= -github.com/pokt-network/smt v0.8.2/go.mod h1:S4Ho4OPkK2v2vUCHNtA49XDjqUC/OFYpBbynRVYmxvA= +github.com/pokt-network/smt v0.9.2 h1:h/GnFm1F6mNBbF1hopr+9+y7nr173SU55NX7NxTVU0Y= +github.com/pokt-network/smt v0.9.2/go.mod h1:S4Ho4OPkK2v2vUCHNtA49XDjqUC/OFYpBbynRVYmxvA= github.com/pokt-network/smt/kvstore/badger v0.0.0-20240104123447-abb5c71c14ce h1:JCpmaD35pHq9VWbePNuysIVOX5ybmVzThNY2BcurjVE= github.com/pokt-network/smt/kvstore/badger v0.0.0-20240104123447-abb5c71c14ce/go.mod h1:VG/uGRf6wmiDnSxee8rmVtmtLXrVM2SK4/xXtrBFeZ0= github.com/polydawn/refmt v0.89.0 h1:ADJTApkvkeBZsN0tBTx8QjpD9JkmxbKp0cxfr9qszm4= diff --git a/localnet/poktrolld/config/gateway1_stake_config.yaml b/localnet/poktrolld/config/gateway1_stake_config.yaml new file mode 100644 index 000000000..2f085c5ef --- /dev/null +++ b/localnet/poktrolld/config/gateway1_stake_config.yaml @@ -0,0 +1 @@ +stake_amount: 1000upokt \ No newline at end of file diff --git a/localnet/poktrolld/config/gateway2_stake_config.yaml b/localnet/poktrolld/config/gateway2_stake_config.yaml new file mode 100644 index 000000000..2f085c5ef --- /dev/null +++ b/localnet/poktrolld/config/gateway2_stake_config.yaml @@ -0,0 +1 @@ +stake_amount: 1000upokt \ No newline at end of file diff --git a/localnet/poktrolld/config/gateway3_stake_config.yaml b/localnet/poktrolld/config/gateway3_stake_config.yaml new file mode 100644 index 000000000..2f085c5ef --- /dev/null +++ b/localnet/poktrolld/config/gateway3_stake_config.yaml @@ -0,0 +1 @@ +stake_amount: 1000upokt \ No newline at end of file diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go index fa747abf0..9488f82dd 100644 --- a/pkg/appgateserver/cmd/cmd.go +++ b/pkg/appgateserver/cmd/cmd.go @@ -180,10 +180,12 @@ func setupAppGateServerDependencies( config.NewSupplyEventsQueryClientFn(queryNodeRPCURL), // leaf config.NewSupplyBlockClientFn(), // leaf config.NewSupplyQueryClientContextFn(queryNodeGRPCURL), // leaf + config.NewSupplyDelegationClientFn(), // leaf config.NewSupplyAccountQuerierFn(), // leaf config.NewSupplyApplicationQuerierFn(), // leaf config.NewSupplySessionQuerierFn(), // leaf config.NewSupplyRingCacheFn(), + config.NewSupplyPOKTRollSDKFn(appGateConfig.SigningKey), } diff --git a/pkg/client/block/client_integration_test.go b/pkg/client/block/client_integration_test.go index 45035ece2..375142f36 100644 --- a/pkg/client/block/client_integration_test.go +++ b/pkg/client/block/client_integration_test.go @@ -14,6 +14,8 @@ import ( "github.com/pokt-network/poktroll/testutil/testclient/testblock" ) +// TODO(#255): Refactor this integration test to use an in-memory simulated network + const blockIntegrationSubTimeout = 5 * time.Second func TestBlockClient_LastNBlocks(t *testing.T) { diff --git a/pkg/client/delegation/client.go b/pkg/client/delegation/client.go index e4baae743..9646ca785 100644 --- a/pkg/client/delegation/client.go +++ b/pkg/client/delegation/client.go @@ -11,15 +11,25 @@ import ( const ( // delegationEventQuery is the query used by the EventsQueryClient to subscribe - // to new delegation events from the the application module on chain. + // to all application module events in order to filter for redelegation events. // See: https://docs.cosmos.network/v0.47/learn/advanced/events#subscribing-to-events // And: https://docs.cosmos.network/v0.47/learn/advanced/events#default-events - delegationEventQuery = "message.action='pocket.application.EventRedelegation'" + // TODO_HACK(#280): Instead of listening to all events and doing a verbose + // filter, we should subscribe to both MsgDelegateToGateway and MsgUndelegateFromGateway + // messages directly, and filter those for the EventRedelegation event types. + // This would save the delegation client from listening to a lot of unnecessary + // events, that it filters out. + // NB: This is not currently possible because the observer pattern does not + // support multiplexing multiple observables into a single observable, that + // can supply the EventsReplayClient with both the MsgDelegateToGateway and + // MsgUndelegateFromGateway events. + delegationEventQuery = "tm.event='Tx' AND message.module='application'" // defaultRedelegationsReplayLimit is the number of redelegations that the // replay observable returned by LastNRedelegations() will be able to replay. - // TODO_TECHDEBT/TODO_FUTURE: add a `redelegationsReplayLimit` field to the `delegationClient` - // struct that defaults to this but can be overridden via an option. + // TODO_TECHDEBT/TODO_FUTURE: add a `redelegationsReplayLimit` field to the + // delegation client struct that defaults to this but can be overridden via + // an option in future work. defaultRedelegationsReplayLimit = 100 ) diff --git a/pkg/client/delegation/client_test.go b/pkg/client/delegation/client_test.go index ccb6beacd..f9b82ffb5 100644 --- a/pkg/client/delegation/client_test.go +++ b/pkg/client/delegation/client_test.go @@ -2,7 +2,6 @@ package delegation_test import ( "context" - "encoding/json" "testing" "time" @@ -12,29 +11,25 @@ import ( "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/client/delegation" "github.com/pokt-network/poktroll/testutil/sample" + "github.com/pokt-network/poktroll/testutil/testclient/testdelegation" "github.com/pokt-network/poktroll/testutil/testclient/testeventsquery" - apptypes "github.com/pokt-network/poktroll/x/application/types" ) const ( testTimeoutDuration = 100 * time.Millisecond // duplicates pkg/client/delegation/client.go's delegationEventQuery for testing purposes. - delegationEventQuery = "message.action='pocket.application.EventRedelegation'" + delegationEventQuery = "tm.event='Tx' AND message.module='application'" ) func TestDelegationClient(t *testing.T) { var ( - expectedAddress = sample.AccAddress() - expectedDelegationEvent = apptypes.EventRedelegation{ - AppAddress: expectedAddress, - GatewayAddress: sample.AccAddress(), - } - ctx = context.Background() + expectedAppAddress = sample.AccAddress() + expectedGatewayAddress = sample.AccAddress() + ctx = context.Background() ) - expectedEventBz, err := json.Marshal(expectedDelegationEvent) - require.NoError(t, err) + expectedEventBz := testdelegation.NewRedelegationEventBytes(t, expectedAppAddress, expectedGatewayAddress) eventsQueryClient := testeventsquery.NewAnyTimesEventsBytesEventsQueryClient( ctx, t, @@ -58,6 +53,8 @@ func TestDelegationClient(t *testing.T) { name: "LastNRedelegations successfully returns latest redelegation", fn: func() client.Redelegation { lastRedelegation := delegationClient.LastNRedelegations(ctx, 1)[0] + require.Equal(t, expectedAppAddress, lastRedelegation.GetAppAddress()) + require.Equal(t, expectedGatewayAddress, lastRedelegation.GetGatewayAddress()) return lastRedelegation }, }, @@ -69,7 +66,8 @@ func TestDelegationClient(t *testing.T) { // Ensure that the observable is replayable via Last. lastRedelegation := redelegationObs.Last(ctx, 1)[0] - require.Equal(t, expectedAddress, lastRedelegation.GetAppAddress()) + require.Equal(t, expectedAppAddress, lastRedelegation.GetAppAddress()) + require.Equal(t, expectedGatewayAddress, lastRedelegation.GetGatewayAddress()) return lastRedelegation }, @@ -90,7 +88,8 @@ func TestDelegationClient(t *testing.T) { select { case actualRedelegation := <-actualRedelegationCh: - require.Equal(t, expectedAddress, actualRedelegation.GetAppAddress()) + require.Equal(t, expectedAppAddress, actualRedelegation.GetAppAddress()) + require.Equal(t, expectedGatewayAddress, actualRedelegation.GetGatewayAddress()) case <-time.After(testTimeoutDuration): t.Fatal("timed out waiting for redelegation event") } diff --git a/pkg/client/delegation/redelegation.go b/pkg/client/delegation/redelegation.go index a9fecae6e..86af6906f 100644 --- a/pkg/client/delegation/redelegation.go +++ b/pkg/client/delegation/redelegation.go @@ -1,16 +1,32 @@ package delegation +// TODO_TECHDEBT(#280): Refactor to use merged observables and subscribe to +// MsgDelegateToGateway and MsgUndelegateFromGateway messages directly, instead +// of listening to all events and doing a verbose filter. + import ( "encoding/json" + "strconv" + + "cosmossdk.io/api/tendermint/abci" "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/client/events" ) +// redelegationEventType is the type of the EventRedelegation event emitted by +// both the MsgDelegateToGateway and MsgUndelegateFromGateway messages. +const redelegationEventType = "pocket.application.EventRedelegation" + var _ client.Redelegation = (*redelegation)(nil) +// TxEvent is an alias for the CometBFT TxResult type used to decode the +// response bytes from the EventsQueryClient's subscription +type TxEvent = abci.TxResult + // redelegation wraps the EventRedelegation event emitted by the application -// module, for use in the observable +// module, for use in the observable, it is one of the log entries embedded +// within the log field of the response struct from the app module's query. type redelegation struct { AppAddress string `json:"app_address"` GatewayAddress string `json:"gateway_address"` @@ -27,22 +43,57 @@ func (d redelegation) GetGatewayAddress() string { } // newRedelegationEventFactoryFn is a factory function that returns a -// function that attempts to deserialise the given bytes into a redelegation +// function that attempts to deserialize the given bytes into a redelegation // struct. If the delegate struct has an empty app address then an // ErrUnmarshalRedelegation error is returned. Otherwise if deserialisation // fails then the error is returned. func newRedelegationEventFactoryFn() events.NewEventsFn[client.Redelegation] { - return func(redelegationEventBz []byte) (client.Redelegation, error) { - redelegationEvent := new(redelegation) - if err := json.Unmarshal(redelegationEventBz, redelegationEvent); err != nil { + return func(eventBz []byte) (client.Redelegation, error) { + txEvent := new(TxEvent) + // Try to deserialize the provided bytes into a TxEvent. + if err := json.Unmarshal(eventBz, txEvent); err != nil { return nil, err } - - if redelegationEvent.AppAddress == "" || redelegationEvent.GatewayAddress == "" { - return nil, events.ErrEventsUnmarshalEvent. - Wrapf("with redelegation: %s", string(redelegationEventBz)) + // Check if the TxEvent has empty transaction bytes, which indicates + // the message is probably not a valid transaction event. + if len(txEvent.Tx) == 0 { + return nil, events.ErrEventsUnmarshalEvent.Wrap("empty transaction bytes") } - - return redelegationEvent, nil + // Iterate through the log entries to find EventRedelegation + for _, event := range txEvent.Result.Events { + if event.GetType_() != redelegationEventType { + continue + } + var redelegationEvent redelegation + for _, attr := range event.Attributes { + switch attr.Key { + case "app_address": + appAddr, err := unescape(attr.Value) + if err != nil { + return nil, events.ErrEventsUnmarshalEvent.Wrapf("cannot retrieve app address: %v", err) + } + redelegationEvent.AppAddress = appAddr + case "gateway_address": + gatewayAddr, err := unescape(attr.Value) + if err != nil { + return nil, events.ErrEventsUnmarshalEvent.Wrapf("cannot retrieve gateway address: %v", err) + } + redelegationEvent.GatewayAddress = gatewayAddr + default: + return nil, events.ErrEventsUnmarshalEvent.Wrapf("unknown attribute key: %s", attr.Key) + } + } + // Handle the redelegation event + if redelegationEvent.AppAddress == "" || redelegationEvent.GatewayAddress == "" { + return nil, events.ErrEventsUnmarshalEvent. + Wrapf("empty redelegation event: %s", string(eventBz)) + } + return redelegationEvent, nil + } + return nil, events.ErrEventsUnmarshalEvent.Wrap("no redelegation event found") } } + +func unescape(s string) (string, error) { + return strconv.Unquote(s) +} diff --git a/pkg/client/events/replay_client.go b/pkg/client/events/replay_client.go index c196f9473..1836dd3ca 100644 --- a/pkg/client/events/replay_client.go +++ b/pkg/client/events/replay_client.go @@ -154,10 +154,10 @@ func (rClient *replayClient[T, R]) goRemapEventsSequence(ctx context.Context, pu func(ctx context.Context, eventTypeObs R) { if prevEventTypeObs != nil { // Just in case the assumption that all transport errors are - // persistent does not hold, unsubscribe from the previous - // event type observable in order to prevent unexpected and/or - // duplicate notifications on the obsersvable returned by this - // function. + // persistent (i.e. they occur once and do not repeat) does not + // hold, unsubscribe from the previous event type observable in + // order to prevent unexpected and/or duplicate notifications + // on the observable returned by this function. prevEventTypeObs.UnsubscribeAll() } else { prevEventTypeObs = eventTypeObs diff --git a/pkg/client/events/replay_client_integration_test.go b/pkg/client/events/replay_client_integration_test.go index a54a0b61e..f41e2f7f3 100644 --- a/pkg/client/events/replay_client_integration_test.go +++ b/pkg/client/events/replay_client_integration_test.go @@ -89,6 +89,9 @@ func TestReplayClient_Remapping(t *testing.T) { eventNum := readEventCounter.Add(1) - 1 event := newMessageEventBz(eventNum) + // After an arbitrary number of events (2 in this case), simulate + // the connection closing so that the replay client can remap the + // events it receives without the caller having to resubscribe. if eventNum == 2 { // Simulate the connection closing connMock.Close() diff --git a/pkg/client/query/accquerier.go b/pkg/client/query/accquerier.go index 73ed42b51..c61ec9760 100644 --- a/pkg/client/query/accquerier.go +++ b/pkg/client/query/accquerier.go @@ -52,7 +52,7 @@ func (aq *accQuerier) GetAccount( } var acc accounttypes.AccountI if err = queryCodec.UnpackAny(res.Account, &acc); err != nil { - return nil, ErrQueryUnableToDeserialiseAccount.Wrapf("address: %s [%v]", address, err) + return nil, ErrQueryUnableToDeserializeAccount.Wrapf("address: %s [%v]", address, err) } return acc, nil } diff --git a/pkg/client/query/errors.go b/pkg/client/query/errors.go index 04cb7fc3b..f395a9997 100644 --- a/pkg/client/query/errors.go +++ b/pkg/client/query/errors.go @@ -7,6 +7,6 @@ import ( var ( codespace = "query" ErrQueryAccountNotFound = sdkerrors.Register(codespace, 1, "account not found") - ErrQueryUnableToDeserialiseAccount = sdkerrors.Register(codespace, 2, "unable to deserialise account") + ErrQueryUnableToDeserializeAccount = sdkerrors.Register(codespace, 2, "unable to deserialize account") ErrQueryRetrieveSession = sdkerrors.Register(codespace, 3, "error while trying to retrieve a session") ) diff --git a/pkg/crypto/interface.go b/pkg/crypto/interface.go index d93978740..4298a9b6a 100644 --- a/pkg/crypto/interface.go +++ b/pkg/crypto/interface.go @@ -12,5 +12,19 @@ import ( // the addresses of the gateways the application is delegated to, and converting // them into their corresponding public key points on the secp256k1 curve. type RingCache interface { + // Start starts the ring cache, it takes a cancellable context and, in a + // separate goroutine, listens for on-chain delegation events and invalidates + // the cache if the redelegation event's AppAddress is stored in the cache. + Start(ctx context.Context) + // GetCachedAddresses returns the addresses of the applications that are + // currently cached in the ring cache. + GetCachedAddresses() []string + // GetRingForAddress returns the ring for the given application address if + // it exists. If it does not exist in the cache, it follows a lazy approach + // of querying the on-chain state and creating it just-in-time, caching for + // future retrievals. GetRingForAddress(ctx context.Context, appAddress string) (*ring.Ring, error) + // Stop stops the ring cache by unsubscribing from on-chain delegation events. + // And clears the cache, so that it no longer contains any rings, + Stop() } diff --git a/pkg/crypto/rings/cache.go b/pkg/crypto/rings/cache.go index 7aeccdddb..e1a21ed50 100644 --- a/pkg/crypto/rings/cache.go +++ b/pkg/crypto/rings/cache.go @@ -13,19 +13,26 @@ import ( "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/crypto" + "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/pkg/polylog" - _ "github.com/pokt-network/poktroll/pkg/polylog/polyzero" ) var _ crypto.RingCache = (*ringCache)(nil) type ringCache struct { + // logger is the logger for the ring cache. + logger polylog.Logger + // ringPointsCache maintains a map of application addresses to the points // on the secp256k1 curve that correspond to the public keys of the gateways // the application is delegated to. These are used to build the app's ring. ringPointsCache map[string][]ringtypes.Point ringPointsMu *sync.RWMutex + // delegationClient is used to listen for on-chain delegation events and + // invalidate cache entries for rings that have been updated on chain. + delegationClient client.DelegationClient + // applicationQuerier is the querier for the application module, and is // used to get the addresses of the gateways an application is delegated to. applicationQuerier client.ApplicationQueryClient @@ -37,6 +44,12 @@ type ringCache struct { // NewRingCache returns a new RingCache instance. It requires a depinject.Config // to be passed in, which is used to inject the dependencies of the RingCache. +// +// Required dependencies: +// - polylog.Logger +// - client.DelegationClient +// - client.ApplicationQueryClient +// - client.AccountQueryClient func NewRingCache(deps depinject.Config) (crypto.RingCache, error) { rc := &ringCache{ ringPointsCache: make(map[string][]ringtypes.Point), @@ -46,6 +59,8 @@ func NewRingCache(deps depinject.Config) (crypto.RingCache, error) { // Supply the account and application queriers to the RingCache. if err := depinject.Inject( deps, + &rc.logger, + &rc.delegationClient, &rc.applicationQuerier, &rc.accountQuerier, ); err != nil { @@ -55,6 +70,68 @@ func NewRingCache(deps depinject.Config) (crypto.RingCache, error) { return rc, nil } +// Start starts the ring cache by subscribing to on-chain redelegation events. +func (rc *ringCache) Start(ctx context.Context) { + rc.logger.Info().Msg("starting ring cache") + // Listen for redelegation events and invalidate the cache if the + // redelegation event's address is stored in the cache. + go func() { + select { + case <-ctx.Done(): + // Stop the ring cache if the context is cancelled. + rc.Stop() + } + }() + go rc.goInvalidateCache(ctx) +} + +// goInvalidateCache listens for redelegation events and invalidates the +// cache if the app address in the redelegation event is stored in the cache. +// This function is intended to be run in a goroutine. +func (rc *ringCache) goInvalidateCache(ctx context.Context) { + // Get the latest redelegation replay observable. + redelegationObs := rc.delegationClient.RedelegationsSequence(ctx) + // For each redelegation event, check if the redelegation events' + // app address is in the cache. If it is, invalidate the cache entry. + channel.ForEach[client.Redelegation]( + ctx, redelegationObs, + func(ctx context.Context, redelegation client.Redelegation) { + // Lock the cache for writing. + rc.ringPointsMu.Lock() + defer rc.ringPointsMu.Unlock() + // Check if the redelegation event's app address is in the cache. + if _, ok := rc.ringPointsCache[redelegation.GetAppAddress()]; ok { + rc.logger.Debug(). + Str("app_address", redelegation.GetAppAddress()). + Msg("redelegation event received; invalidating cache entry") + // Invalidate the cache entry. + delete(rc.ringPointsCache, redelegation.GetAppAddress()) + } + }) +} + +// Stop stops the ring cache by unsubscribing from on-chain redelegation events. +func (rc *ringCache) Stop() { + // Clear the cache. + rc.ringPointsMu.Lock() + rc.ringPointsCache = make(map[string][]ringtypes.Point) + rc.ringPointsMu.Unlock() + // Close the delegation client. + rc.delegationClient.Close() +} + +// GetCachedAddresses returns the addresses of the applications that are +// currently cached in the ring cache. +func (rc *ringCache) GetCachedAddresses() []string { + rc.ringPointsMu.RLock() + defer rc.ringPointsMu.RUnlock() + keys := make([]string, 0, len(rc.ringPointsCache)) + for k := range rc.ringPointsCache { + keys = append(keys, k) + } + return keys +} + // GetRingForAddress returns the ring for the address provided. If it does not // exist in the cache, it will be created by querying the application module. // and converting the addresses into their corresponding public key points on @@ -64,27 +141,26 @@ func (rc *ringCache) GetRingForAddress( appAddress string, ) (*ring.Ring, error) { var ( - ring *ring.Ring - err error - logger = polylog.Ctx(ctx) + ring *ring.Ring + err error ) // Lock the cache for reading. rc.ringPointsMu.RLock() // Check if the ring is in the cache. points, ok := rc.ringPointsCache[appAddress] - // Unlock the cache incase it was not cached. + // Unlock the cache in case it was not cached. rc.ringPointsMu.RUnlock() if !ok { // If the ring is not in the cache, get it from the application module. - logger.Debug(). + rc.logger.Debug(). Str("app_address", appAddress). Msg("ring cache miss; fetching from application module") ring, err = rc.getRingForAppAddress(ctx, appAddress) } else { // If the ring is in the cache, create it from the points. - logger.Debug(). + rc.logger.Debug(). Str("app_address", appAddress). Msg("ring cache hit; creating from points") ring, err = newRingFromPoints(points) @@ -108,6 +184,13 @@ func (rc *ringCache) getRingForAppAddress( if err != nil { return nil, err } + // Cache the ring's points for future use + rc.logger.Debug(). + Str("app_address", appAddress). + Msg("updating ring cache for app") + rc.ringPointsMu.Lock() + defer rc.ringPointsMu.Unlock() + rc.ringPointsCache[appAddress] = points return newRingFromPoints(points) } @@ -124,8 +207,6 @@ func (rc *ringCache) getDelegatedPubKeysForAddress( ctx context.Context, appAddress string, ) ([]ringtypes.Point, error) { - logger := polylog.Ctx(ctx) - rc.ringPointsMu.Lock() defer rc.ringPointsMu.Unlock() @@ -157,12 +238,6 @@ func (rc *ringCache) getDelegatedPubKeysForAddress( return nil, err } - // Update the cache overwriting the previous value. - logger.Debug(). - Str("app_address", appAddress). - Msg("updating ring cache for app") - rc.ringPointsCache[appAddress] = points - // Return the public key points on the secp256k1 curve. return points, nil } @@ -174,11 +249,9 @@ func (rc *ringCache) addressesToPoints( ctx context.Context, addresses []string, ) ([]ringtypes.Point, error) { - logger := polylog.Ctx(ctx) - curve := ring_secp256k1.NewCurve() points := make([]ringtypes.Point, len(addresses)) - logger.Debug(). + rc.logger.Debug(). // TODO_TECHDEBT: implement and use `polylog.Event#Strs([]string)` instead of formatting here. Str("addresses", fmt.Sprintf("%v", addresses)). Msg("converting addresses to points") diff --git a/pkg/crypto/rings/cache_test.go b/pkg/crypto/rings/cache_test.go index cc66dba94..8505fa3dd 100644 --- a/pkg/crypto/rings/cache_test.go +++ b/pkg/crypto/rings/cache_test.go @@ -1,18 +1,22 @@ -package rings +package rings_test import ( "context" "errors" "testing" + "time" - "cosmossdk.io/depinject" cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" "github.com/stretchr/testify/require" "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/crypto" + "github.com/pokt-network/poktroll/pkg/crypto/rings" + "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/testutil/sample" + "github.com/pokt-network/poktroll/testutil/testclient/testdelegation" "github.com/pokt-network/poktroll/testutil/testclient/testqueryclients" + testrings "github.com/pokt-network/poktroll/testutil/testcrypto/rings" apptypes "github.com/pokt-network/poktroll/x/application/types" ) @@ -41,9 +45,16 @@ func newAccount(curve string) account { } func TestRingCache_BuildRing_Uncached(t *testing.T) { - rc := createRingCache(t) + // Create and start the ring cache + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + rc, _ := createRingCache(ctx, t, "") + rc.Start(ctx) + t.Cleanup(rc.Stop) + tests := []struct { desc string + appAddrIndex int appAccount account delegateeAccounts []account expectedRingSize int @@ -51,6 +62,7 @@ func TestRingCache_BuildRing_Uncached(t *testing.T) { }{ { desc: "success: un-cached application without delegated gateways", + appAddrIndex: 1, appAccount: newAccount("secp256k1"), delegateeAccounts: []account{}, expectedRingSize: noDelegateesRingSize, @@ -58,6 +70,7 @@ func TestRingCache_BuildRing_Uncached(t *testing.T) { }, { desc: "success: un-cached application with delegated gateways", + appAddrIndex: 2, appAccount: newAccount("secp256k1"), delegateeAccounts: []account{newAccount("secp256k1"), newAccount("secp256k1")}, expectedRingSize: 3, @@ -68,14 +81,14 @@ func TestRingCache_BuildRing_Uncached(t *testing.T) { appAccount: newAccount("ed25519"), delegateeAccounts: []account{newAccount("secp256k1"), newAccount("secp256k1")}, expectedRingSize: 0, - expectedErr: ErrRingsNotSecp256k1Curve, + expectedErr: rings.ErrRingsNotSecp256k1Curve, }, { desc: "failure: gateway pubkey uses wrong curve", appAccount: newAccount("secp256k1"), delegateeAccounts: []account{newAccount("ed25519"), newAccount("ed25519")}, expectedRingSize: 0, - expectedErr: ErrRingsNotSecp256k1Curve, + expectedErr: rings.ErrRingsNotSecp256k1Curve, }, { desc: "failure: application not found", @@ -85,7 +98,6 @@ func TestRingCache_BuildRing_Uncached(t *testing.T) { expectedErr: apptypes.ErrAppNotFound, }, } - ctx := context.TODO() for _, test := range tests { t.Run(test.desc, func(t *testing.T) { // If we expect the application to exist then add it to the test @@ -109,12 +121,12 @@ func TestRingCache_BuildRing_Uncached(t *testing.T) { require.NoError(t, err) // Ensure the ring is the correct size. require.Equal(t, test.expectedRingSize, ring.Size()) + require.Equal(t, test.appAddrIndex, len(rc.GetCachedAddresses())) }) } } func TestRingCache_BuildRing_Cached(t *testing.T) { - rc := createRingCache(t) tests := []struct { desc string appAccount account @@ -134,12 +146,32 @@ func TestRingCache_BuildRing_Cached(t *testing.T) { expectedErr: nil, }, } - ctx := context.TODO() + for _, test := range tests { t.Run(test.desc, func(t *testing.T) { + // Create and start the ring cache + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + rc, pubCh := createRingCache(ctx, t, test.appAccount.address) + rc.Start(ctx) + t.Cleanup(rc.Stop) + + // Check that the ring cache is empty + require.Equal(t, 0, len(rc.GetCachedAddresses())) + + // add the application's account with no delegated gateways to the + // testing state + testqueryclients.AddAddressToApplicationMap(t, test.appAccount.address, test.appAccount.pubKey, nil) + + // Attempt to retrieve the ring for the address and cache it + ring1, err := rc.GetRingForAddress(ctx, test.appAccount.address) + require.NoError(t, err) + require.Equal(t, noDelegateesRingSize, ring1.Size()) + require.Equal(t, 1, len(rc.GetCachedAddresses())) + accMap := make(map[string]cryptotypes.PubKey) // if the test expects a ring > 2 we have delegated gateways - if test.expectedRingSize > 2 { + if test.expectedRingSize != noDelegateesRingSize { // create accounts for all the expected delegated gateways // and add them to the map for i := 0; i < test.expectedRingSize-1; i++ { @@ -147,34 +179,129 @@ func TestRingCache_BuildRing_Cached(t *testing.T) { accMap[gatewayAcc.address] = gatewayAcc.pubKey } } + // add the application's account and the accounts of all its // delegated gateways to the testing state simulating a change testqueryclients.AddAddressToApplicationMap(t, test.appAccount.address, test.appAccount.pubKey, accMap) + for k := range accMap { + t.Log(accMap) + // publish a redelegation event + pubCh <- testdelegation.NewAnyTimesRedelegation(t, test.appAccount.address, k) + } - // Attempt to retrieve the ring for the address and cache it - ring1, err := rc.GetRingForAddress(ctx, test.appAccount.address) + // Wait a tick to allow the ring cache to process asynchronously. + // It should have invalidated the cache for the ring, if changed. + time.Sleep(15 * time.Millisecond) + + // Attempt to retrieve the ring for the address and cache it if + // the ring was updated + ring2, err := rc.GetRingForAddress(ctx, test.appAccount.address) require.NoError(t, err) - require.Equal(t, test.expectedRingSize, ring1.Size()) + // If the ring was updated then the rings should not be equal + if test.expectedRingSize != noDelegateesRingSize { + require.False(t, ring1.Equals(ring2)) + } else { + require.True(t, ring1.Equals(ring2)) + } + require.Equal(t, test.expectedRingSize, ring2.Size()) + require.Equal(t, 1, len(rc.GetCachedAddresses())) // Attempt to retrieve the ring for the address after its been cached - ring2, err := rc.GetRingForAddress(ctx, test.appAccount.address) + ring3, err := rc.GetRingForAddress(ctx, test.appAccount.address) require.NoError(t, err) + require.Equal(t, 1, len(rc.GetCachedAddresses())) // Ensure the rings are the same and have the same size - require.True(t, ring1.Equals(ring2)) - require.Equal(t, test.expectedRingSize, ring2.Size()) + require.True(t, ring2.Equals(ring3)) + require.Equal(t, test.expectedRingSize, ring3.Size()) + require.Equal(t, 1, len(rc.GetCachedAddresses())) }) } } +func TestRingCache_Stop(t *testing.T) { + // Create and start the ring cache + ctx, cancelCtx := context.WithCancel(context.Background()) + t.Cleanup(cancelCtx) + rc, _ := createRingCache(ctx, t, "") + rc.Start(ctx) + + // Insert an application into the testing state + appAccount := newAccount("secp256k1") + gatewayAccount := newAccount("secp256k1") + testqueryclients.AddAddressToApplicationMap( + t, appAccount.address, + appAccount.pubKey, + map[string]cryptotypes.PubKey{ + gatewayAccount.address: gatewayAccount.pubKey, + }) + + // Attempt to retrieve the ring for the address and cache it + ring1, err := rc.GetRingForAddress(ctx, appAccount.address) + require.NoError(t, err) + require.Equal(t, 2, ring1.Size()) + require.Equal(t, 1, len(rc.GetCachedAddresses())) + + // Retrieve the cached ring + ring2, err := rc.GetRingForAddress(ctx, appAccount.address) + require.NoError(t, err) + require.True(t, ring1.Equals(ring2)) + require.Equal(t, 1, len(rc.GetCachedAddresses())) + + // Stop the ring cache + rc.Stop() + + // Retrieve the ring again + require.Equal(t, 0, len(rc.GetCachedAddresses())) +} + +func TestRingCache_CancelContext(t *testing.T) { + // Create and start the ring cache + ctx, cancelCtx := context.WithCancel(context.Background()) + rc, _ := createRingCache(ctx, t, "") + rc.Start(ctx) + + // Insert an application into the testing state + appAccount := newAccount("secp256k1") + gatewayAccount := newAccount("secp256k1") + testqueryclients.AddAddressToApplicationMap( + t, + appAccount.address, appAccount.pubKey, + map[string]cryptotypes.PubKey{ + gatewayAccount.address: gatewayAccount.pubKey, + }) + + // Attempt to retrieve the ring for the address and cache it + ring1, err := rc.GetRingForAddress(ctx, appAccount.address) + require.NoError(t, err) + require.Equal(t, 2, ring1.Size()) + require.Equal(t, 1, len(rc.GetCachedAddresses())) + + // Retrieve the cached ring + ring2, err := rc.GetRingForAddress(ctx, appAccount.address) + require.NoError(t, err) + require.True(t, ring1.Equals(ring2)) + require.Equal(t, 1, len(rc.GetCachedAddresses())) + + // Cancel the context + cancelCtx() + + // Wait a tick to allow the ring cache to process asynchronously. + time.Sleep(15 * time.Millisecond) + + // Retrieve the ring again + require.Equal(t, 0, len(rc.GetCachedAddresses())) +} + // createRingCache creates the RingCache using mocked AccountQueryClient and -// ApplicatioQueryClient instances -func createRingCache(t *testing.T) crypto.RingCache { +// ApplicatioQueryClient instances and returns the RingCache and the delegatee +// change replay observable. +func createRingCache(ctx context.Context, t *testing.T, appAddress string) (crypto.RingCache, chan<- client.Redelegation) { t.Helper() + redelegationObs, redelegationPublishCh := channel.NewReplayObservable[client.Redelegation](ctx, 1) + delegationClient := testdelegation.NewAnyTimesRedelegationsSequence(ctx, t, appAddress, redelegationObs) accQuerier := testqueryclients.NewTestAccountQueryClient(t) appQuerier := testqueryclients.NewTestApplicationQueryClient(t) - deps := depinject.Supply(client.AccountQueryClient(accQuerier), client.ApplicationQueryClient(appQuerier)) - rc, err := NewRingCache(deps) - require.NoError(t, err) - return rc + rc := testrings.NewRingCacheWithMockDependencies(ctx, t, accQuerier, appQuerier, delegationClient) + return rc, redelegationPublishCh } diff --git a/pkg/deps/config/suppliers.go b/pkg/deps/config/suppliers.go index 4a9d536e9..7c9f4dd21 100644 --- a/pkg/deps/config/suppliers.go +++ b/pkg/deps/config/suppliers.go @@ -12,6 +12,7 @@ import ( "github.com/spf13/cobra" "github.com/pokt-network/poktroll/pkg/client/block" + "github.com/pokt-network/poktroll/pkg/client/delegation" "github.com/pokt-network/poktroll/pkg/client/events" "github.com/pokt-network/poktroll/pkg/client/query" querytypes "github.com/pokt-network/poktroll/pkg/client/query/types" @@ -59,8 +60,8 @@ func NewSupplyLoggerFromCtx(ctx context.Context) SupplierFn { } } -// NewSupplyEventsQueryClientFn supplies a depinject config with an eventsQueryClient -// from the given queryNodeRPCURL. +// NewSupplyEventsQueryClientFn supplies a depinject config with an +// EventsQueryClient from the given queryNodeRPCURL. func NewSupplyEventsQueryClientFn(queryNodeRPCURL *url.URL) SupplierFn { return func( _ context.Context, @@ -92,6 +93,25 @@ func NewSupplyBlockClientFn() SupplierFn { } } +// NewSupplyDelegationClientFn returns a function which constructs a +// DelegationClient instance and returns a new depinject.Config which is +// supplied with the given deps and the new DelegationClient. +func NewSupplyDelegationClientFn() SupplierFn { + return func( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + // Requires a query client to be supplied to the deps + delegationClient, err := delegation.NewDelegationClient(ctx, deps) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(delegationClient)), nil + } +} + // NewSupplyQueryClientContextFn supplies a depinject config with a query // // ClientContext, a GRPC client connection, and a keyring from the given queryNodeGRPCURL. diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index 1dbc61b76..3e80bbfa3 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -178,6 +178,7 @@ func setupRelayerDependencies( config.NewSupplyQueryClientContextFn(queryNodeGRPCUrl), // leaf supplyMiner, // leaf config.NewSupplyTxClientContextFn(queryNodeGRPCUrl, txNodeRPCUrl), // leaf + config.NewSupplyDelegationClientFn(), // leaf config.NewSupplyAccountQuerierFn(), config.NewSupplyApplicationQuerierFn(), config.NewSupplySupplierQuerierFn(), diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index 3b5c69c12..ec9f5a1c4 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -123,6 +123,9 @@ func (rp *relayerProxy) Start(ctx context.Context) error { return err } + // Start the ring cache. + rp.ringCache.Start(ctx) + startGroup, ctx := errgroup.WithContext(ctx) for _, relayServer := range rp.proxyServers { diff --git a/pkg/sdk/deps_builder.go b/pkg/sdk/deps_builder.go index d9113e197..cfd0ff4b3 100644 --- a/pkg/sdk/deps_builder.go +++ b/pkg/sdk/deps_builder.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc/credentials/insecure" block "github.com/pokt-network/poktroll/pkg/client/block" + "github.com/pokt-network/poktroll/pkg/client/delegation" eventsquery "github.com/pokt-network/poktroll/pkg/client/events" "github.com/pokt-network/poktroll/pkg/client/query" "github.com/pokt-network/poktroll/pkg/crypto/rings" @@ -74,7 +75,15 @@ func (sdk *poktrollSDK) buildDeps( } deps = depinject.Configs(deps, depinject.Supply(sessionQuerier)) - // Create and supply the ring cache that depends on application and account queriers + // Create and supply the delegation client + delegationClient, err := delegation.NewDelegationClient(ctx, deps) + if err != nil { + return nil, err + } + deps = depinject.Configs(deps, depinject.Supply(delegationClient)) + + // Create and supply the ring cache that depends on: + // the logger, application and account querier and the delegation client ringCache, err := rings.NewRingCache(deps) if err != nil { return nil, err diff --git a/pkg/sdk/sdk.go b/pkg/sdk/sdk.go index d3c97d867..673d111d3 100644 --- a/pkg/sdk/sdk.go +++ b/pkg/sdk/sdk.go @@ -66,6 +66,7 @@ type poktrollSDK struct { supplierAccountCache map[string]cryptotypes.PubKey } +// NewPOKTRollSDK creates a new POKTRollSDK instance with the given configuration. func NewPOKTRollSDK(ctx context.Context, config *POKTRollSDKConfig) (POKTRollSDK, error) { sdk := &poktrollSDK{ config: config, @@ -101,5 +102,9 @@ func NewPOKTRollSDK(ctx context.Context, config *POKTRollSDKConfig) (POKTRollSDK return nil, fmt.Errorf("failed to decode private key: %w", err) } + // Start the ring cache, when the context is canceled, the ring cache + // will stop. And clear any cached rings. + sdk.ringCache.Start(ctx) + return sdk, nil } diff --git a/proto/pocket/application/event.proto b/proto/pocket/application/event.proto index 624b119d9..619507cab 100644 --- a/proto/pocket/application/event.proto +++ b/proto/pocket/application/event.proto @@ -1,14 +1,15 @@ syntax = "proto3"; package pocket.application; -option go_package = "github.com/pokt-network/poktroll/x/application/types"; - import "cosmos_proto/cosmos.proto"; +import "gogoproto/gogo.proto"; + +option go_package = "github.com/pokt-network/poktroll/x/application/types"; // EventRedelegation is an event emitted whenever an application changes its // delegatee gateways on chain. This is in response to both a DelegateToGateway // and UndelegateFromGateway message. message EventRedelegation { - string app_address = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"]; // The Bech32 address of the application, using cosmos' ScalarDescriptor to ensure deterministic encoding - string gateway_address = 2 [(cosmos_proto.scalar) = "cosmos.AddressString"]; // The Bech32 address of the gateway the application has changed their delegation of, using cosmos' ScalarDescriptor to ensure deterministic encoding + string app_address = 1 [(cosmos_proto.scalar) = "cosmos.AddressString", (gogoproto.jsontag) = "app_address"]; // The Bech32 address of the application, using cosmos' ScalarDescriptor to ensure deterministic encoding + string gateway_address = 2 [(cosmos_proto.scalar) = "cosmos.AddressString", (gogoproto.jsontag) = "gateway_address"]; // The Bech32 address of the gateway the application has changed their delegation of, using cosmos' ScalarDescriptor to ensure deterministic encoding } diff --git a/testutil/network/basenet/accounts.go b/testutil/network/basenet/accounts.go index 663069304..53f9b0193 100644 --- a/testutil/network/basenet/accounts.go +++ b/testutil/network/basenet/accounts.go @@ -10,6 +10,7 @@ import ( "github.com/cosmos/cosmos-sdk/crypto/keyring" testcli "github.com/cosmos/cosmos-sdk/testutil/cli" "github.com/cosmos/cosmos-sdk/types" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/stretchr/testify/require" "github.com/pokt-network/poktroll/testutil/network" @@ -54,7 +55,7 @@ func (memnet *BaseInMemoryNetwork) CreateKeyringAccounts(t *testing.T) { func (memnet *BaseInMemoryNetwork) FundOnChainAccounts(t *testing.T) { t.Helper() - // NB: while it may initially seem like the memnet#InitAccountsWithSequence() methods + // NB: while it may initially seem like the memnet#FundAccounts() methods // can be refactored into a generic function, this is not possible so long as the genesis // state lists are passed directly & remain a slice of concrete types (as opposed to pointers). // Under these conditions, a generic function would not be able to unmarshal the genesis state @@ -106,24 +107,21 @@ func (memnet *BaseInMemoryNetwork) FundAddress( t.Helper() signerAccountNumber := 0 - // Validator's client context MUST be used for this CLI command because its keyring includes the validator's key - clientCtx := memnet.Network.Validators[0].ClientCtx - // MUST NOT use memnet.GetClientCtx(t) as its keyring does not include the validator's account - // TODO_UPNEXT(@bryanchriswhite): Ensure validator key is always available via the in-memory network's keyring. + clientCtx := memnet.GetClientCtx(t) net := memnet.GetNetwork(t) val := net.Validators[0] args := []string{ fmt.Sprintf("--%s=true", flags.FlagOffline), fmt.Sprintf("--%s=%d", flags.FlagAccountNumber, signerAccountNumber), - fmt.Sprintf("--%s=%d", flags.FlagSequence, memnet.NextAccountSequenceNumber(t)), + fmt.Sprintf("--%s=%d", flags.FlagSequence, memnet.NextValidatorTxSequenceNumber(t)), fmt.Sprintf("--%s=%s", flags.FlagFrom, val.Address.String()), fmt.Sprintf("--%s=true", flags.FlagSkipConfirmation), fmt.Sprintf("--%s=%s", flags.FlagBroadcastMode, flags.BroadcastSync), - fmt.Sprintf("--%s=%s", flags.FlagFees, types.NewCoins(types.NewCoin(net.Config.BondDenom, math.NewInt(10))).String()), + fmt.Sprintf("--%s=%s", flags.FlagFees, memnet.NewBondDenomCoins(t, 10).String()), } - amount := types.NewCoins(types.NewCoin("stake", math.NewInt(200))) + amount := memnet.NewBondDenomCoins(t, 200) responseRaw, err := testcli.MsgSendExec(clientCtx, val.Address, addr, amount, args...) require.NoError(t, err) var responseJSON map[string]interface{} @@ -213,3 +211,9 @@ func (memnet *BaseInMemoryNetwork) CreateNewOnChainAccount(t *testing.T) *testke return preGeneratedAcct } + +func (memnet *BaseInMemoryNetwork) NewBondDenomCoins(t *testing.T, numCoins int64) sdk.Coins { + t.Helper() + + return sdk.NewCoins(sdk.NewCoin(memnet.GetNetwork(t).Config.BondDenom, math.NewInt(numCoins))) +} diff --git a/testutil/network/basenet/network.go b/testutil/network/basenet/network.go index 327f8e1ed..fb9fc6918 100644 --- a/testutil/network/basenet/network.go +++ b/testutil/network/basenet/network.go @@ -16,16 +16,18 @@ import ( var _ network.InMemoryNetwork = (*BaseInMemoryNetwork)(nil) // BaseInMemoryNetwork is an "abstract" (i.e. partial) implementation, intended -// to be embedded by other ("concrete") InMemoryNetwork implementations. +// to consolidate shared behavior between multiple ("concrete") InMemoryNetwork +// implementations. These shared behaviors (methods) are accessible to any concrete +// implementation which embeds BaseInMemoryNetwork. type BaseInMemoryNetwork struct { Config network.InMemoryNetworkConfig PreGeneratedAccountIterator *testkeyring.PreGeneratedAccountIterator - Network *sdknetwork.Network + CosmosNetwork *sdknetwork.Network - // lastAccountSeqNumber stores the last (most recently generated) account sequence number. + // lastValidatorSeqNumber stores the last (most recently generated) account sequence number. // NB: explicitly NOT using atomic.Int32 as it's usage doesn't compose well with anonymous // literal declarations. - lastAccountSeqNumber int32 + lastValidatorSeqNumber int32 } // NewBaseInMemoryNetwork creates a new BaseInMemoryNetwork with the given @@ -43,23 +45,23 @@ func NewBaseInMemoryNetwork( PreGeneratedAccountIterator: preGeneratedAccounts, // First functional account sequence number is 1. Starting at 0 so that - // callers can always use NextAccountSequenceNumber() (no boundary condition). - lastAccountSeqNumber: int32(0), + // callers can always use NextValidatorTxSequenceNumber() (no boundary condition). + lastValidatorSeqNumber: int32(0), } } // InitializeDefaults sets the underlying cosmos-sdk testutil network config to // a reasonable default in case one was not provided with the InMemoryNetworkConfig. func (memnet *BaseInMemoryNetwork) InitializeDefaults(t *testing.T) { - if memnet.Config.CosmosCfg == nil { - t.Log("Cosmos config not initialized, using default config") - - // Initialize a network config. - cfg := network.DefaultConfig() - memnet.Config.CosmosCfg = &cfg - } else { + if memnet.Config.CosmosCfg != nil { t.Log("Cosmos config already initialized, using existing config") + return } + + t.Log("Cosmos config not initialized, using default config") + // Initialize a network config. + cfg := network.DefaultConfig() + memnet.Config.CosmosCfg = &cfg } // GetClientCtx returns the underlying cosmos-sdk testutil network's client context. @@ -99,24 +101,24 @@ func (memnet *BaseInMemoryNetwork) GetCosmosNetworkConfig(t *testing.T) *sdknetw func (memnet *BaseInMemoryNetwork) GetNetwork(t *testing.T) *sdknetwork.Network { t.Helper() - require.NotEmptyf(t, memnet.Network, "in-memory cosmos network not set") - return memnet.Network + require.NotEmptyf(t, memnet.CosmosNetwork, "in-memory cosmos network not set") + return memnet.CosmosNetwork } -// GetLastAccountSequenceNumber returns the last (most recently generated) account sequence number. +// GetLastValidatorTxSequenceNumber returns the last (most recently generated) account sequence number. // It is safe for concurrent use. -func (memnet *BaseInMemoryNetwork) GetLastAccountSequenceNumber(t *testing.T) int { +func (memnet *BaseInMemoryNetwork) GetLastValidatorTxSequenceNumber(t *testing.T) int { t.Helper() - return int(atomic.LoadInt32(&memnet.lastAccountSeqNumber)) + return int(atomic.LoadInt32(&memnet.lastValidatorSeqNumber)) } -// NextAccountSequenceNumber increments the account sequence number and returns the new value. +// NextValidatorTxSequenceNumber increments the account sequence number and returns the new value. // It is safe for concurrent use. -func (memnet *BaseInMemoryNetwork) NextAccountSequenceNumber(t *testing.T) int { +func (memnet *BaseInMemoryNetwork) NextValidatorTxSequenceNumber(t *testing.T) int { t.Helper() - return int(atomic.AddInt32(&memnet.lastAccountSeqNumber, 1)) + return int(atomic.AddInt32(&memnet.lastValidatorSeqNumber, 1)) } // Start is a stub which is expected to be implemented by "concrete" InMemoryNetwork @@ -124,5 +126,5 @@ func (memnet *BaseInMemoryNetwork) NextAccountSequenceNumber(t *testing.T) int { // it is too general to define this behavior, leaving it up to embedders. As a result, // this function panics if it is called. func (memnet *BaseInMemoryNetwork) Start(_ context.Context, t *testing.T) { - panic("not implemented") + panic("must be implemented by struct embedders") } diff --git a/testutil/network/config.go b/testutil/network/config.go index f3446e928..e1333a278 100644 --- a/testutil/network/config.go +++ b/testutil/network/config.go @@ -42,7 +42,8 @@ func (cfg *InMemoryNetworkConfig) GetNumKeyringAccounts(t *testing.T) int { } // DefaultConfig will initialize config for the network with custom application, -// genesis and single validator. All other parameters are inherited from cosmos-sdk/testutil/network.DefaultConfig +// genesis and single validator. +// All other parameters are inherited from cosmos-sdk/testutil/network.DefaultConfig. func DefaultConfig() network.Config { var ( encoding = app.MakeEncodingConfig() diff --git a/testutil/network/gatewaynet/network.go b/testutil/network/gatewaynet/network.go index 80ad703ac..8a09e2f58 100644 --- a/testutil/network/gatewaynet/network.go +++ b/testutil/network/gatewaynet/network.go @@ -5,7 +5,6 @@ import ( "fmt" "testing" - "cosmossdk.io/math" "github.com/cosmos/cosmos-sdk/client/flags" testcli "github.com/cosmos/cosmos-sdk/testutil/cli" sdk "github.com/cosmos/cosmos-sdk/types" @@ -24,12 +23,11 @@ var _ network.InMemoryNetwork = (*inMemoryNetworkWithGateways)(nil) // inMemoryNetworkWithGateways is an implementation of the InMemoryNetwork interface. type inMemoryNetworkWithGateways struct { - //baseInMemoryNetwork basenet.BaseInMemoryNetwork basenet.BaseInMemoryNetwork } // DefaultInMemoryNetworkConfig returns the default in-memory network configuration. -// This configuration should sufficient populate on-chain objects to support reasonable +// This configuration should sufficiently populate on-chain objects to support reasonable // coverage around most session-oriented scenarios. func DefaultInMemoryNetworkConfig(t *testing.T) *network.InMemoryNetworkConfig { t.Helper() @@ -55,17 +53,17 @@ func NewInMemoryNetworkWithGateways(t *testing.T, cfg *network.InMemoryNetworkCo // DelegateAppToGateway delegates the application by address to the gateway by address. func (memnet *inMemoryNetworkWithGateways) DelegateAppToGateway( t *testing.T, - appBech32 string, - gatewayBech32 string, + appAddrBech32 string, + gatewayAddrBech32 string, ) { t.Helper() args := []string{ - gatewayBech32, - fmt.Sprintf("--%s=%s", flags.FlagFrom, appBech32), + gatewayAddrBech32, + fmt.Sprintf("--%s=%s", flags.FlagFrom, appAddrBech32), fmt.Sprintf("--%s=true", flags.FlagSkipConfirmation), fmt.Sprintf("--%s=%s", flags.FlagBroadcastMode, flags.BroadcastSync), - fmt.Sprintf("--%s=%s", flags.FlagFees, sdk.NewCoins(sdk.NewCoin(memnet.GetNetwork(t).Config.BondDenom, math.NewInt(10))).String()), + fmt.Sprintf("--%s=%s", flags.FlagFees, memnet.NewBondDenomCoins(t, 10).String()), } responseRaw, err := testcli.ExecTestCLICmd(memnet.GetClientCtx(t), cli.CmdDelegateToGateway(), args) require.NoError(t, err) @@ -88,7 +86,7 @@ func (memnet *inMemoryNetworkWithGateways) UndelegateAppFromGateway( fmt.Sprintf("--%s=%s", flags.FlagFrom, appBech32), fmt.Sprintf("--%s=true", flags.FlagSkipConfirmation), fmt.Sprintf("--%s=%s", flags.FlagBroadcastMode, flags.BroadcastSync), - fmt.Sprintf("--%s=%s", flags.FlagFees, sdk.NewCoins(sdk.NewCoin(memnet.GetCosmosNetworkConfig(t).BondDenom, math.NewInt(10))).String()), + fmt.Sprintf("--%s=%s", flags.FlagFees, memnet.NewBondDenomCoins(t, 10).String()), } responseRaw, err := testcli.ExecTestCLICmd(memnet.GetClientCtx(t), cli.CmdUndelegateFromGateway(), args) require.NoError(t, err) @@ -112,8 +110,8 @@ func (memnet *inMemoryNetworkWithGateways) UndelegateAppFromGateway( func (memnet *inMemoryNetworkWithGateways) Start(_ context.Context, t *testing.T) { t.Helper() - // Application module genesis state fixture data generation is independent - // of that of the supplier module. + // The number of applications is NOT a function of the number of suppliers. + // `AppSupplierPairingRatio` SHOULD NOT be used by this network implementation. if memnet.Config.AppSupplierPairingRatio > 0 { panic("AppSupplierPairingRatio must be 0 for inMemoryNetworkWithGateways, use NumApplications instead") } @@ -125,16 +123,14 @@ func (memnet *inMemoryNetworkWithGateways) Start(_ context.Context, t *testing.T memnet.configureGatewayModuleGenesisState(t) memnet.configureAppModuleGenesisState(t) - memnet.Network = network.New(t, *memnet.GetCosmosNetworkConfig(t)) + memnet.CosmosNetwork = network.New(t, *memnet.GetCosmosNetworkConfig(t)) memnet.FundOnChainAccounts(t) } // configureGatewayModuleGenesisState generates and populates the in-memory network's -// -// application module's GenesisState object with the number of applications specified -// by the InMemoryConfig, each of which is staked for a unique service. It returns -// the genesis state object. +// gateway module's GenesisState object with the number of gateways specified by the +// InMemoryConfig. func (memnet *inMemoryNetworkWithGateways) configureGatewayModuleGenesisState(t *testing.T) { t.Helper() @@ -143,7 +139,6 @@ func (memnet *inMemoryNetworkWithGateways) configureGatewayModuleGenesisState(t stake := sdk.NewCoin("upokt", sdk.NewInt(10000)) preGeneratedAcct, ok := memnet.PreGeneratedAccountIterator.Next() require.Truef(t, ok, "pre-generated accounts iterator exhausted") - require.Truef(t, ok, "pre-generated accounts iterator exhausted") gateway := gatewaytypes.Gateway{ Address: preGeneratedAcct.Address.String(), @@ -156,10 +151,14 @@ func (memnet *inMemoryNetworkWithGateways) configureGatewayModuleGenesisState(t gatewayGenesisBuffer, err := memnet.GetCosmosNetworkConfig(t).Codec.MarshalJSON(gatewayGenesisState) require.NoError(t, err) - // Add supplier module genesis supplierGenesisState to the network config. + // Add gateway module genesis state to the network config. memnet.GetCosmosNetworkConfig(t).GenesisState[gatewaytypes.ModuleName] = gatewayGenesisBuffer } +// configureAppModuleGenesisState generates and populates the in-memory network's +// application module's GenesisState object with the number of applications specified +// by the InMemoryConfig, each of which is staked for a unique service. It returns +// the genesis state object. func (memnet *inMemoryNetworkWithGateways) configureAppModuleGenesisState(t *testing.T) { t.Helper() @@ -185,6 +184,6 @@ func (memnet *inMemoryNetworkWithGateways) configureAppModuleGenesisState(t *tes appGenesisBuffer, err := memnet.Config.CosmosCfg.Codec.MarshalJSON(appGenesisState) require.NoError(t, err) - // Add supplier and application module genesis appGenesisState to the network memnetConfig. + // Add application module genesis state to the network memnet cosmos config. memnet.GetCosmosNetworkConfig(t).GenesisState[apptypes.ModuleName] = appGenesisBuffer } diff --git a/testutil/network/genesis.go b/testutil/network/genesis.go index 2c9053012..d6ec94d74 100644 --- a/testutil/network/genesis.go +++ b/testutil/network/genesis.go @@ -17,7 +17,7 @@ func GetGenesisState[T proto.Message](t *testing.T, moduleName string, memnet In var genesisState T // NB: As this function is generic, it MUST use reflect in order to unmarshal - // the genesis state as the codec requries a reference to a concrete type pointer. + // the genesis state as the codec requires a reference to a concrete type pointer. genesisStateType := reflect.TypeOf(genesisState) genesisStateValue := reflect.New(genesisStateType.Elem()) genesisStatePtr := genesisStateValue.Interface().(proto.Message) diff --git a/testutil/network/interface.go b/testutil/network/interface.go index 85c1f8124..fa2c90eef 100644 --- a/testutil/network/interface.go +++ b/testutil/network/interface.go @@ -9,7 +9,7 @@ import ( ) // InMemoryNetwork encapsulates the cosmos-sdk testutil network instance and the -// esponsibility of initializing it, along with (optional) additional/ setup, in +// responsibility of initializing it, along with (optional) additional/ setup, in // #Start(). It also provides access to additional cosmos-sdk testutil network // internals via corresponding methods. type InMemoryNetwork interface { diff --git a/testutil/network/sessionnet/claims.go b/testutil/network/sessionnet/claims.go index d5531ce77..a7e7a0c68 100644 --- a/testutil/network/sessionnet/claims.go +++ b/testutil/network/sessionnet/claims.go @@ -5,10 +5,8 @@ import ( "fmt" "testing" - "cosmossdk.io/math" "github.com/cosmos/cosmos-sdk/client/flags" testcli "github.com/cosmos/cosmos-sdk/testutil/cli" - sdk "github.com/cosmos/cosmos-sdk/types" "github.com/stretchr/testify/require" "github.com/pokt-network/poktroll/pkg/relayer" @@ -65,9 +63,6 @@ func (memnet *inMemoryNetworkWithSessions) CreateClaim( ) (*suppliertypes.Claim, relayer.SessionTree) { t.Helper() - clientCtx := memnet.GetClientCtx(t) - net := memnet.GetNetwork(t) - // Create a new session tree with NumRelaysPerSession number of relay nodes inserted. // Base64-encode it's root hash for use with the CLI command. sessionTree := newSessionTreeRoot(t, memnet.Config.NumRelaysPerSession, sessionHeader) @@ -81,9 +76,10 @@ func (memnet *inMemoryNetworkWithSessions) CreateClaim( fmt.Sprintf("--%s=%s", flags.FlagFrom, supplierAddr), fmt.Sprintf("--%s=true", flags.FlagSkipConfirmation), fmt.Sprintf("--%s=%s", flags.FlagBroadcastMode, flags.BroadcastSync), - fmt.Sprintf("--%s=%s", flags.FlagFees, sdk.NewCoins(sdk.NewCoin(net.Config.BondDenom, math.NewInt(10))).String()), + fmt.Sprintf("--%s=%s", flags.FlagFees, memnet.NewBondDenomCoins(t, 10).String()), } + clientCtx := memnet.GetClientCtx(t) responseRaw, err := testcli.ExecTestCLICmd(clientCtx, cli.CmdCreateClaim(), args) require.NoError(t, err) var responseJson map[string]interface{} diff --git a/testutil/network/sessionnet/genesis.go b/testutil/network/sessionnet/genesis.go index 616cb4bea..f387a2a56 100644 --- a/testutil/network/sessionnet/genesis.go +++ b/testutil/network/sessionnet/genesis.go @@ -51,18 +51,17 @@ func (memnet *inMemoryNetworkWithSessions) configureSupplierModuleGenesisState(t supplierGenesisBuffer, err := memnet.GetCosmosNetworkConfig(t).Codec.MarshalJSON(supplierGenesisState) require.NoError(t, err) - // Add supplier module genesis supplierGenesisState to the network config. + // Add supplier module genesis state to the network config. memnet.GetCosmosNetworkConfig(t).GenesisState[suppliertypes.ModuleName] = supplierGenesisBuffer return supplierGenesisState } // configureApplicationModuleGenesisState generates and populates the in-memory network's -// applicaion module's GenesisState object with a given number of applications, -// each of which is staked for a service such that -// memnet.Config.AppSupplierPairingRatio*NumSuppliers number of applications are staked -// for each genesis supplier's service (assumes that each supplier is staked for a unique -// service with no overlap). +// application module's GenesisState object with a given number of applications, each of +// which is staked for a service such that memnet.Config.AppSupplierPairingRatio*NumSuppliers +// number of applications are staked for each genesis supplier's service (assumes that each +// supplier is staked for a unique service with no overlap). func (memnet *inMemoryNetworkWithSessions) configureAppModuleGenesisState(t *testing.T) *apptypes.GenesisState { t.Helper() @@ -99,7 +98,7 @@ func (memnet *inMemoryNetworkWithSessions) configureAppModuleGenesisState(t *tes appGenesisBuffer, err := memnet.Config.CosmosCfg.Codec.MarshalJSON(appGenesisState) require.NoError(t, err) - // Add supplier and application module genesis appGenesisState to the network memnetConfig. + // Add supplier and application module genesis appGenesisState to the network memnet cosmos config. memnet.GetCosmosNetworkConfig(t).GenesisState[apptypes.ModuleName] = appGenesisBuffer return appGenesisState diff --git a/testutil/network/sessionnet/network.go b/testutil/network/sessionnet/network.go index 997f5d65f..e661016a8 100644 --- a/testutil/network/sessionnet/network.go +++ b/testutil/network/sessionnet/network.go @@ -77,8 +77,8 @@ func (memnet *inMemoryNetworkWithSessions) Start(_ context.Context, t *testing.T memnet.configureAppModuleGenesisState(t) memnet.configureSupplierModuleGenesisState(t) - memnet.Network = network.New(t, *memnet.GetCosmosNetworkConfig(t)) - err := memnet.Network.WaitForNextBlock() + memnet.CosmosNetwork = network.New(t, *memnet.GetCosmosNetworkConfig(t)) + err := memnet.CosmosNetwork.WaitForNextBlock() require.NoError(t, err) memnet.FundOnChainAccounts(t) diff --git a/testutil/network/sessionnet/proofs.go b/testutil/network/sessionnet/proofs.go index dbaf55e33..83d7419e9 100644 --- a/testutil/network/sessionnet/proofs.go +++ b/testutil/network/sessionnet/proofs.go @@ -6,10 +6,8 @@ import ( "fmt" "testing" - "cosmossdk.io/math" "github.com/cosmos/cosmos-sdk/client/flags" testcli "github.com/cosmos/cosmos-sdk/testutil/cli" - sdk "github.com/cosmos/cosmos-sdk/types" "github.com/stretchr/testify/require" "github.com/pokt-network/poktroll/pkg/relayer" @@ -58,15 +56,13 @@ func (memnet *inMemoryNetworkWithSessions) SubmitProof( sessionHeaderEncoded := cliEncodeSessionHeader(t, claim.GetSessionHeader()) closestMerkleProofEncoded := base64.StdEncoding.EncodeToString(closestMerkleProofBz) - - bondDenom := memnet.GetNetwork(t).Config.BondDenom args := []string{ sessionHeaderEncoded, closestMerkleProofEncoded, fmt.Sprintf("--%s=%s", flags.FlagFrom, claim.GetSupplierAddress()), fmt.Sprintf("--%s=true", flags.FlagSkipConfirmation), fmt.Sprintf("--%s=%s", flags.FlagBroadcastMode, flags.BroadcastSync), - fmt.Sprintf("--%s=%s", flags.FlagFees, sdk.NewCoins(sdk.NewCoin(bondDenom, math.NewInt(10))).String()), + fmt.Sprintf("--%s=%s", flags.FlagFees, memnet.NewBondDenomCoins(t, 10).String()), } ctx := memnet.GetClientCtx(t) diff --git a/testutil/network/types.go b/testutil/network/types.go index 77115384b..368f3c485 100644 --- a/testutil/network/types.go +++ b/testutil/network/types.go @@ -37,12 +37,20 @@ type InMemoryNetworkConfig struct { // NumApplications is the number of applications that should be created at genesis. // Usage is mutually exclusive with AppSupplierPairingRatio. This is enforced by // InMemoryNetwork implementations. + // + // NOTE: Use #GetNumApplications() to compute the correct value, regardless of + // which field is used; in-memory network implementations SHOULD NOT modify config + // fields. #GetNumApplications() is intended to be the single source of truth. NumApplications int // AppSupplierPairingRatio is the number of applications, per supplier, that // share a serviceId (i.e. will be in the same session). // Usage is mutually exclusive with NumApplications. This is enforced by // InMemoryNetwork implementations. + // + // NOTE: Use #GetNumApplications() to compute the correct value, regardless of + // which field is used; in-memory network implementations SHOULD NOT modify config + // fields. #GetNumApplications() is intended to be the single source of truth. AppSupplierPairingRatio int // CosmosCfg is the configuration for the underlying cosmos-sdk testutil network. diff --git a/testutil/testclient/testdelegation/client.go b/testutil/testclient/testdelegation/client.go index a055579df..4d490e376 100644 --- a/testutil/testclient/testdelegation/client.go +++ b/testutil/testclient/testdelegation/client.go @@ -2,6 +2,7 @@ package testdelegation import ( "context" + "fmt" "testing" "cosmossdk.io/depinject" @@ -35,22 +36,22 @@ func NewLocalnetClient(ctx context.Context, t *testing.T) client.DelegationClien // This mock DelegationClient will expect any number of calls to RedelegationsSequence, // and when that call is made, it returns the given EventsObservable[Redelegation]. func NewAnyTimesRedelegationsSequence( + ctx context.Context, t *testing.T, + appAddress string, redelegationObs observable.Observable[client.Redelegation], ) *mockclient.MockDelegationClient { t.Helper() // Create a mock for the delegation client which expects the // LastNRedelegations method to be called any number of times. - delegationClientMock := NewAnyTimeLastNRedelegationsClient(t, "") + delegationClientMock := NewAnyTimeLastNRedelegationsClient(t, appAddress) // Set up the mock expectation for the RedelegationsSequence method. When // the method is called, it returns a new replay observable that publishes // redelegation events sent on the given redelegationObs. delegationClientMock.EXPECT(). - RedelegationsSequence( - gomock.AssignableToTypeOf(context.Background()), - ). + RedelegationsSequence(ctx). Return(redelegationObs). AnyTimes() @@ -65,6 +66,7 @@ func NewAnyTimesRedelegationsSequence( // redelegationPublishCh is the channel the caller can use to publish // Redelegation events to the observable. func NewOneTimeRedelegationsSequenceDelegationClient( + ctx context.Context, t *testing.T, redelegationPublishCh chan client.Redelegation, ) *mockclient.MockDelegationClient { @@ -77,18 +79,19 @@ func NewOneTimeRedelegationsSequenceDelegationClient( // Set up the mock expectation for the RedelegationsSequence method. When // the method is called, it returns a new replay observable that publishes // delegation changes sent on the given redelegationPublishCh. - delegationClientMock.EXPECT().RedelegationsSequence( - gomock.AssignableToTypeOf(context.Background()), - ).DoAndReturn(func(ctx context.Context) client.RedelegationReplayObservable { - // Create a new replay observable with a replay buffer size of 1. - // Redelegation events are published to this observable via the - // provided redelegationPublishCh. - withPublisherOpt := channel.WithPublisher(redelegationPublishCh) - obs, _ := channel.NewReplayObservable[client.Redelegation]( - ctx, 1, withPublisherOpt, - ) - return obs - }) + delegationClientMock.EXPECT().RedelegationsSequence(ctx). + DoAndReturn(func(ctx context.Context) client.RedelegationReplayObservable { + // Create a new replay observable with a replay buffer size of 1. + // Redelegation events are published to this observable via the + // provided redelegationPublishCh. + withPublisherOpt := channel.WithPublisher(redelegationPublishCh) + obs, _ := channel.NewReplayObservable[client.Redelegation]( + ctx, 1, withPublisherOpt, + ) + return obs + }).Times(1) + + delegationClientMock.EXPECT().Close().AnyTimes() return delegationClientMock } @@ -105,13 +108,14 @@ func NewAnyTimeLastNRedelegationsClient( ctrl := gomock.NewController(t) // Create a mock redelegation that returns the provided appAddress - redelegation := NewAnyTimesRedelegation(t, appAddress) + redelegation := NewAnyTimesRedelegation(t, appAddress, "") // Create a mock delegation client that expects calls to // LastNRedelegations method and returns the mock redelegation. delegationClientMock := mockclient.NewMockDelegationClient(ctrl) delegationClientMock.EXPECT(). LastNRedelegations(gomock.Any(), gomock.Any()). Return([]client.Redelegation{redelegation}).AnyTimes() + delegationClientMock.EXPECT().Close().AnyTimes() return delegationClientMock } @@ -122,6 +126,7 @@ func NewAnyTimeLastNRedelegationsClient( func NewAnyTimesRedelegation( t *testing.T, appAddress string, + gatewayAddress string, ) *mockclient.MockRedelegation { t.Helper() ctrl := gomock.NewController(t) @@ -129,6 +134,21 @@ func NewAnyTimesRedelegation( // Create a mock redelegation that returns the provided address AnyTimes. redelegation := mockclient.NewMockRedelegation(ctrl) redelegation.EXPECT().GetAppAddress().Return(appAddress).AnyTimes() + redelegation.EXPECT().GetGatewayAddress().Return(gatewayAddress).AnyTimes() return redelegation } + +// NewRedelegationEventBytes returns a byte slice containing a JSON string +// that mocks the event bytes returned from the events query client for a +// Redelegation event. +func NewRedelegationEventBytes( + t *testing.T, + appAddress string, + gatewayAddress string, +) []byte { + t.Helper() + jsonTemplate := `{"tx":"SGVsbG8sIHdvcmxkIQ==","result":{"events":[{"type":"message","attributes":[{"key":"action","value":"/pocket.application.MsgDelegateToGateway"},{"key":"sender","value":"pokt1exampleaddress"},{"key":"module","value":"application"}]},{"type":"pocket.application.EventRedelegation","attributes":[{"key":"app_address","value":"\"%s\""},{"key":"gateway_address","value":"\"%s\""}]}]}}` + json := fmt.Sprintf(jsonTemplate, appAddress, gatewayAddress) + return []byte(json) +} diff --git a/testutil/testcrypto/rings/cache.go b/testutil/testcrypto/rings/cache.go index f0b9a665c..ef25eb21b 100644 --- a/testutil/testcrypto/rings/cache.go +++ b/testutil/testcrypto/rings/cache.go @@ -1,6 +1,7 @@ package testrings import ( + "context" "testing" "cosmossdk.io/depinject" @@ -8,11 +9,14 @@ import ( "github.com/pokt-network/poktroll/pkg/crypto" "github.com/pokt-network/poktroll/pkg/crypto/rings" + "github.com/pokt-network/poktroll/pkg/polylog" "github.com/pokt-network/poktroll/testutil/mockclient" ) -// NewRingCacheWithMockQueriers creates a new "real" RingCache with the given -// mock Account and Application queriers supplied as dependencies. +// NewRingCacheWithMockDependencies creates a new "real" RingCache with the given +// mock Account and Application queriers supplied as dependencies. A Delegation +// client is required as a dependency and depending on how it is used will +// require a different function to generate the delegations client. // The queriers are expected to maintain their respective mocked states: // - Account querier: the account addresses and public keys // - Application querier: the application addresses delegatee gateway addresses @@ -21,17 +25,21 @@ import ( // // testutil/testclient/testqueryclients/accquerier.go // testutil/testclient/testqueryclients/appquerier.go +// testutil/testclient/testdelegation/client.go // // for methods to create these queriers and maintain their states. -func NewRingCacheWithMockQueriers( +func NewRingCacheWithMockDependencies( + ctx context.Context, t *testing.T, accQuerier *mockclient.MockAccountQueryClient, appQuerier *mockclient.MockApplicationQueryClient, + delegationClient *mockclient.MockDelegationClient, ) crypto.RingCache { t.Helper() // Create the dependency injector with the mock queriers - deps := depinject.Supply(accQuerier, appQuerier) + logger := polylog.Ctx(ctx) + deps := depinject.Supply(logger, accQuerier, appQuerier, delegationClient) ringCache, err := rings.NewRingCache(deps) require.NoError(t, err) diff --git a/testutil/testproxy/relayerproxy.go b/testutil/testproxy/relayerproxy.go index 96768aabb..80cd141fd 100644 --- a/testutil/testproxy/relayerproxy.go +++ b/testutil/testproxy/relayerproxy.go @@ -19,13 +19,16 @@ import ( "github.com/noot/ring-go" "github.com/stretchr/testify/require" - "github.com/pokt-network/poktroll/pkg/crypto/rings" + "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/pkg/polylog" "github.com/pokt-network/poktroll/pkg/relayer/config" "github.com/pokt-network/poktroll/pkg/signer" "github.com/pokt-network/poktroll/testutil/testclient/testblock" + "github.com/pokt-network/poktroll/testutil/testclient/testdelegation" testkeyring "github.com/pokt-network/poktroll/testutil/testclient/testkeyring" "github.com/pokt-network/poktroll/testutil/testclient/testqueryclients" + testrings "github.com/pokt-network/poktroll/testutil/testcrypto/rings" servicetypes "github.com/pokt-network/poktroll/x/service/types" sessionkeeper "github.com/pokt-network/poktroll/x/session/keeper" sessiontypes "github.com/pokt-network/poktroll/x/session/types" @@ -86,18 +89,19 @@ func WithRelayerProxyDependencies(keyName string) func(*TestBehavior) { blockClient := testblock.NewAnyTimeLastNBlocksBlockClient(test.t, []byte{}, 1) keyring, _ := testkeyring.NewTestKeyringWithKey(test.t, keyName) - ringDeps := depinject.Supply(accountQueryClient, applicationQueryClient) - ringCache, err := rings.NewRingCache(ringDeps) - require.NoError(test.t, err) + redelegationObs, _ := channel.NewReplayObservable[client.Redelegation](test.ctx, 1) + delegationClient := testdelegation.NewAnyTimesRedelegationsSequence(test.ctx, test.t, "", redelegationObs) + ringCache := testrings.NewRingCacheWithMockDependencies(test.ctx, test.t, accountQueryClient, applicationQueryClient, delegationClient) - deps := depinject.Configs(ringDeps, depinject.Supply( + deps := depinject.Supply( logger, + accountQueryClient, ringCache, blockClient, sessionQueryClient, supplierQueryClient, keyring, - )) + ) test.Deps = deps } diff --git a/x/application/keeper/msg_server_delegate_to_gateway.go b/x/application/keeper/msg_server_delegate_to_gateway.go index c2ecd82a0..11dea867f 100644 --- a/x/application/keeper/msg_server_delegate_to_gateway.go +++ b/x/application/keeper/msg_server_delegate_to_gateway.go @@ -58,8 +58,10 @@ func (k msgServer) DelegateToGateway(goCtx context.Context, msg *types.MsgDelega k.SetApplication(ctx, app) logger.Info(fmt.Sprintf("Successfully delegated application to gateway for app: %+v", app)) - // Emit the application redelegation change event - if err := ctx.EventManager().EmitTypedEvent(msg.NewRedelegationEvent()); err != nil { + // Emit the application redelegation event + event := msg.NewRedelegationEvent() + logger.Info(fmt.Sprintf("Emitting application redelegation event %v", event)) + if err := ctx.EventManager().EmitTypedEvent(event); err != nil { logger.Error(fmt.Sprintf("Failed to emit application redelegation event: %v", err)) return nil, err } diff --git a/x/application/keeper/msg_server_undelegate_from_gateway.go b/x/application/keeper/msg_server_undelegate_from_gateway.go index 2ebc30416..5c755e648 100644 --- a/x/application/keeper/msg_server_undelegate_from_gateway.go +++ b/x/application/keeper/msg_server_undelegate_from_gateway.go @@ -52,7 +52,9 @@ func (k msgServer) UndelegateFromGateway( logger.Info(fmt.Sprintf("Successfully undelegated application from gateway for app: %+v", app)) // Emit the application redelegation event - if err := ctx.EventManager().EmitTypedEvent(msg.NewRedelegationEvent()); err != nil { + event := msg.NewRedelegationEvent() + logger.Info(fmt.Sprintf("Emitting application redelegation event %v", event)) + if err := ctx.EventManager().EmitTypedEvent(event); err != nil { logger.Error(fmt.Sprintf("Failed to emit application redelegation event: %v", err)) return nil, err } diff --git a/x/gateway/client/cli/tx_stake_gateway.go b/x/gateway/client/cli/tx_stake_gateway.go index 3c4d29a30..3369f3885 100644 --- a/x/gateway/client/cli/tx_stake_gateway.go +++ b/x/gateway/client/cli/tx_stake_gateway.go @@ -1,41 +1,51 @@ package cli import ( + "os" "strconv" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/flags" "github.com/cosmos/cosmos-sdk/client/tx" - sdk "github.com/cosmos/cosmos-sdk/types" "github.com/spf13/cobra" + "github.com/pokt-network/poktroll/x/gateway/client/config" "github.com/pokt-network/poktroll/x/gateway/types" ) -var _ = strconv.Itoa(0) +var ( + flagStakeConfig string + _ = strconv.Itoa(0) +) func CmdStakeGateway() *cobra.Command { cmd := &cobra.Command{ - Use: "stake-gateway ", + Use: "stake-gateway --config ", Short: "Stake a gateway", Long: `Stake a gateway with the provided parameters. This is a broadcast operation that will stake the tokens and associate them with the gateway specified by the 'from' address. Example: -$ poktrolld --home=$(POKTROLLD_HOME) tx gateway stake-gateway 1000upokt --keyring-backend test --from $(GATEWAY) --node $(POCKET_NODE)`, - Args: cobra.ExactArgs(1), - RunE: func(cmd *cobra.Command, args []string) (err error) { - clientCtx, err := client.GetClientTxContext(cmd) +$ poktrolld --home=$(POKTROLLD_HOME) tx gateway stake-gateway --config stake_config.yaml --keyring-backend test --from $(GATEWAY) --node $(POCKET_NODE)`, + Args: cobra.ExactArgs(0), + RunE: func(cmd *cobra.Command, _ []string) (err error) { + configContent, err := os.ReadFile(flagStakeConfig) if err != nil { return err } - stakeString := args[0] - stake, err := sdk.ParseCoinNormalized(stakeString) + + gatewayStakeConfig, err := config.ParseGatewayConfig(configContent) if err != nil { return err } + + clientCtx, err := client.GetClientTxContext(cmd) + if err != nil { + return err + } + msg := types.NewMsgStakeGateway( clientCtx.GetFromAddress().String(), - stake, + gatewayStakeConfig.StakeAmount, ) if err := msg.ValidateBasic(); err != nil { return err @@ -44,6 +54,7 @@ $ poktrolld --home=$(POKTROLLD_HOME) tx gateway stake-gateway 1000upokt --keyrin }, } + cmd.Flags().StringVar(&flagStakeConfig, "config", "", "Path to the stake config file") flags.AddTxFlagsToCmd(cmd) return cmd diff --git a/x/gateway/client/cli/tx_stake_gateway_test.go b/x/gateway/client/cli/tx_stake_gateway_test.go index c3aa766d4..f4d5b2c28 100644 --- a/x/gateway/client/cli/tx_stake_gateway_test.go +++ b/x/gateway/client/cli/tx_stake_gateway_test.go @@ -3,11 +3,13 @@ package cli_test import ( "context" "fmt" + "os" "testing" sdkerrors "cosmossdk.io/errors" sdkmath "cosmossdk.io/math" "github.com/cosmos/cosmos-sdk/client/flags" + "github.com/cosmos/cosmos-sdk/testutil" clitestutil "github.com/cosmos/cosmos-sdk/testutil/cli" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/stretchr/testify/require" @@ -15,6 +17,7 @@ import ( "github.com/pokt-network/poktroll/testutil/network" "github.com/pokt-network/poktroll/testutil/network/gatewaynet" + "github.com/pokt-network/poktroll/testutil/yaml" "github.com/pokt-network/poktroll/x/gateway/client/cli" gatewaytypes "github.com/pokt-network/poktroll/x/gateway/types" ) @@ -44,57 +47,72 @@ func TestCLI_StakeGateway(t *testing.T) { } tests := []struct { - desc string - address string - stake string - err *sdkerrors.Error + desc string + address string + inputConfig string + expectedError *sdkerrors.Error }{ { desc: "stake gateway: invalid address", address: "invalid", - stake: "1000upokt", - err: gatewaytypes.ErrGatewayInvalidAddress, + inputConfig: ` + stake_amount: 1000upokt + `, + expectedError: gatewaytypes.ErrGatewayInvalidAddress, }, { desc: "stake gateway: missing address", // address: intentionally omitted, - stake: "1000upokt", - err: gatewaytypes.ErrGatewayInvalidAddress, + inputConfig: ` + stake_amount: 1000upokt + `, + expectedError: gatewaytypes.ErrGatewayInvalidAddress, }, { desc: "stake gateway: invalid stake amount (zero)", address: gatewayAccount.GetAddress(), - stake: "0upokt", - err: gatewaytypes.ErrGatewayInvalidStake, + inputConfig: ` + stake_amount: 0upokt + `, + expectedError: gatewaytypes.ErrGatewayInvalidStake, }, { desc: "stake gateway: invalid stake amount (negative)", address: gatewayAccount.GetAddress(), - stake: "-1000upokt", - err: gatewaytypes.ErrGatewayInvalidStake, + inputConfig: ` + stake_amount: -1000upokt + `, + expectedError: gatewaytypes.ErrGatewayInvalidStake, }, { desc: "stake gateway: invalid stake denom", address: gatewayAccount.GetAddress(), - stake: "1000invalid", - err: gatewaytypes.ErrGatewayInvalidStake, + inputConfig: ` + stake_amount: 1000invalid + `, + expectedError: gatewaytypes.ErrGatewayInvalidStake, }, { desc: "stake gateway: invalid stake missing denom", address: gatewayAccount.GetAddress(), - stake: "1000", - err: gatewaytypes.ErrGatewayInvalidStake, + inputConfig: ` + stake_amount: 1000 + `, + expectedError: gatewaytypes.ErrGatewayInvalidStake, }, { - desc: "stake gateway: invalid stake missing stake", - address: gatewayAccount.GetAddress(), - // stake: intentionally omitted, - err: gatewaytypes.ErrGatewayInvalidStake, + desc: "stake gateway: invalid stake missing stake", + address: gatewayAccount.GetAddress(), + // inputConfig intentionally empty, + inputConfig: ``, + expectedError: gatewaytypes.ErrGatewayInvalidStake, }, { desc: "stake gateway: valid", address: gatewayAccount.GetAddress(), - stake: "1000upokt", + inputConfig: ` + stake_amount: 1000upokt + `, }, } @@ -104,19 +122,23 @@ func TestCLI_StakeGateway(t *testing.T) { // Wait for a new block to be committed require.NoError(t, net.WaitForNextBlock()) + // write the stake config to a file + configPath := testutil.WriteToNewTempFile(t, yaml.NormalizeYAMLIndentation(tt.inputConfig)).Name() + t.Cleanup(func() { os.Remove(configPath) }) + // Prepare the arguments for the CLI command args := []string{ - tt.stake, + fmt.Sprintf("--config=%s", configPath), fmt.Sprintf("--%s=%s", flags.FlagFrom, tt.address), } args = append(args, commonArgs...) // Execute the command outStake, err := clitestutil.ExecTestCLICmd(clientCtx, cli.CmdStakeGateway(), args) - if tt.err != nil { - stat, ok := status.FromError(tt.err) + if tt.expectedError != nil { + stat, ok := status.FromError(tt.expectedError) require.True(t, ok) - require.Contains(t, stat.Message(), tt.err.Error()) + require.Contains(t, stat.Message(), tt.expectedError.Error()) return } require.NoError(t, err) diff --git a/x/gateway/client/config/errors.go b/x/gateway/client/config/errors.go new file mode 100644 index 000000000..aa01b3a46 --- /dev/null +++ b/x/gateway/client/config/errors.go @@ -0,0 +1,10 @@ +package config + +import sdkerrors "cosmossdk.io/errors" + +var ( + codespace = "gatewayconfig" + ErrGatewayConfigEmptyContent = sdkerrors.Register(codespace, 1, "empty gateway staking config content") + ErrGatewayConfigUnmarshalYAML = sdkerrors.Register(codespace, 2, "config reader cannot unmarshal yaml content") + ErrGatewayConfigInvalidStake = sdkerrors.Register(codespace, 3, "invalid stake in gateway stake config") +) diff --git a/x/gateway/client/config/gateway_config_reader.go b/x/gateway/client/config/gateway_config_reader.go new file mode 100644 index 000000000..6bf7aa3e3 --- /dev/null +++ b/x/gateway/client/config/gateway_config_reader.go @@ -0,0 +1,62 @@ +package config + +import ( + sdk "github.com/cosmos/cosmos-sdk/types" + "gopkg.in/yaml.v2" +) + +// YAMLStakeGateway is the structure describing the gateway stake config file +type YAMLStakeGateway struct { + StakeAmount string `yaml:"stake_amount"` +} + +// GatewayStakeConfig is the structure describing the gateway stake config +type GatewayStakeConfig struct { + StakeAmount sdk.Coin +} + +// ParseGatewayConfig parses the gateway stake yaml config file into a StakeGatewayConfig struct +func ParseGatewayConfig(configContent []byte) (*GatewayStakeConfig, error) { + var stakeConfig *YAMLStakeGateway + + if len(configContent) == 0 { + return nil, ErrGatewayConfigEmptyContent + } + + // Unmarshal the stake config file into a stakeConfig + if err := yaml.Unmarshal(configContent, &stakeConfig); err != nil { + return nil, ErrGatewayConfigUnmarshalYAML.Wrap(err.Error()) + } + + // Validate the stake config + if len(stakeConfig.StakeAmount) == 0 { + return nil, ErrGatewayConfigInvalidStake + } + + // Parse the stake amount to a coin struct + stakeAmount, err := sdk.ParseCoinNormalized(stakeConfig.StakeAmount) + if err != nil { + return nil, ErrGatewayConfigInvalidStake.Wrap(err.Error()) + } + + // Basic validation of the stake amount + if err := stakeAmount.Validate(); err != nil { + return nil, ErrGatewayConfigInvalidStake.Wrap(err.Error()) + } + + if stakeAmount.IsZero() { + return nil, ErrGatewayConfigInvalidStake.Wrap("stake amount cannot be zero") + } + + // Only allow upokt coins staking + if stakeAmount.Denom != "upokt" { + return nil, ErrGatewayConfigInvalidStake.Wrapf( + "invalid stake denom, expecting: upokt, got: %s", + stakeAmount.Denom, + ) + } + + return &GatewayStakeConfig{ + StakeAmount: stakeAmount, + }, nil +} diff --git a/x/gateway/client/config/gateway_config_reader_test.go b/x/gateway/client/config/gateway_config_reader_test.go new file mode 100644 index 000000000..b9ec91f2e --- /dev/null +++ b/x/gateway/client/config/gateway_config_reader_test.go @@ -0,0 +1,83 @@ +package config_test + +import ( + "testing" + + sdkerrors "cosmossdk.io/errors" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/gogo/status" + "github.com/stretchr/testify/require" + + "github.com/pokt-network/poktroll/testutil/yaml" + "github.com/pokt-network/poktroll/x/gateway/client/config" +) + +func Test_ParseGatewayStakeConfig(t *testing.T) { + tests := []struct { + desc string + expectedError *sdkerrors.Error + expectedConfig *config.GatewayStakeConfig + inputConfig string + }{ + // Valid Configs + { + desc: "valid gateway stake config", + inputConfig: ` + stake_amount: 1000upokt + `, + expectedError: nil, + expectedConfig: &config.GatewayStakeConfig{ + StakeAmount: sdk.NewCoin("upokt", sdk.NewInt(1000)), + }, + }, + // Invalid Configs + { + desc: "services_test: invalid service config with empty content", + expectedError: config.ErrGatewayConfigEmptyContent, + inputConfig: ``, + }, + { + desc: "invalid stake denom", + inputConfig: ` + stake_amount: 1000invalid + `, + expectedError: config.ErrGatewayConfigInvalidStake, + }, + { + desc: "negative stake amount", + inputConfig: ` + stake_amount: -1000upokt + `, + expectedError: config.ErrGatewayConfigInvalidStake, + }, + { + desc: "zero stake amount", + inputConfig: ` + stake_amount: 0upokt + `, + expectedError: config.ErrGatewayConfigInvalidStake, + }, + } + + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + normalizedConfig := yaml.NormalizeYAMLIndentation(tt.inputConfig) + supplierServiceConfig, err := config.ParseGatewayConfig([]byte(normalizedConfig)) + + if tt.expectedError != nil { + require.Error(t, err) + require.ErrorIs(t, err, tt.expectedError) + stat, ok := status.FromError(tt.expectedError) + require.True(t, ok) + require.Contains(t, stat.Message(), tt.expectedError.Error()) + require.Nil(t, supplierServiceConfig) + return + } + + require.NoError(t, err) + + require.Equal(t, tt.expectedConfig.StakeAmount, supplierServiceConfig.StakeAmount) + require.Equal(t, tt.expectedConfig.StakeAmount.Denom, supplierServiceConfig.StakeAmount.Denom) + }) + } +} diff --git a/x/supplier/client/cli/query_claim.go b/x/supplier/client/cli/query_claim.go index b879f15c5..6d110c0fa 100644 --- a/x/supplier/client/cli/query_claim.go +++ b/x/supplier/client/cli/query_claim.go @@ -84,9 +84,11 @@ func CmdShowClaim() *cobra.Command { Short: "shows a specific claim", Long: `List a specific claim that the node being queried has access to (if it still exists). -A unique claim can be defined via a session_id that the given supplier participated in. -Claims are pruned, according to protocol parameters, some time after their respective proof has been submitted and any dispute window has elapsed. -This is done to minimize the rate state accumulation by effectively eliminating claims as a long-term factor to persistence requirements. +A unique claim can be defined via a ` + "`session_id`" + ` that the given ` + "`supplier`" + ` participated in. + +` + "`Claims`" + ` are pruned, according to protocol parameters, some time after their respective ` + "`proof`" + ` has been submitted and any dispute window has elapsed. + +This is done to minimize the rate at which state accumulates by eliminating claims as a long-term factor to persistence requirements. Example: $ poktrolld --home=$(POKTROLLD_HOME) q claim show-claims --node $(POCKET_NODE)`,