Skip to content

Commit 328e0d2

Browse files
authored
Refactor Reports by Block and Services and enhancements (#62)
## Summary - Added new table creation logic for `staked_suppliers_by_block_and_services`, `staked_apps_by_block_and_services`, and `relay_by_block_and_services`, replacing unique index and upsert constraints to avoid unnecessary overhead. - Implemented retry utility for handling module accounts and integrated it into `handleModuleAccounts`. - Enhanced block report generation by running database functions in parallel for performance improvements. - Refined block processing in `handleBlock` to include timing logs for profiling purposes. ## Issue In **beta**, we have more than **10,000 services**. This caused the indexer to take **over 30 seconds** to index a single block due to the heavy load from reports by block and services. The tables - `staked_suppliers_by_block_and_services` - `staked_apps_by_block_and_services` - `relay_by_block_and_services` were being created through **SubQL** (since they were defined in `schema.graphql`). Because **historical data** was enabled, these tables included the `_block_range` column, and SubQL automatically generated compound indexes for each foreign key combined with `_block_range`. However, historical data does **not** apply to these tables, since we insert new records for every block. This means we don’t need the `_block_range` column or the additional indexes created by SubQL. With this change, the indexer now takes **around 5 seconds** to index a block (down from 30+ seconds). --- We also encountered an issue when fetching **module accounts**: queries sometimes returned no results for the block being indexed. To address this, we added a **retry mechanism** that keeps querying for module accounts until they become available or the retry limit is reached. ## Type of change Select one or more: - [x] New feature, functionality or library - [x] Bug fix - [x] Code health or cleanup - [ ] Documentation - [ ] Other (specify) ## Sanity Checklist - [x] I have tested my changes using the available tooling - [x] I have commented my code - [x] I have performed a self-review of my own code; both comments & source code - [ ] I create and reference any new tickets, if applicable - [ ] I have left TODOs throughout the codebase, if applicable
1 parent 3470fe7 commit 328e0d2

File tree

10 files changed

+155
-147
lines changed

10 files changed

+155
-147
lines changed

schema.graphql

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,35 +1139,6 @@ type EventProofValidityChecked @entity {
11391139
event: Event!
11401140
}
11411141

1142-
# Entity used to store summarized data related to relays by block and service
1143-
type RelayByBlockAndService @entity {
1144-
id: ID!
1145-
relays: BigInt!
1146-
computedUnits: BigInt!
1147-
claimedUpokt: BigInt!
1148-
amount: Int!
1149-
block: Block!
1150-
service: Service!
1151-
}
1152-
1153-
# Entity used to store summarized data related to staked suppliers by block and service
1154-
type StakedSuppliersByBlockAndService @entity {
1155-
id: ID!
1156-
tokens: BigInt!
1157-
amount: Int!
1158-
block: Block!
1159-
service: Service!
1160-
}
1161-
1162-
# Entity used to store summarized data related to staked apps by block and service
1163-
type StakedAppsByBlockAndService @entity {
1164-
id: ID!
1165-
tokens: BigInt!
1166-
amount: Int!
1167-
block: Block!
1168-
service: Service!
1169-
}
1170-
11711142
type Supply @entity {
11721143
id: ID!
11731144
denom: String! @index

src/mappings/bank/moduleAccounts.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
getBlockId,
1010
} from "../utils/ids";
1111
import getQueryClient from "../utils/query_client";
12+
import { retryOnFail } from "../utils/retry";
1213
import { enforceAccountsExists } from "./balanceChange";
1314

1415
export type ExtendedAccount = ModuleAccount & {
@@ -60,7 +61,16 @@ export async function queryModuleAccounts(block: CosmosBlock): Promise<Array<Ext
6061
export async function handleModuleAccounts(block: CosmosBlock): Promise<Set<string>> {
6162
const blockId = getBlockId(block);
6263
const moduleAccountsSet: Set<string> = new Set();
63-
const moduleAccounts = await queryModuleAccounts(block);
64+
65+
// retry for 15 seconds with a delay of 100 milliseconds every time it fails
66+
const moduleAccounts = await retryOnFail(
67+
async () => {
68+
return queryModuleAccounts(block);
69+
},
70+
150,
71+
100,
72+
)
73+
6474
const accounts = [];
6575

6676
for (const moduleAccount of moduleAccounts) {

src/mappings/dbFunctions/reports/apps.ts

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -91,58 +91,55 @@ $$ LANGUAGE plpgsql;
9191
`
9292
}
9393

94-
//TODO: check index exists before running alter table, it will throw an error if exists
95-
// This will two things:
96-
// 1- An unique index by block_id and service_id to enable upsert operation
97-
// 2- A function that receives the height (id) and upsert the values of that block
98-
// for the table staked_apps_by_block_and_services
94+
export const upsertAppsByBlockAndServicesFnName = 'upsert_staked_apps_by_block_and_services'
95+
96+
// Here we are creating a table outside SubQuery to avoid unnecessary indexes
9997
export function upsertAppsByBlockAndServiceFn(dbSchema: string): string {
100-
return `DO $$
101-
BEGIN
102-
IF NOT EXISTS (
103-
SELECT 1
104-
FROM pg_constraint
105-
WHERE conname = 'staked_apps_by_block_and_services_block_service_key'
106-
AND conrelid = '${dbSchema}.staked_apps_by_block_and_services'::regclass
107-
) THEN
108-
ALTER TABLE ${dbSchema}.staked_apps_by_block_and_services
109-
ADD CONSTRAINT staked_apps_by_block_and_services_block_service_key
110-
UNIQUE (block_id, service_id);
111-
END IF;
112-
END;
113-
$$;
98+
return `
99+
CREATE TABLE IF NOT EXISTS ${dbSchema}.staked_apps_by_block_and_services
100+
(
101+
tokens numeric NOT NULL,
102+
amount integer NOT NULL,
103+
block_id numeric NOT NULL,
104+
service_id text NOT NULL,
105+
_id uuid NOT NULL,
106+
CONSTRAINT apps_by_block_and_services_pkey PRIMARY KEY (_id)
107+
);
114108
115-
CREATE OR REPLACE FUNCTION ${dbSchema}.upsert_staked_apps_by_block_and_services(p_block_id bigint)
109+
COMMENT ON TABLE ${dbSchema}.staked_apps_by_block_and_services
110+
IS '@foreignKey (block_id) REFERENCES blocks (id)';
111+
112+
CREATE INDEX IF NOT EXISTS idx_apps_services_block_id
113+
ON ${dbSchema}.staked_apps_by_block_and_services USING btree
114+
(block_id ASC NULLS LAST)
115+
TABLESPACE pg_default;
116+
117+
CREATE OR REPLACE FUNCTION ${dbSchema}.${upsertAppsByBlockAndServicesFnName}(p_block_id bigint)
116118
RETURNS void AS $$
117119
BEGIN
120+
-- Delete existing rows for this block
121+
DELETE FROM ${dbSchema}.staked_apps_by_block_and_services
122+
WHERE block_id = p_block_id;
123+
118124
INSERT INTO ${dbSchema}.staked_apps_by_block_and_services (
119125
_id,
120-
id,
121126
block_id,
122127
service_id,
123128
amount,
124-
tokens,
125-
_block_range
129+
tokens
126130
)
127131
SELECT
128132
uuid_generate_v4(), -- _id (UUID)
129-
CONCAT(p_block_id::text, '-', ss.service_id) AS id,
130133
p_block_id,
131134
ss.service_id,
132135
COUNT(*) AS amount,
133-
SUM(s.stake_amount) AS tokens,
134-
int8range(p_block_id, NULL) AS _block_range -- open-ended: [block_id,)
136+
SUM(s.stake_amount) AS tokens
135137
FROM ${dbSchema}.applications s
136138
INNER JOIN ${dbSchema}.application_services ss ON ss.application_id = s.id
137139
WHERE s.stake_status = 'Staked'
138140
AND s._block_range @> p_block_id
139141
AND ss._block_range @> p_block_id
140-
GROUP BY ss.service_id
141-
ON CONFLICT (block_id, service_id) DO UPDATE
142-
SET
143-
amount = EXCLUDED.amount,
144-
tokens = EXCLUDED.tokens,
145-
_block_range = EXCLUDED._block_range;
142+
GROUP BY ss.service_id;
146143
END;
147144
$$ LANGUAGE plpgsql;
148145
`

src/mappings/dbFunctions/reports/index.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,6 @@ BEGIN
2020
PERFORM ${dbSchema}.update_block_unstaking_apps(p_block_id);
2121
PERFORM ${dbSchema}.update_block_unstaking_suppliers(p_block_id);
2222
PERFORM ${dbSchema}.update_block_unstaking_validators(p_block_id);
23-
24-
-- Per-service upserts
25-
PERFORM ${dbSchema}.upsert_relays_by_block_and_services(p_block_id);
26-
PERFORM ${dbSchema}.upsert_staked_apps_by_block_and_services(p_block_id);
27-
PERFORM ${dbSchema}.upsert_staked_suppliers_by_block_and_services(p_block_id);
2823
END;
2924
$$ LANGUAGE plpgsql;
3025
`

src/mappings/dbFunctions/reports/relays.ts

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,60 +28,59 @@ $$ LANGUAGE plpgsql;
2828
`
2929
}
3030

31-
// This will two things:
32-
// 1- An unique index by block_id and service_id to enable upsert operation
33-
// 2- A function that receives the height (id) and upsert the values of that block
34-
// for the table relay_by_block_and_services
31+
export const upsertRelaysByBlockAndServicesFnName = 'upsert_relays_by_block_and_services'
32+
33+
// Here we are creating a table outside SubQuery to avoid unnecessary indexes
3534
export function upsertRelaysByBlockAndServiceFn(dbSchema: string): string {
36-
return `DO $$
37-
BEGIN
38-
IF NOT EXISTS (
39-
SELECT 1
40-
FROM pg_constraint
41-
WHERE conname = 'relay_by_block_and_services_block_service_key'
42-
AND conrelid = '${dbSchema}.relay_by_block_and_services'::regclass
43-
) THEN
44-
ALTER TABLE ${dbSchema}.relay_by_block_and_services
45-
ADD CONSTRAINT relay_by_block_and_services_block_service_key
46-
UNIQUE (block_id, service_id);
47-
END IF;
48-
END;
49-
$$;
35+
return `
36+
CREATE TABLE IF NOT EXISTS ${dbSchema}.relay_by_block_and_services
37+
(
38+
relays numeric NOT NULL,
39+
computed_units numeric NOT NULL,
40+
claimed_upokt numeric NOT NULL,
41+
amount integer NOT NULL,
42+
block_id numeric NOT NULL,
43+
service_id text NOT NULL,
44+
_id uuid NOT NULL,
45+
CONSTRAINT relay_by_block_and_services_pkey PRIMARY KEY (_id)
46+
);
5047
51-
CREATE OR REPLACE FUNCTION ${dbSchema}.upsert_relays_by_block_and_services(p_block_id bigint)
48+
COMMENT ON TABLE ${dbSchema}.relay_by_block_and_services
49+
IS '@foreignKey (block_id) REFERENCES blocks (id)';
50+
51+
CREATE INDEX IF NOT EXISTS idx_relays_services_block_id
52+
ON ${dbSchema}.relay_by_block_and_services USING btree
53+
(block_id ASC NULLS LAST)
54+
TABLESPACE pg_default;
55+
56+
CREATE OR REPLACE FUNCTION ${dbSchema}.${upsertRelaysByBlockAndServicesFnName}(p_block_id bigint)
5257
RETURNS void AS $$
5358
BEGIN
59+
-- Delete existing rows for this block
60+
DELETE FROM ${dbSchema}.relay_by_block_and_services
61+
WHERE block_id = p_block_id;
62+
5463
-- Perform the upsert
5564
INSERT INTO ${dbSchema}.relay_by_block_and_services (
56-
id,
5765
service_id,
5866
block_id,
5967
amount,
6068
relays,
6169
computed_units,
6270
claimed_upokt,
63-
_id,
64-
_block_range
71+
_id
6572
)
6673
SELECT
67-
CONCAT(p_block_id::text, '-', c.service_id) AS id,
6874
c.service_id,
6975
p_block_id,
7076
COUNT(*) AS amount,
7177
SUM(c.num_relays) AS relays,
7278
SUM(c.num_claimed_computed_units) AS computed_units,
7379
SUM(c.claimed_amount) AS claimed_upokt,
74-
uuid_generate_v4() _id,
75-
int8range(p_block_id, NULL) _block_range
80+
uuid_generate_v4() _id
7681
FROM ${dbSchema}.event_claim_settleds c
7782
WHERE c.block_id = p_block_id
78-
GROUP BY c.service_id
79-
ON CONFLICT (block_id, service_id) DO UPDATE
80-
SET
81-
amount = EXCLUDED.amount,
82-
relays = EXCLUDED.relays,
83-
computed_units = EXCLUDED.computed_units,
84-
claimed_upokt = EXCLUDED.claimed_upokt;
83+
GROUP BY c.service_id;
8584
END;
8685
$$ LANGUAGE plpgsql;
8786
`

src/mappings/dbFunctions/reports/suppliers.ts

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -91,58 +91,56 @@ $$ LANGUAGE plpgsql;
9191
`
9292
}
9393

94-
// This will two things:
95-
// 1- An unique index by block_id and service_id to enable upsert operation
96-
// 2- A function that receives the height (id) and upsert the values of that block
97-
// for the table staked_suppliers_by_block_and_services
94+
export const upsertSuppliersByBlockAndServicesFnName = 'upsert_staked_suppliers_by_block_and_services'
95+
96+
// Here we are creating a table outside SubQuery to avoid unnecessary indexes
9897
export function upsertSuppliersByBlockAndServiceFn(dbSchema: string): string {
99-
return `DO $$
100-
BEGIN
101-
IF NOT EXISTS (
102-
SELECT 1
103-
FROM pg_constraint
104-
WHERE conname = 'staked_suppliers_by_block_and_services_block_service_key'
105-
AND conrelid = '${dbSchema}.staked_suppliers_by_block_and_services'::regclass
106-
) THEN
107-
ALTER TABLE ${dbSchema}.staked_suppliers_by_block_and_services
108-
ADD CONSTRAINT staked_suppliers_by_block_and_services_block_service_key
109-
UNIQUE (block_id, service_id);
110-
END IF;
111-
END;
112-
$$;
98+
return `
99+
CREATE TABLE IF NOT EXISTS ${dbSchema}.staked_suppliers_by_block_and_services
100+
(
101+
tokens numeric NOT NULL,
102+
amount integer NOT NULL,
103+
block_id numeric NOT NULL,
104+
service_id text NOT NULL,
105+
_id uuid NOT NULL,
106+
CONSTRAINT staked_suppliers_by_block_and_services_pkey PRIMARY KEY (_id)
107+
);
113108
114-
CREATE OR REPLACE FUNCTION ${dbSchema}.upsert_staked_suppliers_by_block_and_services(p_block_id bigint)
109+
COMMENT ON TABLE ${dbSchema}.staked_suppliers_by_block_and_services
110+
IS '@foreignKey (block_id) REFERENCES blocks (id)';
111+
112+
CREATE INDEX IF NOT EXISTS idx_suppliers_services_block_id
113+
ON ${dbSchema}.staked_suppliers_by_block_and_services USING btree
114+
(block_id ASC NULLS LAST)
115+
TABLESPACE pg_default;
116+
117+
CREATE OR REPLACE FUNCTION ${dbSchema}.${upsertSuppliersByBlockAndServicesFnName}(p_block_id bigint)
115118
RETURNS void AS $$
116119
BEGIN
120+
-- Delete existing rows for this block
121+
DELETE FROM ${dbSchema}.staked_suppliers_by_block_and_services
122+
WHERE block_id = p_block_id;
123+
117124
INSERT INTO ${dbSchema}.staked_suppliers_by_block_and_services (
118125
_id,
119-
id,
120126
block_id,
121127
service_id,
122128
amount,
123-
tokens,
124-
_block_range
129+
tokens
125130
)
126131
SELECT
127132
uuid_generate_v4(), -- _id
128-
CONCAT(p_block_id::text, '-', ss.service_id) AS id,
129133
p_block_id,
130134
ss.service_id,
131135
COUNT(*) AS amount,
132-
SUM(s.stake_amount) AS tokens,
133-
int8range(p_block_id, NULL) AS _block_range -- open-ended: [block_id,)
136+
SUM(s.stake_amount) AS tokens
134137
FROM ${dbSchema}.suppliers s
135138
INNER JOIN ${dbSchema}.supplier_service_configs ss
136139
ON ss.supplier_id = s.id
137140
WHERE s.stake_status = 'Staked'
138141
AND s._block_range @> p_block_id
139142
AND ss._block_range @> p_block_id
140-
GROUP BY ss.service_id
141-
ON CONFLICT (block_id, service_id) DO UPDATE
142-
SET
143-
amount = EXCLUDED.amount,
144-
tokens = EXCLUDED.tokens,
145-
_block_range = EXCLUDED._block_range;
143+
GROUP BY ss.service_id;
146144
END;
147145
$$ LANGUAGE plpgsql;
148146
`

src/mappings/indexer.manager.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,8 @@ function handleByType(typeUrl: string | Array<string>, byTypeMap: MessageByType
8686
// anything primitive types
8787
async function indexPrimitives(block: CosmosBlock) {
8888
await profilerWrap(handleGenesis, "indexPrimitives", "handleGenesis")(block);
89-
90-
await Promise.all([
91-
profilerWrap(handleBlock, "indexPrimitives", "handleGenesis")(block),
92-
profilerWrap(handleSupply, "indexPrimitives", "handleSupply")(block),
93-
]);
89+
await profilerWrap(handleBlock, "indexPrimitives", "handleBlock")(block);
90+
await profilerWrap(handleSupply, "indexPrimitives", "handleSupply")(block);
9491
}
9592

9693
// anything that modifies balances

0 commit comments

Comments
 (0)