Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions models/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ var (

// Transaction errors

ErrFailedTransaction = errors.New("failed transaction")
ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction)
ErrFailedTransaction = errors.New("failed transaction")
ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction)
ErrDuplicateTransaction = fmt.Errorf("%w: %s", ErrInvalid, "transaction already in pool")

// Storage errors

Expand Down
9 changes: 8 additions & 1 deletion services/requester/batch_tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package requester
import (
"context"
"encoding/hex"
"slices"
"sort"
"sync"
"time"
Expand All @@ -17,13 +18,15 @@ import (
"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/metrics"
"github.com/onflow/flow-evm-gateway/models"
errs "github.com/onflow/flow-evm-gateway/models/errors"
"github.com/onflow/flow-evm-gateway/services/requester/keystore"
)

const eoaActivityCacheSize = 10_000

type pooledEvmTx struct {
txPayload cadence.String
txHash gethCommon.Hash
nonce uint64
}

Expand Down Expand Up @@ -147,7 +150,11 @@ func (t *BatchTxPool) Add(
err = t.submitSingleTransaction(ctx, hexEncodedTx)
} else {
// Case 3. EOA activity found AND it was less than [X] seconds ago:
userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()}
userTx := pooledEvmTx{txPayload: hexEncodedTx, txHash: tx.Hash(), nonce: tx.Nonce()}
// Prevent submission of duplicate transactions, based on their tx hash
if slices.Contains(t.pooledTxs[from], userTx) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of putting the txHash to the pooledEvmTx, what about maintaining a separate index for the txHash?

So that this existence check can be done in O(1), instead of O(n), n being the number of txs in the pool for a single address. Otherwise, if some bot sends lots of txs, this call would become slow.

For instance:

type BatchTxPool struct {
	*SingleTxPool
	pooledTxs   map[gethCommon.Address][]pooledEvmTx
    pooledTxHashs map[txHash]struct{}
	txMux       sync.Mutex
	eoaActivity *expirable.LRU[gethCommon.Address, time.Time]
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pooledTxs get submitted to the network around every ~2 seconds, so we are talking about a very small number of transactions for a single address. The largest I have observed is 12 transactions for a single address, which is quite small. Do you think it's worth the complexity of maintaining a separate index for de-duplicating? We will also have to remove entries from that separate index, when transactions do get submitted, which means it will require some sync mechanism.

return errs.ErrDuplicateTransaction
}
t.pooledTxs[from] = append(t.pooledTxs[from], userTx)
}

Expand Down
58 changes: 58 additions & 0 deletions tests/tx_batching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,64 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) {
)
}

func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) {
_, cfg, stop := setupGatewayNode(t)
defer stop()

rpcTester := &rpcTest{
url: fmt.Sprintf("%s:%d", cfg.RPCHost, cfg.RPCPort),
}

eoaKey, err := crypto.HexToECDSA(eoaTestPrivateKey)
require.NoError(t, err)

testAddr := common.HexToAddress("55253ed90B70b96C73092D8680915aaF50081194")
nonce := uint64(0)
hashes := make([]common.Hash, 0)

signed, _, err := evmSign(big.NewInt(10), 21000, eoaKey, nonce, &testAddr, nil)
require.NoError(t, err)

txHash, err := rpcTester.sendRawTx(signed)
require.NoError(t, err)
hashes = append(hashes, txHash)

// Increment nonce for the duplicate test transactions that follow
nonce += 1
// Submit 5 identical transactions to test duplicate detection:
// the first should succeed, the rest should be rejected as duplicates
for i := range 5 {
// All these transactions are duplicates, since we don't change any
// of the payload data. These will end up having the same tx hash
// as well.
signed, _, err := evmSign(big.NewInt(10), 15_000_000, eoaKey, nonce, &testAddr, nil)
require.NoError(t, err)

// only the first transaction is valid, the rest 4 are duplicates
// of the 1st one.
if i == 0 {
txHash, err := rpcTester.sendRawTx(signed)
require.NoError(t, err)
hashes = append(hashes, txHash)
} else {
_, err := rpcTester.sendRawTx(signed)
require.Error(t, err)
require.ErrorContains(t, err, "invalid: transaction already in pool")
}
}

assert.Eventually(t, func() bool {
for _, h := range hashes {
rcp, err := rpcTester.getReceipt(h.String())
if err != nil || rcp == nil || rcp.Status != 1 {
return false
}
}

return true
}, time.Second*15, time.Second*1, "all transactions were not executed")
}

func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) {
srv, err := startEmulator(true)
require.NoError(t, err)
Expand Down