Skip to content

Commit 688a0c8

Browse files
committed
fix: performance update unlocking entityIndexerStateDAL DAL between different accounts
1 parent 05da833 commit 688a0c8

File tree

4 files changed

+92
-43
lines changed

4 files changed

+92
-43
lines changed

packages/core/src/storage/entityIndexStorage.ts

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export type EntityIndexStorageOptions<Entity> = LevelStorageOptions<
3030
}
3131

3232
export type EntityIndexStorageInvokeOptions = StorageCommonOptions & {
33-
atomic?: boolean
33+
atomic?: string | boolean
3434
}
3535
export type EntityIndexStorageCallOptions = EntityIndexStorageInvokeOptions
3636
export type EntityIndexStorageSaveOptions<V> = StoragePutOptions<string, V> &
@@ -67,7 +67,7 @@ export class EntityIndexStorage<
6767

6868
constructor(
6969
protected options: EntityIndexStorageOptions<Entity>,
70-
protected atomicOpMutex: Mutex = new Mutex(),
70+
protected atomicOpMutex: Record<string, Mutex> = {},
7171
protected storage: LevelStorage<string | Entity> = new LevelStorage({
7272
...options,
7373
path: path.join(options.path, options.name),
@@ -93,8 +93,8 @@ export class EntityIndexStorage<
9393
}
9494
}
9595

96-
async acquire(): Promise<() => void> {
97-
return this.getAtomicOpMutex(true)
96+
async acquire(id?: string): Promise<() => void> {
97+
return this.getAtomicOpMutex(id || true)
9898
}
9999

100100
async getCount(options?: EntityIndexStorageCallOptions): Promise<number> {
@@ -664,10 +664,39 @@ export class EntityIndexStorage<
664664
return item.value
665665
}
666666

667-
protected getAtomicOpMutex(
668-
atomic = !!this.options.entityStore,
667+
protected async getAtomicOpMutex(
668+
atomic: string | boolean = !!this.options.entityStore,
669669
): Promise<() => void> {
670-
return atomic ? this.atomicOpMutex.acquire() : this.noopMutex
670+
if (!atomic) return this.noopMutex
671+
672+
const mainMutex = this.getMutexById('main')
673+
const mainPromise = mainMutex.acquire()
674+
if (atomic === true) return mainPromise
675+
676+
const idMutex = this.getMutexById(atomic)
677+
const idPromise = idMutex.acquire()
678+
const promises = [mainPromise, idPromise]
679+
680+
// @note: Only wait for id mutex but lock and release both: main and id mutexs
681+
await idPromise
682+
683+
return async () => {
684+
const releaseAll = await Promise.all(promises)
685+
for (const release of releaseAll) {
686+
release()
687+
}
688+
}
689+
}
690+
691+
protected getMutexById(id: string): Mutex {
692+
let mutex = this.atomicOpMutex[id]
693+
694+
if (!mutex) {
695+
mutex = new Mutex()
696+
this.atomicOpMutex[id] = mutex
697+
}
698+
699+
return mutex
671700
}
672701

673702
protected async getCountDelta(

packages/core/src/storage/entityStorage.ts

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
/* eslint-disable @typescript-eslint/no-empty-function */
22
import path from 'node:path'
33

4-
import { Mutex } from '../utils/index.js'
54
import { StorageGetOptions } from './baseStorage.js'
65
import {
76
EntityIndexStorage,
@@ -32,18 +31,17 @@ export type EntityStorageOptions<Entity> = EntityIndexStorageOptions<Entity> & {
3231
count?: boolean
3332
}
3433

35-
export type EntityStorageInvokeOptions = { atomic?: boolean }
34+
export type EntityStorageInvokeOptions = { atomic?: string | boolean }
3635
export type EntityStorageCallOptions = EntityStorageInvokeOptions
3736
export type EntityStorageGetStreamOptions<K, V> = StorageGetOptions<K, V> &
3837
EntityStorageInvokeOptions
39-
38+
export type EntityStorageSaveOptions = EntityStorageInvokeOptions
39+
export type EntityStorageRemoveOptions = EntityStorageInvokeOptions
4040
/**
4141
* Defines the storage handler class for different entities.
4242
*/
4343
export class EntityStorage<Entity> extends EntityIndexStorage<Entity, Entity> {
4444
protected byIndex: Record<string, EntityIndexStorage<Entity, Entity>> = {}
45-
protected atomicOpMutex: Mutex
46-
protected noopMutex = Promise.resolve((): void => {})
4745

4846
static AddressLength = EntityIndexStorage.AddressLength
4947
static TimestampLength = EntityIndexStorage.TimestampLength
@@ -54,15 +52,14 @@ export class EntityStorage<Entity> extends EntityIndexStorage<Entity, Entity> {
5452
protected options: EntityStorageOptions<Entity>,
5553
protected StorageClass: typeof LevelStorage = LevelStorage,
5654
) {
57-
const atomicOpMutex = new Mutex()
55+
const atomicOpMutex = {}
5856
const basePath = path.join(options.path, options.name)
5957
const storage = new LevelStorage({ ...options, path: basePath })
6058

6159
const opts = { ...options, sublevel: 'main' }
6260
super(opts, atomicOpMutex, storage)
6361

6462
this.options = opts
65-
this.atomicOpMutex = atomicOpMutex
6663

6764
const { indexes } = this.options
6865
if (!indexes) return
@@ -79,7 +76,7 @@ export class EntityStorage<Entity> extends EntityIndexStorage<Entity, Entity> {
7976
entityStore: this,
8077
count: false,
8178
},
82-
this.atomicOpMutex,
79+
atomicOpMutex,
8380
storage,
8481
)
8582
}
@@ -96,8 +93,13 @@ export class EntityStorage<Entity> extends EntityIndexStorage<Entity, Entity> {
9693
)
9794
}
9895

99-
async save(entities: Entity | Entity[]): Promise<void> {
100-
const release = await this.getAtomicOpMutex(true)
96+
async save(
97+
entities: Entity | Entity[],
98+
options?: EntityStorageSaveOptions,
99+
): Promise<void> {
100+
const atomicTop = options?.atomic !== undefined ? options.atomic : true
101+
const release = await this.getAtomicOpMutex(atomicTop)
102+
101103
const atomic = false
102104
const batch = this.getBatch()
103105

@@ -144,8 +146,13 @@ export class EntityStorage<Entity> extends EntityIndexStorage<Entity, Entity> {
144146
}
145147
}
146148

147-
async remove(entities: Entity | Entity[]): Promise<void> {
148-
const release = await this.getAtomicOpMutex(true)
149+
async remove(
150+
entities: Entity | Entity[],
151+
options?: EntityStorageRemoveOptions,
152+
): Promise<void> {
153+
const atomicTop = options?.atomic !== undefined ? options.atomic : true
154+
const release = await this.getAtomicOpMutex(atomicTop)
155+
149156
const atomic = false
150157
const batch = this.getBatch()
151158

packages/framework/src/services/indexer/src/accountEntityIndexer.ts

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ export class BaseAccountEntityIndexer<T extends ParsedEntity<unknown>> {
184184

185185
// @note: Update the state of the request to ready (mark for processing)
186186
// @note: Bulk write for boosting performance (more RAM comsumption)
187-
await this.entityIndexerStateDAL.save(readyRanges)
187+
await this.entityIndexerStateDAL.save(readyRanges, { atomic: account })
188188
}
189189

190190
protected async onEntityResponse(request: EntityRequest): Promise<void> {
@@ -203,11 +203,14 @@ export class BaseAccountEntityIndexer<T extends ParsedEntity<unknown>> {
203203
if (!pendingRange) return
204204

205205
// @note: Update the state of the request to ready (mark for processing)
206-
await this.entityIndexerStateDAL.save({
207-
...pendingRange,
208-
requestNonce,
209-
state: Ready,
210-
})
206+
await this.entityIndexerStateDAL.save(
207+
{
208+
...pendingRange,
209+
requestNonce,
210+
state: Ready,
211+
},
212+
{ atomic: account },
213+
)
211214
}
212215

213216
protected async fetchAllRanges({
@@ -309,8 +312,8 @@ export class BaseAccountEntityIndexer<T extends ParsedEntity<unknown>> {
309312
// @note: Ordering is important for not causing
310313
// race conditions issues on pending ranges calculation due
311314
// to empty processed entries in db
312-
await this.entityIndexerStateDAL.save(newStates)
313-
await this.entityIndexerStateDAL.remove(oldStates)
315+
await this.entityIndexerStateDAL.save(newStates, { atomic: account })
316+
await this.entityIndexerStateDAL.remove(oldStates, { atomic: account })
314317

315318
return mergedRanges
316319
}
@@ -349,11 +352,14 @@ export class BaseAccountEntityIndexer<T extends ParsedEntity<unknown>> {
349352
})
350353

351354
// @note: Update the state of the request to processed (mark for compaction)
352-
await this.entityIndexerStateDAL.save({
353-
...range,
354-
requestNonce: undefined,
355-
state: Processed,
356-
})
355+
await this.entityIndexerStateDAL.save(
356+
{
357+
...range,
358+
requestNonce: undefined,
359+
state: Processed,
360+
},
361+
{ atomic: account },
362+
)
357363

358364
// @note: Remove the request state on the entity fetcher
359365
await remove()
@@ -367,28 +373,35 @@ export class BaseAccountEntityIndexer<T extends ParsedEntity<unknown>> {
367373
}
368374

369375
protected async fetchRangeByDate(dateRange: AccountDateRange): Promise<void> {
376+
const { account } = this.config
370377
const { Pending, Ready } = EntityIndexerStateCode
371378

372379
// @note: Do the request and get the nonce
373380
const nonce = await this.entityFetcher.fetchAccountEntitiesByDate(dateRange)
374381

375382
// @note: Save the pending state of the request
376-
await this.entityIndexerStateDAL.save({
377-
...dateRange,
378-
requestNonce: nonce,
379-
state: Pending,
380-
})
383+
await this.entityIndexerStateDAL.save(
384+
{
385+
...dateRange,
386+
requestNonce: nonce,
387+
state: Pending,
388+
},
389+
{ atomic: account },
390+
)
381391

382392
// @note: Wait till the request is complete
383393
await this.entityFetcher.awaitRequestComplete(nonce)
384394

385395
// @note: Update the state to ready
386396
// (in some cases, the response comes before saving the pending state, so we must always check it here too)
387-
await this.entityIndexerStateDAL.save({
388-
...dateRange,
389-
requestNonce: nonce,
390-
state: Ready,
391-
})
397+
await this.entityIndexerStateDAL.save(
398+
{
399+
...dateRange,
400+
requestNonce: nonce,
401+
state: Ready,
402+
},
403+
{ atomic: account },
404+
)
392405
}
393406

394407
protected async calculateRangesToFetch(

packages/indexer-example/src/domain/main.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export default class MainDomain
4848
if (this.context.supportedBlockchains.includes(BlockchainChain.Solana))
4949
accountIndexerConfigs.push({
5050
blockchainId: BlockchainChain.Solana,
51-
account: 'DkxNXPCuJDpYA4rKKpqm1Jwv4tD9SNRcpEH2rf9juicd', // alephTokenSol,
51+
account: alephTokenSol,
5252
meta: 2,
5353
index: {
5454
transactions: {

0 commit comments

Comments
 (0)