Skip to content

Commit

Permalink
feat: ingest block-push events (#60)
Browse files Browse the repository at this point in the history
* feat: ingest block-push events

* fix: block_proposals are accepted on matching block_push

* feat: add `push_time_ms` to block_proposal responses
  • Loading branch information
zone117x authored Nov 22, 2024
1 parent 6fe690f commit 47f524d
Show file tree
Hide file tree
Showing 11 changed files with 386 additions and 43 deletions.
46 changes: 46 additions & 0 deletions migrations/1732111992784_block_pushes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { MigrationBuilder, ColumnDefinitions } from 'node-pg-migrate';

export const shorthands: ColumnDefinitions | undefined = undefined;

export function up(pgm: MigrationBuilder): void {
pgm.createTable('block_pushes', {
id: {
type: 'bigserial',
primaryKey: true,
},
received_at: {
type: 'timestamptz',
notNull: true,
},
miner_key: {
type: 'bytea',
notNull: true,
},
block_height: {
type: 'integer',
notNull: true,
},
block_time: {
type: 'timestamptz',
notNull: true,
},
// AKA signer_sighash
block_hash: {
type: 'bytea',
notNull: true,
},
index_block_hash: {
type: 'bytea',
notNull: true,
},
});

pgm.createIndex('block_pushes', ['received_at']);
pgm.createIndex('block_pushes', ['block_height']);
pgm.createIndex('block_pushes', ['block_hash']);
pgm.createIndex('block_pushes', ['index_block_hash']);

pgm.createConstraint('block_pushes', 'block_pushes_block_hash_unique', {
unique: ['block_hash'],
});
}
1 change: 1 addition & 0 deletions src/api/routes/block-proposals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ export function parseDbBlockProposalData(r: DbBlockProposalQueryResponse): Block
block_time: r.block_time,
cycle_number: r.cycle_number,
status: r.status,
push_time_ms: r.push_time_ms,

// cycle data
total_signer_count: r.total_signer_count,
Expand Down
55 changes: 32 additions & 23 deletions src/api/routes/socket-io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,39 @@ export const SocketIORoutes: FastifyPluginAsync<
const blockProposalNs = io.of('/block-proposals') as BlockProposalSocketNamespace;

const signerMessageListener = (msg: SignerMessagesEventPayload) => {
if (blockProposalNs.sockets.size === 0) {
return;
}
// Use Set to get a unique list of block hashes
const blockHashes = new Set<string>(
msg.map(m => ('proposal' in m ? m.proposal.blockHash : m.response.blockHash))
);
const proposalBroadcasts = Array.from(blockHashes).map(blockHash => {
return db
.sqlTransaction(async sql => {
const results = await db.getBlockProposal({
sql,
blockHash,
});
if (results.length > 0) {
const blockProposal = parseDbBlockProposalData(results[0]);
blockProposalNs.emit('blockProposal', blockProposal);
}
try {
if (blockProposalNs.sockets.size === 0) {
return;
}
// Use Set to get a unique list of block hashes
const blockHashes = new Set<string>(
msg.map(m => {
if ('proposal' in m) return m.proposal.blockHash;
else if ('response' in m) return m.response.blockHash;
else if ('push' in m) return m.push.blockHash;
else throw new Error(`Invalid signer message type: ${JSON.stringify(m)}`);
})
.catch((error: unknown) => {
logger.error(error, `Failed to broadcast block proposal for block hash ${blockHash}`);
});
});
void Promise.allSettled(proposalBroadcasts);
);
const proposalBroadcasts = Array.from(blockHashes).map(blockHash => {
return db
.sqlTransaction(async sql => {
const results = await db.getBlockProposal({
sql,
blockHash,
});
if (results.length > 0) {
const blockProposal = parseDbBlockProposalData(results[0]);
blockProposalNs.emit('blockProposal', blockProposal);
}
})
.catch((error: unknown) => {
logger.error(error, `Failed to broadcast block proposal for block hash ${blockHash}`);
});
});
void Promise.allSettled(proposalBroadcasts);
} catch (error) {
logger.error(error, 'Failed to broadcast block proposal');
}
};

fastify.addHook('onListen', () => {
Expand Down
5 changes: 5 additions & 0 deletions src/api/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ export const BlockProposalsEntrySchema = Type.Object({
}
),

push_time_ms: Type.Union([Type.Null(), Type.Integer()], {
description:
'Time duration (in milliseconds) taken between when the block was proposed and when it was pushed',
}),

// cycle data
total_signer_count: Type.Integer({
description: 'Total number of signers expected for this proposal',
Expand Down
36 changes: 34 additions & 2 deletions src/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { addAbortListener } from 'node:events';
import { addAbortListener, EventEmitter } from 'node:events';
import { parseISO, sub, isValid, Duration } from 'date-fns';

export const isDevEnv = process.env.NODE_ENV === 'development';
Expand All @@ -24,6 +24,8 @@ export function normalizeHexString(hexString: string): string {
return hexString.startsWith('0x') ? hexString : '0x' + hexString;
}

const DisposeSymbol: typeof Symbol.dispose = Symbol.dispose ?? Symbol.for('nodejs.dispose');

export function sleep(ms: number, signal?: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
if (signal?.aborted) {
Expand All @@ -32,7 +34,7 @@ export function sleep(ms: number, signal?: AbortSignal): Promise<void> {
}
const disposable = signal ? addAbortListener(signal, onAbort) : undefined;
const timeout = setTimeout(() => {
disposable?.[Symbol.dispose ?? (Symbol.for('nodejs.dispose') as typeof Symbol.dispose)]();
disposable?.[DisposeSymbol]();
resolve();
}, ms);
function onAbort() {
Expand Down Expand Up @@ -84,3 +86,33 @@ export type BlockIdParam =
| { type: 'height'; height: number }
| { type: 'hash'; hash: string }
| { type: 'latest'; latest: true };

/**
* Similar to `node:events.once` but with a predicate to filter events and supports typed EventEmitters
*/
export function waitForEvent<T extends Record<string, any[]>, K extends keyof T>(
emitter: EventEmitter<T>,
event: K,
predicate: (...args: T[K]) => boolean,
signal?: AbortSignal
): Promise<T[K]> {
return new Promise((resolve, reject) => {
if (signal?.aborted) {
reject(signal.reason as Error);
return;
}
const disposable = signal ? addAbortListener(signal, onAbort) : undefined;
const handler = (...args: T[K]) => {
if (predicate(...args)) {
disposable?.[DisposeSymbol]();
(emitter as EventEmitter).off(event as string, handler);
resolve(args);
}
};
(emitter as EventEmitter).on(event as string, handler);
function onAbort() {
(emitter as EventEmitter).off(event as string, handler);
reject((signal?.reason as Error) ?? new Error('Aborted'));
}
});
}
72 changes: 67 additions & 5 deletions src/pg/chainhook/chainhook-pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import {
} from '@hirosystems/api-toolkit';
import { StacksEvent, StacksPayload } from '@hirosystems/chainhook-client';
import {
BlockProposalEventArgs,
BlockResponseEventArgs,
DbBlock,
DbBlockProposal,
DbBlockPush,
DbBlockResponse,
DbBlockSignerSignature,
DbMockBlock,
Expand Down Expand Up @@ -41,6 +40,11 @@ type BlockResponseData = Extract<
{ type: 'BlockResponse' }
>['data'];

type BlockPushedData = Extract<
SignerMessage['payload']['data']['message'],
{ type: 'BlockPushed' }
>['data'];

type MockProposalData = Extract<
SignerMessage['payload']['data']['message'],
{ type: 'MockProposal' }
Expand Down Expand Up @@ -175,7 +179,20 @@ export class ChainhookPgStore extends BasePgStoreModule {
break;
}
case 'BlockPushed': {
this.logger.info(`Ignoring BlockPushed StackerDB event`);
const res = await this.applyBlockPush(
sql,
event.received_at_ms,
event.payload.data.pubkey,
event.payload.data.message.data
);
if (res.applied) {
appliedResults.push({
push: {
receiptTimestamp: event.received_at_ms,
blockHash: res.blockHash,
},
});
}
break;
}
case 'MockProposal': {
Expand Down Expand Up @@ -402,6 +419,19 @@ export class ChainhookPgStore extends BasePgStoreModule {
return proposal;
}

async deleteBlockPush(sql: PgSqlClient, blockHash: string): Promise<DbBlockPush> {
const result = await sql<DbBlockProposal[]>`
DELETE FROM block_pushes WHERE block_hash = ${blockHash} RETURNING *
`;
if (result.length === 0) {
throw new Error(`Block push not found for hash ${blockHash}`);
}
// copy the result to a new object to remove the id field
const blockPush = { ...result[0], id: undefined };
delete blockPush.id;
return blockPush;
}

async deleteBlockResponses(sql: PgSqlClient, blockHash: string): Promise<DbBlockResponse[]> {
const result = await sql<DbBlockResponse[]>`
DELETE FROM block_responses WHERE signer_sighash = ${blockHash} RETURNING *
Expand All @@ -414,6 +444,38 @@ export class ChainhookPgStore extends BasePgStoreModule {
});
}

private async applyBlockPush(
sql: PgSqlClient,
receivedAt: number,
minerPubkey: string,
messageData: BlockPushedData
): Promise<{ applied: false } | { applied: true; blockHash: string }> {
const blockHash = normalizeHexString(messageData.block.block_hash);
const dbBlockPush: DbBlockPush = {
received_at: unixTimeMillisecondsToISO(receivedAt),
miner_key: normalizeHexString(minerPubkey),
block_height: messageData.block.header.chain_length,
block_time: unixTimeSecondsToISO(messageData.block.header.timestamp),
block_hash: blockHash,
index_block_hash: normalizeHexString(messageData.block.index_block_hash),
};
const result = await sql`
INSERT INTO block_pushes ${sql(dbBlockPush)}
ON CONFLICT ON CONSTRAINT block_pushes_block_hash_unique DO NOTHING
`;

if (result.count === 0) {
this.logger.info(
`Skipped inserting duplicate block push hash=${dbBlockPush.block_hash}, miner=${dbBlockPush.miner_key}`
);
return { applied: false };
}
this.logger.info(
`ChainhookPgStore apply block_push hash=${dbBlockPush.block_hash}, miner=${dbBlockPush.miner_key}`
);
return { applied: true, blockHash };
}

private async applyBlockResponse(
sql: PgSqlClient,
receivedAt: number,
Expand Down Expand Up @@ -644,7 +706,7 @@ export class ChainhookPgStore extends BasePgStoreModule {
await this.rollBackBlockSignerSignatures(sql, blockHeight);
}

private async rollBackBlock(sql: PgSqlClient, blockHeight: number) {
async rollBackBlock(sql: PgSqlClient, blockHeight: number) {
const res = await sql`
DELETE FROM blocks WHERE block_height = ${blockHeight}
`;
Expand All @@ -656,7 +718,7 @@ export class ChainhookPgStore extends BasePgStoreModule {
}
}

private async rollBackBlockSignerSignatures(sql: PgSqlClient, blockHeight: number) {
async rollBackBlockSignerSignatures(sql: PgSqlClient, blockHeight: number) {
const res = await sql`
DELETE FROM block_signer_signatures WHERE block_height = ${blockHeight}
`;
Expand Down
20 changes: 18 additions & 2 deletions src/pg/pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,15 @@ export class PgStore extends BasePgStore {
-- Proposal status
CASE
WHEN block_pushes.block_hash IS NOT NULL THEN 'accepted'
WHEN bp.block_height > ct.block_height THEN 'pending'
WHEN b.block_hash IS NULL THEN 'rejected'
WHEN b.block_hash = bp.block_hash THEN 'accepted'
ELSE 'rejected'
END AS status,
(EXTRACT(EPOCH FROM (block_pushes.received_at - bp.received_at)) * 1000)::integer AS push_time_ms,
-- Aggregate cycle data from reward_set_signers
COUNT(DISTINCT rss.signer_key)::integer AS total_signer_count,
SUM(rss.signer_weight)::integer AS total_signer_weight,
Expand Down Expand Up @@ -186,6 +189,9 @@ export class PgStore extends BasePgStore {
LEFT JOIN blocks b
ON b.block_height = bp.block_height
LEFT JOIN block_pushes
ON block_pushes.block_hash = bp.block_hash
LEFT JOIN reward_set_signers rss
ON rss.cycle_number = bp.reward_cycle
Expand All @@ -203,7 +209,9 @@ export class PgStore extends BasePgStore {
bp.block_time,
bp.reward_cycle,
ct.block_height,
b.block_hash
b.block_hash,
block_pushes.block_hash,
block_pushes.received_at
ORDER BY bp.received_at DESC
`;
Expand All @@ -223,12 +231,15 @@ export class PgStore extends BasePgStore {
-- Proposal status
CASE
WHEN block_pushes.block_hash IS NOT NULL THEN 'accepted'
WHEN bp.block_height > ct.block_height THEN 'pending'
WHEN b.block_hash IS NULL THEN 'rejected'
WHEN b.block_hash = bp.block_hash THEN 'accepted'
ELSE 'rejected'
END AS status,
(EXTRACT(EPOCH FROM (block_pushes.received_at - bp.received_at)) * 1000)::integer AS push_time_ms,
-- Aggregate cycle data from reward_set_signers
COUNT(DISTINCT rss.signer_key)::integer AS total_signer_count,
SUM(rss.signer_weight)::integer AS total_signer_weight,
Expand Down Expand Up @@ -276,6 +287,9 @@ export class PgStore extends BasePgStore {
LEFT JOIN blocks b
ON b.block_height = bp.block_height
LEFT JOIN block_pushes
ON block_pushes.block_hash = bp.block_hash
LEFT JOIN reward_set_signers rss
ON rss.cycle_number = bp.reward_cycle
Expand All @@ -289,7 +303,9 @@ export class PgStore extends BasePgStore {
GROUP BY
bp.id,
ct.block_height,
b.block_hash
b.block_hash,
block_pushes.block_hash,
block_pushes.received_at
LIMIT 1
`;
Expand Down
Loading

0 comments on commit 47f524d

Please sign in to comment.