Skip to content

Commit

Permalink
Merge branch 'fix/dupe-msg-handling' into 13-determine-cycle-number-m…
Browse files Browse the repository at this point in the history
…ore-robustly

# Conflicts:
#	src/pg/pg-store.ts
  • Loading branch information
janniks committed Oct 28, 2024
2 parents 871d784 + eda79d0 commit 2d5c994
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 60 deletions.
4 changes: 4 additions & 0 deletions migrations/1729684505755_block_proposals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ export function up(pgm: MigrationBuilder): void {
pgm.createIndex('block_proposals', ['block_hash']);
pgm.createIndex('block_proposals', ['index_block_hash']);
pgm.createIndex('block_proposals', ['reward_cycle']);

pgm.createConstraint('block_proposals', 'block_proposals_block_hash_unique', {
unique: ['block_hash'],
});
}
4 changes: 4 additions & 0 deletions migrations/1729684505756_block_responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,8 @@ export function up(pgm: MigrationBuilder): void {
pgm.createIndex('block_responses', ['received_at']);
pgm.createIndex('block_responses', ['signer_sighash']);
pgm.createIndex('block_responses', ['accepted']);

pgm.createConstraint('block_responses', 'block_responses_signer_key_sighash_unique', {
unique: ['signer_key', 'signer_sighash'],
});
}
4 changes: 4 additions & 0 deletions migrations/1729684505758_mock_proposals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ export function up(pgm: MigrationBuilder): void {
pgm.createIndex('mock_proposals', ['stacks_tip']);
pgm.createIndex('mock_proposals', ['index_block_hash']);
pgm.createIndex('mock_proposals', ['burn_block_height']);

pgm.createConstraint('mock_proposals', 'mock_proposals_idb_unique', {
unique: ['index_block_hash'],
});
}
4 changes: 4 additions & 0 deletions migrations/1729684505759_mock_signature.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,8 @@ export function up(pgm: MigrationBuilder): void {
pgm.createIndex('mock_signatures', ['stacks_tip']);
pgm.createIndex('mock_signatures', ['index_block_hash']);
pgm.createIndex('mock_signatures', ['burn_block_height']);

pgm.createConstraint('mock_signatures', 'mock_signatures_signer_key_idb_unique', {
unique: ['signer_key', 'index_block_hash'],
});
}
8 changes: 8 additions & 0 deletions migrations/1729684505760_mock_blocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ export function up(pgm: MigrationBuilder): void {
pgm.createIndex('mock_blocks', ['index_block_hash']);
pgm.createIndex('mock_blocks', ['burn_block_height']);

pgm.createConstraint('mock_blocks', 'mock_blocks_idb_unique', {
unique: ['index_block_hash'],
});

// Mock block signer signatures
pgm.createTable('mock_block_signer_signatures', {
id: {
Expand Down Expand Up @@ -99,4 +103,8 @@ export function up(pgm: MigrationBuilder): void {
pgm.createIndex('mock_block_signer_signatures', ['stacks_tip']);
pgm.createIndex('mock_block_signer_signatures', ['stacks_tip_height']);
pgm.createIndex('mock_block_signer_signatures', ['index_block_hash']);

pgm.createConstraint('mock_block_signer_signatures', 'mock_block_signers_idb_unique', {
unique: ['index_block_hash', 'signer_key'],
});
}
47 changes: 40 additions & 7 deletions src/pg/chainhook/chainhook-pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,18 @@ export class ChainhookPgStore extends BasePgStoreModule {
network_id: messageData.mock_proposal.peer_info.network_id,
index_block_hash: normalizeHexString(messageData.mock_proposal.peer_info.index_block_hash),
};
await sql`
const result = await sql`
INSERT INTO mock_blocks ${sql(dbMockBlock)}
ON CONFLICT ON CONSTRAINT mock_blocks_idb_unique DO NOTHING
`;

if (result.count === 0) {
logger.info(
`Skipped inserting duplicate mock block height=${dbMockBlock.stacks_tip_height}, hash=${dbMockBlock.stacks_tip}`
);
return;
}

for (const batch of batchIterate(messageData.mock_signatures, 500)) {
const sigs = batch.map(sig => {
const dbSig: DbMockBlockSignerSignature = {
Expand Down Expand Up @@ -235,9 +243,15 @@ export class ChainhookPgStore extends BasePgStoreModule {
// Metadata fields
metadata_server_version: messageData.metadata.server_version,
};
await sql`
const result = await sql`
INSERT INTO mock_signatures ${sql(dbMockSignature)}
ON CONFLICT ON CONSTRAINT mock_signatures_signer_key_idb_unique DO NOTHING
`;
if (result.count === 0) {
logger.info(
`Skipped inserting duplicate mock signature height=${dbMockSignature.stacks_tip_height}, hash=${dbMockSignature.stacks_tip}, signer=${dbMockSignature.signer_key}`
);
}
}

private async applyMockProposal(
Expand All @@ -258,9 +272,15 @@ export class ChainhookPgStore extends BasePgStoreModule {
network_id: messageData.network_id,
index_block_hash: normalizeHexString(messageData.index_block_hash),
};
await sql`
const result = await sql`
INSERT INTO mock_proposals ${sql(dbMockProposal)}
ON CONFLICT ON CONSTRAINT mock_proposals_idb_unique DO NOTHING
`;
if (result.count === 0) {
logger.info(
`Skipped inserting duplicate mock proposal height=${dbMockProposal.stacks_tip_height}, hash=${dbMockProposal.stacks_tip}`
);
}
}

private async applyBlockProposal(
Expand All @@ -279,9 +299,15 @@ export class ChainhookPgStore extends BasePgStoreModule {
reward_cycle: messageData.reward_cycle,
burn_block_height: messageData.burn_height,
};
await sql`
const result = await sql`
INSERT INTO block_proposals ${sql(dbBlockProposal)}
ON CONFLICT ON CONSTRAINT block_proposals_block_hash_unique DO NOTHING
`;
if (result.count === 0) {
logger.info(
`Skipped inserting duplicate block proposal height=${dbBlockProposal.block_height}, hash=${dbBlockProposal.block_hash}`
);
}
}

private async applyBlockResponse(
Expand All @@ -307,7 +333,7 @@ export class ChainhookPgStore extends BasePgStoreModule {
}
}

const dbBlockProposal: DbBlockResponse = {
const dbBlockResponse: DbBlockResponse = {
received_at: unixTimeMillisecondsToISO(receivedAt),
signer_key: normalizeHexString(signerPubkey),
accepted: accepted,
Expand All @@ -319,9 +345,16 @@ export class ChainhookPgStore extends BasePgStoreModule {
reject_code: rejectCode,
chain_id: accepted ? null : messageData.data.chain_id,
};
await sql`
INSERT INTO block_responses ${sql(dbBlockProposal)}
const result = await sql`
INSERT INTO block_responses ${sql(dbBlockResponse)}
ON CONFLICT ON CONSTRAINT block_responses_signer_key_sighash_unique DO NOTHING
`;

if (result.count === 0) {
logger.info(
`Skipped inserting duplicate block response signer=${dbBlockResponse.signer_key}, hash=${dbBlockResponse.signer_sighash}`
);
}
}

private async updateStacksBlock(
Expand Down
68 changes: 15 additions & 53 deletions src/pg/pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ export class PgStore extends BasePgStore {
// The `blocks` table (and its associated block_signer_signatures table) is the source of truth that is
// never missing blocks and does not contain duplicate rows per block.
//
// The block_proposals and block_responses tables can have duplicate rows. Duplicates can be detected in
// block_proposals using the block_hash column. Duplicates can be detected in block_responses by looking
// at (signer_key, signer_sighash). For both tables filter duplicates by using only the first row (the
// oldest id column).
//
// Each block has a known set of signer_keys which can be determined by first looking up the block's
// cycle_number from the `block_proposals` table matching on block_hash, then using cycle_number to look
// up the set of signer_keys from the reward_set_signers table (matching cycle_number with reward_cycle).
Expand Down Expand Up @@ -158,16 +153,6 @@ export class PgStore extends BasePgStore {
LIMIT ${limit}
OFFSET ${offset}
),
filtered_block_proposals AS (
SELECT DISTINCT ON (block_hash) id, block_hash, received_at, reward_cycle AS cycle_number
FROM block_proposals
ORDER BY block_hash, id
),
filtered_block_responses AS (
SELECT DISTINCT ON (signer_key, signer_sighash) *
FROM block_responses
ORDER BY signer_key, signer_sighash, id
),
block_signers AS (
SELECT
lb.id AS block_id,
Expand All @@ -189,10 +174,10 @@ export class PgStore extends BasePgStore {
END AS signer_status,
EXTRACT(MILLISECOND FROM (fbr.received_at - bp.received_at)) AS response_time_ms
FROM latest_blocks lb
LEFT JOIN filtered_block_proposals bp ON lb.block_hash = bp.block_hash
LEFT JOIN block_proposals bp ON lb.block_hash = bp.block_hash
LEFT JOIN reward_set_signers rs ON lb.cycle_number = rs.cycle_number
LEFT JOIN block_signer_signatures bss ON lb.block_height = bss.block_height AND rs.signer_key = bss.signer_key
LEFT JOIN filtered_block_responses fbr ON fbr.signer_key = rs.signer_key AND fbr.signer_sighash = lb.block_hash
LEFT JOIN block_responses fbr ON fbr.signer_key = rs.signer_key AND fbr.signer_sighash = lb.block_hash
),
signer_state_aggregation AS (
SELECT
Expand Down Expand Up @@ -285,36 +270,24 @@ export class PgStore extends BasePgStore {
WHERE rss.cycle_number = ${cycleNumber}
),
proposal_data AS (
-- Fetch the first (oldest) proposal for each block_hash for the given cycle
-- Select all proposals for the given cycle
SELECT
bp.block_hash,
bp.block_height,
bp.received_at AS proposal_received_at
FROM block_proposals bp
WHERE bp.reward_cycle = ${cycleNumber}
AND bp.id = (
-- Select the earliest proposal for each block_hash
SELECT MIN(sub_bp.id)
FROM block_proposals sub_bp
WHERE sub_bp.block_hash = bp.block_hash
)
),
response_data AS (
-- Fetch the first (oldest) response for each (signer_key, signer_sighash) pair
SELECT DISTINCT ON (br.signer_key, br.signer_sighash)
-- Select responses associated with the proposals from the given cycle
SELECT
br.signer_key,
br.signer_sighash,
br.accepted,
br.received_at,
br.id
FROM block_responses br
WHERE br.id = (
-- Select the earliest response for each signer_sighash and signer_key
SELECT MIN(sub_br.id)
FROM block_responses sub_br
WHERE sub_br.signer_key = br.signer_key
AND sub_br.signer_sighash = br.signer_sighash
)
JOIN proposal_data pd ON br.signer_sighash = pd.block_hash -- Only responses linked to selected proposals
),
signer_proposal_data AS (
-- Cross join signers with proposals and left join filtered responses
Expand Down Expand Up @@ -376,7 +349,7 @@ export class PgStore extends BasePgStore {
}[]
>`
WITH signer_data AS (
-- Fetch the signer for the given cycle
-- Fetch the specific signer for the given cycle
SELECT
rss.signer_key,
rss.signer_weight,
Expand All @@ -386,39 +359,28 @@ export class PgStore extends BasePgStore {
AND rss.signer_key = ${normalizeHexString(signerId)}
),
proposal_data AS (
-- Fetch the first (oldest) proposal for each block_hash for the given cycle
-- Select all proposals for the given cycle
SELECT
bp.block_hash,
bp.block_height,
bp.received_at AS proposal_received_at
FROM block_proposals bp
WHERE bp.reward_cycle = ${cycleNumber}
AND bp.id = (
-- Select the earliest proposal for each block_hash
SELECT MIN(sub_bp.id)
FROM block_proposals sub_bp
WHERE sub_bp.block_hash = bp.block_hash
)
),
response_data AS (
-- Fetch the first (oldest) response for each (signer_key, signer_sighash) pair
SELECT DISTINCT ON (br.signer_key, br.signer_sighash)
-- Select all responses for the proposals in the given cycle
SELECT
br.signer_key,
br.signer_sighash,
br.accepted,
br.received_at,
br.id
FROM block_responses br
WHERE br.id = (
-- Select the earliest response for each signer_sighash and signer_key
SELECT MIN(sub_br.id)
FROM block_responses sub_br
WHERE sub_br.signer_key = br.signer_key
AND sub_br.signer_sighash = br.signer_sighash
)
JOIN proposal_data pd ON br.signer_sighash = pd.block_hash
WHERE br.signer_key = ${normalizeHexString(signerId)} -- Filter for the specific signer
),
signer_proposal_data AS (
-- Cross join signers with proposals and left join filtered responses
-- Cross join the specific signer with proposals and left join filtered responses
SELECT
sd.signer_key,
pd.block_hash,
Expand All @@ -427,13 +389,13 @@ export class PgStore extends BasePgStore {
rd.received_at AS response_received_at,
EXTRACT(MILLISECOND FROM (rd.received_at - pd.proposal_received_at)) AS response_time_ms
FROM signer_data sd
CROSS JOIN proposal_data pd -- Cross join to associate all signers with all proposals
CROSS JOIN proposal_data pd
LEFT JOIN response_data rd
ON pd.block_hash = rd.signer_sighash
AND sd.signer_key = rd.signer_key -- Match signers with their corresponding responses
),
aggregated_data AS (
-- Aggregate the proposal and response data by signer
-- Aggregate the proposal and response data for the specific signer
SELECT
spd.signer_key,
COUNT(CASE WHEN spd.accepted = true THEN 1 END)::integer AS proposals_accepted_count,
Expand Down

0 comments on commit 2d5c994

Please sign in to comment.