Skip to content

Commit 4925450

Browse files
committed
Implement the new 'transactionReceipts' subscription API
1 parent 01387cd commit 4925450

File tree

7 files changed

+201
-27
lines changed

7 files changed

+201
-27
lines changed

api/stream.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package api
33
import (
44
"context"
55
"fmt"
6+
"slices"
67

8+
gethCommon "github.com/ethereum/go-ethereum/common"
79
"github.com/ethereum/go-ethereum/common/hexutil"
810
gethTypes "github.com/ethereum/go-ethereum/core/types"
911
"github.com/ethereum/go-ethereum/eth/filters"
@@ -19,6 +21,9 @@ import (
1921
"github.com/onflow/flow-evm-gateway/storage"
2022
)
2123

24+
// The maximum number of transaction hash criteria allowed in a single subscription
25+
const maxTxHashes = 200
26+
2227
type StreamAPI struct {
2328
logger zerolog.Logger
2429
config config.Config
@@ -27,6 +32,7 @@ type StreamAPI struct {
2732
receipts storage.ReceiptIndexer
2833
blocksPublisher *models.Publisher[*models.Block]
2934
transactionsPublisher *models.Publisher[*gethTypes.Transaction]
35+
receiptsPublisher *models.Publisher[[]*models.Receipt]
3036
logsPublisher *models.Publisher[[]*gethTypes.Log]
3137
}
3238

@@ -38,6 +44,7 @@ func NewStreamAPI(
3844
receipts storage.ReceiptIndexer,
3945
blocksPublisher *models.Publisher[*models.Block],
4046
transactionsPublisher *models.Publisher[*gethTypes.Transaction],
47+
receiptsPublisher *models.Publisher[[]*models.Receipt],
4148
logsPublisher *models.Publisher[[]*gethTypes.Log],
4249
) *StreamAPI {
4350
return &StreamAPI{
@@ -48,6 +55,7 @@ func NewStreamAPI(
4855
receipts: receipts,
4956
blocksPublisher: blocksPublisher,
5057
transactionsPublisher: transactionsPublisher,
58+
receiptsPublisher: receiptsPublisher,
5159
logsPublisher: logsPublisher,
5260
}
5361
}
@@ -121,6 +129,59 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (
121129
)
122130
}
123131

132+
// TransactionReceipts creates a subscription that fires transaction
133+
// receipts when transactions are included in blocks.
134+
func (s *StreamAPI) TransactionReceipts(
135+
ctx context.Context,
136+
filter *filters.TransactionReceiptsQuery,
137+
) (*rpc.Subscription, error) {
138+
// Validate transaction hashes limit
139+
if filter != nil && len(filter.TransactionHashes) > maxTxHashes {
140+
return nil, errs.ErrExceedMaxTxHashes
141+
}
142+
143+
var txHashes []gethCommon.Hash
144+
145+
if filter != nil {
146+
txHashes = filter.TransactionHashes
147+
}
148+
149+
return newSubscription(
150+
ctx,
151+
s.logger,
152+
s.receiptsPublisher,
153+
func(notifier *rpc.Notifier, sub *rpc.Subscription) func([]*models.Receipt) error {
154+
return func(receipts []*models.Receipt) error {
155+
// Convert to the same format as `eth_getTransactionReceipt`
156+
marshaledReceipts := make([]map[string]any, 0)
157+
158+
for _, receipt := range receipts {
159+
// Check if the subscription is only interested for a given
160+
// set of tx receipts.
161+
if len(txHashes) > 0 && !slices.Contains(txHashes, receipt.TxHash) {
162+
continue
163+
}
164+
165+
tx, err := s.transactions.Get(receipt.TxHash)
166+
if err != nil {
167+
return err
168+
}
169+
170+
txReceipt, err := ethTypes.MarshalReceipt(receipt, tx)
171+
if err != nil {
172+
return err
173+
}
174+
175+
marshaledReceipts = append(marshaledReceipts, txReceipt)
176+
}
177+
178+
// Send a batch of tx receipts in one notification
179+
return notifier.Notify(sub.ID, marshaledReceipts)
180+
}
181+
},
182+
)
183+
}
184+
124185
func (s *StreamAPI) prepareBlockHeader(
125186
block *models.Block,
126187
) (*ethTypes.BlockHeader, error) {

bootstrap/bootstrap.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type Storages struct {
6262
type Publishers struct {
6363
Block *models.Publisher[*models.Block]
6464
Transaction *models.Publisher[*gethTypes.Transaction]
65+
Receipts *models.Publisher[[]*models.Receipt]
6566
Logs *models.Publisher[[]*gethTypes.Log]
6667
}
6768

@@ -105,6 +106,7 @@ func New(config config.Config) (*Bootstrap, error) {
105106
publishers: &Publishers{
106107
Block: models.NewPublisher[*models.Block](),
107108
Transaction: models.NewPublisher[*gethTypes.Transaction](),
109+
Receipts: models.NewPublisher[[]*models.Receipt](),
108110
Logs: models.NewPublisher[[]*gethTypes.Log](),
109111
},
110112
db: db,
@@ -199,6 +201,7 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
199201
b.storages.Traces,
200202
b.publishers.Block,
201203
b.publishers.Logs,
204+
b.publishers.Receipts,
202205
b.logger,
203206
b.collector,
204207
replayerConfig,
@@ -333,6 +336,7 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
333336
b.storages.Receipts,
334337
b.publishers.Block,
335338
b.publishers.Transaction,
339+
b.publishers.Receipts,
336340
b.publishers.Logs,
337341
)
338342

models/errors/errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ var (
1717
ErrRateLimit = errors.New("limit of requests per second reached")
1818
ErrIndexOnlyMode = errors.New("transaction submission not allowed in index-only mode")
1919
ErrExceedLogQueryLimit = errors.New("exceed max addresses or topics per search position")
20+
ErrExceedMaxTxHashes = errors.New("exceed max number of transaction hashes allowed per transactionReceipts subscription")
2021

2122
// General errors
2223

models/events.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ func NewCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) {
7979
for _, l := range rcp.Logs {
8080
l.BlockNumber = rcp.BlockNumber.Uint64()
8181
l.BlockHash = rcp.BlockHash
82+
l.BlockTimestamp = e.block.Timestamp
8283
l.TxHash = rcp.TxHash
8384
l.TxIndex = rcp.TransactionIndex
8485
l.Index = logIndex

services/ingestion/engine.go

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,21 @@ var _ models.Engine = &Engine{}
4040
type Engine struct {
4141
*models.EngineStatus
4242

43-
subscriber EventSubscriber
44-
blocksProvider *replayer.BlocksProvider
45-
store *pebble.Storage
46-
registerStore *pebble.RegisterStorage
47-
blocks storage.BlockIndexer
48-
receipts storage.ReceiptIndexer
49-
transactions storage.TransactionIndexer
50-
traces storage.TraceIndexer
51-
log zerolog.Logger
52-
evmLastHeight *models.SequentialHeight
53-
blocksPublisher *models.Publisher[*models.Block]
54-
logsPublisher *models.Publisher[[]*gethTypes.Log]
55-
collector metrics.Collector
56-
replayerConfig replayer.Config
43+
subscriber EventSubscriber
44+
blocksProvider *replayer.BlocksProvider
45+
store *pebble.Storage
46+
registerStore *pebble.RegisterStorage
47+
blocks storage.BlockIndexer
48+
receipts storage.ReceiptIndexer
49+
transactions storage.TransactionIndexer
50+
traces storage.TraceIndexer
51+
log zerolog.Logger
52+
evmLastHeight *models.SequentialHeight
53+
blocksPublisher *models.Publisher[*models.Block]
54+
logsPublisher *models.Publisher[[]*gethTypes.Log]
55+
receiptsPublisher *models.Publisher[[]*models.Receipt]
56+
collector metrics.Collector
57+
replayerConfig replayer.Config
5758
}
5859

5960
func NewEventIngestionEngine(
@@ -67,6 +68,7 @@ func NewEventIngestionEngine(
6768
traces storage.TraceIndexer,
6869
blocksPublisher *models.Publisher[*models.Block],
6970
logsPublisher *models.Publisher[[]*gethTypes.Log],
71+
receiptsPublisher *models.Publisher[[]*models.Receipt],
7072
log zerolog.Logger,
7173
collector metrics.Collector,
7274
replayerConfig replayer.Config,
@@ -76,19 +78,20 @@ func NewEventIngestionEngine(
7678
return &Engine{
7779
EngineStatus: models.NewEngineStatus(),
7880

79-
subscriber: subscriber,
80-
blocksProvider: blocksProvider,
81-
store: store,
82-
registerStore: registerStore,
83-
blocks: blocks,
84-
receipts: receipts,
85-
transactions: transactions,
86-
traces: traces,
87-
log: log,
88-
blocksPublisher: blocksPublisher,
89-
logsPublisher: logsPublisher,
90-
collector: collector,
91-
replayerConfig: replayerConfig,
81+
subscriber: subscriber,
82+
blocksProvider: blocksProvider,
83+
store: store,
84+
registerStore: registerStore,
85+
blocks: blocks,
86+
receipts: receipts,
87+
transactions: transactions,
88+
traces: traces,
89+
log: log,
90+
blocksPublisher: blocksPublisher,
91+
logsPublisher: logsPublisher,
92+
receiptsPublisher: receiptsPublisher,
93+
collector: collector,
94+
replayerConfig: replayerConfig,
9295
}
9396
}
9497

@@ -204,6 +207,7 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
204207

205208
// emit block event and logs, only after we successfully commit the data
206209
e.blocksPublisher.Publish(events.Block())
210+
e.receiptsPublisher.Publish(events.Receipts())
207211
for _, r := range events.Receipts() {
208212
if len(r.Logs) > 0 {
209213
e.logsPublisher.Publish(r.Logs)

services/ingestion/engine_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ func TestSerialBlockIngestion(t *testing.T) {
7575
traces,
7676
models.NewPublisher[*models.Block](),
7777
models.NewPublisher[[]*gethTypes.Log](),
78+
models.NewPublisher[[]*models.Receipt](),
7879
zerolog.Nop(),
7980
metrics.NopCollector,
8081
defaultReplayerConfig(),
@@ -154,6 +155,7 @@ func TestSerialBlockIngestion(t *testing.T) {
154155
traces,
155156
models.NewPublisher[*models.Block](),
156157
models.NewPublisher[[]*gethTypes.Log](),
158+
models.NewPublisher[[]*models.Receipt](),
157159
zerolog.Nop(),
158160
metrics.NopCollector,
159161
defaultReplayerConfig(),
@@ -275,6 +277,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
275277
traces,
276278
models.NewPublisher[*models.Block](),
277279
models.NewPublisher[[]*gethTypes.Log](),
280+
models.NewPublisher[[]*models.Receipt](),
278281
zerolog.Nop(),
279282
metrics.NopCollector,
280283
defaultReplayerConfig(),
@@ -383,6 +386,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
383386
traces,
384387
models.NewPublisher[*models.Block](),
385388
models.NewPublisher[[]*gethTypes.Log](),
389+
models.NewPublisher[[]*models.Receipt](),
386390
zerolog.Nop(),
387391
metrics.NopCollector,
388392
defaultReplayerConfig(),
@@ -477,6 +481,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
477481
traces,
478482
models.NewPublisher[*models.Block](),
479483
models.NewPublisher[[]*gethTypes.Log](),
484+
models.NewPublisher[[]*models.Receipt](),
480485
zerolog.Nop(),
481486
metrics.NopCollector,
482487
defaultReplayerConfig(),

tests/web3js/eth_streaming_test.js

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
const WebSocket = require('ws')
12
const conf = require('./config')
23
const helpers = require('./helpers')
34
const { assert } = require('chai')
@@ -69,6 +70,33 @@ it('streaming of blocks, transactions, logs using filters', async () => {
6970
}
7071
})
7172

73+
let socket = new WebSocket('ws://127.0.0.1:8545')
74+
// give some time for the connection to open
75+
await new Promise((res) => setTimeout(() => res(), 1500))
76+
let payload = `
77+
{
78+
"jsonrpc": "2.0",
79+
"id": 2,
80+
"method": "eth_subscribe",
81+
"params": [
82+
"transactionReceipts",
83+
{
84+
"transactionHashes": []
85+
}
86+
]
87+
}
88+
`
89+
socket.send(payload)
90+
91+
// subscribe to all new receipts being produced by transaction submissions below
92+
let receipts = []
93+
socket.onmessage = (event) => {
94+
let response = JSON.parse(event.data)
95+
if (response.method == 'eth_subscription') {
96+
receipts = receipts.concat(response.params.result)
97+
}
98+
}
99+
72100
let sentHashes = []
73101
// produce events by submitting transactions
74102
for (const { A, B } of testValues) {
@@ -120,4 +148,74 @@ it('streaming of blocks, transactions, logs using filters', async () => {
120148
assert.lengthOf(matchingLogs, 1)
121149
assert.deepEqual(log, matchingLogs[0])
122150
}
151+
152+
assert.equal(10, receipts.length)
153+
for (let txHash of sentHashes) {
154+
let txReceipt = await helpers.callRPCMethod(
155+
'eth_getTransactionReceipt',
156+
[txHash]
157+
)
158+
159+
for (let rcp of receipts) {
160+
if (rcp.transactionHash == txHash) {
161+
assert.deepEqual(rcp, txReceipt.body['result'])
162+
}
163+
}
164+
}
165+
166+
let signedTx = await conf.eoa.signTransaction({
167+
from: conf.eoa.address,
168+
to: contractAddress,
169+
data: deployed.contract.methods.sum(7, 7).encodeABI(),
170+
gas: 1_000_000,
171+
gasPrice: conf.minGasPrice
172+
})
173+
174+
receipts = []
175+
let subID = null
176+
socket.onmessage = (event) => {
177+
let response = JSON.parse(event.data)
178+
if (response.id == 12) {
179+
subID = response.result
180+
}
181+
182+
if (response.method == 'eth_subscription' && response.params.subscription == subID) {
183+
receipts = receipts.concat(response.params.result)
184+
}
185+
}
186+
// Check that the subscription will notify the caller only for the given
187+
// set of tx hashes.
188+
payload = `
189+
{
190+
"jsonrpc": "2.0",
191+
"id": 12,
192+
"method": "eth_subscribe",
193+
"params": [
194+
"transactionReceipts",
195+
{
196+
"transactionHashes": [
197+
"0x7b45084668258f29cfc525494d00ea5171766d1d43436e41cea930380d96bf67",
198+
"0xed970aa258b677d5e772125dd4342f38e5ccf4dec685d38fc5f04f18eff1a939",
199+
"${signedTx.transactionHash}"
200+
]
201+
}
202+
]
203+
}
204+
`
205+
socket.send(payload)
206+
207+
// send transaction and make sure interaction was success
208+
let txReceipt = await web3.eth.sendSignedTransaction(signedTx.rawTransaction)
209+
assert.equal(txReceipt.status, conf.successStatus)
210+
assert.equal(txReceipt.transactionHash, signedTx.transactionHash)
211+
212+
await new Promise((res, rej) => setTimeout(() => res(), 1500))
213+
socket.close(1000, 'finished testing')
214+
215+
assert.equal(1, receipts.length)
216+
let expectedReceipt = await helpers.callRPCMethod(
217+
'eth_getTransactionReceipt',
218+
[signedTx.transactionHash]
219+
)
220+
assert.deepEqual(receipts[0], expectedReceipt.body['result'])
123221
})

0 commit comments

Comments
 (0)