diff --git a/Tiltfile b/Tiltfile index 2e3198580..994a30489 100644 --- a/Tiltfile +++ b/Tiltfile @@ -236,48 +236,56 @@ for x in range(localnet_config["relayminers"]["count"]): "--set=image.repository=pocketd", ] - ############# - # NOTE: To provide a proper configuration for the relayminer, we dynamically - # define the supplier configuration overrides for the relayminer helm chart - # so that every service enabled in the localnet configuration (ollama, rest) - # file are also declared in the relayminer.config.suppliers list. - ############# + ############################################################################ + ######## Dynamic supplier configuration overrides. ########################## + # To provide a proper configuration for the relayminer, we dynamically define + # the supplier configuration overrides for the relayminer helm chart. + # This is so that every service enabled in the localnet configuration (e.g. ollama, rest) + # are also declared in the relayminer.config.suppliers list. + ############################################################################ - supplier_number = 0 + eagerValidation = str(localnet_config["relayminers"].get("eagerValidation", False)) + # Service: Anvil + supplier_number = 0 flags.append("--set=config.suppliers["+str(supplier_number)+"].service_id=anvil") flags.append("--set=config.suppliers["+str(supplier_number)+"].listen_url=http://0.0.0.0:8545") flags.append("--set=config.suppliers["+str(supplier_number)+"].service_config.backend_url=http://anvil:8547/") flags.append("--set=config.suppliers["+str(supplier_number)+"].rpc_type_service_configs.json_rpc.backend_url=http://anvil:8547/") - flags.append("--set=config.suppliers["+str(supplier_number)+"].enable_eager_relay_request_validation="+str(localnet_config["relayminers"].get("eagerValidation", False))) - supplier_number = supplier_number + 1 + flags.append("--set=config.suppliers["+str(supplier_number)+"].enable_eager_relay_request_validation="+eagerValidation) + + # Service: Anvil WS + supplier_number = supplier_number + 1 flags.append("--set=config.suppliers["+str(supplier_number)+"].service_id=anvilws") flags.append("--set=config.suppliers["+str(supplier_number)+"].listen_url=http://0.0.0.0:8545") flags.append("--set=config.suppliers["+str(supplier_number)+"].service_config.backend_url=http://anvil:8547/") flags.append("--set=config.suppliers["+str(supplier_number)+"].rpc_type_service_configs.websocket.backend_url=ws://anvil:8547/") - flags.append("--set=config.suppliers["+str(supplier_number)+"].enable_eager_relay_request_validation="+str(localnet_config["relayminers"].get("eagerValidation", False))) - supplier_number = supplier_number + 1 + flags.append("--set=config.suppliers["+str(supplier_number)+"].enable_eager_relay_request_validation="+eagerValidation) + + # Service: Static + supplier_number = supplier_number + 1 flags.append("--set=config.suppliers["+str(supplier_number)+"].service_id=static") flags.append("--set=config.suppliers["+str(supplier_number)+"].listen_url=http://0.0.0.0:8545") flags.append("--set=config.suppliers["+str(supplier_number)+"].service_config.backend_url=http://nginx-chainid/") - flags.append("--set=config.suppliers["+str(supplier_number)+"].enable_eager_relay_request_validation="+str(localnet_config["relayminers"].get("eagerValidation", False))) - supplier_number = supplier_number + 1 + flags.append("--set=config.suppliers["+str(supplier_number)+"].enable_eager_relay_request_validation="+eagerValidation) + # Service: REST if localnet_config["rest"]["enabled"]: - flags.append("--set=config.suppliers["+str(supplier_number)+"].service_id=rest") - flags.append("--set=config.suppliers["+str(supplier_number)+"].listen_url=http://0.0.0.0:8545") - flags.append("--set=config.suppliers["+str(supplier_number)+"].service_config.backend_url=http://rest:10000/") - flags.append("--set=config.suppliers["+str(supplier_number)+"].enable_eager_relay_request_validation="+str(localnet_config["relayminers"].get("eagerValidation", False))) - supplier_number = supplier_number + 1 + supplier_number = supplier_number + 1 + flags.append("--set=config.suppliers["+str(supplier_number)+"].service_id=rest") + flags.append("--set=config.suppliers["+str(supplier_number)+"].listen_url=http://0.0.0.0:8545") + flags.append("--set=config.suppliers["+str(supplier_number)+"].service_config.backend_url=http://rest:10000/") + flags.append("--set=config.suppliers["+str(supplier_number)+"].enable_eager_relay_request_validation="+eagerValidation) + # Service: Ollama if localnet_config["ollama"]["enabled"]: - flags.append("--set=config.suppliers["+str(supplier_number)+"].service_id=ollama") - flags.append("--set=config.suppliers["+str(supplier_number)+"].listen_url=http://0.0.0.0:8545") - flags.append("--set=config.suppliers["+str(supplier_number)+"].service_config.backend_url=http://ollama:11434/") - flags.append("--set=config.suppliers["+str(supplier_number)+"].enable_eager_relay_request_validation="+str(localnet_config["relayminers"].get("eagerValidation", False))) - supplier_number = supplier_number + 1 + supplier_number = supplier_number + 1 + flags.append("--set=config.suppliers["+str(supplier_number)+"].service_id=ollama") + flags.append("--set=config.suppliers["+str(supplier_number)+"].listen_url=http://0.0.0.0:8545") + flags.append("--set=config.suppliers["+str(supplier_number)+"].service_config.backend_url=http://ollama:11434/") + flags.append("--set=config.suppliers["+str(supplier_number)+"].enable_eager_relay_request_validation="+eagerValidation) helm_resource( "relayminer" + str(actor_number), diff --git a/go.mod b/go.mod index 9b37d6c25..3f15717dc 100644 --- a/go.mod +++ b/go.mod @@ -16,26 +16,38 @@ module github.com/pokt-network/poktroll // 6. go mod tidy // 7. make ignite_build # and/or (re)start/build localnet -//module github.com/pokt-network/pocket -//replace github.com/pokt-network/poktroll => . - go 1.24.3 +tool ( + github.com/bufbuild/buf/cmd/buf + github.com/cosmos/cosmos-proto/cmd/protoc-gen-go-pulsar + github.com/cosmos/gogoproto/protoc-gen-gocosmos + github.com/cosmos/gogoproto/protoc-gen-gogo + github.com/golangci/golangci-lint/cmd/golangci-lint + github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway + github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger + github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2 + golang.org/x/tools/cmd/goimports + google.golang.org/grpc/cmd/protoc-gen-go-grpc + google.golang.org/protobuf/cmd/protoc-gen-go +) + // DEVELOPER_TIP: Uncomment to use local copies of various libraries // replace github.com/pokt-network/shannon-sdk => ../shannon-sdk // replace github.com/pokt-network/smt => ../smt // replace github.com/pokt-network/smt/kvstore/badger => ../smt/kvstore/badger // replace github.com/pokt-network/smt/kvstore/pebble => ../smt/kvstore/pebble -// TODO: Investigate why we need to replace this? -replace nhooyr.io/websocket => github.com/coder/websocket v1.8.6 +replace ( + // TODO_HACK(@olshansk): Replace CometBFT with Pocket's fork to avoid blocking RPC queries on heavy EndBlockers. + // Ref: https://github.com/pokt-network/cometbft/issues/3 + github.com/cometbft/cometbft => github.com/pokt-network/cometbft v0.38.17-0.20250808222235-91d271231811 -// replace broken goleveldb -replace github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 - -// TODO_HACK(@olshansk): Replace CometBFT with Pocket's fork to avoid blocking RPC queries on heavy EndBlockers. -// Ref: https://github.com/pokt-network/cometbft/issues/3 -replace github.com/cometbft/cometbft => github.com/pokt-network/cometbft v0.38.17-0.20250808222235-91d271231811 + // replace broken goleveldb + github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 + // TODO_TECHDEBT(@olshansk): Investigate why we need to replace this? + nhooyr.io/websocket => github.com/coder/websocket v1.8.6 +) require ( cosmossdk.io/x/tx v0.14.0 @@ -59,60 +71,6 @@ require ( golang.org/x/term v0.32.0 ) -require ( - cosmossdk.io/api v0.9.2 - cosmossdk.io/client/v2 v2.0.0-beta.8 - cosmossdk.io/core v0.11.3 - cosmossdk.io/depinject v1.2.0 - cosmossdk.io/errors v1.0.2 - cosmossdk.io/log v1.5.1 - cosmossdk.io/math v1.5.3 - cosmossdk.io/store v1.1.2 - cosmossdk.io/tools/confix v0.1.2 - cosmossdk.io/x/circuit v0.1.1 - cosmossdk.io/x/evidence v0.1.1 - cosmossdk.io/x/feegrant v0.1.1 - cosmossdk.io/x/upgrade v0.1.4 - github.com/bufbuild/buf v1.54.0 // indirect - github.com/cometbft/cometbft v0.38.17 - github.com/cosmos/cosmos-db v1.1.1 - github.com/cosmos/cosmos-proto v1.0.0-beta.5 - github.com/cosmos/cosmos-sdk v0.53.0 - github.com/cosmos/gogoproto v1.7.0 - github.com/cosmos/ibc-go/modules/capability v1.0.1 - github.com/cosmos/ibc-go/v8 v8.7.0 - github.com/go-kit/kit v0.13.0 - github.com/gogo/status v1.1.0 - github.com/golang/protobuf v1.5.4 - github.com/gorilla/mux v1.8.1 - github.com/gorilla/websocket v1.5.3 - github.com/grpc-ecosystem/grpc-gateway v1.16.0 - github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect - github.com/hashicorp/go-metrics v0.5.4 - github.com/pokt-network/smt v0.14.1 - github.com/pokt-network/smt/kvstore/pebble v0.0.0-20240822175047-21ea8639c188 - github.com/prometheus/client_golang v1.22.0 - github.com/regen-network/gocuke v1.1.0 - github.com/rs/zerolog v1.34.0 - github.com/spf13/cobra v1.9.1 - github.com/spf13/pflag v1.0.6 - github.com/spf13/viper v1.20.1 - github.com/stretchr/testify v1.10.0 - go.uber.org/multierr v1.11.0 - golang.org/x/crypto v0.38.0 - golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 // indirect - golang.org/x/sync v0.14.0 - golang.org/x/text v0.25.0 - golang.org/x/tools v0.33.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2 - google.golang.org/grpc v1.72.0 - google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.4.0 // indirect - google.golang.org/protobuf v1.36.6 - gopkg.in/yaml.v2 v2.4.0 -) - -require github.com/puzpuzpuz/xsync/v4 v4.2.0 - require ( 4d63.com/gocheckcompilerdirectives v1.3.0 // indirect 4d63.com/gochecknoglobals v0.2.2 // indirect @@ -138,8 +96,21 @@ require ( cloud.google.com/go/storage v1.49.0 // indirect connectrpc.com/connect v1.18.1 // indirect connectrpc.com/otelconnect v0.7.2 // indirect + cosmossdk.io/api v0.9.2 + cosmossdk.io/client/v2 v2.0.0-beta.8 cosmossdk.io/collections v1.2.0 // indirect + cosmossdk.io/core v0.11.3 + cosmossdk.io/depinject v1.2.0 + cosmossdk.io/errors v1.0.2 + cosmossdk.io/log v1.5.1 + cosmossdk.io/math v1.5.3 cosmossdk.io/schema v1.1.0 // indirect + cosmossdk.io/store v1.1.2 + cosmossdk.io/tools/confix v0.1.2 + cosmossdk.io/x/circuit v0.1.1 + cosmossdk.io/x/evidence v0.1.1 + cosmossdk.io/x/feegrant v0.1.1 + cosmossdk.io/x/upgrade v0.1.4 filippo.io/edwards25519 v1.1.0 // indirect github.com/4meepo/tagalign v1.4.2 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect @@ -180,6 +151,7 @@ require ( github.com/bombsimon/wsl/v4 v4.5.0 // indirect github.com/breml/bidichk v0.3.2 // indirect github.com/breml/errchkjson v0.4.0 // indirect + github.com/bufbuild/buf v1.54.0 // indirect github.com/bufbuild/protocompile v0.14.1 // indirect github.com/bufbuild/protoplugin v0.0.0-20250218205857-750e09ce93e1 // indirect github.com/butuzov/ireturn v0.3.1 // indirect @@ -204,13 +176,20 @@ require ( github.com/cockroachdb/pebble v1.1.5 // indirect github.com/cockroachdb/redact v1.1.6 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect + github.com/cometbft/cometbft v0.38.17 github.com/cometbft/cometbft-db v0.14.1 // indirect github.com/containerd/log v0.1.0 // indirect github.com/containerd/stargz-snapshotter/estargz v0.16.3 // indirect github.com/cosmos/btcutil v1.0.5 // indirect + github.com/cosmos/cosmos-db v1.1.1 + github.com/cosmos/cosmos-proto v1.0.0-beta.5 + github.com/cosmos/cosmos-sdk v0.53.0 github.com/cosmos/go-bip39 v1.0.0 // indirect github.com/cosmos/gogogateway v1.2.0 // indirect + github.com/cosmos/gogoproto v1.7.0 github.com/cosmos/iavl v1.2.2 // indirect + github.com/cosmos/ibc-go/modules/capability v1.0.1 + github.com/cosmos/ibc-go/v8 v8.7.0 github.com/cosmos/ics23/go v0.11.0 // indirect github.com/cosmos/ledger-cosmos-go v0.14.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.7 // indirect @@ -253,6 +232,7 @@ require ( github.com/ghostiam/protogetter v0.3.9 // indirect github.com/go-critic/go-critic v0.12.0 // indirect github.com/go-jose/go-jose/v4 v4.0.4 // indirect + github.com/go-kit/kit v0.13.0 github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/go-logr/logr v1.4.2 // indirect @@ -274,8 +254,10 @@ require ( github.com/gofrs/uuid v4.4.0+incompatible // indirect github.com/gogo/googleapis v1.4.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/gogo/status v1.1.0 github.com/golang/glog v1.2.4 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/protobuf v1.5.4 github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/golangci/dupl v0.0.0-20250308024227-f665c8d69b32 // indirect github.com/golangci/go-printf-func-name v0.1.0 // indirect @@ -298,17 +280,22 @@ require ( github.com/googleapis/gax-go/v2 v2.14.1 // indirect github.com/gordonklaus/ineffassign v0.1.0 // indirect github.com/gorilla/handlers v1.5.2 // indirect + github.com/gorilla/mux v1.8.1 + github.com/gorilla/websocket v1.5.3 github.com/gostaticanalysis/analysisutil v0.7.1 // indirect github.com/gostaticanalysis/comment v1.5.0 // indirect github.com/gostaticanalysis/forcetypeassert v0.2.0 // indirect github.com/gostaticanalysis/nilerr v0.1.1 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect + github.com/grpc-ecosystem/grpc-gateway v1.16.0 + github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-getter v1.7.5 // indirect github.com/hashicorp/go-hclog v1.6.3 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-immutable-radix/v2 v2.1.0 // indirect + github.com/hashicorp/go-metrics v0.5.4 github.com/hashicorp/go-plugin v1.6.3 // indirect github.com/hashicorp/go-safetemp v1.0.0 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect @@ -392,10 +379,14 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/pokt-network/smt v0.14.1 + github.com/pokt-network/smt/kvstore/pebble v0.0.0-20240822175047-21ea8639c188 github.com/polyfloyd/go-errorlint v1.7.1 // indirect + github.com/prometheus/client_golang v1.22.0 github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.63.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/puzpuzpuz/xsync/v4 v4.2.0 github.com/quasilyte/go-ruleguard v0.4.3-0.20240823090925-0fe6f58b47b1 // indirect github.com/quasilyte/go-ruleguard/dsl v0.3.22 // indirect github.com/quasilyte/gogrep v0.5.0 // indirect @@ -405,9 +396,11 @@ require ( github.com/quic-go/quic-go v0.51.0 // indirect github.com/raeperd/recvcheck v0.2.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/regen-network/gocuke v1.1.0 github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/rs/cors v1.11.1 // indirect + github.com/rs/zerolog v1.34.0 github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/ryancurrah/gomodguard v1.3.5 // indirect github.com/ryanrolds/sqlclosecheck v0.5.1 // indirect @@ -428,11 +421,15 @@ require ( github.com/sourcegraph/go-diff v0.7.0 // indirect github.com/spf13/afero v1.12.0 // indirect github.com/spf13/cast v1.7.1 // indirect + github.com/spf13/cobra v1.9.1 + github.com/spf13/pflag v1.0.6 + github.com/spf13/viper v1.20.1 github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect github.com/ssgreg/nlreturn/v2 v2.2.1 // indirect github.com/stbenjam/no-sprintf-host-port v0.2.0 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/stretchr/testify v1.10.0 github.com/subosito/gotenv v1.6.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tdakkota/asciicheck v0.4.1 // indirect @@ -477,18 +474,29 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.35.0 // indirect go.opentelemetry.io/otel/trace v1.35.0 // indirect go.uber.org/automaxprocs v1.6.0 // indirect + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 // indirect go.uber.org/zap/exp v0.3.0 // indirect golang.org/x/arch v0.15.0 // indirect + golang.org/x/crypto v0.38.0 + golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 // indirect golang.org/x/exp/typeparams v0.0.0-20250210185358-939b2ce775ac // indirect golang.org/x/mod v0.24.0 // indirect golang.org/x/net v0.40.0 // indirect golang.org/x/oauth2 v0.27.0 // indirect + golang.org/x/sync v0.14.0 golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.25.0 golang.org/x/time v0.10.0 // indirect + golang.org/x/tools v0.33.0 // indirect google.golang.org/api v0.223.0 // indirect google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2 google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 // indirect + google.golang.org/grpc v1.72.0 + google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.4.0 // indirect + google.golang.org/protobuf v1.36.6 + gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 // indirect gotest.tools/v3 v3.5.2 // indirect honnef.co/go/tools v0.6.1 // indirect @@ -500,17 +508,3 @@ require ( rsc.io/qr v0.2.0 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) - -tool ( - github.com/bufbuild/buf/cmd/buf - github.com/cosmos/cosmos-proto/cmd/protoc-gen-go-pulsar - github.com/cosmos/gogoproto/protoc-gen-gocosmos - github.com/cosmos/gogoproto/protoc-gen-gogo - github.com/golangci/golangci-lint/cmd/golangci-lint - github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway - github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger - github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2 - golang.org/x/tools/cmd/goimports - google.golang.org/grpc/cmd/protoc-gen-go-grpc - google.golang.org/protobuf/cmd/protoc-gen-go -) diff --git a/localnet/grafana-dashboards/rm_benchmark.json b/localnet/grafana-dashboards/relayminer_benchmark.json similarity index 100% rename from localnet/grafana-dashboards/rm_benchmark.json rename to localnet/grafana-dashboards/relayminer_benchmark.json diff --git a/pkg/cache/memory/kvcache.go b/pkg/cache/memory/kvcache.go index da6d47e51..50f5788b2 100644 --- a/pkg/cache/memory/kvcache.go +++ b/pkg/cache/memory/kvcache.go @@ -4,15 +4,18 @@ import ( "fmt" "time" - "github.com/pokt-network/poktroll/pkg/cache" "github.com/puzpuzpuz/xsync/v4" + + "github.com/pokt-network/poktroll/pkg/cache" ) var _ cache.KeyValueCache[any] = (*keyValueCache[any])(nil) // keyValueCache provides a concurrency-safe in-memory key/value cache implementation. type keyValueCache[T any] struct { + // config holds the configuration for the cache. config keyValueCacheConfig + // values holds the cached values. values *xsync.Map[string, cacheValue[T]] } @@ -48,21 +51,25 @@ func NewKeyValueCache[T any](opts ...KeyValueCacheOptionFn) (*keyValueCache[T], // Get retrieves the value from the cache with the given key. func (c *keyValueCache[T]) Get(key string) (T, bool) { var zero T - v, ok := c.values.Load(key) - if !ok { + cachedValue, exists := c.values.Load(key) + if !exists { return zero, false } - if time.Since(v.cachedAt) > c.config.ttl { - // Opportunistic prune (no need for atomic compute here). + isCacheValueExpired := time.Since(cachedValue.cachedAt) > c.config.ttl + if isCacheValueExpired { + // Opportunistically pruning because we already checked if the cache value has expired. c.values.Delete(key) return zero, false } - return v.value, true + return cachedValue.value, true } // Set adds or updates the value in the cache for the given key. func (c *keyValueCache[T]) Set(key string, value T) { - c.values.Store(key, cacheValue[T]{value: value, cachedAt: time.Now()}) + c.values.Store(key, cacheValue[T]{ + value: value, + cachedAt: time.Now(), + }) if c.config.maxKeys > 0 && int64(c.values.Size()) > c.config.maxKeys { c.evictKey() @@ -79,24 +86,27 @@ func (c *keyValueCache[T]) Clear() { c.values = xsync.NewMap[string, cacheValue[T]]() } -// evictKey removes one key/value pair from the cache, to make space for a new one, -// according to the configured eviction policy. +// evictKey removes one key/value pair from the cache to make space for a new one. +// It evicts keys based on the following policy: +// 1. Remove any expired entries (i.e. entries that have exceeded the configured TTL). +// 2. If no expired entries are found, uses the configured eviction policy to determine which key to remove. func (c *keyValueCache[T]) evictKey() { + // There is more space in the cache than the configured maxKeys. if c.config.maxKeys <= 0 || int64(c.values.Size()) <= c.config.maxKeys { return } - now := time.Now() - // 1) Prefer to evict any TTL-expired entry (cheap scan, remove one). var expiredKey string - c.values.Range(func(k string, v cacheValue[T]) bool { - if now.Sub(v.cachedAt) > c.config.ttl { - expiredKey = k - return false // found one, stop - } - return true - }) + now := time.Now() + c.values.Range( + func(k string, v cacheValue[T]) bool { + if now.Sub(v.cachedAt) > c.config.ttl { + expiredKey = k + return false // found one expired entry, stop + } + return true // continue + }) if expiredKey != "" { c.values.Delete(expiredKey) return @@ -104,28 +114,33 @@ func (c *keyValueCache[T]) evictKey() { // 2) Fall back to configured policy. switch c.config.evictionPolicy { + + // FIFO ≈ remove the oldest by cachedAt. case FirstInFirstOut: - // FIFO ≈ remove the oldest by cachedAt. var ( oldestKey string oldestAt time.Time found bool ) - c.values.Range(func(k string, v cacheValue[T]) bool { - if !found || v.cachedAt.Before(oldestAt) { - oldestKey, oldestAt, found = k, v.cachedAt, true - } - return true - }) + c.values.Range( + func(k string, v cacheValue[T]) bool { + if !found || v.cachedAt.Before(oldestAt) { + oldestKey, oldestAt, found = k, v.cachedAt, true + } + return true + }) if found { c.values.Delete(oldestKey) } + + // Not implemented in original; keep behavior. case LeastRecentlyUsed: - // Not implemented in original; keep behavior. panic("LRU eviction not implemented") + + // Not implemented in original; keep behavior. case LeastFrequentlyUsed: - // Not implemented in original; keep behavior. panic("LFU eviction not implemented") + default: panic(fmt.Sprintf("unsupported eviction policy: %d", c.config.evictionPolicy)) } diff --git a/pkg/client/query/sessionquerier.go b/pkg/client/query/sessionquerier.go index 015d89ed5..e70fdc5dd 100644 --- a/pkg/client/query/sessionquerier.go +++ b/pkg/client/query/sessionquerier.go @@ -3,6 +3,7 @@ package query import ( "context" "fmt" + //"sync" "cosmossdk.io/depinject" diff --git a/pkg/client/query/sharedquerier.go b/pkg/client/query/sharedquerier.go index 50522b541..178b0e47d 100644 --- a/pkg/client/query/sharedquerier.go +++ b/pkg/client/query/sharedquerier.go @@ -3,6 +3,7 @@ package query import ( "context" "strconv" + //"sync" "cosmossdk.io/depinject" @@ -30,7 +31,7 @@ type sharedQuerier struct { // blockHashCache caches blockQuerier.Block requests blockHashCache cache.KeyValueCache[BlockHash] // blockHashMutex to protect cache access patterns for block hashes - //blockHashMutex sync.Mutex + // blockHashMutex sync.Mutex // paramsCache caches sharedQueryClient.Params requests paramsCache client.ParamsCache[sharedtypes.Params] diff --git a/pkg/relayer/config/mining_supervisor_hydrator.go b/pkg/relayer/config/mining_supervisor_hydrator.go index 91ef77763..9c8393a81 100644 --- a/pkg/relayer/config/mining_supervisor_hydrator.go +++ b/pkg/relayer/config/mining_supervisor_hydrator.go @@ -2,46 +2,60 @@ package config import "time" +const ( + MSDropPolicyNew = "drop-new" + MSDropPolicyOldest = "drop-oldest" +) + func (relayMinerConfig *RelayMinerConfig) HydrateMiningSupervisor( yamlMiningSupervisorConfig *YAMLMiningSupervisorConfig, ) error { - relayMinerConfig.MiningSupervisorConfig = &MiningSupervisorConfig{} + config := &MiningSupervisorConfig{} + // Relay Queue Size if yamlMiningSupervisorConfig.QueueSize == 0 { - relayMinerConfig.MiningSupervisorConfig.QueueSize = DefaultMSQueueSize + config.QueueSize = DefaultMSQueueSize } else { - relayMinerConfig.MiningSupervisorConfig.QueueSize = yamlMiningSupervisorConfig.QueueSize + config.QueueSize = yamlMiningSupervisorConfig.QueueSize } + // Relay Workers if yamlMiningSupervisorConfig.Workers == 0 { - relayMinerConfig.MiningSupervisorConfig.Workers = DefaultMSWorkers + config.Workers = DefaultMSWorkers } else { - relayMinerConfig.MiningSupervisorConfig.Workers = yamlMiningSupervisorConfig.Workers + config.Workers = yamlMiningSupervisorConfig.Workers } + // Enqueue Timeout if yamlMiningSupervisorConfig.EnqueueTimeoutMs == 0 { - relayMinerConfig.MiningSupervisorConfig.EnqueueTimeout = time.Duration(DefaultMSEnqueueTimeout) * time.Millisecond + config.EnqueueTimeout = time.Duration(DefaultMSEnqueueTimeout) * time.Millisecond } else { - relayMinerConfig.MiningSupervisorConfig.EnqueueTimeout = time.Duration(yamlMiningSupervisorConfig.EnqueueTimeoutMs) * time.Millisecond + config.EnqueueTimeout = time.Duration(yamlMiningSupervisorConfig.EnqueueTimeoutMs) * time.Millisecond } + // Gauge Sample Interval if yamlMiningSupervisorConfig.GaugeSampleIntervalMs == 0 { - relayMinerConfig.MiningSupervisorConfig.GaugeSampleInterval = time.Duration(DefaultMSGaugeSampleInterval) * time.Millisecond + config.GaugeSampleInterval = time.Duration(DefaultMSGaugeSampleInterval) * time.Millisecond } else { - relayMinerConfig.MiningSupervisorConfig.GaugeSampleInterval = time.Duration(yamlMiningSupervisorConfig.GaugeSampleIntervalMs) * time.Millisecond + config.GaugeSampleInterval = time.Duration(yamlMiningSupervisorConfig.GaugeSampleIntervalMs) * time.Millisecond } + // Drop Log Interval if yamlMiningSupervisorConfig.DropLogIntervalMs == 0 { - relayMinerConfig.MiningSupervisorConfig.DropLogInterval = time.Duration(DefaultMSDropLogInterval) * time.Millisecond + config.DropLogInterval = time.Duration(DefaultMSDropLogInterval) * time.Millisecond } else { - relayMinerConfig.MiningSupervisorConfig.DropLogInterval = time.Duration(yamlMiningSupervisorConfig.DropLogIntervalMs) * time.Millisecond + config.DropLogInterval = time.Duration(yamlMiningSupervisorConfig.DropLogIntervalMs) * time.Millisecond } - if yamlMiningSupervisorConfig.DropPolicy == "" || (yamlMiningSupervisorConfig.DropPolicy != "drop-new" && yamlMiningSupervisorConfig.DropPolicy != "drop-oldest") { - relayMinerConfig.MiningSupervisorConfig.DropPolicy = DefaultMSDropPolicy + // Drop Policy + isDropPolicyConfigured := (yamlMiningSupervisorConfig.DropPolicy == "") || + (yamlMiningSupervisorConfig.DropPolicy != MSDropPolicyNew && yamlMiningSupervisorConfig.DropPolicy != MSDropPolicyOldest) + if isDropPolicyConfigured { + config.DropPolicy = DefaultMSDropPolicy } else { - relayMinerConfig.MiningSupervisorConfig.DropPolicy = yamlMiningSupervisorConfig.DropPolicy + config.DropPolicy = yamlMiningSupervisorConfig.DropPolicy } + relayMinerConfig.MiningSupervisorConfig = config return nil } diff --git a/pkg/relayer/config/relayminer_configs_reader.go b/pkg/relayer/config/relayminer_configs_reader.go index e02da952f..1995d20a9 100644 --- a/pkg/relayer/config/relayminer_configs_reader.go +++ b/pkg/relayer/config/relayminer_configs_reader.go @@ -23,12 +23,12 @@ const DefaultEagerRelayRequestValidation = false // --- Defaults of Mining Supervisor --- -const DefaultMSQueueSize = uint64(10000) +const DefaultMSQueueSize = uint64(10_000) const DefaultMSWorkers = uint8(0) const DefaultMSEnqueueTimeout = uint8(0) -const DefaultMSDropPolicy = "drop-new" const DefaultMSGaugeSampleInterval = uint64(200) const DefaultMSDropLogInterval = uint64(2) +const DefaultMSDropPolicy = MSDropPolicyNew // ParseRelayMinerConfigs parses the relay miner config file into a RelayMinerConfig func ParseRelayMinerConfigs(logger polylog.Logger, configContent []byte) (*RelayMinerConfig, error) { diff --git a/pkg/relayer/config/types.go b/pkg/relayer/config/types.go index 3a728f630..b0a2b4b1f 100644 --- a/pkg/relayer/config/types.go +++ b/pkg/relayer/config/types.go @@ -131,11 +131,8 @@ type RelayMinerConfig struct { Servers map[string]*RelayMinerServerConfig SmtStorePath string Ping *RelayMinerPingConfig - // TECH_DEBT(@jorgecuesta): should this be moved into a per-service validation config? since could easily follow - // the same pattern of eager validation, where the needs of a service does not apply to all services. - EnableOverServicing bool - - MiningSupervisorConfig *MiningSupervisorConfig + EnableOverServicing bool // TODO_IMPROVE(@jorgecuesta): Move this to per-service validation config because different services may have different needs. + MiningSupervisorConfig *MiningSupervisorConfig } // TODO_TECHDEBT(@red-0ne): Remove this structure altogether. See the discussion here for ref: diff --git a/pkg/relayer/metrics.go b/pkg/relayer/metrics.go index 458b664d3..71046e80c 100644 --- a/pkg/relayer/metrics.go +++ b/pkg/relayer/metrics.go @@ -182,7 +182,7 @@ var ( Subsystem: relayMinerProcess, Name: responseSizeBytes, Help: "Histogram of response sizes in bytes for performance analysis.", - Buckets: []float64{100, 500, 1000, 5000, 10000, 50000}, + Buckets: []float64{100, 500, 1_000, 5_000, 10_000, 50_000}, }, []string{"service_id"}) // RelayRequestSizeBytes is a histogram metric for observing request size distribution. @@ -194,7 +194,7 @@ var ( Subsystem: relayMinerProcess, Name: requestSizeBytes, Help: "Histogram of request sizes in bytes for performance analysis.", - Buckets: []float64{100, 500, 1000, 5000, 10000, 50000}, + Buckets: []float64{100, 500, 1_000, 5_000, 10_000, 50_000}, }, []string{"service_id"}) // DelayedRelayRequestValidationTotal is a Counter metric for tracking delayed validation occurrences. diff --git a/pkg/relayer/performance_tracker.go b/pkg/relayer/performance_tracker.go index 35f0bba9b..a4ba8921d 100644 --- a/pkg/relayer/performance_tracker.go +++ b/pkg/relayer/performance_tracker.go @@ -52,6 +52,9 @@ import ( // and measures seconds. // Instruction labels for metrics. +// +// When adding new instructions, maintain alphabetical order within each section +// and update the metrics documentation accordingly. const ( // --- General proxy sync flow --- @@ -104,7 +107,7 @@ type SpanRecord struct { // - false => Finish() enqueues to a lock-free buffer; caller must Flush() // // BufferCap: -// - Capacity for the MPMC queue when RecordOnFinish=false (default 4096). +// - Capacity for the MPMC queue when RecordOnFinish=false (default 4_096). // // ConstantLabels: // - Constant labels attached to each observation; a "service_id" key is @@ -146,7 +149,7 @@ type PerfTracker struct { // Derived contexts (WithTimeout/WithDeadline) inherit the value. func WithPerf(parent context.Context, opts PerfOptions) context.Context { if opts.BufferCap <= 0 { - opts.BufferCap = 4096 + opts.BufferCap = 4_096 } // Ensure a service_id label exists. if opts.ConstantLabels == nil { @@ -228,7 +231,7 @@ func EnsurePerf( func EnsurePerfBuffered(ctx context.Context, serviceID string) (context.Context, *PerfTracker, func()) { return EnsurePerf(ctx, serviceID, PerfOptions{ RecordOnFinish: false, - BufferCap: 2048, + BufferCap: 2_048, }) } diff --git a/pkg/relayer/performance_tracker_test.go b/pkg/relayer/performance_tracker_test.go index 3ceeee93b..dbee3ceb1 100644 --- a/pkg/relayer/performance_tracker_test.go +++ b/pkg/relayer/performance_tracker_test.go @@ -287,7 +287,7 @@ func TestEnsurePerfBuffered_ReusesExisting_NoOpFlush(t *testing.T) { func BenchmarkPerf_Instance_Buffered(b *testing.B) { ctx := WithPerfForService(context.Background(), "svc", PerfOptions{ RecordOnFinish: false, - BufferCap: 2048, + BufferCap: 2_048, }) tr, _ := FromCtx(ctx) @@ -303,7 +303,7 @@ func BenchmarkPerf_Instance_Buffered(b *testing.B) { func BenchmarkPerf_Span_Buffered(b *testing.B) { ctx := WithPerfForService(context.Background(), "svc", PerfOptions{ RecordOnFinish: false, - BufferCap: 2048, + BufferCap: 2_048, }) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -316,7 +316,7 @@ func BenchmarkPerf_Span_Buffered(b *testing.B) { func BenchmarkPerf_Pkg_Buffered(b *testing.B) { ctx := WithPerfForService(context.Background(), "svc", PerfOptions{ RecordOnFinish: false, - BufferCap: 2048, + BufferCap: 2_048, }) b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/pkg/relayer/proxy/http_server.go b/pkg/relayer/proxy/http_server.go index e80d9eae8..2779652be 100644 --- a/pkg/relayer/proxy/http_server.go +++ b/pkg/relayer/proxy/http_server.go @@ -99,7 +99,7 @@ func NewHTTPServer( blockClient client.BlockClient, sharedQueryClient client.SharedQueryClient, sessionQueryClient client.SessionQueryClient, - miningSup *RelayMiningSupervisor, + relayMiningSupervisor *RelayMiningSupervisor, ) relayer.RelayServer { // Create the HTTP server with comprehensive limits for security and stability. httpServer := &http.Server{ @@ -134,7 +134,7 @@ func NewHTTPServer( sharedQueryClient: sharedQueryClient, sessionQueryClient: sessionQueryClient, httpClient: httpClient, - miningSupervisor: miningSup, + miningSupervisor: relayMiningSupervisor, } } diff --git a/pkg/relayer/proxy/mining_supervisor.go b/pkg/relayer/proxy/mining_supervisor.go index a6f26ed79..241ddff7e 100644 --- a/pkg/relayer/proxy/mining_supervisor.go +++ b/pkg/relayer/proxy/mining_supervisor.go @@ -9,52 +9,63 @@ import ( "sync/atomic" "time" + "github.com/puzpuzpuz/xsync/v4" + "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/polylog" "github.com/pokt-network/poktroll/pkg/relayer" "github.com/pokt-network/poktroll/pkg/relayer/config" "github.com/pokt-network/poktroll/pkg/relayer/relay_authenticator" "github.com/pokt-network/poktroll/x/service/types" - "github.com/puzpuzpuz/xsync/v4" ) -// SessionCache stores fast-path per-session state. -// Rewardable starts true and is only ever downgraded to false. -type SessionCache struct { - EndHeight int64 - Rewardable atomic.Bool +// sessionCache stores fast-path per-session state. +type sessionCache struct { + // The end height of the session. + sessionEndHeight int64 + + // isRewardable starts as true and is only ever downgraded to false. + isRewardable atomic.Bool } -// RelayMiningSupervisor runs delayed validation/rewardability and forwards rewardable -// relays to the downstream "observable"/miner. It uses one bounded queue, a worker pool, -// and an xsync-backed session cache. +// RelayMiningSupervisor runs delayed relay validation and rewardability checks. +// It forwards rewardable relays to the downstream "observable" (i.e. miner). +// It uses one bounded queue, a worker pool, and an xsync-backed session cache. type RelayMiningSupervisor struct { - logger polylog.Logger - downstream chan<- *types.Relay // we don't own/close this + // downstreamMiner is the channel NOT owned by the supervisor that is responsible + // for the relay mining process. + // Once the supervisor has performed its duties in delayed validation & rewardability + // checks, it forwards the relay to the downstream miner. + downstreamMiner chan<- *types.Relay + + // logger is the logger for the supervisor. + logger polylog.Logger - queue chan *types.Relay // internal bounded queue + // Internal bounded queue + queue chan *types.Relay wg sync.WaitGroup + // ctx is the context for the supervisor. ctx context.Context cancel context.CancelFunc stopped atomic.Bool - // Options + // Options passed to the supervisor. dropOldest bool enqueueTimeout time.Duration gaugeSampleInterval time.Duration dropLogInterval time.Duration // Fast state - downstreamClosed atomic.Bool - lastDropLogNs atomic.Int64 // rate-limit downstream drop logs + downstreamMinerClosed atomic.Bool + lastDropLogNs atomic.Int64 // rate-limit downstream drop logs // Dependencies relayMeter relayer.RelayMeter relayAuthenticator relayer.RelayAuthenticator // Sessions: sessionID -> *SessionCache - knownSessions *xsync.Map[string, *SessionCache] + knownSessions *xsync.Map[string, *sessionCache] } // NewRelayMiningSupervisor creates a new instance and starts workers + gauge sampler. @@ -67,7 +78,8 @@ func NewRelayMiningSupervisor( relayAuthenticator relayer.RelayAuthenticator, ) *RelayMiningSupervisor { if downstream == nil { - // TECH_DEBT: should it panic? + // Panic is appropriate here since this is a programmer error (invalid constructor arg). + // TODO_FUTURE: Consider returning (nil, error) for better testability. panic("RelayMiningSupervisor: downstream channel must not be nil") } if cfg.QueueSize <= 0 { @@ -93,7 +105,7 @@ func NewRelayMiningSupervisor( s := &RelayMiningSupervisor{ logger: logger.With("component", "mining_supervisor"), - downstream: downstream, + downstreamMiner: downstream, queue: make(chan *types.Relay, cfg.QueueSize), ctx: ctx, cancel: cancel, @@ -101,7 +113,7 @@ func NewRelayMiningSupervisor( enqueueTimeout: cfg.EnqueueTimeout, gaugeSampleInterval: cfg.GaugeSampleInterval, dropLogInterval: cfg.DropLogInterval, - knownSessions: xsync.NewMap[string, *SessionCache](), + knownSessions: xsync.NewMap[string, *sessionCache](), relayMeter: relayMeter, relayAuthenticator: relayAuthenticator, } @@ -161,7 +173,7 @@ func (s *RelayMiningSupervisor) Publish(ctx context.Context, r *types.Relay) boo return false } - if s.stopped.Load() || s.ctx.Err() != nil || s.downstreamClosed.Load() { + if s.stopped.Load() || s.ctx.Err() != nil || s.downstreamMinerClosed.Load() { relayer.CaptureMiningQueueDropped(serviceIDOf(r), "stopped_or_closed") return false } @@ -337,8 +349,8 @@ func (s *RelayMiningSupervisor) processRelay(r *types.Relay) { if !ok { relayer.CaptureMiningQueueDropped(serviceID, "downstream_closed_or_full") - s.maybeLogDrop("downstream_full", serviceID, len(s.downstream), cap(s.downstream)) - s.downstreamClosed.Store(true) + s.maybeLogDrop("downstream_full", serviceID, len(s.downstreamMiner), cap(s.downstreamMiner)) + s.downstreamMinerClosed.Store(true) rewardable = ok // rollback to non-rewardable since we were not able to send downstream } } @@ -359,7 +371,7 @@ func (s *RelayMiningSupervisor) safeSendDownstream(r *types.Relay) (ok bool) { } }() select { - case s.downstream <- r: + case s.downstreamMiner <- r: return true default: return false // buffer full / no reader @@ -369,38 +381,48 @@ func (s *RelayMiningSupervisor) safeSendDownstream(r *types.Relay) (ok bool) { // --- Session cache (handler fast-path helpers) --- // GetSessionEntry returns cached state if present. -func (s *RelayMiningSupervisor) GetSessionEntry(sessionID string) (*SessionCache, bool) { +func (s *RelayMiningSupervisor) GetSessionEntry(sessionID string) (*sessionCache, bool) { return s.knownSessions.Load(sessionID) } // MarkSessionAsKnown inserts/updates a session as known; rewardable stays true unless previously downgraded. -func (s *RelayMiningSupervisor) MarkSessionAsKnown(sessionID string, endHeight int64) *SessionCache { +func (s *RelayMiningSupervisor) MarkSessionAsKnown(sessionID string, endHeight int64) *sessionCache { return s.upsertSessionState(sessionID, endHeight, true) } // MarkSessionAsNonRewardable permanently downgrades rewardability for a session. -func (s *RelayMiningSupervisor) MarkSessionAsNonRewardable(sessionID string) (*SessionCache, error) { +func (s *RelayMiningSupervisor) MarkSessionAsNonRewardable(sessionID string) (*sessionCache, error) { st, ok := s.knownSessions.Load(sessionID) if !ok { return nil, fmt.Errorf("session %q not found", sessionID) } - st.Rewardable.Store(false) + st.isRewardable.Store(false) return st, nil } // upsertSessionState inserts or updates session state and only downgrades rewardability. -func (s *RelayMiningSupervisor) upsertSessionState(sessionID string, endHeight int64, rewardable bool) *SessionCache { +// +// Note: This is intentionally optimistic. A TOCTOU race exists where: +// - Thread A loads session with Rewardable=true +// - Thread B downgrades to Rewardable=false +// - Thread A acts on stale Rewardable=true +// +// This is acceptable because: +// - Worst case: one extra relay is temporarily counted as rewardable +// - The mining supervisor will rollback via SetNonApplicableRelayReward +// - The atomic bool ensures eventual consistency +func (s *RelayMiningSupervisor) upsertSessionState(sessionID string, endHeight int64, rewardable bool) *sessionCache { if st, ok := s.knownSessions.Load(sessionID); ok { - if endHeight > st.EndHeight { - st.EndHeight = endHeight + if endHeight > st.sessionEndHeight { + st.sessionEndHeight = endHeight } if !rewardable { - st.Rewardable.Store(false) + st.isRewardable.Store(false) } return st } - st := &SessionCache{EndHeight: endHeight} - st.Rewardable.Store(rewardable) + st := &sessionCache{sessionEndHeight: endHeight} + st.isRewardable.Store(rewardable) s.knownSessions.Store(sessionID, st) return st } @@ -408,8 +430,8 @@ func (s *RelayMiningSupervisor) upsertSessionState(sessionID string, endHeight i // PruneOutdatedKnownSessions removes sessions whose EndHeight is before the current height (with +1 guard). // Call periodically (e.g., on new block events). Consider adding a grace window if late requests are common. func (s *RelayMiningSupervisor) PruneOutdatedKnownSessions(_ context.Context, block client.Block) { - s.knownSessions.Range(func(sessionID string, st *SessionCache) bool { - if st.EndHeight+1 < block.Height() { + s.knownSessions.Range(func(sessionID string, st *sessionCache) bool { + if st.sessionEndHeight+1 < block.Height() { s.knownSessions.Delete(sessionID) } return true diff --git a/pkg/relayer/proxy/relay_meter.go b/pkg/relayer/proxy/relay_meter.go index 0447766d1..fe661834d 100644 --- a/pkg/relayer/proxy/relay_meter.go +++ b/pkg/relayer/proxy/relay_meter.go @@ -71,7 +71,8 @@ type ProxyRelayMeter struct { logger polylog.Logger - // Per-session relay cost memoization: + // Per-session relay cost memoization to capture the cost of a single relay. + // It caches the cost of a single relay based on (sessionID, serviceID). // sessionID -> (serviceID -> coin) relayCostBySession *xsync.Map[string, *xsync.Map[string, cosmostypes.Coin]] } @@ -102,55 +103,11 @@ func NewRelayMeter(deps depinject.Config, enableOverServicing bool) (relayer.Rel func (rmtr *ProxyRelayMeter) Start(ctx context.Context) error { // Listen to new blocks and reset the relay meter application stakes every new session. committedBlocksSequence := rmtr.blockQuerier.CommittedBlocksSequence(ctx) - channel.ForEach(ctx, committedBlocksSequence, rmtr.forEachNewBlockFn) + channel.ForEach(ctx, committedBlocksSequence, rmtr.clearOnEachNewBlockFn) return nil } -// relayCostFor returns the per-relay cost for (sessionID, serviceID), caching -// the result for the lifetime of the session. -func (rmtr *ProxyRelayMeter) relayCostFor( - ctx context.Context, - sessionID string, - serviceID string, -) (cosmostypes.Coin, error) { - // Fast path: find or create the per-session submap - svcMap, ok := rmtr.relayCostBySession.Load(sessionID) - if !ok { - // Create a new submap; a benign race here is fine - newMap := xsync.NewMap[string, cosmostypes.Coin]() - if actual, loaded := rmtr.relayCostBySession.LoadOrStore(sessionID, newMap); loaded { - svcMap = actual - } else { - svcMap = newMap - } - } - - // Fast path: per-service coin - if coin, ok := svcMap.Load(serviceID); ok { - return coin, nil - } - - // Slow path: compute once, then store - sharedParams, err := rmtr.sharedQuerier.GetParams(ctx) - if err != nil { - return cosmostypes.Coin{}, err - } - - service, err := rmtr.serviceQuerier.GetService(ctx, serviceID) - if err != nil { - return cosmostypes.Coin{}, err - } - - coin, err := getSingleRelayCostCoin(sharedParams, &service) - if err != nil { - return cosmostypes.Coin{}, err - } - - svcMap.Store(serviceID, coin) - return coin, nil -} - // IsOverServicing returns whether the relay would result in over-servicing the application. // // It returns true if serving this relay would exceed the application's allocated stake @@ -178,7 +135,7 @@ func (rmtr *ProxyRelayMeter) IsOverServicing( return false } - relayCostCoin, err := rmtr.relayCostFor( + relayCostCoin, err := rmtr.getCachedRelayCost( ctx, reqMeta.GetSessionHeader().GetSessionId(), reqMeta.SessionHeader.ServiceId, @@ -238,7 +195,7 @@ func (rmtr *ProxyRelayMeter) SetNonApplicableRelayReward(ctx context.Context, re return } - relayCost, err := rmtr.relayCostFor( + relayCost, err := rmtr.getCachedRelayCost( ctx, reqMeta.GetSessionHeader().GetSessionId(), reqMeta.SessionHeader.ServiceId, @@ -285,32 +242,33 @@ func (rmtr *ProxyRelayMeter) AllowOverServicing() bool { return rmtr.overServicingEnabled } -// forEachNewBlockFn is a callback function that is called every time a new block is committed. -// It resets the relay meter's application stakes every new session so that new -// application stakes can be metered. -func (rmtr *ProxyRelayMeter) forEachNewBlockFn(ctx context.Context, block client.Block) { +// clearOnEachNewBlockFn is a callback function that is called every time a new block is committed. +// It resets cached data in the relay meter. +func (rmtr *ProxyRelayMeter) clearOnEachNewBlockFn(ctx context.Context, block client.Block) { // Fast path: nothing to prune. if rmtr.sessionToRelayMeterMap.Size() == 0 && rmtr.relayCostBySession.Size() == 0 { return } + // Fetch shared params once. sharedParams, err := rmtr.sharedQuerier.GetParams(ctx) if err != nil { return } - rmtr.sessionToRelayMeterMap.Range(func(sessionID string, meter *sessionRelayMeter) bool { - claimOpen := sharedtypes.GetClaimWindowOpenHeight( - sharedParams, - meter.sessionHeader.GetSessionEndBlockHeight(), - ) - if block.Height() >= claimOpen { - // Drop both: meter state + per-session relay-cost memoization. - rmtr.sessionToRelayMeterMap.Delete(sessionID) - rmtr.relayCostBySession.Delete(sessionID) - } - return true - }) + rmtr.sessionToRelayMeterMap.Range( + func(sessionID string, meter *sessionRelayMeter) bool { + sessionClaimOpenHeight := sharedtypes.GetClaimWindowOpenHeight( + sharedParams, + meter.sessionHeader.GetSessionEndBlockHeight(), + ) + if block.Height() >= sessionClaimOpenHeight { + // Drop both: meter state + per-session relay-cost memoization. + rmtr.sessionToRelayMeterMap.Delete(sessionID) + rmtr.relayCostBySession.Delete(sessionID) + } + return true // continue + }) } // ensureRequestSessionRelayMeter ensures that the relay miner has a relay meter @@ -343,31 +301,31 @@ func (rmtr *ProxyRelayMeter) ensureRequestSessionRelayMeter(ctx context.Context, ) } + // calculate the max amount of stake the application can consume in the current session. sharedParams, err := rmtr.sharedQuerier.GetParams(ctx) if err != nil { return nil, err } - sessionParams, err := rmtr.sessionQuerier.GetParams(ctx) if err != nil { return nil, err } - - // calculate the max amount of stake the application can consume in the current session. - supplierAppStake := getAppStakePortionPayableToSessionSupplier( + maxAppPerSupplierUsageCoin := getAppStakePortionPayableToSessionSupplier( app.GetStake(), sharedParams, sessionParams.GetNumSuppliersPerSession(), ) + // Prepare a new relay meter for the particular app for this session. relayMeter := &sessionRelayMeter{ app: app, consumedCoin: cosmostypes.NewInt64Coin(pocket.DenomuPOKT, 0), - maxCoin: supplierAppStake, + maxCoin: maxAppPerSupplierUsageCoin, sessionHeader: reqMeta.SessionHeader, } - // Try to publish; if someone beat us, reuse theirs. + // Try to store the new relay meter. + // If another thread has already done it, reuse theirs. if existing, loaded := rmtr.sessionToRelayMeterMap.LoadOrStore(sessionId, relayMeter); loaded { return existing, nil } @@ -375,6 +333,48 @@ func (rmtr *ProxyRelayMeter) ensureRequestSessionRelayMeter(ctx context.Context, return relayMeter, nil } +// getCachedRelayCost returns the per-relay cost for a single relay. +// It caches the result for the lifetime of the session inside the relay meter. +func (rmtr *ProxyRelayMeter) getCachedRelayCost( + ctx context.Context, + sessionID string, + serviceID string, +) (cosmostypes.Coin, error) { + // Fast path: find or create the per-session submap + svcToRelayCostMap, ok := rmtr.relayCostBySession.Load(sessionID) + if !ok { + // Create a new submap; a benign race here is fine + newMap := xsync.NewMap[string, cosmostypes.Coin]() + if cachedMap, loaded := rmtr.relayCostBySession.LoadOrStore(sessionID, newMap); loaded { + svcToRelayCostMap = cachedMap + } else { + svcToRelayCostMap = newMap + } + } + + // Fast path: per-service relayCost is cached and can be returned immediately. + if relayCost, ok := svcToRelayCostMap.Load(serviceID); ok { + return relayCost, nil + } + + // Slow path: per-service relayCost is not cached, so we need to compute it. + sharedParams, err := rmtr.sharedQuerier.GetParams(ctx) + if err != nil { + return cosmostypes.Coin{}, err + } + service, err := rmtr.serviceQuerier.GetService(ctx, serviceID) + if err != nil { + return cosmostypes.Coin{}, err + } + relayCost, err := getSingleRelayCostCoin(sharedParams, &service) + if err != nil { + return cosmostypes.Coin{}, err + } + + svcToRelayCostMap.Store(serviceID, relayCost) + return relayCost, nil +} + // getSingleRelayCostCoin returns the cost of a relay based on the shared parameters and the service. // // relayCost = diff --git a/pkg/relayer/proxy/server_builder.go b/pkg/relayer/proxy/server_builder.go index 38178696b..4a692dd32 100644 --- a/pkg/relayer/proxy/server_builder.go +++ b/pkg/relayer/proxy/server_builder.go @@ -42,8 +42,9 @@ func (rp *relayerProxy) BuildProvidedServices(ctx context.Context) error { // initializeProxyServers initializes the proxy servers for each server config. func (rp *relayerProxy) initializeProxyServers() (proxyServerMap map[string]relayer.RelayServer, err error) { - // build miner supervisor which should be the same for all servers - miningSup := NewRelayMiningSupervisor( + // Build miner supervisor. + // The same miner supervisor should be used for all servers. + relayMiningSupervisor := NewRelayMiningSupervisor( rp.logger, rp.servedRelaysPublishCh, rp.miningSupervisorConfig, @@ -51,8 +52,6 @@ func (rp *relayerProxy) initializeProxyServers() (proxyServerMap map[string]rela rp.relayAuthenticator, ) - // Build a map of serviceId -> service for the supplier's advertised services - // Build a map of listenAddress -> RelayServer for each server defined in the config file servers := make(map[string]relayer.RelayServer) @@ -78,7 +77,7 @@ func (rp *relayerProxy) initializeProxyServers() (proxyServerMap map[string]rela rp.blockClient, rp.sharedQuerier, rp.sessionQuerier, - miningSup, + relayMiningSupervisor, ) default: return nil, ErrRelayerProxyUnsupportedTransportType diff --git a/pkg/relayer/proxy/sync.go b/pkg/relayer/proxy/sync.go index 2ca0d7a5f..edee6eb43 100644 --- a/pkg/relayer/proxy/sync.go +++ b/pkg/relayer/proxy/sync.go @@ -10,7 +10,6 @@ import ( "strings" "time" - //"github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/client/query" "github.com/pokt-network/poktroll/pkg/polylog" "github.com/pokt-network/poktroll/pkg/relayer" @@ -117,7 +116,7 @@ func (server *relayMinerHTTPServer) serveSyncRequest( } tr.Start(relayer.InstructionProxySyncCheckSessionIsRewardable) - sessionIsRewardable := sessionCacheEntry.Rewardable.Load() + sessionIsRewardable := sessionCacheEntry.isRewardable.Load() tr.Finish(relayer.InstructionProxySyncCheckSessionIsRewardable) if !sessionIsRewardable { return relayReq, ErrRelayerProxyRateLimited @@ -292,7 +291,7 @@ func (server *relayMinerHTTPServer) serveSyncRequest( logger.Warn().Msg("mining channel full - dropping (protect tail)") } } - tr.Start(relayer.InstructionProxySyncEagerCheckRewardApplicability) + tr.Finish(relayer.InstructionProxySyncEagerCheckRewardApplicability) return relayReq, nil }