Skip to content

Commit 68f4e9b

Browse files
committed
Refactor the indexing process and improve configuration management.
We were trigger a lot of promises in parallel, and for some reason when we do this, Sequelize in the background hangs the transactions. Lowering the number of parallel promises looks like giving more free space into the event loop, which allows To Sequelize to do his job. - Reordered asynchronous operations in `indexer.manager.ts` for better handling of event processing. - Introduced a new function `sanitize_cmd` in `shared.sh` to redact sensitive command-line values for logging. - Updated default values for `BATCH_SIZE` and `CONCURRENCY` in `db.ts`, and added a log entry for global config parameters. - Enhanced `node-entrypoint.sh` to include additional environment variables and sanitized command logging for execution safety. - Fixed minor formatting issues in `project.ts` handler configuration.
1 parent 4d3e591 commit 68f4e9b

File tree

5 files changed

+46
-18
lines changed

5 files changed

+46
-18
lines changed

project.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ const project: CosmosProject = {
435435
// and parallel operations.
436436
handler: "indexingHandler",
437437
kind: CosmosHandlerKind.Block,
438-
}
438+
},
439439
],
440440
},
441441
},

scripts/node-entrypoint.sh

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ set -e
1010
# all this is to been able to join WATCH and Normal execution in a single dockerfile
1111
cmd="env NODE_OPTIONS=$NODE_OPTIONS \
1212
NODE_ENV=$NODE_ENV \
13-
CHAIN_ID=$CHAIN_ID"
13+
CHAIN_ID=$CHAIN_ID \
14+
BATCH_SIZE=$BATCH_SIZE \
15+
START_BLOCK=$START_BLOCK \
16+
DB_SCHEMA=$DB_SCHEMA"
1417

1518
if [ "$NODE_ENV" = "test" ]
1619
then
@@ -37,7 +40,10 @@ EOF
3740
DB_PASS=$DB_PASS \
3841
DB_DATABASE=$DB_DATABASE \
3942
DB_HOST=$DB_HOST \
40-
DB_PORT=$DB_PORT"
43+
DB_PORT=$DB_PORT \
44+
POCKETDEX_DB_PAGE_LIMIT=$POCKETDEX_DB_PAGE_LIMIT \
45+
POCKETDEX_DB_BATCH_SIZE=$POCKETDEX_DB_BATCH_SIZE \
46+
POCKETDEX_DB_BULK_WRITE_CONCURRENCY=$POCKETDEX_DB_BULK_WRITE_CONCURRENCY"
4147

4248
if [ "$NODE_ENV" = "development" ]
4349
then
@@ -58,9 +64,15 @@ EOF
5864
# move the dist folder to the mounted folder in run time
5965
update_project
6066
params=$(get_params)
67+
68+
6169
# run the main node
6270
cmd="$cmd node ./node_modules/@subql/node-cosmos/bin/run $params $@"
6371
fi
6472
fi
6573

74+
# Sanitize and log the command before execution
75+
sanitized_cmd=$(sanitize_cmd "$cmd")
76+
info_log "Executing command: $sanitized_cmd"
77+
6678
eval $cmd

scripts/shared.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,8 @@ get_env_file_name(){
7777
fi
7878
echo "$file"
7979
}
80+
81+
# Function to sanitize sensitive values in the command string for logging
82+
sanitize_cmd() {
83+
echo "$1" | sed "s/DB_PASS=[^ ]*/DB_PASS=****/g"
84+
}

src/mappings/indexer.manager.ts

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -690,21 +690,31 @@ async function _indexingHandler(block: CosmosBlock): Promise<void> {
690690
logger.warn(`[indexer.manager] unhandledEventTypes=${stringify(Array.from(unhandledEventTypes))} eventsByType=${stringify(eventsByType, jsonArrayCounter, 0)}`);
691691
}
692692

693+
// let's serialize to maybe avoid issues do to even loop saturation?
693694
await profilerWrap(indexPrimitives, "indexingHandler", "indexPrimitives")(block);
694695

695-
await Promise.all([
696-
profilerWrap(handleTransactions, "indexPrimitives", "handleTransactions")(block.transactions),
697-
profilerWrap(handleMessages, "indexPrimitives", "handleMessages")(block.messages),
698-
profilerWrap(handleEvents, "indexPrimitives", "handleEvents")(filteredEvents),
699-
profilerWrap(indexBalances, "indexingHandler", "indexBalances")(block, msgsByType as MessageByType, eventsByType),
700-
profilerWrap(indexParams, "indexingHandler", "indexParams")(msgsByType as MessageByType),
701-
profilerWrap(indexGrants, "indexingHandler", "indexGrants")(msgsByType as MessageByType, eventsByType),
702-
profilerWrap(indexService, "indexingHandler", "indexService")(msgsByType as MessageByType, eventsByType),
703-
profilerWrap(indexValidators, "indexingHandler", "indexValidators")(msgsByType as MessageByType, eventsByType),
704-
profilerWrap(indexStake, "indexingHandler", "indexStake")(msgsByType as MessageByType, eventsByType),
705-
profilerWrap(indexRelays, "indexingHandler", "indexRelays")(msgsByType as MessageByType, eventsByType),
706-
profilerWrap(indexMigrationAccounts, "indexingHandler", "indexMigrationAccounts")(msgsByType as MessageByType),
707-
]);
696+
// lets this happens first because is massive
697+
await profilerWrap(handleEvents, "indexPrimitives", "handleEvents")(filteredEvents);
698+
699+
await profilerWrap(handleTransactions, "indexPrimitives", "handleTransactions")(block.transactions);
700+
701+
await profilerWrap(handleMessages, "indexPrimitives", "handleMessages")(block.messages);
702+
703+
await profilerWrap(indexBalances, "indexingHandler", "indexBalances")(block, msgsByType as MessageByType, eventsByType);
704+
705+
await profilerWrap(indexParams, "indexingHandler", "indexParams")(msgsByType as MessageByType);
706+
707+
await profilerWrap(indexGrants, "indexingHandler", "indexGrants")(msgsByType as MessageByType, eventsByType);
708+
709+
await profilerWrap(indexService, "indexingHandler", "indexService")(msgsByType as MessageByType, eventsByType);
710+
711+
await profilerWrap(indexValidators, "indexingHandler", "indexValidators")(msgsByType as MessageByType, eventsByType);
712+
713+
await profilerWrap(indexStake, "indexingHandler", "indexStake")(msgsByType as MessageByType, eventsByType);
714+
715+
await profilerWrap(indexRelays, "indexingHandler", "indexRelays")(msgsByType as MessageByType, eventsByType);
716+
717+
await profilerWrap(indexMigrationAccounts, "indexingHandler", "indexMigrationAccounts")(msgsByType as MessageByType);
708718

709719
await profilerWrap(generateReports, "indexingHandler", "generateReports")(block);
710720
}

src/mappings/utils/db.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ type Transformer<T> = (doc: T) => any;
2727

2828
// PAGE_LIMIT should be less or equal to --query-limit=<value> otherwise Subql will throw an error.
2929
const PAGE_LIMIT = isNumber(process.env.POCKETDEX_DB_PAGE_LIMIT) ? Number(process.env.POCKETDEX_DB_PAGE_LIMIT) : 1000;
30-
const BATCH_SIZE = isNumber(process.env.POCKETDEX_DB_BATCH_SIZE) ? Number(process.env.POCKETDEX_DB_BATCH_SIZE) : 15000;
31-
const CONCURRENCY = isNumber(process.env.POCKETDEX_DB_BULK_WRITE_CONCURRENCY) ? Number(process.env.POCKETDEX_DB_BULK_WRITE_CONCURRENCY) : 10;
30+
const BATCH_SIZE = isNumber(process.env.POCKETDEX_DB_BATCH_SIZE) ? Number(process.env.POCKETDEX_DB_BATCH_SIZE) : 5000;
31+
const CONCURRENCY = isNumber(process.env.POCKETDEX_DB_BULK_WRITE_CONCURRENCY) ? Number(process.env.POCKETDEX_DB_BULK_WRITE_CONCURRENCY) : 5;
3232

33+
logger.info(`[Global] PAGE_LIMIT=${PAGE_LIMIT} BATCH_SIZE=${BATCH_SIZE} CONCURRENCY=${CONCURRENCY}`);
3334

3435
/**
3536
* Retrieves the store model associated with the given name.

0 commit comments

Comments
 (0)