Skip to content

Commit 407106e

Browse files
committed
Track Flow fees params updates in its own subscriber service
1 parent a09baeb commit 407106e

File tree

7 files changed

+396
-78
lines changed

7 files changed

+396
-78
lines changed

bootstrap/bootstrap.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,17 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
182182
ValidateResults: true,
183183
}
184184

185+
feeParamsSubscriber := ingestion.NewFeeParamsEventSubscriber(
186+
b.logger,
187+
b.client,
188+
chainID,
189+
nextCadenceHeight,
190+
)
191+
185192
// initialize event ingestion engine
186193
b.events = ingestion.NewEventIngestionEngine(
187194
subscriber,
195+
feeParamsSubscriber,
188196
blocksProvider,
189197
b.storages.Storage,
190198
b.storages.Registers,

models/events.go

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ type CadenceEvents struct {
5252
transactions []Transaction // transactions in the EVM block
5353
txEventPayloads []events.TransactionEventPayload // EVM.TransactionExecuted event payloads
5454
receipts []*Receipt // receipts for transactions
55-
feeParameters *FeeParameters // updates to Flow fees parameters
5655
}
5756

5857
// NewCadenceEvents decodes the events into evm types.
@@ -135,15 +134,6 @@ func decodeCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) {
135134
e.txEventPayloads = append(e.txEventPayloads, *txEventPayload)
136135
e.receipts = append(e.receipts, receipt)
137136
}
138-
139-
if isFeeParametersChangedEvent(val) {
140-
feeParameters, err := decodeFeeParametersChangedEvent(val)
141-
if err != nil {
142-
return nil, err
143-
}
144-
145-
e.feeParameters = feeParameters
146-
}
147137
}
148138

149139
// safety check, we have a missing block in the events
@@ -202,11 +192,6 @@ func (c *CadenceEvents) Receipts() []*Receipt {
202192
return c.receipts
203193
}
204194

205-
// FeeParameters returns any updates to the Flow fees parameters.
206-
func (c *CadenceEvents) FeeParameters() *FeeParameters {
207-
return c.feeParameters
208-
}
209-
210195
// Empty checks if there is an EVM block included in the events.
211196
// If there are no evm block or transactions events this is a heartbeat event.
212197
func (c *CadenceEvents) Empty() bool {
@@ -279,3 +264,34 @@ func NewBlockEventsError(err error) BlockEvents {
279264
Err: err,
280265
}
281266
}
267+
268+
type FeeParamsEvents struct {
269+
FeeParameters *FeeParameters // updates to Flow fees parameters
270+
Err error
271+
}
272+
273+
func NewFeeParamsEvents(events flow.BlockEvents) *FeeParamsEvents {
274+
for _, event := range events.Events {
275+
val := event.Value
276+
if isFeeParametersChangedEvent(val) {
277+
feeParameters, err := decodeFeeParametersChangedEvent(val)
278+
return &FeeParamsEvents{
279+
FeeParameters: feeParameters,
280+
Err: err,
281+
}
282+
}
283+
}
284+
285+
return &FeeParamsEvents{
286+
Err: fmt.Errorf(
287+
"could not find any %s events",
288+
FeeParametersChangedQualifiedIdentifier,
289+
),
290+
}
291+
}
292+
293+
func NewFeeParamsEventsError(err error) *FeeParamsEvents {
294+
return &FeeParamsEvents{
295+
Err: err,
296+
}
297+
}

services/ingestion/engine.go

Lines changed: 66 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -40,25 +40,27 @@ var _ models.Engine = &Engine{}
4040
type Engine struct {
4141
*models.EngineStatus
4242

43-
subscriber EventSubscriber
44-
blocksProvider *replayer.BlocksProvider
45-
store *pebble.Storage
46-
registerStore *pebble.RegisterStorage
47-
blocks storage.BlockIndexer
48-
receipts storage.ReceiptIndexer
49-
transactions storage.TransactionIndexer
50-
traces storage.TraceIndexer
51-
feeParameters storage.FeeParametersIndexer
52-
log zerolog.Logger
53-
evmLastHeight *models.SequentialHeight
54-
blocksPublisher *models.Publisher[*models.Block]
55-
logsPublisher *models.Publisher[[]*gethTypes.Log]
56-
collector metrics.Collector
57-
replayerConfig replayer.Config
43+
subscriber EventSubscriber
44+
feeParamsSubscriber FeeParamsSubscriber
45+
blocksProvider *replayer.BlocksProvider
46+
store *pebble.Storage
47+
registerStore *pebble.RegisterStorage
48+
blocks storage.BlockIndexer
49+
receipts storage.ReceiptIndexer
50+
transactions storage.TransactionIndexer
51+
traces storage.TraceIndexer
52+
feeParameters storage.FeeParametersIndexer
53+
log zerolog.Logger
54+
evmLastHeight *models.SequentialHeight
55+
blocksPublisher *models.Publisher[*models.Block]
56+
logsPublisher *models.Publisher[[]*gethTypes.Log]
57+
collector metrics.Collector
58+
replayerConfig replayer.Config
5859
}
5960

6061
func NewEventIngestionEngine(
6162
subscriber EventSubscriber,
63+
feeParamsSubscriber FeeParamsSubscriber,
6264
blocksProvider *replayer.BlocksProvider,
6365
store *pebble.Storage,
6466
registerStore *pebble.RegisterStorage,
@@ -78,20 +80,21 @@ func NewEventIngestionEngine(
7880
return &Engine{
7981
EngineStatus: models.NewEngineStatus(),
8082

81-
subscriber: subscriber,
82-
blocksProvider: blocksProvider,
83-
store: store,
84-
registerStore: registerStore,
85-
blocks: blocks,
86-
receipts: receipts,
87-
transactions: transactions,
88-
traces: traces,
89-
feeParameters: feeParameters,
90-
log: log,
91-
blocksPublisher: blocksPublisher,
92-
logsPublisher: logsPublisher,
93-
collector: collector,
94-
replayerConfig: replayerConfig,
83+
subscriber: subscriber,
84+
feeParamsSubscriber: feeParamsSubscriber,
85+
blocksProvider: blocksProvider,
86+
store: store,
87+
registerStore: registerStore,
88+
blocks: blocks,
89+
receipts: receipts,
90+
transactions: transactions,
91+
traces: traces,
92+
feeParameters: feeParameters,
93+
log: log,
94+
blocksPublisher: blocksPublisher,
95+
logsPublisher: logsPublisher,
96+
collector: collector,
97+
replayerConfig: replayerConfig,
9598
}
9699
}
97100

@@ -123,6 +126,7 @@ func (e *Engine) Run(ctx context.Context) error {
123126
defer e.MarkStopped()
124127

125128
events := e.subscriber.Subscribe(ctx)
129+
feeParamsEvents := e.feeParamsSubscriber.Subscribe(ctx)
126130

127131
for {
128132
select {
@@ -145,6 +149,20 @@ func (e *Engine) Run(ctx context.Context) error {
145149
e.log.Error().Err(err).Msg("failed to process EVM events")
146150
return err
147151
}
152+
case feeParams, ok := <-feeParamsEvents:
153+
if !ok {
154+
return nil
155+
}
156+
if feeParams.Err != nil {
157+
return fmt.Errorf(
158+
"failure in FeeParametersChanged event subscription with: %w",
159+
feeParams.Err,
160+
)
161+
}
162+
if err := e.processFeeParamsEvents(feeParams); err != nil {
163+
e.log.Error().Err(err).Msg("failed to process FeeParametersChanged events")
164+
return err
165+
}
148166
}
149167
}
150168
}
@@ -172,6 +190,25 @@ func (e *Engine) withBatch(f func(batch *pebbleDB.Batch) error) error {
172190
return nil
173191
}
174192

193+
func (e *Engine) processFeeParamsEvents(events *models.FeeParamsEvents) error {
194+
if events == nil || events.FeeParameters == nil {
195+
return nil
196+
}
197+
198+
err := e.withBatch(
199+
func(batch *pebbleDB.Batch) error {
200+
return e.feeParameters.Store(events.FeeParameters, batch)
201+
},
202+
)
203+
if err != nil {
204+
return fmt.Errorf("failed to update fee parameters during events ingestion: %w", err)
205+
}
206+
207+
e.log.Info().Msg("updated fee parameters")
208+
209+
return nil
210+
}
211+
175212
// processEvents converts the events to block and transactions and indexes them.
176213
//
177214
// BlockEvents are received by the access node API and contain Cadence height (always a single Flow block),
@@ -220,16 +257,6 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
220257

221258
// indexEvents will replay the evm transactions using the block events and index all results.
222259
func (e *Engine) indexEvents(events *models.CadenceEvents, batch *pebbleDB.Batch) error {
223-
if events.FeeParameters() != nil {
224-
if err := e.feeParameters.Store(events.FeeParameters(), batch); err != nil {
225-
return fmt.Errorf(
226-
"failed to update fee parameters for height: %d, during events ingestion: %w",
227-
events.CadenceHeight(),
228-
err,
229-
)
230-
}
231-
}
232-
233260
// if heartbeat interval with no data still update the cadence height
234261
if events.Empty() {
235262
if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), batch); err != nil {

services/ingestion/engine_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,15 @@ func TestSerialBlockIngestion(t *testing.T) {
6565
return eventsChan
6666
})
6767

68+
feeParamsSubscriber := &mocks.FeeParamsSubscriber{}
69+
feeParamsSubscriber.On("Subscribe", mock.Anything).
70+
Return(func(ctx context.Context) <-chan *models.FeeParamsEvents {
71+
return make(chan *models.FeeParamsEvents)
72+
})
73+
6874
engine := NewEventIngestionEngine(
6975
subscriber,
76+
feeParamsSubscriber,
7077
replayer.NewBlocksProvider(blocks, flowGo.Emulator, nil),
7178
store,
7279
registerStore,
@@ -146,8 +153,15 @@ func TestSerialBlockIngestion(t *testing.T) {
146153
return eventsChan
147154
})
148155

156+
feeParamsSubscriber := &mocks.FeeParamsSubscriber{}
157+
feeParamsSubscriber.On("Subscribe", mock.Anything).
158+
Return(func(ctx context.Context) <-chan *models.FeeParamsEvents {
159+
return make(chan *models.FeeParamsEvents)
160+
})
161+
149162
engine := NewEventIngestionEngine(
150163
subscriber,
164+
feeParamsSubscriber,
151165
replayer.NewBlocksProvider(blocks, flowGo.Emulator, nil),
152166
store,
153167
registerStore,
@@ -270,8 +284,15 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
270284

271285
feeParams := &storageMock.FeeParametersIndexer{}
272286

287+
feeParamsSubscriber := &mocks.FeeParamsSubscriber{}
288+
feeParamsSubscriber.On("Subscribe", mock.Anything).
289+
Return(func(ctx context.Context) <-chan *models.FeeParamsEvents {
290+
return make(chan *models.FeeParamsEvents)
291+
})
292+
273293
engine := NewEventIngestionEngine(
274294
subscriber,
295+
feeParamsSubscriber,
275296
replayer.NewBlocksProvider(blocks, flowGo.Emulator, nil),
276297
store,
277298
registerStore,
@@ -366,6 +387,12 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
366387
return eventsChan
367388
})
368389

390+
feeParamsSubscriber := &mocks.FeeParamsSubscriber{}
391+
feeParamsSubscriber.On("Subscribe", mock.Anything).
392+
Return(func(ctx context.Context) <-chan *models.FeeParamsEvents {
393+
return make(chan *models.FeeParamsEvents)
394+
})
395+
369396
txCdc, txEvent, transaction, res, err := newTransaction(nextHeight)
370397
require.NoError(t, err)
371398
blockCdc, _, blockEvent, err := newBlock(nextHeight, []gethCommon.Hash{res.TxHash})
@@ -383,6 +410,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
383410

384411
engine := NewEventIngestionEngine(
385412
subscriber,
413+
feeParamsSubscriber,
386414
replayer.NewBlocksProvider(blocks, flowGo.Emulator, nil),
387415
store,
388416
registerStore,
@@ -477,8 +505,15 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
477505
}).
478506
Once()
479507

508+
feeParamsSubscriber := &mocks.FeeParamsSubscriber{}
509+
feeParamsSubscriber.On("Subscribe", mock.Anything).
510+
Return(func(ctx context.Context) <-chan *models.FeeParamsEvents {
511+
return make(chan *models.FeeParamsEvents)
512+
})
513+
480514
engine := NewEventIngestionEngine(
481515
subscriber,
516+
feeParamsSubscriber,
482517
replayer.NewBlocksProvider(blocks, flowGo.Emulator, nil),
483518
store,
484519
registerStore,

services/ingestion/event_subscriber.go

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
146146
blockEventsStream, errChan, err = r.client.SubscribeEventsByBlockHeight(
147147
ctx,
148148
height,
149-
blocksEventFilter(r.chain),
149+
evmEventFilter(r.chain),
150150
access.WithHeartbeatInterval(1),
151151
)
152152

@@ -552,29 +552,6 @@ func (r *RPCEventSubscriber) recover(
552552
return models.NewBlockEventsError(err)
553553
}
554554

555-
// blocksEventFilter defines the full set of events we subscribe to:
556-
// - A.{evm}.EVM.BlockExecuted
557-
// - A.{evm}.EVM.TransactionExecuted,
558-
// - A.{flow_fees}.FlowFees.FeeParametersChanged,
559-
// where {evm} is the EVM deployed contract address, which depends on the
560-
// configured chain ID and {flow_fees} is the FlowFees deployed contract
561-
// address for the configured chain ID.
562-
func blocksEventFilter(chainID flowGo.ChainID) flow.EventFilter {
563-
contracts := systemcontracts.SystemContractsForChain(chainID)
564-
flowFeesAddress := common.Address(contracts.FlowFees.Address)
565-
eventFilter := evmEventFilter(chainID)
566-
567-
feeParametersChangedEvent := common.NewAddressLocation(
568-
nil,
569-
flowFeesAddress,
570-
models.FeeParametersChangedQualifiedIdentifier,
571-
).ID()
572-
573-
eventFilter.EventTypes = append(eventFilter.EventTypes, feeParametersChangedEvent)
574-
575-
return eventFilter
576-
}
577-
578555
// evmEventFilter defines the EVM-related events we subscribe to:
579556
// - A.{evm}.EVM.BlockExecuted,
580557
// - A.{evm}.EVM.TransactionExecuted,

0 commit comments

Comments
 (0)