Skip to content

Commit 4ec3a0d

Browse files
authored
Handle nested event processing and optimize balance queries (#64)
- Added support for parsing nested "application" JSON in _handleApplicationUnbondingBeginEvent to correctly handle missing operator addresses in events. - Introduced a helper function `chunkArray` to optimize batch balance queries. - Refactored balance querying logic in updateBalances to ensure scalability by splitting requests into manageable batches of 1000. - Improved error handling to better identify and process cases with missing or invalid data.
1 parent 328e0d2 commit 4ec3a0d

File tree

2 files changed

+105
-27
lines changed

2 files changed

+105
-27
lines changed

src/mappings/bank/balanceChange.ts

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
import {
2-
CosmosEvent,
3-
} from "@subql/types-cosmos";
1+
import { CosmosEvent } from "@subql/types-cosmos";
42
import { findIndex } from "lodash";
53
import { parseCoins } from "../../cosmjs/utils";
64
import {
@@ -10,7 +8,12 @@ import {
108
import { AccountProps } from "../../types/models/Account";
119
import type { BalanceProps } from "../../types/models/Balance";
1210
import { ModuleAccountProps } from "../../types/models/ModuleAccount";
13-
import { fetchPaginatedRecords, getSequelize, getStoreModel, optimizedBulkCreate } from "../utils/db";
11+
import {
12+
fetchPaginatedRecords,
13+
getSequelize,
14+
getStoreModel,
15+
optimizedBulkCreate,
16+
} from "../utils/db";
1417
import {
1518
generateDeterministicUUID,
1619
getBlockId,
@@ -103,26 +106,41 @@ export async function enforceAccountsExists(accounts: Array<EnforceAccountExiste
103106
export const CoinReceiveType = "coin_received";
104107
export const CoinSpentType = "coin_spent";
105108

109+
function chunkArray<T>(arr: T[], size: number): T[][] {
110+
if (size <= 0) throw new Error("size must be > 0");
111+
const chunks: T[][] = [];
112+
for (let i = 0; i < arr.length; i += size) {
113+
chunks.push(arr.slice(i, i + size));
114+
}
115+
return chunks;
116+
}
117+
106118
export async function updateBalances(
107119
addressDenomEntries: Array<[string, bigint]>,
108120
blockId: ReturnType<typeof getBlockId>
109121
): Promise<void> {
110122
const ids = addressDenomEntries.map(([id]) => id)
111123

112-
const currentBalancesMap: Record<string, bigint> = await fetchPaginatedRecords<Balance>({
113-
fetchFn: (options) => Balance.getByFields(
114-
[
115-
['id', 'in', ids],
116-
],
117-
options
118-
)
119-
})
120-
.then((balances: Array<Balance>) => balances.reduce((acc, record) => ({
121-
...acc,
122-
[record.id]: record.amount
123-
}),
124-
{}
125-
))
124+
const batches = chunkArray(ids, 1000);
125+
126+
const balances = [];
127+
128+
for (const batch of batches) {
129+
const balancesBatch = await fetchPaginatedRecords<Balance>({
130+
fetchFn: (options) => Balance.getByFields(
131+
[
132+
["id", "in", batch],
133+
],
134+
options,
135+
),
136+
});
137+
balances.push(...balancesBatch);
138+
}
139+
140+
const currentBalancesMap: Record<string, bigint> = balances.reduce((acc, record) => ({
141+
...acc,
142+
[record.id]: record.amount,
143+
}), {});
126144

127145
const balancesToSaveWithOptimize: Array<BalanceProps> = []
128146

src/mappings/pocket/applications.ts

Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
import { toHex } from "@cosmjs/encoding";
2-
import { CosmosEvent, CosmosMessage, CosmosTransaction } from "@subql/types-cosmos";
2+
import {
3+
CosmosEvent,
4+
CosmosMessage,
5+
CosmosTransaction,
6+
} from "@subql/types-cosmos";
37
import { Coin } from "../../client/cosmos/base/v1beta1/coin";
48
import {
59
ApplicationUnbondingReason as ApplicationUnbondingReasonSDKType,
610
applicationUnbondingReasonFromJSON,
711
} from "../../client/pocket/application/event";
812
import {
913
Application,
10-
ApplicationGateway, ApplicationUnbondingReason,
14+
ApplicationGateway,
15+
ApplicationUnbondingReason,
1116
EventApplicationUnbondingBegin as EventApplicationUnbondingBeginEntity,
1217
EventApplicationUnbondingEnd as EventApplicationUnbondingEndEntity,
1318
EventTransferBegin as EventTransferBeginEntity,
@@ -37,8 +42,12 @@ import {
3742
} from "../../types/proto-interfaces/pocket/application/tx";
3843
import { ApplicationSDKType } from "../../types/proto-interfaces/pocket/application/types";
3944
import { MsgClaimMorseApplication } from "../../types/proto-interfaces/pocket/migration/tx";
40-
import {ApplicationServiceConfig as ApplicationServiceConfigType} from "../../types/proto-interfaces/pocket/shared/service"
41-
import { getSequelize, getStoreModel, optimizedBulkCreate } from "../utils/db";
45+
import { ApplicationServiceConfig as ApplicationServiceConfigType } from "../../types/proto-interfaces/pocket/shared/service";
46+
import {
47+
getSequelize,
48+
getStoreModel,
49+
optimizedBulkCreate,
50+
} from "../utils/db";
4251
import {
4352
getAppDelegatedToGatewayId,
4453
getBlockId,
@@ -47,10 +56,17 @@ import {
4756
getStakeServiceId,
4857
messageId,
4958
} from "../utils/ids";
59+
import { parseJson } from "../utils/json";
5060
import { getDenomAndAmount } from "../utils/primitives";
51-
import { Ed25519, pubKeyToAddress } from "../utils/pub_key";
61+
import {
62+
Ed25519,
63+
pubKeyToAddress,
64+
} from "../utils/pub_key";
5265
import { updateMorseClaimableAccounts } from "./migration";
53-
import { fetchAllApplicationGatewayByApplicationId, fetchAllApplicationServiceByApplicationId } from "./pagination";
66+
import {
67+
fetchAllApplicationGatewayByApplicationId,
68+
fetchAllApplicationServiceByApplicationId,
69+
} from "./pagination";
5470

5571
function getAppUnbondingReasonFromSDK(item: ApplicationUnbondingReasonSDKType | string | number): ApplicationUnbondingReason {
5672
switch (item) {
@@ -553,8 +569,42 @@ async function _handleTransferApplicationErrorEvent(
553569
async function _handleApplicationUnbondingBeginEvent(
554570
event: CosmosEvent,
555571
) {
572+
/**
573+
* {
574+
* "type":"pocket.application.EventApplicationUnbondingBegin",
575+
* "attributes":[
576+
* {
577+
* "key":"application",
578+
* "value":"{\"address\":\"pokt1jz6hcaz3pjrshw9atqelktuzf8zkklqr0jj2mn\",\"stake\":{\"denom\":\"upokt\",\"amount\":\"979738\"},\"service_configs\":[{\"service_id\":\"avax\"}],\"delegatee_gateway_addresses\":[\"pokt1lf0kekv9zcv9v3wy4v6jx2wh7v4665s8e0sl9s\"],\"pending_undelegations\":{},\"unstake_session_end_height\":\"461280\",\"pending_transfer\":null}",
579+
* "index":false
580+
* },
581+
* {
582+
* "key":"reason",
583+
* "value":"\"APPLICATION_UNBONDING_REASON_BELOW_MIN_STAKE\"",
584+
* "index":false
585+
* },
586+
* {
587+
* "key":"session_end_height",
588+
* "value":"\"461280\"",
589+
* "index":false
590+
* },
591+
* {
592+
* "key":"unbonding_end_height",
593+
* "value":"\"461340\"",
594+
* "index":false
595+
* },
596+
* {
597+
* "key":"mode",
598+
* "value":"EndBlock",
599+
* "index":false
600+
* }
601+
* ]
602+
* }
603+
*/
556604
const msg = event.msg as CosmosMessage<MsgUnstakeApplication>;
557605

606+
let address = msg ? msg.msg.decodedMsg.address : "";
607+
558608
let unstakingEndHeight = BigInt(0), sessionEndHeight = BigInt(0), reason: number | null = null;
559609

560610
for (const attribute of event.event.attributes) {
@@ -569,6 +619,12 @@ async function _handleApplicationUnbondingBeginEvent(
569619
if (attribute.key === "reason") {
570620
reason = applicationUnbondingReasonFromJSON((attribute.value as unknown as string).replaceAll("\"", ""));
571621
}
622+
623+
if (!msg && attribute.key === "application") {
624+
// now this is a block event?
625+
const application: ApplicationSDKType = parseJson(attribute.value as unknown as string);
626+
address = application.address;
627+
}
572628
}
573629

574630
if (unstakingEndHeight === BigInt(0)) {
@@ -583,10 +639,14 @@ async function _handleApplicationUnbondingBeginEvent(
583639
throw new Error(`[handleApplicationUnbondingBeginEvent] reason not found in event`);
584640
}
585641

586-
const application = await Application.get(msg.msg.decodedMsg.address);
642+
if (address === "") {
643+
throw new Error(`[handleApplicationUnbondingBeginEvent] address not found in event`);
644+
}
645+
646+
const application = await Application.get(address);
587647

588648
if (!application) {
589-
throw new Error(`[handleApplicationUnbondingBeginEvent] application not found for operator address ${msg.msg.decodedMsg.address}`);
649+
throw new Error(`[handleApplicationUnbondingBeginEvent] application not found for operator address ${address}`);
590650
}
591651

592652
application.unstakingEndHeight = unstakingEndHeight;
@@ -599,7 +659,7 @@ async function _handleApplicationUnbondingBeginEvent(
599659
application.save(),
600660
EventApplicationUnbondingBeginEntity.create({
601661
id: eventId,
602-
applicationId: msg.msg.decodedMsg.address,
662+
applicationId: address,
603663
blockId: getBlockId(event.block),
604664
unstakingEndHeight,
605665
sessionEndHeight,

0 commit comments

Comments
 (0)