@@ -9,6 +9,7 @@ import spread from 'lodash/spread'
9
9
import sub from 'subleveldown'
10
10
import wait from 'src/utils/wait'
11
11
import { EventEmitter } from 'events'
12
+ import { Migration } from 'src/db/migrations'
12
13
import { Mutex } from 'async-mutex'
13
14
import { TenSecondsMs } from 'src/constants'
14
15
import { config as globalConfig } from 'src/config'
@@ -36,6 +37,10 @@ type QueueItem = {
36
37
cb : any
37
38
}
38
39
40
+ type MigrationIndexEntry = {
41
+ index : number
42
+ }
43
+
39
44
// this are options that leveldb createReadStream accepts
40
45
export type KeyFilter = {
41
46
gt ?: string
@@ -59,8 +64,9 @@ class BaseDb extends EventEmitter {
59
64
batchSize : number = 10
60
65
batchTimeLimit : number = 2 * 1000
61
66
batchQueue : QueueItem [ ] = [ ]
67
+ private readonly dbMigrationKey : string = '_dbMigrationIndex'
62
68
63
- constructor ( prefix : string , _namespace ?: string ) {
69
+ constructor ( prefix : string , _namespace ?: string , migrations ?: Migration [ ] ) {
64
70
super ( )
65
71
if ( ! prefix ) {
66
72
throw new Error ( 'db prefix is required' )
@@ -121,7 +127,7 @@ class BaseDb extends EventEmitter {
121
127
} )
122
128
123
129
this . pollBatchQueue ( )
124
- this . _migration ( )
130
+ this . _migrate ( migrations )
125
131
. then ( ( ) => {
126
132
this . ready = true
127
133
this . logger . debug ( 'db ready' )
@@ -132,15 +138,36 @@ class BaseDb extends EventEmitter {
132
138
} )
133
139
}
134
140
135
- // To add a migration, implement the shouldMigrate and migration functions in the child class.
136
141
// Migrations are memory intensive. Ensure there is no unintentional memory overflow.
137
142
// * Use stream instead of storing all entries at once
138
143
// * Bypass the mutex
139
- async _migration ( ) : Promise < void > {
140
- // Check if migration is needed
141
- if ( ! this . shouldMigrate ( ) ) return
144
+ // This may take minutes or hours to complete
145
+ private async _migrate ( migrations ?: Migration [ ] ) : Promise < void > {
146
+ if ( ! migrations ?. length ) {
147
+ this . logger . debug ( 'no migrations to process' )
148
+ return
149
+ }
150
+
151
+ const currentMigrationIndex = await this . getMigrationIndex ( ) ?? 0
152
+ const lastMigrationIndex = migrations . length - 1
153
+ if ( currentMigrationIndex > lastMigrationIndex ) {
154
+ this . logger . debug ( `no migration required, currentMigrationIndex: ${ currentMigrationIndex } ` )
155
+ return
156
+ }
157
+
158
+ this . logger . debug ( `processing migrations from ${ currentMigrationIndex } to ${ lastMigrationIndex } ` )
159
+ for ( let i = currentMigrationIndex ; i <= lastMigrationIndex ; i ++ ) {
160
+ const migration : Migration = migrations [ i ]
161
+ this . logger . debug ( `processing migration ${ i } ` )
162
+ await this . _processMigration ( migration )
163
+ await this . putMigrationIndex ( i + 1 )
164
+ this . logger . debug ( `completed migration ${ i } ` )
165
+ }
166
+ this . logger . debug ( 'migrations complete' )
167
+ }
142
168
143
- // Perform migration
169
+ private async _processMigration ( migration : Migration ) : Promise < void > {
170
+ let migrationCount = 0
144
171
return await new Promise ( ( resolve , reject ) => {
145
172
const s = this . db . createReadStream ( { } )
146
173
s . on ( 'data' , async ( key : any , value : any ) => {
@@ -154,15 +181,15 @@ class BaseDb extends EventEmitter {
154
181
return
155
182
}
156
183
157
- // Call the child class migration function
158
184
try {
159
- await this . migration ( key , value )
185
+ await this . _migrateEntry ( migration , key , value )
186
+ migrationCount ++
160
187
} catch ( err ) {
161
188
s . emit ( 'error' , err )
162
189
}
163
190
} )
164
191
. on ( 'end' , ( ) => {
165
- this . logger . debug ( ' DB migration complete' )
192
+ this . logger . debug ( ` DB migration complete. migrated ${ migrationCount } entries` )
166
193
s . destroy ( )
167
194
resolve ( )
168
195
} )
@@ -174,15 +201,29 @@ class BaseDb extends EventEmitter {
174
201
} )
175
202
}
176
203
177
- shouldMigrate ( ) : boolean {
178
- // Optional
179
- // Must return true in child class to perform migration
180
- return false
204
+ // Ensure that these DB reads and writes do not use the mutex since a migration is memory-intensive
205
+ private async _migrateEntry ( migration : Migration , key : string , value : any ) : Promise < void > {
206
+ const { key : migrationKey , value : migrationValue , migratedValue } = migration
207
+ if (
208
+ value ?. [ migrationKey ] === undefined ||
209
+ value ?. [ migrationKey ] === migrationValue
210
+ ) {
211
+ const { value : updatedValue } = await this . _getUpdateData ( key , value )
212
+ updatedValue [ migrationKey ] = migratedValue
213
+ return this . db . put ( key , updatedValue )
214
+ }
215
+ }
216
+
217
+ private async getMigrationIndex ( ) : Promise < number | undefined > {
218
+ const migrationEntry : MigrationIndexEntry = await this . getById ( this . dbMigrationKey )
219
+ return migrationEntry ?. index
181
220
}
182
221
183
- async migration ( key : string , value : any ) : Promise < void > {
184
- // Optional
185
- // Must be implemented in child class to perform migration
222
+ private async putMigrationIndex ( updatedMigrationIndex : number ) : Promise < void > {
223
+ const data : MigrationIndexEntry = {
224
+ index : updatedMigrationIndex
225
+ }
226
+ return this . _updateSingle ( this . dbMigrationKey , data )
186
227
}
187
228
188
229
protected async tilReady ( ) : Promise < boolean > {
0 commit comments