Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 0 additions & 29 deletions schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1139,35 +1139,6 @@ type EventProofValidityChecked @entity {
event: Event!
}

# Entity used to store summarized data related to relays by block and service
type RelayByBlockAndService @entity {
id: ID!
relays: BigInt!
computedUnits: BigInt!
claimedUpokt: BigInt!
amount: Int!
block: Block!
service: Service!
}

# Entity used to store summarized data related to staked suppliers by block and service
type StakedSuppliersByBlockAndService @entity {
id: ID!
tokens: BigInt!
amount: Int!
block: Block!
service: Service!
}

# Entity used to store summarized data related to staked apps by block and service
type StakedAppsByBlockAndService @entity {
id: ID!
tokens: BigInt!
amount: Int!
block: Block!
service: Service!
}

type Supply @entity {
id: ID!
denom: String! @index
Expand Down
12 changes: 11 additions & 1 deletion src/mappings/bank/moduleAccounts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
getBlockId,
} from "../utils/ids";
import getQueryClient from "../utils/query_client";
import { retryOnFail } from "../utils/retry";
import { enforceAccountsExists } from "./balanceChange";

export type ExtendedAccount = ModuleAccount & {
Expand Down Expand Up @@ -60,7 +61,16 @@ export async function queryModuleAccounts(block: CosmosBlock): Promise<Array<Ext
export async function handleModuleAccounts(block: CosmosBlock): Promise<Set<string>> {
const blockId = getBlockId(block);
const moduleAccountsSet: Set<string> = new Set();
const moduleAccounts = await queryModuleAccounts(block);

// retry for 15 seconds with a delay of 100 milliseconds every time it fails
const moduleAccounts = await retryOnFail(
async () => {
return queryModuleAccounts(block);
},
150,
100,
)

const accounts = [];

for (const moduleAccount of moduleAccounts) {
Expand Down
61 changes: 29 additions & 32 deletions src/mappings/dbFunctions/reports/apps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,58 +91,55 @@ $$ LANGUAGE plpgsql;
`
}

//TODO: check index exists before running alter table, it will throw an error if exists
// This will two things:
// 1- An unique index by block_id and service_id to enable upsert operation
// 2- A function that receives the height (id) and upsert the values of that block
// for the table staked_apps_by_block_and_services
export const upsertAppsByBlockAndServicesFnName = 'upsert_staked_apps_by_block_and_services'

// Here we are creating a table outside SubQuery to avoid unnecessary indexes
export function upsertAppsByBlockAndServiceFn(dbSchema: string): string {
return `DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'staked_apps_by_block_and_services_block_service_key'
AND conrelid = '${dbSchema}.staked_apps_by_block_and_services'::regclass
) THEN
ALTER TABLE ${dbSchema}.staked_apps_by_block_and_services
ADD CONSTRAINT staked_apps_by_block_and_services_block_service_key
UNIQUE (block_id, service_id);
END IF;
END;
$$;
return `
CREATE TABLE IF NOT EXISTS ${dbSchema}.staked_apps_by_block_and_services
(
tokens numeric NOT NULL,
amount integer NOT NULL,
block_id numeric NOT NULL,
service_id text NOT NULL,
_id uuid NOT NULL,
CONSTRAINT apps_by_block_and_services_pkey PRIMARY KEY (_id)
);

CREATE OR REPLACE FUNCTION ${dbSchema}.upsert_staked_apps_by_block_and_services(p_block_id bigint)
COMMENT ON TABLE ${dbSchema}.staked_apps_by_block_and_services
IS '@foreignKey (block_id) REFERENCES blocks (id)';

CREATE INDEX IF NOT EXISTS idx_apps_services_block_id
ON ${dbSchema}.staked_apps_by_block_and_services USING btree
(block_id ASC NULLS LAST)
TABLESPACE pg_default;

CREATE OR REPLACE FUNCTION ${dbSchema}.${upsertAppsByBlockAndServicesFnName}(p_block_id bigint)
RETURNS void AS $$
BEGIN
-- Delete existing rows for this block
DELETE FROM ${dbSchema}.staked_apps_by_block_and_services
WHERE block_id = p_block_id;

INSERT INTO ${dbSchema}.staked_apps_by_block_and_services (
_id,
id,
block_id,
service_id,
amount,
tokens,
_block_range
tokens
)
SELECT
uuid_generate_v4(), -- _id (UUID)
CONCAT(p_block_id::text, '-', ss.service_id) AS id,
p_block_id,
ss.service_id,
COUNT(*) AS amount,
SUM(s.stake_amount) AS tokens,
int8range(p_block_id, NULL) AS _block_range -- open-ended: [block_id,)
SUM(s.stake_amount) AS tokens
FROM ${dbSchema}.applications s
INNER JOIN ${dbSchema}.application_services ss ON ss.application_id = s.id
WHERE s.stake_status = 'Staked'
AND s._block_range @> p_block_id
AND ss._block_range @> p_block_id
GROUP BY ss.service_id
ON CONFLICT (block_id, service_id) DO UPDATE
SET
amount = EXCLUDED.amount,
tokens = EXCLUDED.tokens,
_block_range = EXCLUDED._block_range;
GROUP BY ss.service_id;
END;
$$ LANGUAGE plpgsql;
`
Expand Down
5 changes: 0 additions & 5 deletions src/mappings/dbFunctions/reports/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ BEGIN
PERFORM ${dbSchema}.update_block_unstaking_apps(p_block_id);
PERFORM ${dbSchema}.update_block_unstaking_suppliers(p_block_id);
PERFORM ${dbSchema}.update_block_unstaking_validators(p_block_id);

-- Per-service upserts
PERFORM ${dbSchema}.upsert_relays_by_block_and_services(p_block_id);
PERFORM ${dbSchema}.upsert_staked_apps_by_block_and_services(p_block_id);
PERFORM ${dbSchema}.upsert_staked_suppliers_by_block_and_services(p_block_id);
END;
$$ LANGUAGE plpgsql;
`
Expand Down
63 changes: 31 additions & 32 deletions src/mappings/dbFunctions/reports/relays.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,60 +28,59 @@ $$ LANGUAGE plpgsql;
`
}

// This will two things:
// 1- An unique index by block_id and service_id to enable upsert operation
// 2- A function that receives the height (id) and upsert the values of that block
// for the table relay_by_block_and_services
export const upsertRelaysByBlockAndServicesFnName = 'upsert_relays_by_block_and_services'

// Here we are creating a table outside SubQuery to avoid unnecessary indexes
export function upsertRelaysByBlockAndServiceFn(dbSchema: string): string {
return `DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'relay_by_block_and_services_block_service_key'
AND conrelid = '${dbSchema}.relay_by_block_and_services'::regclass
) THEN
ALTER TABLE ${dbSchema}.relay_by_block_and_services
ADD CONSTRAINT relay_by_block_and_services_block_service_key
UNIQUE (block_id, service_id);
END IF;
END;
$$;
return `
CREATE TABLE IF NOT EXISTS ${dbSchema}.relay_by_block_and_services
(
relays numeric NOT NULL,
computed_units numeric NOT NULL,
claimed_upokt numeric NOT NULL,
amount integer NOT NULL,
block_id numeric NOT NULL,
service_id text NOT NULL,
_id uuid NOT NULL,
CONSTRAINT relay_by_block_and_services_pkey PRIMARY KEY (_id)
);

CREATE OR REPLACE FUNCTION ${dbSchema}.upsert_relays_by_block_and_services(p_block_id bigint)
COMMENT ON TABLE ${dbSchema}.relay_by_block_and_services
IS '@foreignKey (block_id) REFERENCES blocks (id)';

CREATE INDEX IF NOT EXISTS idx_relays_services_block_id
ON ${dbSchema}.relay_by_block_and_services USING btree
(block_id ASC NULLS LAST)
TABLESPACE pg_default;

CREATE OR REPLACE FUNCTION ${dbSchema}.${upsertRelaysByBlockAndServicesFnName}(p_block_id bigint)
RETURNS void AS $$
BEGIN
-- Delete existing rows for this block
DELETE FROM ${dbSchema}.relay_by_block_and_services
WHERE block_id = p_block_id;

-- Perform the upsert
INSERT INTO ${dbSchema}.relay_by_block_and_services (
id,
service_id,
block_id,
amount,
relays,
computed_units,
claimed_upokt,
_id,
_block_range
_id
)
SELECT
CONCAT(p_block_id::text, '-', c.service_id) AS id,
c.service_id,
p_block_id,
COUNT(*) AS amount,
SUM(c.num_relays) AS relays,
SUM(c.num_claimed_computed_units) AS computed_units,
SUM(c.claimed_amount) AS claimed_upokt,
uuid_generate_v4() _id,
int8range(p_block_id, NULL) _block_range
uuid_generate_v4() _id
FROM ${dbSchema}.event_claim_settleds c
WHERE c.block_id = p_block_id
GROUP BY c.service_id
ON CONFLICT (block_id, service_id) DO UPDATE
SET
amount = EXCLUDED.amount,
relays = EXCLUDED.relays,
computed_units = EXCLUDED.computed_units,
claimed_upokt = EXCLUDED.claimed_upokt;
GROUP BY c.service_id;
END;
$$ LANGUAGE plpgsql;
`
Expand Down
60 changes: 29 additions & 31 deletions src/mappings/dbFunctions/reports/suppliers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,58 +91,56 @@ $$ LANGUAGE plpgsql;
`
}

// This will two things:
// 1- An unique index by block_id and service_id to enable upsert operation
// 2- A function that receives the height (id) and upsert the values of that block
// for the table staked_suppliers_by_block_and_services
export const upsertSuppliersByBlockAndServicesFnName = 'upsert_staked_suppliers_by_block_and_services'

// Here we are creating a table outside SubQuery to avoid unnecessary indexes
export function upsertSuppliersByBlockAndServiceFn(dbSchema: string): string {
return `DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'staked_suppliers_by_block_and_services_block_service_key'
AND conrelid = '${dbSchema}.staked_suppliers_by_block_and_services'::regclass
) THEN
ALTER TABLE ${dbSchema}.staked_suppliers_by_block_and_services
ADD CONSTRAINT staked_suppliers_by_block_and_services_block_service_key
UNIQUE (block_id, service_id);
END IF;
END;
$$;
return `
CREATE TABLE IF NOT EXISTS ${dbSchema}.staked_suppliers_by_block_and_services
(
tokens numeric NOT NULL,
amount integer NOT NULL,
block_id numeric NOT NULL,
service_id text NOT NULL,
_id uuid NOT NULL,
CONSTRAINT staked_suppliers_by_block_and_services_pkey PRIMARY KEY (_id)
);

CREATE OR REPLACE FUNCTION ${dbSchema}.upsert_staked_suppliers_by_block_and_services(p_block_id bigint)
COMMENT ON TABLE ${dbSchema}.staked_suppliers_by_block_and_services
IS '@foreignKey (block_id) REFERENCES blocks (id)';

CREATE INDEX IF NOT EXISTS idx_suppliers_services_block_id
ON ${dbSchema}.staked_suppliers_by_block_and_services USING btree
(block_id ASC NULLS LAST)
TABLESPACE pg_default;

CREATE OR REPLACE FUNCTION ${dbSchema}.${upsertSuppliersByBlockAndServicesFnName}(p_block_id bigint)
RETURNS void AS $$
BEGIN
-- Delete existing rows for this block
DELETE FROM ${dbSchema}.staked_suppliers_by_block_and_services
WHERE block_id = p_block_id;

INSERT INTO ${dbSchema}.staked_suppliers_by_block_and_services (
_id,
id,
block_id,
service_id,
amount,
tokens,
_block_range
tokens
)
SELECT
uuid_generate_v4(), -- _id
CONCAT(p_block_id::text, '-', ss.service_id) AS id,
p_block_id,
ss.service_id,
COUNT(*) AS amount,
SUM(s.stake_amount) AS tokens,
int8range(p_block_id, NULL) AS _block_range -- open-ended: [block_id,)
SUM(s.stake_amount) AS tokens
FROM ${dbSchema}.suppliers s
INNER JOIN ${dbSchema}.supplier_service_configs ss
ON ss.supplier_id = s.id
WHERE s.stake_status = 'Staked'
AND s._block_range @> p_block_id
AND ss._block_range @> p_block_id
GROUP BY ss.service_id
ON CONFLICT (block_id, service_id) DO UPDATE
SET
amount = EXCLUDED.amount,
tokens = EXCLUDED.tokens,
_block_range = EXCLUDED._block_range;
GROUP BY ss.service_id;
END;
$$ LANGUAGE plpgsql;
`
Expand Down
7 changes: 2 additions & 5 deletions src/mappings/indexer.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,8 @@ function handleByType(typeUrl: string | Array<string>, byTypeMap: MessageByType
// anything primitive types
async function indexPrimitives(block: CosmosBlock) {
await profilerWrap(handleGenesis, "indexPrimitives", "handleGenesis")(block);

await Promise.all([
profilerWrap(handleBlock, "indexPrimitives", "handleGenesis")(block),
profilerWrap(handleSupply, "indexPrimitives", "handleSupply")(block),
]);
await profilerWrap(handleBlock, "indexPrimitives", "handleBlock")(block);
await profilerWrap(handleSupply, "indexPrimitives", "handleSupply")(block);
}

// anything that modifies balances
Expand Down
Loading