diff --git a/schema.graphql b/schema.graphql index ddb399e..9280652 100644 --- a/schema.graphql +++ b/schema.graphql @@ -1139,35 +1139,6 @@ type EventProofValidityChecked @entity { event: Event! } -# Entity used to store summarized data related to relays by block and service -type RelayByBlockAndService @entity { - id: ID! - relays: BigInt! - computedUnits: BigInt! - claimedUpokt: BigInt! - amount: Int! - block: Block! - service: Service! -} - -# Entity used to store summarized data related to staked suppliers by block and service -type StakedSuppliersByBlockAndService @entity { - id: ID! - tokens: BigInt! - amount: Int! - block: Block! - service: Service! -} - -# Entity used to store summarized data related to staked apps by block and service -type StakedAppsByBlockAndService @entity { - id: ID! - tokens: BigInt! - amount: Int! - block: Block! - service: Service! -} - type Supply @entity { id: ID! denom: String! @index diff --git a/src/mappings/bank/moduleAccounts.ts b/src/mappings/bank/moduleAccounts.ts index 04c0335..94b5635 100644 --- a/src/mappings/bank/moduleAccounts.ts +++ b/src/mappings/bank/moduleAccounts.ts @@ -9,6 +9,7 @@ import { getBlockId, } from "../utils/ids"; import getQueryClient from "../utils/query_client"; +import { retryOnFail } from "../utils/retry"; import { enforceAccountsExists } from "./balanceChange"; export type ExtendedAccount = ModuleAccount & { @@ -60,7 +61,16 @@ export async function queryModuleAccounts(block: CosmosBlock): Promise> { const blockId = getBlockId(block); const moduleAccountsSet: Set = new Set(); - const moduleAccounts = await queryModuleAccounts(block); + + // retry for 15 seconds with a delay of 100 milliseconds every time it fails + const moduleAccounts = await retryOnFail( + async () => { + return queryModuleAccounts(block); + }, + 150, + 100, + ) + const accounts = []; for (const moduleAccount of moduleAccounts) { diff --git a/src/mappings/dbFunctions/reports/apps.ts b/src/mappings/dbFunctions/reports/apps.ts index 893a92c..dde35a9 100644 --- a/src/mappings/dbFunctions/reports/apps.ts +++ b/src/mappings/dbFunctions/reports/apps.ts @@ -91,58 +91,55 @@ $$ LANGUAGE plpgsql; ` } -//TODO: check index exists before running alter table, it will throw an error if exists -// This will two things: -// 1- An unique index by block_id and service_id to enable upsert operation -// 2- A function that receives the height (id) and upsert the values of that block -// for the table staked_apps_by_block_and_services +export const upsertAppsByBlockAndServicesFnName = 'upsert_staked_apps_by_block_and_services' + +// Here we are creating a table outside SubQuery to avoid unnecessary indexes export function upsertAppsByBlockAndServiceFn(dbSchema: string): string { - return `DO $$ -BEGIN - IF NOT EXISTS ( - SELECT 1 - FROM pg_constraint - WHERE conname = 'staked_apps_by_block_and_services_block_service_key' - AND conrelid = '${dbSchema}.staked_apps_by_block_and_services'::regclass - ) THEN - ALTER TABLE ${dbSchema}.staked_apps_by_block_and_services - ADD CONSTRAINT staked_apps_by_block_and_services_block_service_key - UNIQUE (block_id, service_id); - END IF; -END; -$$; + return ` +CREATE TABLE IF NOT EXISTS ${dbSchema}.staked_apps_by_block_and_services +( + tokens numeric NOT NULL, + amount integer NOT NULL, + block_id numeric NOT NULL, + service_id text NOT NULL, + _id uuid NOT NULL, + CONSTRAINT apps_by_block_and_services_pkey PRIMARY KEY (_id) +); -CREATE OR REPLACE FUNCTION ${dbSchema}.upsert_staked_apps_by_block_and_services(p_block_id bigint) +COMMENT ON TABLE ${dbSchema}.staked_apps_by_block_and_services + IS '@foreignKey (block_id) REFERENCES blocks (id)'; + +CREATE INDEX IF NOT EXISTS idx_apps_services_block_id + ON ${dbSchema}.staked_apps_by_block_and_services USING btree + (block_id ASC NULLS LAST) + TABLESPACE pg_default; + +CREATE OR REPLACE FUNCTION ${dbSchema}.${upsertAppsByBlockAndServicesFnName}(p_block_id bigint) RETURNS void AS $$ BEGIN + -- Delete existing rows for this block + DELETE FROM ${dbSchema}.staked_apps_by_block_and_services + WHERE block_id = p_block_id; + INSERT INTO ${dbSchema}.staked_apps_by_block_and_services ( _id, - id, block_id, service_id, amount, - tokens, - _block_range + tokens ) SELECT uuid_generate_v4(), -- _id (UUID) - CONCAT(p_block_id::text, '-', ss.service_id) AS id, p_block_id, ss.service_id, COUNT(*) AS amount, - SUM(s.stake_amount) AS tokens, - int8range(p_block_id, NULL) AS _block_range -- open-ended: [block_id,) + SUM(s.stake_amount) AS tokens FROM ${dbSchema}.applications s INNER JOIN ${dbSchema}.application_services ss ON ss.application_id = s.id WHERE s.stake_status = 'Staked' AND s._block_range @> p_block_id AND ss._block_range @> p_block_id - GROUP BY ss.service_id - ON CONFLICT (block_id, service_id) DO UPDATE - SET - amount = EXCLUDED.amount, - tokens = EXCLUDED.tokens, - _block_range = EXCLUDED._block_range; + GROUP BY ss.service_id; END; $$ LANGUAGE plpgsql; ` diff --git a/src/mappings/dbFunctions/reports/index.ts b/src/mappings/dbFunctions/reports/index.ts index 51d5c22..894f089 100644 --- a/src/mappings/dbFunctions/reports/index.ts +++ b/src/mappings/dbFunctions/reports/index.ts @@ -20,11 +20,6 @@ BEGIN PERFORM ${dbSchema}.update_block_unstaking_apps(p_block_id); PERFORM ${dbSchema}.update_block_unstaking_suppliers(p_block_id); PERFORM ${dbSchema}.update_block_unstaking_validators(p_block_id); - - -- Per-service upserts - PERFORM ${dbSchema}.upsert_relays_by_block_and_services(p_block_id); - PERFORM ${dbSchema}.upsert_staked_apps_by_block_and_services(p_block_id); - PERFORM ${dbSchema}.upsert_staked_suppliers_by_block_and_services(p_block_id); END; $$ LANGUAGE plpgsql; ` diff --git a/src/mappings/dbFunctions/reports/relays.ts b/src/mappings/dbFunctions/reports/relays.ts index ea15701..f385803 100644 --- a/src/mappings/dbFunctions/reports/relays.ts +++ b/src/mappings/dbFunctions/reports/relays.ts @@ -28,60 +28,59 @@ $$ LANGUAGE plpgsql; ` } -// This will two things: -// 1- An unique index by block_id and service_id to enable upsert operation -// 2- A function that receives the height (id) and upsert the values of that block -// for the table relay_by_block_and_services +export const upsertRelaysByBlockAndServicesFnName = 'upsert_relays_by_block_and_services' + +// Here we are creating a table outside SubQuery to avoid unnecessary indexes export function upsertRelaysByBlockAndServiceFn(dbSchema: string): string { - return `DO $$ -BEGIN - IF NOT EXISTS ( - SELECT 1 - FROM pg_constraint - WHERE conname = 'relay_by_block_and_services_block_service_key' - AND conrelid = '${dbSchema}.relay_by_block_and_services'::regclass - ) THEN - ALTER TABLE ${dbSchema}.relay_by_block_and_services - ADD CONSTRAINT relay_by_block_and_services_block_service_key - UNIQUE (block_id, service_id); - END IF; -END; -$$; + return ` +CREATE TABLE IF NOT EXISTS ${dbSchema}.relay_by_block_and_services +( + relays numeric NOT NULL, + computed_units numeric NOT NULL, + claimed_upokt numeric NOT NULL, + amount integer NOT NULL, + block_id numeric NOT NULL, + service_id text NOT NULL, + _id uuid NOT NULL, + CONSTRAINT relay_by_block_and_services_pkey PRIMARY KEY (_id) +); -CREATE OR REPLACE FUNCTION ${dbSchema}.upsert_relays_by_block_and_services(p_block_id bigint) +COMMENT ON TABLE ${dbSchema}.relay_by_block_and_services + IS '@foreignKey (block_id) REFERENCES blocks (id)'; + +CREATE INDEX IF NOT EXISTS idx_relays_services_block_id + ON ${dbSchema}.relay_by_block_and_services USING btree + (block_id ASC NULLS LAST) + TABLESPACE pg_default; + +CREATE OR REPLACE FUNCTION ${dbSchema}.${upsertRelaysByBlockAndServicesFnName}(p_block_id bigint) RETURNS void AS $$ BEGIN + -- Delete existing rows for this block + DELETE FROM ${dbSchema}.relay_by_block_and_services + WHERE block_id = p_block_id; + -- Perform the upsert INSERT INTO ${dbSchema}.relay_by_block_and_services ( - id, service_id, block_id, amount, relays, computed_units, claimed_upokt, - _id, - _block_range + _id ) SELECT - CONCAT(p_block_id::text, '-', c.service_id) AS id, c.service_id, p_block_id, COUNT(*) AS amount, SUM(c.num_relays) AS relays, SUM(c.num_claimed_computed_units) AS computed_units, SUM(c.claimed_amount) AS claimed_upokt, - uuid_generate_v4() _id, - int8range(p_block_id, NULL) _block_range + uuid_generate_v4() _id FROM ${dbSchema}.event_claim_settleds c WHERE c.block_id = p_block_id - GROUP BY c.service_id - ON CONFLICT (block_id, service_id) DO UPDATE - SET - amount = EXCLUDED.amount, - relays = EXCLUDED.relays, - computed_units = EXCLUDED.computed_units, - claimed_upokt = EXCLUDED.claimed_upokt; + GROUP BY c.service_id; END; $$ LANGUAGE plpgsql; ` diff --git a/src/mappings/dbFunctions/reports/suppliers.ts b/src/mappings/dbFunctions/reports/suppliers.ts index 73a9db9..0fca8e5 100644 --- a/src/mappings/dbFunctions/reports/suppliers.ts +++ b/src/mappings/dbFunctions/reports/suppliers.ts @@ -91,58 +91,56 @@ $$ LANGUAGE plpgsql; ` } -// This will two things: -// 1- An unique index by block_id and service_id to enable upsert operation -// 2- A function that receives the height (id) and upsert the values of that block -// for the table staked_suppliers_by_block_and_services +export const upsertSuppliersByBlockAndServicesFnName = 'upsert_staked_suppliers_by_block_and_services' + +// Here we are creating a table outside SubQuery to avoid unnecessary indexes export function upsertSuppliersByBlockAndServiceFn(dbSchema: string): string { - return `DO $$ -BEGIN - IF NOT EXISTS ( - SELECT 1 - FROM pg_constraint - WHERE conname = 'staked_suppliers_by_block_and_services_block_service_key' - AND conrelid = '${dbSchema}.staked_suppliers_by_block_and_services'::regclass - ) THEN - ALTER TABLE ${dbSchema}.staked_suppliers_by_block_and_services - ADD CONSTRAINT staked_suppliers_by_block_and_services_block_service_key - UNIQUE (block_id, service_id); - END IF; -END; -$$; + return ` +CREATE TABLE IF NOT EXISTS ${dbSchema}.staked_suppliers_by_block_and_services +( + tokens numeric NOT NULL, + amount integer NOT NULL, + block_id numeric NOT NULL, + service_id text NOT NULL, + _id uuid NOT NULL, + CONSTRAINT staked_suppliers_by_block_and_services_pkey PRIMARY KEY (_id) +); -CREATE OR REPLACE FUNCTION ${dbSchema}.upsert_staked_suppliers_by_block_and_services(p_block_id bigint) +COMMENT ON TABLE ${dbSchema}.staked_suppliers_by_block_and_services + IS '@foreignKey (block_id) REFERENCES blocks (id)'; + +CREATE INDEX IF NOT EXISTS idx_suppliers_services_block_id + ON ${dbSchema}.staked_suppliers_by_block_and_services USING btree + (block_id ASC NULLS LAST) + TABLESPACE pg_default; + +CREATE OR REPLACE FUNCTION ${dbSchema}.${upsertSuppliersByBlockAndServicesFnName}(p_block_id bigint) RETURNS void AS $$ BEGIN + -- Delete existing rows for this block + DELETE FROM ${dbSchema}.staked_suppliers_by_block_and_services + WHERE block_id = p_block_id; + INSERT INTO ${dbSchema}.staked_suppliers_by_block_and_services ( _id, - id, block_id, service_id, amount, - tokens, - _block_range + tokens ) SELECT uuid_generate_v4(), -- _id - CONCAT(p_block_id::text, '-', ss.service_id) AS id, p_block_id, ss.service_id, COUNT(*) AS amount, - SUM(s.stake_amount) AS tokens, - int8range(p_block_id, NULL) AS _block_range -- open-ended: [block_id,) + SUM(s.stake_amount) AS tokens FROM ${dbSchema}.suppliers s INNER JOIN ${dbSchema}.supplier_service_configs ss ON ss.supplier_id = s.id WHERE s.stake_status = 'Staked' AND s._block_range @> p_block_id AND ss._block_range @> p_block_id - GROUP BY ss.service_id - ON CONFLICT (block_id, service_id) DO UPDATE - SET - amount = EXCLUDED.amount, - tokens = EXCLUDED.tokens, - _block_range = EXCLUDED._block_range; + GROUP BY ss.service_id; END; $$ LANGUAGE plpgsql; ` diff --git a/src/mappings/indexer.manager.ts b/src/mappings/indexer.manager.ts index 2660a50..0164b52 100644 --- a/src/mappings/indexer.manager.ts +++ b/src/mappings/indexer.manager.ts @@ -86,11 +86,8 @@ function handleByType(typeUrl: string | Array, byTypeMap: MessageByType // anything primitive types async function indexPrimitives(block: CosmosBlock) { await profilerWrap(handleGenesis, "indexPrimitives", "handleGenesis")(block); - - await Promise.all([ - profilerWrap(handleBlock, "indexPrimitives", "handleGenesis")(block), - profilerWrap(handleSupply, "indexPrimitives", "handleSupply")(block), - ]); + await profilerWrap(handleBlock, "indexPrimitives", "handleBlock")(block); + await profilerWrap(handleSupply, "indexPrimitives", "handleSupply")(block); } // anything that modifies balances diff --git a/src/mappings/pocket/reports.ts b/src/mappings/pocket/reports.ts index 2f42986..732a391 100644 --- a/src/mappings/pocket/reports.ts +++ b/src/mappings/pocket/reports.ts @@ -1,6 +1,9 @@ import { CosmosBlock } from "@subql/types-cosmos"; import { updateBlockReportsFnName } from "../dbFunctions/reports"; import { getDbSchema, getSequelize } from "../utils/db"; +import { upsertSuppliersByBlockAndServicesFnName } from "../dbFunctions/reports/suppliers"; +import { upsertAppsByBlockAndServicesFnName } from "../dbFunctions/reports/apps"; +import { upsertRelaysByBlockAndServicesFnName } from "../dbFunctions/reports/relays"; export async function handleAddBlockReports(block: CosmosBlock): Promise { logger.info(`[handleAddBlockReports] Generating block #${block.header.height} reports...`); @@ -8,17 +11,36 @@ export async function handleAddBlockReports(block: CosmosBlock): Promise { const sequelize = getSequelize('Block') const dbSchema = getDbSchema() - const query = `SELECT ${dbSchema}.${updateBlockReportsFnName}(${block.header.height}::bigint)` + const defaultOptions = { + // whe need to run it in the subql transaction, otherwise it will not find the block, and the function will do nothing + transaction: store.context.transaction, + // to avoid sequelize try to format the results; this function returns a void, so this is not needed + raw: true, + // runs directly in the write pool + useMaster: true, + } - await sequelize.query( - query, - { - // whe need to run it in the subql transaction, otherwise it will not find the block, and the function will do nothing - transaction: store.context.transaction, - // to avoid sequelize try to format the results; this function returns a void, so this is not needed - raw: true, - // runs directly in the write pool - useMaster: true, - } - ); + // we can run those functions in parallel because they do not modify the same records + await Promise.all([ + // mutate current block + sequelize.query( + `SELECT ${dbSchema}.${updateBlockReportsFnName}(${block.header.height}::bigint);`, + defaultOptions + ), + // insert new records of suppliers by services and block + sequelize.query( + `SELECT ${dbSchema}.${upsertSuppliersByBlockAndServicesFnName}(${block.header.height}::bigint);`, + defaultOptions + ), + // insert new records of apps by services and block + sequelize.query( + `SELECT ${dbSchema}.${upsertAppsByBlockAndServicesFnName}(${block.header.height}::bigint);`, + defaultOptions + ), + // insert new records of relays by services and block + sequelize.query( + `SELECT ${dbSchema}.${upsertRelaysByBlockAndServicesFnName}(${block.header.height}::bigint);`, + defaultOptions + ), + ]) } diff --git a/src/mappings/primitives/block.ts b/src/mappings/primitives/block.ts index 239d938..6046986 100644 --- a/src/mappings/primitives/block.ts +++ b/src/mappings/primitives/block.ts @@ -20,6 +20,7 @@ export async function handleBlock(block: CosmosBlock): Promise { const { header: { chainId, time } } = block.block; const timestamp = new Date(time.getTime()); + const dateBeforeProcessBlock = new Date(); // CosmosBlock has hash and addresses as Uint8array which is not the expected value on the graphql schema/db model, // so here we get a parsed version of its data that match the expected values base on words ending const processedBlock = processBlockJson( @@ -28,6 +29,8 @@ export async function handleBlock(block: CosmosBlock): Promise { PREFIX, ) as ConvertedBlockJson; + logger.info(`[handleBlock] block processed in ${new Date().getTime() - dateBeforeProcessBlock.getTime()}ms`); + const blockMetadata = BlockMetadata.create({ id, blockId: processedBlock.blockId as unknown as BlockId, @@ -35,8 +38,12 @@ export async function handleBlock(block: CosmosBlock): Promise { lastCommit: processedBlock.block.lastCommit as unknown as BlockLastCommit, }); + const dateBeforeGetSize = new Date(); + const size = getBlockByteSize(block); + logger.info(`[handleBlock] block size computed in ${new Date().getTime() - dateBeforeGetSize.getTime()}ms`); + const blockEntity = Block.create({ id, chainId, diff --git a/src/mappings/utils/retry.ts b/src/mappings/utils/retry.ts new file mode 100644 index 0000000..6f3caac --- /dev/null +++ b/src/mappings/utils/retry.ts @@ -0,0 +1,12 @@ +export async function retryOnFail(fn: () => Promise, maxRetries: number, delay: number, retries = 0): Promise { + try { + return await fn(); + } catch (e) { + if (retries < maxRetries) { + await new Promise(resolve => setTimeout(resolve, delay)); + return retryOnFail(fn, maxRetries, delay, retries + 1); + } else { + throw new Error(`[retryFunction] max retries reached, last error: ${e instanceof Error ? e.message : e}`); + } + } +}