diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index f5748c2e..05514f1b 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -22,6 +22,8 @@ import ( "github.com/sethvargo/go-limiter/memorystore" grpcOpts "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/status" "github.com/onflow/flow-evm-gateway/api" @@ -41,13 +43,13 @@ const ( // DefaultMaxMessageSize is the default maximum message size for gRPC responses DefaultMaxMessageSize = 1024 * 1024 * 1024 - // DefaultResourceExhaustedRetryDelay is the default delay between retries when the server returns - // a ResourceExhausted error. - DefaultResourceExhaustedRetryDelay = 100 * time.Millisecond + // DefaultRetryDelay is the default delay between retries when a gRPC request + // to one of the Access Nodes has errored out. + DefaultRetryDelay = 100 * time.Millisecond - // DefaultResourceExhaustedMaxRetryDelay is the default max request duration when retrying server - // ResourceExhausted errors. - DefaultResourceExhaustedMaxRetryDelay = 30 * time.Second + // DefaultMaxRetryDelay is the default max request duration when retrying failed + // gRPC requests to one of the Access Nodes. + DefaultMaxRetryDelay = 30 * time.Second ) type Storages struct { @@ -493,16 +495,59 @@ func StartEngine( // setupCrossSporkClient sets up a cross-spork AN client. func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*requester.CrossSporkClient, error) { // create access client with cross-spork capabilities - currentSporkClient, err := grpc.NewClient( - config.AccessNodeHost, - grpc.WithGRPCDialOptions( - grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)), - grpcOpts.WithUnaryInterceptor(retryInterceptor( - DefaultResourceExhaustedMaxRetryDelay, - DefaultResourceExhaustedRetryDelay, - )), - ), - ) + var currentSporkClient *grpc.Client + var err error + + if len(config.AccessNodeBackupHosts) > 0 { + mr := manual.NewBuilderWithScheme("dns") + defer mr.Close() + + // `pick_first` tries to connect to the first address, uses it for all RPCs + // if it connects, or try the next address if it fails + // (and keep doing that until one connection is successful). + // Because of this, all the RPCs will be sent to the same backend. See more on: + // https://github.com/grpc/grpc-go/tree/master/examples/features/load_balancing#pick_first + json := `{"loadBalancingConfig": [{"pick_first":{}}]}` + endpoints := []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: config.AccessNodeHost}}}, + } + + for _, accessNodeBackupAddr := range config.AccessNodeBackupHosts { + endpoints = append(endpoints, resolver.Endpoint{ + Addresses: []resolver.Address{{Addr: accessNodeBackupAddr}}, + }) + } + + mr.InitialState(resolver.State{ + Endpoints: endpoints, + }) + + targetHost := fmt.Sprintf("%s:///%s", mr.Scheme(), "flow-access") + currentSporkClient, err = grpc.NewClient( + targetHost, + grpc.WithGRPCDialOptions( + grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)), + grpcOpts.WithResolvers(mr), + grpcOpts.WithDefaultServiceConfig(json), + grpcOpts.WithUnaryInterceptor(retryInterceptor( + DefaultMaxRetryDelay, + DefaultRetryDelay, + )), + ), + ) + } else { + currentSporkClient, err = grpc.NewClient( + config.AccessNodeHost, + grpc.WithGRPCDialOptions( + grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)), + grpcOpts.WithUnaryInterceptor(retryInterceptor( + DefaultMaxRetryDelay, + DefaultRetryDelay, + )), + ), + ) + } + if err != nil { return nil, fmt.Errorf( "failed to create client connection for host: %s, with error: %w", @@ -555,7 +600,16 @@ func retryInterceptor(maxDuration, pauseDuration time.Duration) grpcOpts.UnaryCl return nil } - if status.Code(err) != codes.ResourceExhausted { + switch status.Code(err) { + case codes.Canceled, codes.DeadlineExceeded: + // these kind of errors are guaranteed to fail all requests, + // if the source was a local context + return err + case codes.ResourceExhausted, codes.OutOfRange, codes.NotFound: + // when we receive these errors, we pause briefly, so that + // the next request on the same AN, has a higher chance + // of success. + default: return err } diff --git a/cmd/run/cmd.go b/cmd/run/cmd.go index 00274cc7..b93086de 100644 --- a/cmd/run/cmd.go +++ b/cmd/run/cmd.go @@ -201,6 +201,16 @@ func parseConfigFromFlags() error { } cfg.FilterExpiry = exp + if accessNodeBackupHosts != "" { + rawHosts := strings.Split(accessNodeBackupHosts, ",") + cfg.AccessNodeBackupHosts = make([]string, 0, len(rawHosts)) + for _, host := range rawHosts { + if trimmed := strings.TrimSpace(host); trimmed != "" { + cfg.AccessNodeBackupHosts = append(cfg.AccessNodeBackupHosts, trimmed) + } + } + } + if accessSporkHosts != "" { heightHosts := strings.Split(accessSporkHosts, ",") cfg.AccessNodePreviousSporkHosts = append(cfg.AccessNodePreviousSporkHosts, heightHosts...) @@ -242,6 +252,7 @@ var ( logWriter, filterExpiry, accessSporkHosts, + accessNodeBackupHosts, cloudKMSKey, cloudKMSProjectID, cloudKMSLocationID, @@ -259,6 +270,7 @@ func init() { Cmd.Flags().IntVar(&cfg.RPCPort, "rpc-port", 8545, "Port for the RPC API server") Cmd.Flags().BoolVar(&cfg.WSEnabled, "ws-enabled", false, "Enable websocket connections") Cmd.Flags().StringVar(&cfg.AccessNodeHost, "access-node-grpc-host", "localhost:3569", "Host to the flow access node gRPC API") + Cmd.Flags().StringVar(&accessNodeBackupHosts, "access-node-backup-hosts", "", `Backup AN hosts to use in case of connectivity issues, defined following the schema: {host1},{host2} as a comma separated list (e.g. "host-1.com,host2.com")`) Cmd.Flags().StringVar(&accessSporkHosts, "access-node-spork-hosts", "", `Previous spork AN hosts, defined following the schema: {host1},{host2} as a comma separated list (e.g. "host-1.com,host2.com")`) Cmd.Flags().StringVar(&flowNetwork, "flow-network-id", "flow-emulator", "Flow network ID (flow-emulator, flow-previewnet, flow-testnet, flow-mainnet)") Cmd.Flags().StringVar(&coinbase, "coinbase", "", "Coinbase address to use for fee collection") diff --git a/config/config.go b/config/config.go index 42529a1e..c094296f 100644 --- a/config/config.go +++ b/config/config.go @@ -46,6 +46,9 @@ type Config struct { DatabaseDir string // AccessNodeHost defines the current spork Flow network AN host. AccessNodeHost string + // AccessNodeBackupHosts contains a list of ANs hosts to use as backup, in + // case of connectivity issues with `AccessNodeHost`. + AccessNodeBackupHosts []string // AccessNodePreviousSporkHosts contains a list of the ANs hosts for each spork AccessNodePreviousSporkHosts []string // GRPCPort for the RPC API server diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index ed86cd27..cf85afbe 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -224,7 +224,7 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha // we can get not found when reconnecting after a disconnect/restart before the // next block is finalized. just wait briefly and try again time.Sleep(200 * time.Millisecond) - case codes.DeadlineExceeded, codes.Internal: + case codes.DeadlineExceeded, codes.Internal, codes.Unavailable: // these are sometimes returned when the stream is disconnected by a middleware or the server default: // skip reconnect on all other errors @@ -232,15 +232,30 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha return } - if err := connect(lastReceivedHeight + 1); err != nil { - eventsChan <- models.NewBlockEventsError( - fmt.Errorf( - "failed to resubscribe for events on height: %d, with: %w", - lastReceivedHeight+1, - err, - ), - ) - return + start := time.Now() + attempts := 0 + pauseDuration, maxDuration := 200*time.Millisecond, 30*time.Second + // Allow reconnect retries for up to 30 seconds, with retry + // attempts every 200 ms. + for { + err := connect(lastReceivedHeight) + if err == nil { + break + } + + attempts++ + duration := time.Since(start) + if duration >= maxDuration { + eventsChan <- models.NewBlockEventsError( + fmt.Errorf( + "failed to resubscribe for events on height: %d, with: %w", + lastReceivedHeight, + err, + ), + ) + return + } + time.Sleep(pauseDuration) } } } diff --git a/tests/helpers.go b/tests/helpers.go index 017b8578..f9eb8dab 100644 --- a/tests/helpers.go +++ b/tests/helpers.go @@ -67,23 +67,18 @@ func testLogWriter() io.Writer { return zerolog.NewConsoleWriter() } -func startEmulator(createTestAccounts bool) (*server.EmulatorServer, error) { +func defaultServerConfig() *server.Config { pkey, err := crypto.DecodePrivateKeyHex(sigAlgo, servicePrivateKey) if err != nil { - return nil, err + panic(err) } genesisToken, err := cadence.NewUFix64("10000.0") if err != nil { - return nil, err + panic(err) } - log := logger.With().Timestamp().Str("component", "emulator").Logger().Level(zerolog.DebugLevel) - if logOutput == "false" { - log = zerolog.Nop() - } - - srv := server.NewEmulatorServer(&log, &server.Config{ + return &server.Config{ ServicePrivateKey: pkey, ServiceKeySigAlgo: sigAlgo, ServiceKeyHashAlgo: hashAlgo, @@ -94,7 +89,19 @@ func startEmulator(createTestAccounts bool) (*server.EmulatorServer, error) { TransactionMaxGasLimit: flow.DefaultMaxTransactionGasLimit, SetupEVMEnabled: true, SetupVMBridgeEnabled: true, - }) + } +} + +func startEmulator(createTestAccounts bool, conf *server.Config) ( + *server.EmulatorServer, + error, +) { + log := logger.With().Timestamp().Str("component", "emulator").Logger().Level(zerolog.DebugLevel) + if logOutput == "false" { + log = zerolog.Nop() + } + + srv := server.NewEmulatorServer(&log, conf) go func() { srv.Start() @@ -133,7 +140,7 @@ func runWeb3TestWithSetup( // servicesSetup starts up an emulator and the gateway // engines required for operation of the evm gateway. func servicesSetup(t *testing.T) (emulator.Emulator, func()) { - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) diff --git a/tests/integration_test.go b/tests/integration_test.go index 79508ed4..7142eaa6 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -30,7 +30,7 @@ import ( // This is using the TxSealValidation mechanism for validating submitted // transactions, which blocks until it receives the Flow transaction result. func Test_ConcurrentTransactionSubmissionWithTxSeal(t *testing.T) { - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -141,7 +141,7 @@ func Test_ConcurrentTransactionSubmissionWithTxSeal(t *testing.T) { // This is using the LocalIndexValidation mechanism for validating submitted // transactions, which is non-blocking and validates based on the local state. func Test_ConcurrentTransactionSubmissionWithLocalIndex(t *testing.T) { - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -246,7 +246,7 @@ func Test_ConcurrentTransactionSubmissionWithLocalIndex(t *testing.T) { } func Test_EthClientTest(t *testing.T) { - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -325,7 +325,7 @@ func Test_CloudKMSConcurrentTransactionSubmission(t *testing.T) { t.Skip() } - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -449,7 +449,7 @@ func Test_CloudKMSConcurrentTransactionSubmission(t *testing.T) { // 2. No transactions are lost // 3. The state remains consistent func Test_ForceStartHeightIdempotency(t *testing.T) { - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -592,3 +592,120 @@ func Test_ForceStartHeightIdempotency(t *testing.T) { return true }, time.Second*15, time.Second*1, "all transactions were not executed") } + +// Test_AccessNodeBackupFunctionality verifies that the specified AccessNode +// backup hosts, are used when the primary `AccessNodeHost` is unavailable +// for whatever reason. +func Test_AccessNodeBackupFunctionality(t *testing.T) { + srv, err := startEmulator(true, defaultServerConfig()) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + srv.Stop() + }() + + backupConfg := defaultServerConfig() + backupConfg.GRPCPort = 3599 + backupConfg.RESTPort = 9999 + backupConfg.AdminPort = 9090 + backupConfg.DebuggerPort = 3456 + backupSrv, err := startEmulator(true, backupConfg) + require.NoError(t, err) + + backupCtx, backupCancel := context.WithCancel(context.Background()) + defer func() { + backupCancel() + backupSrv.Stop() + }() + + grpcHost := "localhost:3569" + emu := srv.Emulator() + service := emu.ServiceKey() + + client, err := grpc.NewClient(grpcHost) + require.NoError(t, err) + + time.Sleep(500 * time.Millisecond) // some time to startup + + // create new account with keys used for key-rotation + keyCount := 5 + createdAddr, privateKey, err := bootstrap.CreateMultiKeyAccount( + client, + keyCount, + service.Address, + sc.FungibleToken.Address.HexWithPrefix(), + sc.FlowToken.Address.HexWithPrefix(), + service.PrivateKey, + ) + require.NoError(t, err) + + backupAccessNodeHost := fmt.Sprintf("localhost:%d", backupConfg.GRPCPort) + + cfg := config.Config{ + DatabaseDir: t.TempDir(), + AccessNodeHost: grpcHost, + AccessNodeBackupHosts: []string{backupAccessNodeHost}, + RPCPort: 8545, + RPCHost: "127.0.0.1", + FlowNetworkID: "flow-emulator", + EVMNetworkID: types.FlowEVMPreviewNetChainID, + Coinbase: eoaTestAccount, + COAAddress: *createdAddr, + COAKey: privateKey, + GasPrice: new(big.Int).SetUint64(0), + EnforceGasPrice: true, + LogLevel: zerolog.DebugLevel, + LogWriter: testLogWriter(), + TxStateValidation: config.LocalIndexValidation, + } + + boot, err := bootstrap.New(cfg) + require.NoError(t, err) + defer func() { + // Stop the EVM GW service + boot.Stop() + }() + + ready := make(chan struct{}) + go func() { + err = boot.Run(ctx, cfg, func() { + close(ready) + }) + require.NoError(t, err) + }() + + <-ready + + time.Sleep(3 * time.Second) // some time to startup + + ethClient, err := ethclient.Dial("http://127.0.0.1:8545") + require.NoError(t, err) + + // This endpoint (`eth_syncing`), will make the following gRPC call, + // `ExecuteScriptAtLatestBlock`. This gRPC call is served by the + // first Emulator process, that is configured as the `AccessNodeHost`. + _, err = ethClient.SyncProgress(ctx) + require.NoError(t, err) + + // Shutdown the first Emulator process, that is configured as the + // `AccessNodeHost` + cancel() + srv.Stop() + + // This endpoint (`eth_syncing`), will make the following gRPC call, + // `ExecuteScriptAtLatestBlock`. This gRPC call is served by the + // first-available AccessNode specified in `AccessNodeBackupHosts`. + // In this E2E test, that would be the second Emulator process. + assert.Eventually( + t, + func() bool { + _, err := ethClient.SyncProgress(backupCtx) + return err == nil + }, + time.Second*5, + time.Millisecond*500, + "backup AN should serve requests after primary shutdown", + ) +} diff --git a/tests/key_store_release_test.go b/tests/key_store_release_test.go index dcc77f4d..15fd5938 100644 --- a/tests/key_store_release_test.go +++ b/tests/key_store_release_test.go @@ -20,7 +20,7 @@ import ( ) func Test_KeyStoreSigningKeysRelease(t *testing.T) { - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) diff --git a/tests/tx_batching_test.go b/tests/tx_batching_test.go index d3d9f87d..a753da0c 100644 --- a/tests/tx_batching_test.go +++ b/tests/tx_batching_test.go @@ -515,7 +515,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) { } func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) { - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background())