From e02fe9ecdf6647fcbdf1b8053e4beccc64a76bf3 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Fri, 26 Sep 2025 16:12:57 +0300 Subject: [PATCH 01/12] Allow adding back-up AN hosts to be used for client-side request load balancing --- bootstrap/bootstrap.go | 59 +++++++++++++++++++++++++++++++++++------- cmd/run/cmd.go | 6 +++++ config/config.go | 3 +++ 3 files changed, 58 insertions(+), 10 deletions(-) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index f5748c2e..5ee60026 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -22,6 +22,8 @@ import ( "github.com/sethvargo/go-limiter/memorystore" grpcOpts "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/status" "github.com/onflow/flow-evm-gateway/api" @@ -493,16 +495,53 @@ func StartEngine( // setupCrossSporkClient sets up a cross-spork AN client. func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*requester.CrossSporkClient, error) { // create access client with cross-spork capabilities - currentSporkClient, err := grpc.NewClient( - config.AccessNodeHost, - grpc.WithGRPCDialOptions( - grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)), - grpcOpts.WithUnaryInterceptor(retryInterceptor( - DefaultResourceExhaustedMaxRetryDelay, - DefaultResourceExhaustedRetryDelay, - )), - ), - ) + var currentSporkClient *grpc.Client + var err error + + if len(config.AccessNodeBackupHosts) > 0 { + mr := manual.NewBuilderWithScheme("dns") + defer mr.Close() + + json := `{"loadBalancingConfig": [{"pick_first":{}}]}` + endpoints := []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: config.AccessNodeHost}}}, + } + + for _, accessNodeBackupAddr := range config.AccessNodeBackupHosts { + endpoints = append(endpoints, resolver.Endpoint{ + Addresses: []resolver.Address{{Addr: accessNodeBackupAddr}}, + }) + } + + mr.InitialState(resolver.State{ + Endpoints: endpoints, + }) + + currentSporkClient, err = grpc.NewClient( + mr.Scheme(), + grpc.WithGRPCDialOptions( + grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)), + grpcOpts.WithResolvers(mr), + grpcOpts.WithDefaultServiceConfig(json), + grpcOpts.WithUnaryInterceptor(retryInterceptor( + DefaultResourceExhaustedMaxRetryDelay, + DefaultResourceExhaustedRetryDelay, + )), + ), + ) + } else { + currentSporkClient, err = grpc.NewClient( + config.AccessNodeHost, + grpc.WithGRPCDialOptions( + grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)), + grpcOpts.WithUnaryInterceptor(retryInterceptor( + DefaultResourceExhaustedMaxRetryDelay, + DefaultResourceExhaustedRetryDelay, + )), + ), + ) + } + if err != nil { return nil, fmt.Errorf( "failed to create client connection for host: %s, with error: %w", diff --git a/cmd/run/cmd.go b/cmd/run/cmd.go index 00274cc7..298eeaa7 100644 --- a/cmd/run/cmd.go +++ b/cmd/run/cmd.go @@ -201,6 +201,10 @@ func parseConfigFromFlags() error { } cfg.FilterExpiry = exp + if accessNodeBackupHosts != "" { + cfg.AccessNodeBackupHosts = strings.Split(accessNodeBackupHosts, ",") + } + if accessSporkHosts != "" { heightHosts := strings.Split(accessSporkHosts, ",") cfg.AccessNodePreviousSporkHosts = append(cfg.AccessNodePreviousSporkHosts, heightHosts...) @@ -242,6 +246,7 @@ var ( logWriter, filterExpiry, accessSporkHosts, + accessNodeBackupHosts, cloudKMSKey, cloudKMSProjectID, cloudKMSLocationID, @@ -259,6 +264,7 @@ func init() { Cmd.Flags().IntVar(&cfg.RPCPort, "rpc-port", 8545, "Port for the RPC API server") Cmd.Flags().BoolVar(&cfg.WSEnabled, "ws-enabled", false, "Enable websocket connections") Cmd.Flags().StringVar(&cfg.AccessNodeHost, "access-node-grpc-host", "localhost:3569", "Host to the flow access node gRPC API") + Cmd.Flags().StringVar(&accessNodeBackupHosts, "access-node-backup-hosts", "", `Backup AN hosts to use in case of connectivity issues, defined following the schema: {host1},{host2} as a comma separated list (e.g. "host-1.com,host2.com")`) Cmd.Flags().StringVar(&accessSporkHosts, "access-node-spork-hosts", "", `Previous spork AN hosts, defined following the schema: {host1},{host2} as a comma separated list (e.g. "host-1.com,host2.com")`) Cmd.Flags().StringVar(&flowNetwork, "flow-network-id", "flow-emulator", "Flow network ID (flow-emulator, flow-previewnet, flow-testnet, flow-mainnet)") Cmd.Flags().StringVar(&coinbase, "coinbase", "", "Coinbase address to use for fee collection") diff --git a/config/config.go b/config/config.go index 42529a1e..c094296f 100644 --- a/config/config.go +++ b/config/config.go @@ -46,6 +46,9 @@ type Config struct { DatabaseDir string // AccessNodeHost defines the current spork Flow network AN host. AccessNodeHost string + // AccessNodeBackupHosts contains a list of ANs hosts to use as backup, in + // case of connectivity issues with `AccessNodeHost`. + AccessNodeBackupHosts []string // AccessNodePreviousSporkHosts contains a list of the ANs hosts for each spork AccessNodePreviousSporkHosts []string // GRPCPort for the RPC API server From 21dde48b7f9603eb2735ec585fa56e156db06694 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Fri, 26 Sep 2025 16:36:53 +0300 Subject: [PATCH 02/12] The retryInterceptor should retry requests on any kind of error --- bootstrap/bootstrap.go | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 5ee60026..8e811fed 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -21,10 +21,8 @@ import ( "github.com/rs/zerolog" "github.com/sethvargo/go-limiter/memorystore" grpcOpts "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" - "google.golang.org/grpc/status" "github.com/onflow/flow-evm-gateway/api" "github.com/onflow/flow-evm-gateway/config" @@ -43,13 +41,13 @@ const ( // DefaultMaxMessageSize is the default maximum message size for gRPC responses DefaultMaxMessageSize = 1024 * 1024 * 1024 - // DefaultResourceExhaustedRetryDelay is the default delay between retries when the server returns - // a ResourceExhausted error. - DefaultResourceExhaustedRetryDelay = 100 * time.Millisecond + // DefaultRetryDelay is the default delay between retries when a gRPC request + // to one of the Access Nodes has errored out. + DefaultRetryDelay = 100 * time.Millisecond - // DefaultResourceExhaustedMaxRetryDelay is the default max request duration when retrying server - // ResourceExhausted errors. - DefaultResourceExhaustedMaxRetryDelay = 30 * time.Second + // DefaultMaxRetryDelay is the default max request duration when retrying failed + // gRPC requests to one of the Access Nodes. + DefaultMaxRetryDelay = 30 * time.Second ) type Storages struct { @@ -524,8 +522,8 @@ func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*reques grpcOpts.WithResolvers(mr), grpcOpts.WithDefaultServiceConfig(json), grpcOpts.WithUnaryInterceptor(retryInterceptor( - DefaultResourceExhaustedMaxRetryDelay, - DefaultResourceExhaustedRetryDelay, + DefaultMaxRetryDelay, + DefaultRetryDelay, )), ), ) @@ -535,8 +533,8 @@ func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*reques grpc.WithGRPCDialOptions( grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)), grpcOpts.WithUnaryInterceptor(retryInterceptor( - DefaultResourceExhaustedMaxRetryDelay, - DefaultResourceExhaustedRetryDelay, + DefaultMaxRetryDelay, + DefaultRetryDelay, )), ), ) @@ -594,10 +592,6 @@ func retryInterceptor(maxDuration, pauseDuration time.Duration) grpcOpts.UnaryCl return nil } - if status.Code(err) != codes.ResourceExhausted { - return err - } - attempts++ duration := time.Since(start) if duration >= maxDuration { From bf53e015d6f02655c083969b253f25ef46faa461 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Fri, 26 Sep 2025 16:51:45 +0300 Subject: [PATCH 03/12] Allow EventSubscriber to re-connect on codes.Unavailable errors --- services/ingestion/event_subscriber.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index ed86cd27..163bce73 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -224,7 +224,7 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha // we can get not found when reconnecting after a disconnect/restart before the // next block is finalized. just wait briefly and try again time.Sleep(200 * time.Millisecond) - case codes.DeadlineExceeded, codes.Internal: + case codes.DeadlineExceeded, codes.Internal, codes.Unavailable: // these are sometimes returned when the stream is disconnected by a middleware or the server default: // skip reconnect on all other errors From 390e1b56c880eeecc502a9d677b5d73bd896a351 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 29 Sep 2025 12:21:42 +0300 Subject: [PATCH 04/12] Add E2E tests for AN backup hosts switch functionality --- tests/helpers.go | 29 +++++--- tests/integration_test.go | 119 ++++++++++++++++++++++++++++++-- tests/key_store_release_test.go | 2 +- tests/tx_batching_test.go | 2 +- 4 files changed, 134 insertions(+), 18 deletions(-) diff --git a/tests/helpers.go b/tests/helpers.go index 017b8578..f9eb8dab 100644 --- a/tests/helpers.go +++ b/tests/helpers.go @@ -67,23 +67,18 @@ func testLogWriter() io.Writer { return zerolog.NewConsoleWriter() } -func startEmulator(createTestAccounts bool) (*server.EmulatorServer, error) { +func defaultServerConfig() *server.Config { pkey, err := crypto.DecodePrivateKeyHex(sigAlgo, servicePrivateKey) if err != nil { - return nil, err + panic(err) } genesisToken, err := cadence.NewUFix64("10000.0") if err != nil { - return nil, err + panic(err) } - log := logger.With().Timestamp().Str("component", "emulator").Logger().Level(zerolog.DebugLevel) - if logOutput == "false" { - log = zerolog.Nop() - } - - srv := server.NewEmulatorServer(&log, &server.Config{ + return &server.Config{ ServicePrivateKey: pkey, ServiceKeySigAlgo: sigAlgo, ServiceKeyHashAlgo: hashAlgo, @@ -94,7 +89,19 @@ func startEmulator(createTestAccounts bool) (*server.EmulatorServer, error) { TransactionMaxGasLimit: flow.DefaultMaxTransactionGasLimit, SetupEVMEnabled: true, SetupVMBridgeEnabled: true, - }) + } +} + +func startEmulator(createTestAccounts bool, conf *server.Config) ( + *server.EmulatorServer, + error, +) { + log := logger.With().Timestamp().Str("component", "emulator").Logger().Level(zerolog.DebugLevel) + if logOutput == "false" { + log = zerolog.Nop() + } + + srv := server.NewEmulatorServer(&log, conf) go func() { srv.Start() @@ -133,7 +140,7 @@ func runWeb3TestWithSetup( // servicesSetup starts up an emulator and the gateway // engines required for operation of the evm gateway. func servicesSetup(t *testing.T) (emulator.Emulator, func()) { - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) diff --git a/tests/integration_test.go b/tests/integration_test.go index 79508ed4..fe9d0d51 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -30,7 +30,7 @@ import ( // This is using the TxSealValidation mechanism for validating submitted // transactions, which blocks until it receives the Flow transaction result. func Test_ConcurrentTransactionSubmissionWithTxSeal(t *testing.T) { - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -141,7 +141,7 @@ func Test_ConcurrentTransactionSubmissionWithTxSeal(t *testing.T) { // This is using the LocalIndexValidation mechanism for validating submitted // transactions, which is non-blocking and validates based on the local state. func Test_ConcurrentTransactionSubmissionWithLocalIndex(t *testing.T) { - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -246,7 +246,7 @@ func Test_ConcurrentTransactionSubmissionWithLocalIndex(t *testing.T) { } func Test_EthClientTest(t *testing.T) { - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -325,7 +325,7 @@ func Test_CloudKMSConcurrentTransactionSubmission(t *testing.T) { t.Skip() } - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -449,7 +449,7 @@ func Test_CloudKMSConcurrentTransactionSubmission(t *testing.T) { // 2. No transactions are lost // 3. The state remains consistent func Test_ForceStartHeightIdempotency(t *testing.T) { - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -592,3 +592,112 @@ func Test_ForceStartHeightIdempotency(t *testing.T) { return true }, time.Second*15, time.Second*1, "all transactions were not executed") } + +// Test_AccessNodeBackupFunctionality verifies that the specified AccessNode +// backup hosts, are used when the primary `AccessNodeHost` is unavailable +// or whatever reason. +func Test_AccessNodeBackupFunctionality(t *testing.T) { + srv, err := startEmulator(true, defaultServerConfig()) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + srv.Stop() + }() + + backupConfg := defaultServerConfig() + backupConfg.GRPCPort = 3599 + backupConfg.RESTPort = 9999 + backupConfg.AdminPort = 9090 + backupConfg.DebuggerPort = 3456 + backupSrv, err := startEmulator(true, backupConfg) + require.NoError(t, err) + + _, backupCancel := context.WithCancel(context.Background()) + defer func() { + backupCancel() + backupSrv.Stop() + }() + + grpcHost := "localhost:3569" + emu := srv.Emulator() + service := emu.ServiceKey() + + client, err := grpc.NewClient(grpcHost) + require.NoError(t, err) + + time.Sleep(500 * time.Millisecond) // some time to startup + + // create new account with keys used for key-rotation + keyCount := 5 + createdAddr, privateKey, err := bootstrap.CreateMultiKeyAccount( + client, + keyCount, + service.Address, + sc.FungibleToken.Address.HexWithPrefix(), + sc.FlowToken.Address.HexWithPrefix(), + service.PrivateKey, + ) + require.NoError(t, err) + + backupAccessNodeHost := fmt.Sprintf("localhost:%d", backupConfg.GRPCPort) + + cfg := config.Config{ + DatabaseDir: t.TempDir(), + AccessNodeHost: grpcHost, + AccessNodeBackupHosts: []string{backupAccessNodeHost}, + RPCPort: 8545, + RPCHost: "127.0.0.1", + FlowNetworkID: "flow-emulator", + EVMNetworkID: types.FlowEVMPreviewNetChainID, + Coinbase: eoaTestAccount, + COAAddress: *createdAddr, + COAKey: privateKey, + GasPrice: new(big.Int).SetUint64(0), + EnforceGasPrice: true, + LogLevel: zerolog.DebugLevel, + LogWriter: testLogWriter(), + TxStateValidation: config.LocalIndexValidation, + } + + boot, err := bootstrap.New(cfg) + require.NoError(t, err) + defer func() { + // Stop the EVM GW service + boot.Stop() + }() + + ready := make(chan struct{}) + go func() { + err = boot.Run(ctx, cfg, func() { + close(ready) + }) + require.NoError(t, err) + }() + + <-ready + + time.Sleep(3 * time.Second) // some time to startup + + ethClient, err := ethclient.Dial("http://127.0.0.1:8545") + require.NoError(t, err) + + // This endpoint (`eth_syncing`), will make the following gRPC call, + // `ExecuteScriptAtLatestBlock`. This gRPC call is served by the + // first Emulator process, that is configured as the `AccessNodeHost`. + _, err = ethClient.SyncProgress(context.Background()) + require.NoError(t, err) + + // Shutdown the first Emulator process, that is configured as the + // `AccessNodeHost` + cancel() + srv.Stop() + + // This endpoint (`eth_syncing`), will make the following gRPC call, + // `ExecuteScriptAtLatestBlock`. This gRPC call is served by the + // first-available AccessNode specified in `AccessNodeBackupHosts`. + // In this E2E test, that would be the second Emulator process. + _, err = ethClient.SyncProgress(context.Background()) + require.NoError(t, err) +} diff --git a/tests/key_store_release_test.go b/tests/key_store_release_test.go index dcc77f4d..15fd5938 100644 --- a/tests/key_store_release_test.go +++ b/tests/key_store_release_test.go @@ -20,7 +20,7 @@ import ( ) func Test_KeyStoreSigningKeysRelease(t *testing.T) { - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) diff --git a/tests/tx_batching_test.go b/tests/tx_batching_test.go index d3d9f87d..a753da0c 100644 --- a/tests/tx_batching_test.go +++ b/tests/tx_batching_test.go @@ -515,7 +515,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) { } func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) { - srv, err := startEmulator(true) + srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) From efebeef73a6176f860e8b462f46024f2d8bdc2cd Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 29 Sep 2025 14:32:12 +0300 Subject: [PATCH 05/12] Improve target host creation for load-balancing --- bootstrap/bootstrap.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 8e811fed..23a5e5c4 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -515,8 +515,9 @@ func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*reques Endpoints: endpoints, }) + targetHost := fmt.Sprintf("%s:///%s", mr.Scheme(), "flow-access") currentSporkClient, err = grpc.NewClient( - mr.Scheme(), + targetHost, grpc.WithGRPCDialOptions( grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)), grpcOpts.WithResolvers(mr), From 93d4c6d4c3a7e181c9628b9af4b924bcba0dab8f Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 6 Oct 2025 10:47:57 +0300 Subject: [PATCH 06/12] Describe behavior of the 'pick_first' strategy used on 'loadBalancingConfig' --- bootstrap/bootstrap.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 23a5e5c4..39e4bc5d 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -500,6 +500,11 @@ func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*reques mr := manual.NewBuilderWithScheme("dns") defer mr.Close() + // `pick_first` tries to connect to the first address, uses it for all RPCs + // if it connects, or try the next address if it fails + // (and keep doing that until one connection is successful). + // Because of this, all the RPCs will be sent to the same backend. See more on: + // https://github.com/grpc/grpc-go/tree/master/examples/features/load_balancing#pick_first json := `{"loadBalancingConfig": [{"pick_first":{}}]}` endpoints := []resolver.Endpoint{ {Addresses: []resolver.Address{{Addr: config.AccessNodeHost}}}, From 3a952f2ee9bdf232aeb7199e946f4abad23f699a Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 6 Oct 2025 11:02:22 +0300 Subject: [PATCH 07/12] Improve E2E load-balancing test to remove flakiness --- tests/integration_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/integration_test.go b/tests/integration_test.go index fe9d0d51..a3dec4b4 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -698,6 +698,14 @@ func Test_AccessNodeBackupFunctionality(t *testing.T) { // `ExecuteScriptAtLatestBlock`. This gRPC call is served by the // first-available AccessNode specified in `AccessNodeBackupHosts`. // In this E2E test, that would be the second Emulator process. - _, err = ethClient.SyncProgress(context.Background()) - require.NoError(t, err) + assert.Eventually( + t, + func() bool { + _, err := ethClient.SyncProgress(context.Background()) + return err == nil + }, + time.Second*5, + time.Millisecond*500, + "backup AN should serve requests after primary shutdown", + ) } From 7097da4867db1463f27fb53eea207125eba954ec Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 6 Oct 2025 12:45:46 +0300 Subject: [PATCH 08/12] Use proper ctx during E2E tests on backup AN functionality --- tests/integration_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/integration_test.go b/tests/integration_test.go index a3dec4b4..7142eaa6 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -595,7 +595,7 @@ func Test_ForceStartHeightIdempotency(t *testing.T) { // Test_AccessNodeBackupFunctionality verifies that the specified AccessNode // backup hosts, are used when the primary `AccessNodeHost` is unavailable -// or whatever reason. +// for whatever reason. func Test_AccessNodeBackupFunctionality(t *testing.T) { srv, err := startEmulator(true, defaultServerConfig()) require.NoError(t, err) @@ -614,7 +614,7 @@ func Test_AccessNodeBackupFunctionality(t *testing.T) { backupSrv, err := startEmulator(true, backupConfg) require.NoError(t, err) - _, backupCancel := context.WithCancel(context.Background()) + backupCtx, backupCancel := context.WithCancel(context.Background()) defer func() { backupCancel() backupSrv.Stop() @@ -686,7 +686,7 @@ func Test_AccessNodeBackupFunctionality(t *testing.T) { // This endpoint (`eth_syncing`), will make the following gRPC call, // `ExecuteScriptAtLatestBlock`. This gRPC call is served by the // first Emulator process, that is configured as the `AccessNodeHost`. - _, err = ethClient.SyncProgress(context.Background()) + _, err = ethClient.SyncProgress(ctx) require.NoError(t, err) // Shutdown the first Emulator process, that is configured as the @@ -701,7 +701,7 @@ func Test_AccessNodeBackupFunctionality(t *testing.T) { assert.Eventually( t, func() bool { - _, err := ethClient.SyncProgress(context.Background()) + _, err := ethClient.SyncProgress(backupCtx) return err == nil }, time.Second*5, From 418fe700e2ac5cf5324d12904113b614deda8a59 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 6 Oct 2025 13:00:46 +0300 Subject: [PATCH 09/12] EVM event subscription should reconnect on the last received height --- services/ingestion/event_subscriber.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index 163bce73..3040c558 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -232,11 +232,11 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha return } - if err := connect(lastReceivedHeight + 1); err != nil { + if err := connect(lastReceivedHeight); err != nil { eventsChan <- models.NewBlockEventsError( fmt.Errorf( "failed to resubscribe for events on height: %d, with: %w", - lastReceivedHeight+1, + lastReceivedHeight, err, ), ) From b98cdcec85489194a070673093954b8b4cb3762c Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 6 Oct 2025 15:12:05 +0300 Subject: [PATCH 10/12] Improve error handling on retryInterceptor --- bootstrap/bootstrap.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 39e4bc5d..05514f1b 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -21,8 +21,10 @@ import ( "github.com/rs/zerolog" "github.com/sethvargo/go-limiter/memorystore" grpcOpts "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/status" "github.com/onflow/flow-evm-gateway/api" "github.com/onflow/flow-evm-gateway/config" @@ -598,6 +600,19 @@ func retryInterceptor(maxDuration, pauseDuration time.Duration) grpcOpts.UnaryCl return nil } + switch status.Code(err) { + case codes.Canceled, codes.DeadlineExceeded: + // these kind of errors are guaranteed to fail all requests, + // if the source was a local context + return err + case codes.ResourceExhausted, codes.OutOfRange, codes.NotFound: + // when we receive these errors, we pause briefly, so that + // the next request on the same AN, has a higher chance + // of success. + default: + return err + } + attempts++ duration := time.Since(start) if duration >= maxDuration { From 352e59a021656dc370b5c0654bc3f092d9937177 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Thu, 9 Oct 2025 16:32:23 +0300 Subject: [PATCH 11/12] Retry reconnects on EVM event subscriber for up to 30 seconds --- services/ingestion/event_subscriber.go | 33 +++++++++++++++++++------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index 3040c558..cf85afbe 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -232,15 +232,30 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha return } - if err := connect(lastReceivedHeight); err != nil { - eventsChan <- models.NewBlockEventsError( - fmt.Errorf( - "failed to resubscribe for events on height: %d, with: %w", - lastReceivedHeight, - err, - ), - ) - return + start := time.Now() + attempts := 0 + pauseDuration, maxDuration := 200*time.Millisecond, 30*time.Second + // Allow reconnect retries for up to 30 seconds, with retry + // attempts every 200 ms. + for { + err := connect(lastReceivedHeight) + if err == nil { + break + } + + attempts++ + duration := time.Since(start) + if duration >= maxDuration { + eventsChan <- models.NewBlockEventsError( + fmt.Errorf( + "failed to resubscribe for events on height: %d, with: %w", + lastReceivedHeight, + err, + ), + ) + return + } + time.Sleep(pauseDuration) } } } From 33d24452256788283347b6d663e3cc2a33ff3c9a Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Wed, 12 Nov 2025 14:24:44 +0200 Subject: [PATCH 12/12] Trim input value for AccessNodeBackupHosts config flag --- cmd/run/cmd.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cmd/run/cmd.go b/cmd/run/cmd.go index 298eeaa7..b93086de 100644 --- a/cmd/run/cmd.go +++ b/cmd/run/cmd.go @@ -202,7 +202,13 @@ func parseConfigFromFlags() error { cfg.FilterExpiry = exp if accessNodeBackupHosts != "" { - cfg.AccessNodeBackupHosts = strings.Split(accessNodeBackupHosts, ",") + rawHosts := strings.Split(accessNodeBackupHosts, ",") + cfg.AccessNodeBackupHosts = make([]string, 0, len(rawHosts)) + for _, host := range rawHosts { + if trimmed := strings.TrimSpace(host); trimmed != "" { + cfg.AccessNodeBackupHosts = append(cfg.AccessNodeBackupHosts, trimmed) + } + } } if accessSporkHosts != "" {