From fe0a8201b0f9886eba9e221746f6b1e471e8c42b Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Thu, 22 Dec 2022 17:02:06 +0100 Subject: [PATCH] Merge branch 'develop' into stacks-2.1 # Conflicts: # src/event-stream/event-server.ts --- src/datastore/pg-notifier.ts | 13 +++- src/datastore/pg-store-event-emitter.ts | 3 +- src/datastore/pg-store.ts | 8 +++ src/datastore/pg-write-store.ts | 1 + src/event-replay/event-replay.ts | 15 ---- src/event-replay/helpers.ts | 68 ++++++++++++------- src/event-stream/event-server.ts | 25 ++++++- src/tests-bns/event-server-tests.ts | 63 ++++++++++++++--- src/tests-bns/v1-import-tests.ts | 14 ++-- src/tests-event-replay/import-export-tests.ts | 12 +++- 10 files changed, 162 insertions(+), 60 deletions(-) diff --git a/src/datastore/pg-notifier.ts b/src/datastore/pg-notifier.ts index 9abe937381..ef6def194f 100644 --- a/src/datastore/pg-notifier.ts +++ b/src/datastore/pg-notifier.ts @@ -1,5 +1,6 @@ import * as postgres from 'postgres'; import { logError, logger } from '../helpers'; +import { DbConfigState } from './common'; import { connectPostgres, PgServer, PgSqlClient } from './connection'; type PgNotificationType = @@ -10,7 +11,8 @@ type PgNotificationType = | 'addressUpdate' | 'nameUpdate' | 'tokenMetadataUpdateQueued' - | 'tokensUpdate'; + | 'tokensUpdate' + | 'configStateUpdate'; export type PgTxNotificationPayload = { txId: string; @@ -34,6 +36,8 @@ export type PgAddressNotificationPayload = { blockHeight: number; }; +export type PgConfigStateNotificationPayload = DbConfigState; + export type PgTokenMetadataNotificationPayload = { queueId: number; }; @@ -54,7 +58,8 @@ type PgNotificationPayload = | PgNftEventNotificationPayload | PgTokenMetadataNotificationPayload | PgTokensNotificationPayload - | PgTxNotificationPayload; + | PgTxNotificationPayload + | PgConfigStateNotificationPayload; type PgNotification = { type: PgNotificationType; @@ -127,6 +132,10 @@ export class PgNotifier { await this.notify({ type: 'tokensUpdate', payload: payload }); } + public async sendConfigState(payload: PgConfigStateNotificationPayload) { + await this.notify({ type: 'configStateUpdate', payload }); + } + public async close() { await this.listener ?.unlisten() diff --git a/src/datastore/pg-store-event-emitter.ts b/src/datastore/pg-store-event-emitter.ts index b52e7d31f2..8813af606b 100644 --- a/src/datastore/pg-store-event-emitter.ts +++ b/src/datastore/pg-store-event-emitter.ts @@ -1,6 +1,6 @@ import { EventEmitter } from 'events'; import StrictEventEmitter from 'strict-event-emitter-types'; -import { DbMempoolStats } from './common'; +import { DbConfigState, DbMempoolStats } from './common'; type DataStoreEventEmitter = StrictEventEmitter< EventEmitter, @@ -14,6 +14,7 @@ type DataStoreEventEmitter = StrictEventEmitter< tokensUpdate: (contractID: string) => void; tokenMetadataUpdateQueued: (queueId: number) => void; mempoolStatsUpdate: (mempoolStats: DbMempoolStats) => void; + configStateUpdate: (configState: DbConfigState) => void; } >; diff --git a/src/datastore/pg-store.ts b/src/datastore/pg-store.ts index 3f79a5daaf..7787a8f339 100644 --- a/src/datastore/pg-store.ts +++ b/src/datastore/pg-store.ts @@ -29,6 +29,7 @@ import { DbBnsZoneFile, DbBurnchainReward, DbChainTip, + DbConfigState, DbEvent, DbEventTypeId, DbFtBalance, @@ -96,6 +97,7 @@ import { import { PgAddressNotificationPayload, PgBlockNotificationPayload, + PgConfigStateNotificationPayload, PgMicroblockNotificationPayload, PgNameNotificationPayload, PgNftEventNotificationPayload, @@ -245,6 +247,12 @@ export class PgStore { const nftEvent = notification.payload as PgNftEventNotificationPayload; this.eventEmitter.emit('nftEventUpdate', nftEvent.txId, nftEvent.eventIndex); break; + case 'configStateUpdate': + this.eventEmitter.emit( + 'configStateUpdate', + notification.payload as PgConfigStateNotificationPayload + ); + break; } }); } diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index 25da3e96c1..43643ab4d1 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -1870,6 +1870,7 @@ export class PgWriteStore extends PgStore { bns_subdomains_imported = ${configState.bns_subdomains_imported}, token_offering_imported = ${configState.token_offering_imported} `; + await this.notifier?.sendConfigState(configState); if (queryResult.count !== 1) { throw new Error(`Unexpected config update row count: ${queryResult.count}`); } diff --git a/src/event-replay/event-replay.ts b/src/event-replay/event-replay.ts index 0f1ce1cef7..9ced8e7584 100644 --- a/src/event-replay/event-replay.ts +++ b/src/event-replay/event-replay.ts @@ -118,8 +118,6 @@ export async function importEventsFromTsv( if (eventImportMode === EventImportMode.pruned) { console.log(`Ignoring all prunable events before block height: ${prunedBlockHeight}`); } - // Look for the TSV's genesis block information for BNS import. - const tsvGenesisBlockData = await findBnsGenesisBlockData(resolvedFilePath); const db = await PgWriteStore.connect({ usageName: 'import-events', @@ -137,15 +135,6 @@ export async function importEventsFromTsv( await importV1TokenOfferingData(db); - // Import V1 BNS names first. Subdomains will be imported after TSV replay is finished in order to - // keep the size of the `subdomains` table small. - if (process.env.BNS_IMPORT_DIR) { - logger.info(`Using BNS export data from: ${process.env.BNS_IMPORT_DIR}`); - await importV1BnsNames(db, process.env.BNS_IMPORT_DIR, tsvGenesisBlockData); - } else { - logger.warn(`Notice: full BNS functionality requires 'BNS_IMPORT_DIR' to be set.`); - } - // Import TSV chain data const readStream = fs.createReadStream(resolvedFilePath); const rawEventsIterator = getRawEventRequests(readStream, status => { @@ -186,10 +175,6 @@ export async function importEventsFromTsv( } } await db.finishEventReplay(); - if (process.env.BNS_IMPORT_DIR) { - logger.level = defaultLogLevel; - await importV1BnsSubdomains(db, process.env.BNS_IMPORT_DIR, tsvGenesisBlockData); - } console.log(`Event import and playback successful.`); await eventServer.closeAsync(); await db.close(); diff --git a/src/event-replay/helpers.ts b/src/event-replay/helpers.ts index e293ba02c7..0a1d6fbb97 100644 --- a/src/event-replay/helpers.ts +++ b/src/event-replay/helpers.ts @@ -4,6 +4,7 @@ import * as readline from 'readline'; import { decodeTransaction, TxPayloadTypeID } from 'stacks-encoding-native-js'; import { DataStoreBnsBlockData } from '../datastore/common'; import { ReverseFileStream } from './reverse-file-stream'; +import { CoreNodeBlockMessage } from '../event-stream/core-node-message'; export type BnsGenesisBlock = DataStoreBnsBlockData & { tx_id: string; @@ -42,36 +43,57 @@ export async function findTsvBlockHeight(filePath: string): Promise { * @returns Genesis block data */ export async function findBnsGenesisBlockData(filePath: string): Promise { + const genesisBlockMessage = await getGenesisBlockData(filePath); + const bnsGenesisBlock = getBnsGenesisBlockFromBlockMessage(genesisBlockMessage); + return bnsGenesisBlock; +} + +export async function getGenesisBlockData(filePath: string): Promise { const rl = readline.createInterface({ input: fs.createReadStream(filePath), crlfDelay: Infinity, }); - for await (const line of rl) { - const columns = line.split('\t'); - const eventName = columns[2]; - if (eventName === '/new_block') { - const payload = JSON.parse(columns[3]); - // Look for block 1 - if (payload.block_height === 1) { - for (const tx of payload.transactions) { - const decodedTx = decodeTransaction(tx.raw_tx); - // Look for the only token transfer transaction in the genesis block. This is the one - // that contains all the events, including all BNS name registrations. - if (decodedTx.payload.type_id === TxPayloadTypeID.TokenTransfer) { - rl.close(); - return { - index_block_hash: payload.index_block_hash, - parent_index_block_hash: payload.parent_index_block_hash, - microblock_hash: payload.parent_microblock, - microblock_sequence: payload.parent_microblock_sequence, - microblock_canonical: true, - tx_id: decodedTx.tx_id, - tx_index: tx.tx_index, - }; - } + try { + for await (const line of rl) { + const columns = line.split('\t'); + const eventName = columns[2]; + if (eventName === '/new_block') { + const blockMessage = JSON.parse(columns[3]); + if (blockMessage.block_height === 1) { + return blockMessage as CoreNodeBlockMessage; } } } + } finally { + rl.close(); + } + throw new Error('Genesis block data not found'); +} + +export function getBnsGenesisBlockFromBlockMessage( + genesisBlockMessage: CoreNodeBlockMessage +): BnsGenesisBlock { + if (genesisBlockMessage.block_height !== 1) { + throw new Error( + `This block message with height ${genesisBlockMessage.block_height} is not the genesis block message` + ); + } + const txs = genesisBlockMessage.transactions; + for (const tx of txs) { + const decodedTx = decodeTransaction(tx.raw_tx); + // Look for the only token transfer transaction in the genesis block. This is the one + // that contains all the events, including all BNS name registrations. + if (decodedTx.payload.type_id === TxPayloadTypeID.TokenTransfer) { + return { + index_block_hash: genesisBlockMessage.index_block_hash, + parent_index_block_hash: genesisBlockMessage.parent_index_block_hash, + microblock_hash: genesisBlockMessage.parent_microblock, + microblock_sequence: genesisBlockMessage.parent_microblock_sequence, + microblock_canonical: true, + tx_id: decodedTx.tx_id, + tx_index: tx.tx_index, + }; + } } throw new Error('BNS genesis block data not found'); } diff --git a/src/event-stream/event-server.ts b/src/event-stream/event-server.ts index 59edb08872..42489d8e08 100644 --- a/src/event-stream/event-server.ts +++ b/src/event-stream/event-server.ts @@ -68,6 +68,8 @@ import { createDbTxFromCoreMsg, getTxDbStatus, } from '../datastore/helpers'; +import { importV1BnsNames, importV1BnsSubdomains } from '../import-v1'; +import { getBnsGenesisBlockFromBlockMessage } from '../event-replay/helpers'; import { Pox2ContractIdentifer } from '../pox-helpers'; import { decodePox2PrintEvent } from './pox2-event-parsing'; @@ -732,6 +734,24 @@ export type EventStreamServer = net.Server & { closeAsync: () => Promise; }; +export const bnsImportMiddleware = (db: PgWriteStore) => { + return asyncHandler(async (req, res, next) => { + const blockMessage: CoreNodeBlockMessage = req.body; + const bnsDir = process.env.BNS_IMPORT_DIR; + if (blockMessage.block_height === 1 && bnsDir) { + const configState = await db.getConfigState(); + if (!configState.bns_names_onchain_imported || !configState.bns_subdomains_imported) { + const bnsGenesisBlock = getBnsGenesisBlockFromBlockMessage(blockMessage); + logger.verbose('Starting V1 BNS names import'); + await importV1BnsNames(db, bnsDir, bnsGenesisBlock); + logger.verbose('Starting V1 BNS subdomains import'); + await importV1BnsSubdomains(db, bnsDir, bnsGenesisBlock); + } + } + next(); + }); +}; + export async function startEventServer(opts: { datastore: PgWriteStore; chainId: ChainID; @@ -801,8 +821,8 @@ export async function startEventServer(opts: { '/new_block', asyncHandler(async (req, res, next) => { try { - const msg: CoreNodeBlockMessage = req.body; - await messageHandler.handleBlockMessage(opts.chainId, msg, db); + const blockMessage: CoreNodeBlockMessage = req.body; + await messageHandler.handleBlockMessage(opts.chainId, blockMessage, db); res.status(200).json({ result: 'ok' }); next(); } catch (error) { @@ -810,6 +830,7 @@ export async function startEventServer(opts: { res.status(500).json({ error: error }); } }), + bnsImportMiddleware(db), handleRawEventRequest ); diff --git a/src/tests-bns/event-server-tests.ts b/src/tests-bns/event-server-tests.ts index c3b3984e4d..72cd26214f 100644 --- a/src/tests-bns/event-server-tests.ts +++ b/src/tests-bns/event-server-tests.ts @@ -1,11 +1,13 @@ import { ChainID } from '@stacks/transactions'; import { bnsNameCV, httpPostRequest } from '../helpers'; -import { EventStreamServer, startEventServer } from '../event-stream/event-server'; +import { bnsImportMiddleware, EventStreamServer, startEventServer } from '../event-stream/event-server'; import { TestBlockBuilder, TestMicroblockStreamBuilder } from '../test-utils/test-builders'; import { DbAssetEventTypeId, DbBnsZoneFile } from '../datastore/common'; import { PgWriteStore } from '../datastore/pg-write-store'; import { cycleMigrations, runMigrations } from '../datastore/migrations'; import { PgSqlClient } from '../datastore/connection'; +import { getGenesisBlockData } from '../event-replay/helpers'; +import { NextFunction } from 'express'; describe('BNS event server tests', () => { let db: PgWriteStore; @@ -15,7 +17,7 @@ describe('BNS event server tests', () => { beforeEach(async () => { process.env.PG_DATABASE = 'postgres'; await cycleMigrations(); - db = await PgWriteStore.connect({ usageName: 'tests', withNotifier: false }); + db = await PgWriteStore.connect({ usageName: 'tests', withNotifier: true }); client = db.sql; eventServer = await startEventServer({ datastore: db, @@ -26,6 +28,12 @@ describe('BNS event server tests', () => { }); }); + afterEach(async () => { + await eventServer.closeAsync(); + await db?.close(); + await runMigrations(undefined, 'down'); + }); + test('namespace-ready called by a contract other than BNS', async () => { const block = new TestBlockBuilder({ block_height: 1, @@ -1027,9 +1035,48 @@ describe('BNS event server tests', () => { expect(namespaceList.results).toStrictEqual(['ape.mega']); }); - afterEach(async () => { - await eventServer.closeAsync(); - await db?.close(); - await runMigrations(undefined, 'down'); - }); -}); + test('BNS middleware imports bns when ir receives the genesis block', async () => { + process.env.BNS_IMPORT_DIR = 'src/tests-bns/import-test-files'; + const genesisBlock = await getGenesisBlockData('src/tests-event-replay/tsv/mainnet.tsv'); + const bnsImportMiddlewareInitialized = bnsImportMiddleware(db); + let mockRequest = { + body: genesisBlock + } as unknown as Partial; + let mockResponse: Partial = {}; + let nextFunction: NextFunction = jest.fn(); + await bnsImportMiddlewareInitialized(mockRequest as any, mockResponse as any, nextFunction) + + const configState = await db.getConfigState(); + expect(configState.bns_names_onchain_imported).toBe(true) + expect(configState.bns_subdomains_imported).toBe(true) + }) + + test('BNS middleware is async. /new_block posts return before importing BNS finishes', async () => { + process.env.BNS_IMPORT_DIR = 'src/tests-bns/import-test-files'; + const genesisBlock = await getGenesisBlockData('src/tests-event-replay/tsv/mainnet.tsv'); + + await httpPostRequest({ + host: '127.0.0.1', + port: eventServer.serverAddress.port, + path: '/new_block', + headers: { 'Content-Type': 'application/json' }, + body: Buffer.from(JSON.stringify(genesisBlock), 'utf8'), + throwOnNotOK: true, + }); + + const configState = await db.getConfigState(); + expect(configState.bns_names_onchain_imported).toBe(false) + expect(configState.bns_subdomains_imported).toBe(false) + + await new Promise(resolve => { + db.eventEmitter.on('configStateUpdate', (configState) => { + if (configState.bns_names_onchain_imported && configState.bns_subdomains_imported) { + expect(configState.bns_names_onchain_imported).toBe(true) + expect(configState.bns_subdomains_imported).toBe(true); + resolve(undefined); + } + }) + }) + db.eventEmitter.removeAllListeners('configStateUpdate'); + }) +}) diff --git a/src/tests-bns/v1-import-tests.ts b/src/tests-bns/v1-import-tests.ts index 9d649591a9..f7c22b5f8f 100644 --- a/src/tests-bns/v1-import-tests.ts +++ b/src/tests-bns/v1-import-tests.ts @@ -28,6 +28,13 @@ describe('BNS V1 import', () => { await db.update(block); }); + afterEach(async () => { + await new Promise(resolve => eventServer.close(() => resolve(true))); + await api.terminate(); + await db?.close(); + await runMigrations(undefined, 'down'); + }); + test('v1-import', async () => { const genesis: BnsGenesisBlock = { index_block_hash: block.block.index_block_hash, @@ -161,11 +168,4 @@ describe('BNS V1 import', () => { if (dbquery.result){ expect(dbquery.result.name).toBe('id.blockstack');} }); - - afterEach(async () => { - await new Promise(resolve => eventServer.close(() => resolve(true))); - await api.terminate(); - await db?.close(); - await runMigrations(undefined, 'down'); - }); }); diff --git a/src/tests-event-replay/import-export-tests.ts b/src/tests-event-replay/import-export-tests.ts index 9b63e44c05..464235e2d9 100644 --- a/src/tests-event-replay/import-export-tests.ts +++ b/src/tests-event-replay/import-export-tests.ts @@ -17,6 +17,10 @@ describe('import/export tests', () => { }); }); + afterEach(async () => { + await db?.close(); + }); + test('event import and export cycle', async () => { // Import from mocknet TSV await importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, true); @@ -95,7 +99,11 @@ describe('import/export tests', () => { await expect(databaseHasData({ ignoreMigrationTables: true })).resolves.toBe(false); }); - afterEach(async () => { - await db?.close(); + test('Bns import occurs', async () => { + process.env.BNS_IMPORT_DIR = 'src/tests-bns/import-test-files'; + await importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, true); + const configState = await db.getConfigState(); + expect(configState.bns_names_onchain_imported).toBe(true); + expect(configState.bns_subdomains_imported).toBe(true); }); });