@@ -20,15 +20,18 @@ const DEFAULT_BLOCKS_PER_POLL = 10
20
20
const DEFAULT_TRIGGER_INTERVAL = 1000
21
21
22
22
type Poller struct {
23
- rpc rpc.IRPCClient
24
- blocksPerPoll int64
25
- triggerIntervalMs int64
26
- storage storage.IStorage
27
- lastPolledBlock * big.Int
28
- pollFromBlock * big.Int
29
- pollUntilBlock * big.Int
30
- parallelPollers int
31
- workModeChan chan WorkMode
23
+ rpc rpc.IRPCClient
24
+ blocksPerPoll int64
25
+ triggerIntervalMs int64
26
+ storage storage.IStorage
27
+ lastPolledBlock * big.Int
28
+ lastPolledBlockMutex sync.RWMutex
29
+ pollFromBlock * big.Int
30
+ pollUntilBlock * big.Int
31
+ parallelPollers int
32
+ workModeChan chan WorkMode
33
+ currentWorkMode WorkMode
34
+ workModeMutex sync.RWMutex
32
35
}
33
36
34
37
type BlockNumberWithError struct {
@@ -117,6 +120,15 @@ func (p *Poller) Start(ctx context.Context) {
117
120
if ! ok {
118
121
return
119
122
}
123
+
124
+ // Do not poll if not in backfill mode
125
+ p .workModeMutex .RLock ()
126
+ if p .currentWorkMode != WorkModeBackfill {
127
+ p .workModeMutex .RUnlock ()
128
+ continue
129
+ }
130
+ p .workModeMutex .RUnlock ()
131
+
120
132
blockRangeMutex .Lock ()
121
133
blockNumbers , err := p .getNextBlockRange (pollCtx )
122
134
blockRangeMutex .Unlock ()
@@ -149,10 +161,26 @@ func (p *Poller) Start(ctx context.Context) {
149
161
p .shutdown (cancel , tasks , & wg )
150
162
return
151
163
case workMode := <- p .workModeChan :
152
- if workMode == WorkModeLive {
153
- log .Info ().Msg ("Switching to live mode, stopping poller" )
154
- p .shutdown (cancel , tasks , & wg )
155
- return
164
+ p .workModeMutex .RLock ()
165
+ currentWorkMode := p .currentWorkMode
166
+ p .workModeMutex .RUnlock ()
167
+ if workMode != currentWorkMode && workMode != "" {
168
+ log .Info ().Msgf ("Poller work mode changing from %s to %s" , currentWorkMode , workMode )
169
+ p .workModeMutex .Lock ()
170
+ changedToBackfillFromLive := currentWorkMode == WorkModeLive && workMode == WorkModeBackfill
171
+ p .currentWorkMode = workMode
172
+ p .workModeMutex .Unlock ()
173
+ if changedToBackfillFromLive {
174
+ lastBlockInMainStorage , err := p .storage .MainStorage .GetMaxBlockNumber (p .rpc .GetChainID ())
175
+ if err != nil {
176
+ log .Error ().Err (err ).Msg ("Error getting last block in main storage" )
177
+ } else {
178
+ p .lastPolledBlockMutex .Lock ()
179
+ p .lastPolledBlock = lastBlockInMainStorage
180
+ p .lastPolledBlockMutex .Unlock ()
181
+ log .Debug ().Msgf ("Switching to backfill mode, updating last polled block to %s" , p .lastPolledBlock .String ())
182
+ }
183
+ }
156
184
}
157
185
case <- ticker .C :
158
186
select {
0 commit comments