Skip to content

Commit

Permalink
Merge remote-tracking branch 'pokt/main' into issues/126/test/miner
Browse files Browse the repository at this point in the history
* pokt/main:
  [RelayMiner, Testing, Off-chain] test: `relayer` pkg (#193)
  [Relayminer] chore: cleanup after 177 (#190)
  • Loading branch information
bryanchriswhite committed Nov 17, 2023
2 parents a4c2fea + dd8f35a commit 7fdd625
Show file tree
Hide file tree
Showing 12 changed files with 234 additions and 63 deletions.
13 changes: 6 additions & 7 deletions pkg/deps/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func SupplyConfig(
return deps, nil
}

// NewSupplyEventsQueryClientFn constructs an EventsQueryClient instance and returns
// a new depinject.Config which is supplied with the given deps and the new
// EventsQueryClient.
// NewSupplyEventsQueryClientFn returns a new function which constructs an
// EventsQueryClient instance and returns a new depinject.Config which is supplied
// with the given deps and the new EventsQueryClient.
func NewSupplyEventsQueryClientFn(
pocketNodeWebsocketUrl string,
) SupplierFn {
Expand All @@ -53,10 +53,9 @@ func NewSupplyEventsQueryClientFn(
}
}

// NewSupplyBlockClientFn returns a function with constructs a BlockClient instance
// with the given nodeURL and returns a new
// depinject.Config which is supplied with the given deps and the new
// BlockClient.
// NewSupplyBlockClientFn returns a function which constructs a BlockClient instance
// with the given nodeURL and returns a new depinject.Config which is supplied with
// the given deps and the new BlockClient.
func NewSupplyBlockClientFn(pocketNodeWebsocketUrl string) SupplierFn {
return func(
ctx context.Context,
Expand Down
38 changes: 0 additions & 38 deletions pkg/relayer/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"github.com/spf13/cobra"

"github.com/pokt-network/poktroll/cmd/signals"
"github.com/pokt-network/poktroll/pkg/client/block"
eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query"
"github.com/pokt-network/poktroll/pkg/client/supplier"
"github.com/pokt-network/poktroll/pkg/client/tx"
"github.com/pokt-network/poktroll/pkg/deps/config"
Expand Down Expand Up @@ -148,42 +146,6 @@ func getPocketNodeWebsocketUrl() (string, error) {
return fmt.Sprintf("ws://%s/websocket", pocketNodeURL.Host), nil
}

// newSupplyEventsQueryClientFn constructs an EventsQueryClient instance and returns
// a new depinject.Config which is supplied with the given deps and the new
// EventsQueryClient.
func newSupplyEventsQueryClientFn(
pocketNodeWebsocketUrl string,
) config.SupplierFn {
return func(
_ context.Context,
deps depinject.Config,
_ *cobra.Command,
) (depinject.Config, error) {
eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketUrl)

return depinject.Configs(deps, depinject.Supply(eventsQueryClient)), nil
}
}

// newSupplyBlockClientFn returns a function with constructs a BlockClient instance
// with the given nodeURL and returns a new
// depinject.Config which is supplied with the given deps and the new
// BlockClient.
func newSupplyBlockClientFn(pocketNodeWebsocketUrl string) config.SupplierFn {
return func(
ctx context.Context,
deps depinject.Config,
_ *cobra.Command,
) (depinject.Config, error) {
blockClient, err := block.NewBlockClient(ctx, deps, pocketNodeWebsocketUrl)
if err != nil {
return nil, err
}

return depinject.Configs(deps, depinject.Supply(blockClient)), nil
}
}

// supplyMiner constructs a Miner instance and returns a new depinject.Config
// which is supplied with the given deps and the new Miner.
func supplyMiner(
Expand Down
32 changes: 25 additions & 7 deletions pkg/relayer/interface.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//go:generate mockgen -destination=../../testutil/mockrelayer/relayer_proxy_mock.go -package=mockrelayer . RelayerProxy
//go:generate mockgen -destination=../../testutil/mockrelayer/miner_mock.go -package=mockrelayer . Miner
//go:generate mockgen -destination=../../testutil/mockrelayer/relayer_sessions_manager_mock.go -package=mockrelayer . RelayerSessionsManager

package relayer

import (
Expand All @@ -7,7 +11,7 @@ import (
"github.com/pokt-network/smt"

"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/x/service/types"
servicetypes "github.com/pokt-network/poktroll/x/service/types"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)
Expand All @@ -24,14 +28,28 @@ type TxClientContext client.Context
// to the dependency injector
type QueryClientContext client.Context

// RelaysObservable is an observable which is notified with Relay values.
//
// TODO_HACK: The purpose of this type is to work around gomock's lack of
// support for generic types. For the same reason, this type cannot be an
// alias (i.e. RelaysObservable = observable.Observable[*servicetypes.Relay]).
type RelaysObservable observable.Observable[*servicetypes.Relay]

// MinedRelaysObservable is an observable which is notified with MinedRelay values.
//
// TODO_HACK: The purpose of this type is to work around gomock's lack of
// support for generic types. For the same reason, this type cannot be an
// alias (i.e. MinedRelaysObservable = observable.Observable[*MinedRelay]).
type MinedRelaysObservable observable.Observable[*MinedRelay]

// Miner is responsible for observing servedRelayObs, hashing and checking the
// difficulty of each, finally publishing those with sufficient difficulty to
// minedRelayObs as they are applicable for relay volume.
type Miner interface {
MinedRelays(
ctx context.Context,
servedRelayObs observable.Observable[*types.Relay],
) (minedRelaysObs observable.Observable[*MinedRelay])
servedRelayObs RelaysObservable,
) (minedRelaysObs MinedRelaysObservable)
}

type MinerOption func(Miner)
Expand All @@ -51,23 +69,23 @@ type RelayerProxy interface {
// ServedRelays returns an observable that notifies the miner about the relays that have been served.
// A served relay is one whose RelayRequest's signature and session have been verified,
// and its RelayResponse has been signed and successfully sent to the client.
ServedRelays() observable.Observable[*types.Relay]
ServedRelays() RelaysObservable

// VerifyRelayRequest is a shared method used by RelayServers to check the
// relay request signature and session validity.
// TODO_TECHDEBT(@red-0ne): This method should be moved out of the RelayerProxy interface
// that should not be responsible for verifying relay requests.
VerifyRelayRequest(
ctx context.Context,
relayRequest *types.RelayRequest,
relayRequest *servicetypes.RelayRequest,
service *sharedtypes.Service,
) error

// SignRelayResponse is a shared method used by RelayServers to sign
// and append the signature to the RelayResponse.
// TODO_TECHDEBT(@red-0ne): This method should be moved out of the RelayerProxy interface
// that should not be responsible for signing relay responses.
SignRelayResponse(relayResponse *types.RelayResponse) error
SignRelayResponse(relayResponse *servicetypes.RelayResponse) error
}

type RelayerProxyOption func(RelayerProxy)
Expand Down Expand Up @@ -95,7 +113,7 @@ type RelayServer interface {
type RelayerSessionsManager interface {
// InsertRelays receives an observable of relays that should be included
// in their respective session's SMST (tree).
InsertRelays(minedRelaysObs observable.Observable[*MinedRelay])
InsertRelays(minedRelaysObs MinedRelaysObservable)

// Start iterates over the session trees at the end of each, respective, session.
// The session trees are piped through a series of map operations which progress
Expand Down
11 changes: 8 additions & 3 deletions pkg/relayer/miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,17 @@ func NewMiner(
// It DOES NOT BLOCK as map operations run in their own goroutines.
func (mnr *miner) MinedRelays(
ctx context.Context,
servedRelaysObs observable.Observable[*servicetypes.Relay],
) observable.Observable[*relayer.MinedRelay] {
servedRelaysObs relayer.RelaysObservable,
) relayer.MinedRelaysObservable {
// NB: must cast back to generic observable type to use with Map.
// relayer.RelaysObervable cannot be an alias due to gomock's lack of
// support for generic types.
relaysObs := observable.Observable[*servicetypes.Relay](servedRelaysObs)

// Map servedRelaysObs to a new observable of an either type, populated with
// the minedRelay or an error. It is notified after the relay has been mined
// or an error has been encountered, respectively.
eitherMinedRelaysObs := channel.Map(ctx, servedRelaysObs, mnr.mapMineRelay)
eitherMinedRelaysObs := channel.Map(ctx, relaysObs, mnr.mapMineRelay)
logging.LogErrors(ctx, filter.EitherError(ctx, eitherMinedRelaysObs))

return filter.EitherSuccess(ctx, eitherMinedRelaysObs)
Expand Down
5 changes: 2 additions & 3 deletions pkg/relayer/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"golang.org/x/sync/errgroup"

blocktypes "github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/pkg/relayer"
apptypes "github.com/pokt-network/poktroll/x/application/types"
Expand Down Expand Up @@ -70,7 +69,7 @@ type relayerProxy struct {
proxiedServicesEndpoints servicesEndpointsMap

// servedRelays is an observable that notifies the miner about the relays that have been served.
servedRelays observable.Observable[*types.Relay]
servedRelays relayer.RelaysObservable

// servedRelaysPublishCh is a channel that emits the relays that have been served so that the
// servedRelays observable can fan out the notifications to its subscribers.
Expand Down Expand Up @@ -179,7 +178,7 @@ func (rp *relayerProxy) Stop(ctx context.Context) error {
// ServedRelays returns an observable that notifies the miner about the relays that have been served.
// A served relay is one whose RelayRequest's signature and session have been verified,
// and its RelayResponse has been signed and successfully sent to the client.
func (rp *relayerProxy) ServedRelays() observable.Observable[*types.Relay] {
func (rp *relayerProxy) ServedRelays() relayer.RelaysObservable {
return rp.servedRelays
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/relayer/relayminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ type relayMiner struct {

// NewRelayMiner creates a new Relayer instance with the given dependencies.
// It injects the dependencies into the Relayer instance and returns it.
//
// Required dependencies:
// - RelayerProxy
// - Miner
// - RelayerSessionsManager
func NewRelayMiner(ctx context.Context, deps depinject.Config) (*relayMiner, error) {
rel := &relayMiner{}

Expand Down Expand Up @@ -61,5 +66,6 @@ func (rel *relayMiner) Start(ctx context.Context) error {
// Stop stops the relayer proxy which in turn stops all advertised relay servers
// and unsubscribes the miner from the served relays observable.
func (rel *relayMiner) Stop(ctx context.Context) error {
rel.relayerSessionsManager.Stop()
return rel.relayerProxy.Stop(ctx)
}
58 changes: 58 additions & 0 deletions pkg/relayer/relayminer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package relayer_test

import (
"context"
"testing"
"time"

"cosmossdk.io/depinject"
"github.com/stretchr/testify/require"

"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/testutil/testrelayer"
servicetypes "github.com/pokt-network/poktroll/x/service/types"
)

func TestRelayMiner_StartAndStop(t *testing.T) {
srObs, _ := channel.NewObservable[*servicetypes.Relay]()
servedRelaysObs := relayer.RelaysObservable(srObs)

mrObs, _ := channel.NewObservable[*relayer.MinedRelay]()
minedRelaysObs := relayer.MinedRelaysObservable(mrObs)

ctx := context.Background()
relayerProxyMock := testrelayer.NewMockOneTimeRelayerProxy(
ctx, t,
servedRelaysObs,
)

minerMock := testrelayer.NewMockOneTimeMiner(
ctx, t,
servedRelaysObs,
minedRelaysObs,
)

relayerSessionsManagerMock := testrelayer.NewMockOneTimeRelayerSessionsManager(
ctx, t,
minedRelaysObs,
)

deps := depinject.Supply(
relayerProxyMock,
minerMock,
relayerSessionsManagerMock,
)

relayminer, err := relayer.NewRelayMiner(ctx, deps)
require.NoError(t, err)
require.NotNil(t, relayminer)

err = relayminer.Start(ctx)
require.NoError(t, err)

time.Sleep(time.Millisecond)

err = relayminer.Stop(ctx)
require.NoError(t, err)
}
15 changes: 10 additions & 5 deletions pkg/relayer/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type sessionsTreesMap = map[int64]map[string]relayer.SessionTree
// relayerSessionsManager is an implementation of the RelayerSessions interface.
// TODO_TEST: Add tests to the relayerSessionsManager.
type relayerSessionsManager struct {
relayObs observable.Observable[*relayer.MinedRelay]
relayObs relayer.MinedRelaysObservable

// sessionsToClaimObs notifies about sessions that are ready to be claimed.
sessionsToClaimObs observable.Observable[relayer.SessionTree]
Expand Down Expand Up @@ -93,10 +93,15 @@ func NewRelayerSessions(
// network as necessary.
// It IS NOT BLOCKING as map operations run in their own goroutines.
func (rs *relayerSessionsManager) Start(ctx context.Context) {
// NB: must cast back to generic observable type to use with Map.
// relayer.MinedRelaysObservable cannot be an alias due to gomock's lack of
// support for generic types.
relayObs := observable.Observable[*relayer.MinedRelay](rs.relayObs)

// Map eitherMinedRelays to a new observable of an error type which is
// notified if an error was encountered while attempting to add the relay to
// the session tree.
miningErrorsObs := channel.Map(ctx, rs.relayObs, rs.mapAddRelayToSessionTree)
miningErrorsObs := channel.Map(ctx, relayObs, rs.mapAddMinedRelayToSessionTree)
logging.LogErrors(ctx, miningErrorsObs)

// Start claim/proof pipeline.
Expand All @@ -115,7 +120,7 @@ func (rs *relayerSessionsManager) Stop() {
}

// SessionsToClaim returns an observable that notifies when sessions are ready to be claimed.
func (rs *relayerSessionsManager) InsertRelays(relays observable.Observable[*relayer.MinedRelay]) {
func (rs *relayerSessionsManager) InsertRelays(relays relayer.MinedRelaysObservable) {
rs.relayObs = relays
}

Expand Down Expand Up @@ -222,10 +227,10 @@ func (rs *relayerSessionsManager) waitForBlock(ctx context.Context, height int64
return nil
}

// mapAddRelayToSessionTree is intended to be used as a MapFn. It adds the relay
// mapAddMinedRelayToSessionTree is intended to be used as a MapFn. It adds the relay
// to the session tree. If it encounters an error, it returns the error. Otherwise,
// it skips output (only outputs errors).
func (rs *relayerSessionsManager) mapAddRelayToSessionTree(
func (rs *relayerSessionsManager) mapAddMinedRelayToSessionTree(
_ context.Context,
relay *relayer.MinedRelay,
) (_ error, skip bool) {
Expand Down
11 changes: 11 additions & 0 deletions testutil/mockrelayer/mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package mockrelayer

// This file is in place to declare the package for dynamically generated structs.
//
// Note that this does not follow the Cosmos SDK pattern of committing Mocks to main.
// For example, they commit auto-generate code to main: https://github.com/cosmos/cosmos-sdk/blob/main/x/gov/testutil/expected_keepers_mocks.go
// Documentation on how Cosmos uses mockgen can be found here: https://docs.cosmos.network/main/build/building-modules/testing#unit-tests
//
// IMPORTANT: We have attempted to use `.gitkeep` files instead, but it causes a circular dependency issue with protobuf and mock generation
// since we are leveraging `ignite` to compile `.proto` files which runs `go mod tidy` before generating, requiring the entire dependency tree
// to be valid before mock implementations have been generated.
34 changes: 34 additions & 0 deletions testutil/testrelayer/miner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package testrelayer

import (
"context"
"testing"

"github.com/golang/mock/gomock"

"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/testutil/mockrelayer"
)

// NewMockOneTimeMiner creates a new mock Miner. This mock Miner will expect a
// call to MinedRelays with the given context and expectedRelayObs args. When
// that call is made, returnedMinedRelaysObs is returned.
func NewMockOneTimeMiner(
ctx context.Context,
t *testing.T,
expectedRelaysObs relayer.RelaysObservable,
returnedMinedRelaysObs relayer.MinedRelaysObservable,
) *mockrelayer.MockMiner {
t.Helper()

ctrl := gomock.NewController(t)
minerMock := mockrelayer.NewMockMiner(ctrl)
minerMock.EXPECT().
MinedRelays(
gomock.Eq(ctx),
gomock.Eq(expectedRelaysObs),
).
Return(returnedMinedRelaysObs).
Times(1)
return minerMock
}
Loading

0 comments on commit 7fdd625

Please sign in to comment.