Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package api
import (
"context"
"fmt"
"slices"

gethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/filters"
Expand All @@ -19,6 +21,9 @@ import (
"github.com/onflow/flow-evm-gateway/storage"
)

// The maximum number of transaction hash criteria allowed in a single subscription
const maxTxHashes = 200

type StreamAPI struct {
logger zerolog.Logger
config config.Config
Expand All @@ -27,6 +32,7 @@ type StreamAPI struct {
receipts storage.ReceiptIndexer
blocksPublisher *models.Publisher[*models.Block]
transactionsPublisher *models.Publisher[*gethTypes.Transaction]
receiptsPublisher *models.Publisher[[]*models.Receipt]
logsPublisher *models.Publisher[[]*gethTypes.Log]
}

Expand All @@ -38,6 +44,7 @@ func NewStreamAPI(
receipts storage.ReceiptIndexer,
blocksPublisher *models.Publisher[*models.Block],
transactionsPublisher *models.Publisher[*gethTypes.Transaction],
receiptsPublisher *models.Publisher[[]*models.Receipt],
logsPublisher *models.Publisher[[]*gethTypes.Log],
) *StreamAPI {
return &StreamAPI{
Expand All @@ -48,6 +55,7 @@ func NewStreamAPI(
receipts: receipts,
blocksPublisher: blocksPublisher,
transactionsPublisher: transactionsPublisher,
receiptsPublisher: receiptsPublisher,
logsPublisher: logsPublisher,
}
}
Expand Down Expand Up @@ -121,6 +129,59 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (
)
}

// TransactionReceipts creates a subscription that fires transaction
// receipts when transactions are included in blocks.
func (s *StreamAPI) TransactionReceipts(
ctx context.Context,
filter *filters.TransactionReceiptsQuery,
) (*rpc.Subscription, error) {
// Validate transaction hashes limit
if filter != nil && len(filter.TransactionHashes) > maxTxHashes {
return nil, errs.ErrExceedMaxTxHashes
}

var txHashes []gethCommon.Hash

if filter != nil {
txHashes = filter.TransactionHashes
}

return newSubscription(
ctx,
s.logger,
s.receiptsPublisher,
func(notifier *rpc.Notifier, sub *rpc.Subscription) func([]*models.Receipt) error {
return func(receipts []*models.Receipt) error {
// Convert to the same format as `eth_getTransactionReceipt`
marshaledReceipts := make([]map[string]any, 0)

for _, receipt := range receipts {
// Check if the subscription is only interested for a given
// set of tx receipts.
if len(txHashes) > 0 && !slices.Contains(txHashes, receipt.TxHash) {
continue
}

tx, err := s.transactions.Get(receipt.TxHash)
if err != nil {
return err
}

txReceipt, err := ethTypes.MarshalReceipt(receipt, tx)
if err != nil {
return err
}

marshaledReceipts = append(marshaledReceipts, txReceipt)
}

// Send a batch of tx receipts in one notification
return notifier.Notify(sub.ID, marshaledReceipts)
}
},
)
}

func (s *StreamAPI) prepareBlockHeader(
block *models.Block,
) (*ethTypes.BlockHeader, error) {
Expand Down
4 changes: 4 additions & 0 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Storages struct {
type Publishers struct {
Block *models.Publisher[*models.Block]
Transaction *models.Publisher[*gethTypes.Transaction]
Receipts *models.Publisher[[]*models.Receipt]
Logs *models.Publisher[[]*gethTypes.Log]
}

Expand Down Expand Up @@ -105,6 +106,7 @@ func New(config config.Config) (*Bootstrap, error) {
publishers: &Publishers{
Block: models.NewPublisher[*models.Block](),
Transaction: models.NewPublisher[*gethTypes.Transaction](),
Receipts: models.NewPublisher[[]*models.Receipt](),
Logs: models.NewPublisher[[]*gethTypes.Log](),
},
db: db,
Expand Down Expand Up @@ -199,6 +201,7 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
b.storages.Traces,
b.publishers.Block,
b.publishers.Logs,
b.publishers.Receipts,
b.logger,
b.collector,
replayerConfig,
Expand Down Expand Up @@ -338,6 +341,7 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
b.storages.Receipts,
b.publishers.Block,
b.publishers.Transaction,
b.publishers.Receipts,
b.publishers.Logs,
)

Expand Down
1 change: 1 addition & 0 deletions models/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var (
ErrRateLimit = errors.New("limit of requests per second reached")
ErrIndexOnlyMode = errors.New("transaction submission not allowed in index-only mode")
ErrExceedLogQueryLimit = errors.New("exceed max addresses or topics per search position")
ErrExceedMaxTxHashes = errors.New("exceed max number of transaction hashes allowed per transactionReceipts subscription")

// General errors

Expand Down
1 change: 1 addition & 0 deletions models/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func NewCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) {
for _, l := range rcp.Logs {
l.BlockNumber = rcp.BlockNumber.Uint64()
l.BlockHash = rcp.BlockHash
l.BlockTimestamp = e.block.Timestamp
l.TxHash = rcp.TxHash
l.TxIndex = rcp.TransactionIndex
l.Index = logIndex
Expand Down
58 changes: 31 additions & 27 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,21 @@ var _ models.Engine = &Engine{}
type Engine struct {
*models.EngineStatus

subscriber EventSubscriber
blocksProvider *replayer.BlocksProvider
store *pebble.Storage
registerStore *pebble.RegisterStorage
blocks storage.BlockIndexer
receipts storage.ReceiptIndexer
transactions storage.TransactionIndexer
traces storage.TraceIndexer
log zerolog.Logger
evmLastHeight *models.SequentialHeight
blocksPublisher *models.Publisher[*models.Block]
logsPublisher *models.Publisher[[]*gethTypes.Log]
collector metrics.Collector
replayerConfig replayer.Config
subscriber EventSubscriber
blocksProvider *replayer.BlocksProvider
store *pebble.Storage
registerStore *pebble.RegisterStorage
blocks storage.BlockIndexer
receipts storage.ReceiptIndexer
transactions storage.TransactionIndexer
traces storage.TraceIndexer
log zerolog.Logger
evmLastHeight *models.SequentialHeight
blocksPublisher *models.Publisher[*models.Block]
logsPublisher *models.Publisher[[]*gethTypes.Log]
receiptsPublisher *models.Publisher[[]*models.Receipt]
collector metrics.Collector
replayerConfig replayer.Config
}

func NewEventIngestionEngine(
Expand All @@ -67,6 +68,7 @@ func NewEventIngestionEngine(
traces storage.TraceIndexer,
blocksPublisher *models.Publisher[*models.Block],
logsPublisher *models.Publisher[[]*gethTypes.Log],
receiptsPublisher *models.Publisher[[]*models.Receipt],
log zerolog.Logger,
collector metrics.Collector,
replayerConfig replayer.Config,
Expand All @@ -76,19 +78,20 @@ func NewEventIngestionEngine(
return &Engine{
EngineStatus: models.NewEngineStatus(),

subscriber: subscriber,
blocksProvider: blocksProvider,
store: store,
registerStore: registerStore,
blocks: blocks,
receipts: receipts,
transactions: transactions,
traces: traces,
log: log,
blocksPublisher: blocksPublisher,
logsPublisher: logsPublisher,
collector: collector,
replayerConfig: replayerConfig,
subscriber: subscriber,
blocksProvider: blocksProvider,
store: store,
registerStore: registerStore,
blocks: blocks,
receipts: receipts,
transactions: transactions,
traces: traces,
log: log,
blocksPublisher: blocksPublisher,
logsPublisher: logsPublisher,
receiptsPublisher: receiptsPublisher,
collector: collector,
replayerConfig: replayerConfig,
}
}

Expand Down Expand Up @@ -204,6 +207,7 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {

// emit block event and logs, only after we successfully commit the data
e.blocksPublisher.Publish(events.Block())
e.receiptsPublisher.Publish(events.Receipts())
for _, r := range events.Receipts() {
if len(r.Logs) > 0 {
e.logsPublisher.Publish(r.Logs)
Expand Down
5 changes: 5 additions & 0 deletions services/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func TestSerialBlockIngestion(t *testing.T) {
traces,
models.NewPublisher[*models.Block](),
models.NewPublisher[[]*gethTypes.Log](),
models.NewPublisher[[]*models.Receipt](),
zerolog.Nop(),
metrics.NopCollector,
defaultReplayerConfig(),
Expand Down Expand Up @@ -154,6 +155,7 @@ func TestSerialBlockIngestion(t *testing.T) {
traces,
models.NewPublisher[*models.Block](),
models.NewPublisher[[]*gethTypes.Log](),
models.NewPublisher[[]*models.Receipt](),
zerolog.Nop(),
metrics.NopCollector,
defaultReplayerConfig(),
Expand Down Expand Up @@ -275,6 +277,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
traces,
models.NewPublisher[*models.Block](),
models.NewPublisher[[]*gethTypes.Log](),
models.NewPublisher[[]*models.Receipt](),
zerolog.Nop(),
metrics.NopCollector,
defaultReplayerConfig(),
Expand Down Expand Up @@ -383,6 +386,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
traces,
models.NewPublisher[*models.Block](),
models.NewPublisher[[]*gethTypes.Log](),
models.NewPublisher[[]*models.Receipt](),
zerolog.Nop(),
metrics.NopCollector,
defaultReplayerConfig(),
Expand Down Expand Up @@ -477,6 +481,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
traces,
models.NewPublisher[*models.Block](),
models.NewPublisher[[]*gethTypes.Log](),
models.NewPublisher[[]*models.Receipt](),
zerolog.Nop(),
metrics.NopCollector,
defaultReplayerConfig(),
Expand Down
98 changes: 98 additions & 0 deletions tests/web3js/eth_streaming_test.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const WebSocket = require('ws')
const conf = require('./config')
const helpers = require('./helpers')
const { assert } = require('chai')
Expand Down Expand Up @@ -69,6 +70,33 @@ it('streaming of blocks, transactions, logs using filters', async () => {
}
})

let socket = new WebSocket('ws://127.0.0.1:8545')
// give some time for the connection to open
await new Promise((res) => setTimeout(() => res(), 1500))
let payload = `
{
"jsonrpc": "2.0",
"id": 2,
"method": "eth_subscribe",
"params": [
"transactionReceipts",
{
"transactionHashes": []
}
]
}
`
socket.send(payload)

// subscribe to all new receipts being produced by transaction submissions below
let receipts = []
socket.onmessage = (event) => {
let response = JSON.parse(event.data)
if (response.method == 'eth_subscription') {
receipts = receipts.concat(response.params.result)
}
}

let sentHashes = []
// produce events by submitting transactions
for (const { A, B } of testValues) {
Expand Down Expand Up @@ -120,4 +148,74 @@ it('streaming of blocks, transactions, logs using filters', async () => {
assert.lengthOf(matchingLogs, 1)
assert.deepEqual(log, matchingLogs[0])
}

assert.equal(10, receipts.length)
for (let txHash of sentHashes) {
let txReceipt = await helpers.callRPCMethod(
'eth_getTransactionReceipt',
[txHash]
)

for (let rcp of receipts) {
if (rcp.transactionHash == txHash) {
assert.deepEqual(rcp, txReceipt.body['result'])
}
}
}

let signedTx = await conf.eoa.signTransaction({
from: conf.eoa.address,
to: contractAddress,
data: deployed.contract.methods.sum(7, 7).encodeABI(),
gas: 1_000_000,
gasPrice: conf.minGasPrice
})

receipts = []
let subID = null
socket.onmessage = (event) => {
let response = JSON.parse(event.data)
if (response.id == 12) {
subID = response.result
}

if (response.method == 'eth_subscription' && response.params.subscription == subID) {
receipts = receipts.concat(response.params.result)
}
}
// Check that the subscription will notify the caller only for the given
// set of tx hashes.
payload = `
{
"jsonrpc": "2.0",
"id": 12,
"method": "eth_subscribe",
"params": [
"transactionReceipts",
{
"transactionHashes": [
"0x7b45084668258f29cfc525494d00ea5171766d1d43436e41cea930380d96bf67",
"0xed970aa258b677d5e772125dd4342f38e5ccf4dec685d38fc5f04f18eff1a939",
"${signedTx.transactionHash}"
]
}
]
}
`
socket.send(payload)

// send transaction and make sure interaction was success
let txReceipt = await web3.eth.sendSignedTransaction(signedTx.rawTransaction)
assert.equal(txReceipt.status, conf.successStatus)
assert.equal(txReceipt.transactionHash, signedTx.transactionHash)

await new Promise((res, rej) => setTimeout(() => res(), 1500))
socket.close(1000, 'finished testing')

assert.equal(1, receipts.length)
let expectedReceipt = await helpers.callRPCMethod(
'eth_getTransactionReceipt',
[signedTx.transactionHash]
)
assert.deepEqual(receipts[0], expectedReceipt.body['result'])
})