Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,8 @@ func (b *Bootstrap) Run(
// mark ready
ready()

go b.trackOperatorBalance(ctx)

return nil
}

Expand All @@ -707,6 +709,27 @@ func (b *Bootstrap) Stop() {
b.StopDB()
}

func (b *Bootstrap) trackOperatorBalance(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
accBalance, err := b.client.GetAccountBalanceAtLatestBlock(ctx, b.config.COAAddress)
if err != nil {
b.logger.Warn().Err(err).Msg(
"failed to collect operator's balance metric",
)
continue
}
b.collector.OperatorBalance(accBalance)
}
}
}

// Run will run complete bootstrap of the EVM gateway with all the engines.
// Run is a blocking call, but it does signal readiness of the service
// through a channel provided as an argument.
Expand Down
7 changes: 3 additions & 4 deletions metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"time"

"github.com/onflow/flow-go-sdk"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
)
Expand Down Expand Up @@ -113,7 +112,7 @@ type Collector interface {
EVMTransactionIndexed(count int)
EVMAccountInteraction(address string)
MeasureRequestDuration(start time.Time, method string)
OperatorBalance(account *flow.Account)
OperatorBalance(balance uint64)
AvailableSigningKeys(count int)
GasEstimationIterations(count int)
BlockIngestionTime(blockCreation time.Time)
Expand Down Expand Up @@ -208,8 +207,8 @@ func (c *DefaultCollector) EVMAccountInteraction(address string) {
c.evmAccountCallCounters.With(prometheus.Labels{"address": address}).Inc()
}

func (c *DefaultCollector) OperatorBalance(account *flow.Account) {
c.operatorBalance.Set(float64(account.Balance))
func (c *DefaultCollector) OperatorBalance(balance uint64) {
c.operatorBalance.Set(float64(balance))
}

func (c *DefaultCollector) MeasureRequestDuration(start time.Time, method string) {
Expand Down
4 changes: 1 addition & 3 deletions metrics/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package metrics

import (
"time"

"github.com/onflow/flow-go-sdk"
)

type nopCollector struct{}
Expand All @@ -19,7 +17,7 @@ func (c *nopCollector) EVMHeightIndexed(uint64) {}
func (c *nopCollector) EVMTransactionIndexed(int) {}
func (c *nopCollector) EVMAccountInteraction(string) {}
func (c *nopCollector) MeasureRequestDuration(time.Time, string) {}
func (c *nopCollector) OperatorBalance(*flow.Account) {}
func (c *nopCollector) OperatorBalance(balance uint64) {}
func (c *nopCollector) AvailableSigningKeys(count int) {}
func (c *nopCollector) GasEstimationIterations(count int) {}
func (c *nopCollector) BlockIngestionTime(blockCreation time.Time) {}
Expand Down
12 changes: 5 additions & 7 deletions services/requester/batch_tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
latestBlock, account, err := t.fetchFlowLatestBlockAndCOA(ctx)
latestBlock, err := t.client.GetLatestBlock(ctx, true)
if err != nil {
t.logger.Error().Err(err).Msg(
"failed to get COA / latest Flow block on batch tx submission",
"failed to get latest Flow block on batch tx submission",
)
continue
}
Expand All @@ -181,7 +181,6 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) {
err := t.batchSubmitTransactionsForSameAddress(
ctx,
latestBlock,
account,
pooledTxs,
)
if err != nil {
Expand All @@ -199,7 +198,6 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) {
func (t *BatchTxPool) batchSubmitTransactionsForSameAddress(
ctx context.Context,
latestBlock *flow.Block,
account *flow.Account,
pooledTxs []pooledEvmTx,
) error {
// Sort the transactions based on their nonce, to make sure
Expand All @@ -220,8 +218,8 @@ func (t *BatchTxPool) batchSubmitTransactionsForSameAddress(

script := replaceAddresses(runTxScript, t.config.FlowNetworkID)
flowTx, err := t.buildTransaction(
ctx,
latestBlock,
account,
script,
cadence.NewArray(hexEncodedTxs),
coinbaseAddress,
Expand All @@ -244,7 +242,7 @@ func (t *BatchTxPool) submitSingleTransaction(
ctx context.Context,
hexEncodedTx cadence.String,
) error {
latestBlock, account, err := t.fetchFlowLatestBlockAndCOA(ctx)
latestBlock, err := t.client.GetLatestBlock(ctx, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could throttle the calls to GetLatestBlock, given the latest block only changes about once per second for mainnet, and we are only using the block ID as reference block ID, it's ok to have a little bit delay. This could further reduce the load to AN.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, there should be a component that is responsible for querying (or being subscribed to) the last block, which we then use here.

I think that might be best to put in a separate PR though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have already opened a separate PR for that: #896

if err != nil {
return err
}
Expand All @@ -256,8 +254,8 @@ func (t *BatchTxPool) submitSingleTransaction(

script := replaceAddresses(runTxScript, t.config.FlowNetworkID)
flowTx, err := t.buildTransaction(
ctx,
latestBlock,
account,
script,
cadence.NewArray([]cadence.Value{hexEncodedTx}),
coinbaseAddress,
Expand Down
23 changes: 14 additions & 9 deletions services/requester/keystore/account_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,31 @@ func (k *AccountKey) SetLockMetadata(txID flowsdk.Identifier, referenceBlockHeig
// SetProposerPayerAndSign sets the proposer, payer, and signs the transaction with the key.
func (k *AccountKey) SetProposerPayerAndSign(
tx *flowsdk.Transaction,
account *flowsdk.Account,
address flowsdk.Address,
acckey *flowsdk.AccountKey,
) error {
if k.Address != account.Address {
if acckey == nil {
return fmt.Errorf("nil account key provided for address %s (index %d)", address, k.Index)
}

if k.Address != address {
return fmt.Errorf(
"expected address: %v, got address: %v",
"expected address: %s, got address: %s",
k.Address,
account.Address,
address,
)
}
if k.Index >= uint32(len(account.Keys)) {

if k.Index != acckey.Index {
return fmt.Errorf(
"key index: %d exceeds keys length: %d",
"expected account key with index: %d, got key with index: %d",
k.Index,
len(account.Keys),
acckey.Index,
)
}
seqNumber := account.Keys[k.Index].SequenceNumber

return tx.
SetProposalKey(k.Address, k.Index, seqNumber).
SetProposalKey(k.Address, k.Index, acckey.SequenceNumber).
SetPayer(k.Address).
SignEnvelope(k.Address, k.Index, k.Signer)
}
Expand Down
2 changes: 1 addition & 1 deletion services/requester/keystore/key_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestKeySigning(t *testing.T) {
}
tx := sdk.NewTransaction()

err = key.SetProposerPayerAndSign(tx, account)
err = key.SetProposerPayerAndSign(tx, address, accountKey)
require.NoError(t, err)

assert.Equal(t, account.Address, tx.ProposalKey.Address)
Expand Down
8 changes: 4 additions & 4 deletions services/requester/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func NewEVM(

if !config.IndexOnly {
address := config.COAAddress
acc, err := client.GetAccount(context.Background(), address)
accBalance, err := client.GetAccountBalanceAtLatestBlock(context.Background(), address)
if err != nil {
return nil, fmt.Errorf(
"could not fetch the configured COA account: %s make sure it exists: %w",
Expand All @@ -127,13 +127,13 @@ func NewEVM(
)
}
// initialize the operator balance metric since it is only updated when sending a tx
collector.OperatorBalance(acc)
collector.OperatorBalance(accBalance)

if acc.Balance < minFlowBalance {
if accBalance < minFlowBalance {
return nil, fmt.Errorf(
"COA account must be funded with at least %d Flow, but has balance of: %d",
minFlowBalance,
acc.Balance,
accBalance,
)
}
}
Expand Down
55 changes: 18 additions & 37 deletions services/requester/single_tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
flowGo "github.com/onflow/flow-go/model/flow"
"github.com/rs/zerolog"
"github.com/sethvargo/go-retry"
"golang.org/x/sync/errgroup"

"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/metrics"
Expand Down Expand Up @@ -93,15 +92,15 @@ func (t *SingleTxPool) Add(
return err
}

latestBlock, account, err := t.fetchFlowLatestBlockAndCOA(ctx)
latestBlock, err := t.client.GetLatestBlock(ctx, true)
if err != nil {
return err
}

script := replaceAddresses(runTxScript, t.config.FlowNetworkID)
flowTx, err := t.buildTransaction(
ctx,
latestBlock,
account,
script,
cadence.NewArray([]cadence.Value{hexEncodedTx}),
coinbaseAddress,
Expand Down Expand Up @@ -157,8 +156,8 @@ func (t *SingleTxPool) Add(
// buildTransaction creates a Cadence transaction from the provided script,
// with the given arguments and signs it with the configured COA account.
func (t *SingleTxPool) buildTransaction(
ctx context.Context,
latestBlock *flow.Block,
account *flow.Account,
script []byte,
args ...cadence.Value,
) (*flow.Transaction, error) {
Expand All @@ -177,53 +176,35 @@ func (t *SingleTxPool) buildTransaction(
}
}

// building and signing transactions should be blocking,
// so we don't have keys conflict
t.mux.Lock()
defer t.mux.Unlock()
accKey, err := t.fetchSigningAccountKey()
if err != nil {
return nil, err
}

accKey, err := t.keystore.Take()
coaAddress := t.config.COAAddress
accountKey, err := t.client.GetAccountKeyAtLatestBlock(ctx, coaAddress, accKey.Index)
if err != nil {
accKey.Done()
return nil, err
}

if err := accKey.SetProposerPayerAndSign(flowTx, account); err != nil {
if err := accKey.SetProposerPayerAndSign(flowTx, coaAddress, accountKey); err != nil {
accKey.Done()
return nil, err
}

// now that the transaction is prepared, store the transaction's metadata
accKey.SetLockMetadata(flowTx.ID(), latestBlock.Height)

t.collector.OperatorBalance(account)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the OperatorBalance metric, because now we use GetAccountKeyAtLatestBlock, and we don't have access to the COA balance.
I am planning to add this back, on this PR: #896. Generally, we don't have to emit this metric on every tx submission, we can do it at a less frequent rate.


return flowTx, nil
}

func (t *SingleTxPool) fetchFlowLatestBlockAndCOA(ctx context.Context) (
*flow.Block,
*flow.Account,
error,
) {
var (
g = errgroup.Group{}
err1, err2 error
latestBlock *flow.Block
account *flow.Account
)

// execute concurrently so we can speed up all the information we need for tx
g.Go(func() error {
latestBlock, err1 = t.client.GetLatestBlock(ctx, true)
return err1
})
g.Go(func() error {
account, err2 = t.client.GetAccount(ctx, t.config.COAAddress)
return err2
})
if err := g.Wait(); err != nil {
return nil, nil, err
}
func (t *SingleTxPool) fetchSigningAccountKey() (*keystore.AccountKey, error) {
// getting an account key from the `KeyStore` for signing transactions,
// should be lock-protected, so that we don't sign any two Flow
// transactions with the same account key
t.mux.Lock()
defer t.mux.Unlock()

return latestBlock, account, nil
return t.keystore.Take()
}