Skip to content

Commit

Permalink
Merge branch 'develop' into stacks-2.1
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/event-stream/event-server.ts
  • Loading branch information
zone117x committed Dec 22, 2022
1 parent bb35c2a commit fe0a820
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 60 deletions.
13 changes: 11 additions & 2 deletions src/datastore/pg-notifier.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as postgres from 'postgres';
import { logError, logger } from '../helpers';
import { DbConfigState } from './common';
import { connectPostgres, PgServer, PgSqlClient } from './connection';

type PgNotificationType =
Expand All @@ -10,7 +11,8 @@ type PgNotificationType =
| 'addressUpdate'
| 'nameUpdate'
| 'tokenMetadataUpdateQueued'
| 'tokensUpdate';
| 'tokensUpdate'
| 'configStateUpdate';

export type PgTxNotificationPayload = {
txId: string;
Expand All @@ -34,6 +36,8 @@ export type PgAddressNotificationPayload = {
blockHeight: number;
};

export type PgConfigStateNotificationPayload = DbConfigState;

export type PgTokenMetadataNotificationPayload = {
queueId: number;
};
Expand All @@ -54,7 +58,8 @@ type PgNotificationPayload =
| PgNftEventNotificationPayload
| PgTokenMetadataNotificationPayload
| PgTokensNotificationPayload
| PgTxNotificationPayload;
| PgTxNotificationPayload
| PgConfigStateNotificationPayload;

type PgNotification = {
type: PgNotificationType;
Expand Down Expand Up @@ -127,6 +132,10 @@ export class PgNotifier {
await this.notify({ type: 'tokensUpdate', payload: payload });
}

public async sendConfigState(payload: PgConfigStateNotificationPayload) {
await this.notify({ type: 'configStateUpdate', payload });
}

public async close() {
await this.listener
?.unlisten()
Expand Down
3 changes: 2 additions & 1 deletion src/datastore/pg-store-event-emitter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EventEmitter } from 'events';
import StrictEventEmitter from 'strict-event-emitter-types';
import { DbMempoolStats } from './common';
import { DbConfigState, DbMempoolStats } from './common';

type DataStoreEventEmitter = StrictEventEmitter<
EventEmitter,
Expand All @@ -14,6 +14,7 @@ type DataStoreEventEmitter = StrictEventEmitter<
tokensUpdate: (contractID: string) => void;
tokenMetadataUpdateQueued: (queueId: number) => void;
mempoolStatsUpdate: (mempoolStats: DbMempoolStats) => void;
configStateUpdate: (configState: DbConfigState) => void;
}
>;

Expand Down
8 changes: 8 additions & 0 deletions src/datastore/pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
DbBnsZoneFile,
DbBurnchainReward,
DbChainTip,
DbConfigState,
DbEvent,
DbEventTypeId,
DbFtBalance,
Expand Down Expand Up @@ -96,6 +97,7 @@ import {
import {
PgAddressNotificationPayload,
PgBlockNotificationPayload,
PgConfigStateNotificationPayload,
PgMicroblockNotificationPayload,
PgNameNotificationPayload,
PgNftEventNotificationPayload,
Expand Down Expand Up @@ -245,6 +247,12 @@ export class PgStore {
const nftEvent = notification.payload as PgNftEventNotificationPayload;
this.eventEmitter.emit('nftEventUpdate', nftEvent.txId, nftEvent.eventIndex);
break;
case 'configStateUpdate':
this.eventEmitter.emit(
'configStateUpdate',
notification.payload as PgConfigStateNotificationPayload
);
break;
}
});
}
Expand Down
1 change: 1 addition & 0 deletions src/datastore/pg-write-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1870,6 +1870,7 @@ export class PgWriteStore extends PgStore {
bns_subdomains_imported = ${configState.bns_subdomains_imported},
token_offering_imported = ${configState.token_offering_imported}
`;
await this.notifier?.sendConfigState(configState);
if (queryResult.count !== 1) {
throw new Error(`Unexpected config update row count: ${queryResult.count}`);
}
Expand Down
15 changes: 0 additions & 15 deletions src/event-replay/event-replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ export async function importEventsFromTsv(
if (eventImportMode === EventImportMode.pruned) {
console.log(`Ignoring all prunable events before block height: ${prunedBlockHeight}`);
}
// Look for the TSV's genesis block information for BNS import.
const tsvGenesisBlockData = await findBnsGenesisBlockData(resolvedFilePath);

const db = await PgWriteStore.connect({
usageName: 'import-events',
Expand All @@ -137,15 +135,6 @@ export async function importEventsFromTsv(

await importV1TokenOfferingData(db);

// Import V1 BNS names first. Subdomains will be imported after TSV replay is finished in order to
// keep the size of the `subdomains` table small.
if (process.env.BNS_IMPORT_DIR) {
logger.info(`Using BNS export data from: ${process.env.BNS_IMPORT_DIR}`);
await importV1BnsNames(db, process.env.BNS_IMPORT_DIR, tsvGenesisBlockData);
} else {
logger.warn(`Notice: full BNS functionality requires 'BNS_IMPORT_DIR' to be set.`);
}

// Import TSV chain data
const readStream = fs.createReadStream(resolvedFilePath);
const rawEventsIterator = getRawEventRequests(readStream, status => {
Expand Down Expand Up @@ -186,10 +175,6 @@ export async function importEventsFromTsv(
}
}
await db.finishEventReplay();
if (process.env.BNS_IMPORT_DIR) {
logger.level = defaultLogLevel;
await importV1BnsSubdomains(db, process.env.BNS_IMPORT_DIR, tsvGenesisBlockData);
}
console.log(`Event import and playback successful.`);
await eventServer.closeAsync();
await db.close();
Expand Down
68 changes: 45 additions & 23 deletions src/event-replay/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as readline from 'readline';
import { decodeTransaction, TxPayloadTypeID } from 'stacks-encoding-native-js';
import { DataStoreBnsBlockData } from '../datastore/common';
import { ReverseFileStream } from './reverse-file-stream';
import { CoreNodeBlockMessage } from '../event-stream/core-node-message';

export type BnsGenesisBlock = DataStoreBnsBlockData & {
tx_id: string;
Expand Down Expand Up @@ -42,36 +43,57 @@ export async function findTsvBlockHeight(filePath: string): Promise<number> {
* @returns Genesis block data
*/
export async function findBnsGenesisBlockData(filePath: string): Promise<BnsGenesisBlock> {
const genesisBlockMessage = await getGenesisBlockData(filePath);
const bnsGenesisBlock = getBnsGenesisBlockFromBlockMessage(genesisBlockMessage);
return bnsGenesisBlock;
}

export async function getGenesisBlockData(filePath: string): Promise<CoreNodeBlockMessage> {
const rl = readline.createInterface({
input: fs.createReadStream(filePath),
crlfDelay: Infinity,
});
for await (const line of rl) {
const columns = line.split('\t');
const eventName = columns[2];
if (eventName === '/new_block') {
const payload = JSON.parse(columns[3]);
// Look for block 1
if (payload.block_height === 1) {
for (const tx of payload.transactions) {
const decodedTx = decodeTransaction(tx.raw_tx);
// Look for the only token transfer transaction in the genesis block. This is the one
// that contains all the events, including all BNS name registrations.
if (decodedTx.payload.type_id === TxPayloadTypeID.TokenTransfer) {
rl.close();
return {
index_block_hash: payload.index_block_hash,
parent_index_block_hash: payload.parent_index_block_hash,
microblock_hash: payload.parent_microblock,
microblock_sequence: payload.parent_microblock_sequence,
microblock_canonical: true,
tx_id: decodedTx.tx_id,
tx_index: tx.tx_index,
};
}
try {
for await (const line of rl) {
const columns = line.split('\t');
const eventName = columns[2];
if (eventName === '/new_block') {
const blockMessage = JSON.parse(columns[3]);
if (blockMessage.block_height === 1) {
return blockMessage as CoreNodeBlockMessage;
}
}
}
} finally {
rl.close();
}
throw new Error('Genesis block data not found');
}

export function getBnsGenesisBlockFromBlockMessage(
genesisBlockMessage: CoreNodeBlockMessage
): BnsGenesisBlock {
if (genesisBlockMessage.block_height !== 1) {
throw new Error(
`This block message with height ${genesisBlockMessage.block_height} is not the genesis block message`
);
}
const txs = genesisBlockMessage.transactions;
for (const tx of txs) {
const decodedTx = decodeTransaction(tx.raw_tx);
// Look for the only token transfer transaction in the genesis block. This is the one
// that contains all the events, including all BNS name registrations.
if (decodedTx.payload.type_id === TxPayloadTypeID.TokenTransfer) {
return {
index_block_hash: genesisBlockMessage.index_block_hash,
parent_index_block_hash: genesisBlockMessage.parent_index_block_hash,
microblock_hash: genesisBlockMessage.parent_microblock,
microblock_sequence: genesisBlockMessage.parent_microblock_sequence,
microblock_canonical: true,
tx_id: decodedTx.tx_id,
tx_index: tx.tx_index,
};
}
}
throw new Error('BNS genesis block data not found');
}
Expand Down
25 changes: 23 additions & 2 deletions src/event-stream/event-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ import {
createDbTxFromCoreMsg,
getTxDbStatus,
} from '../datastore/helpers';
import { importV1BnsNames, importV1BnsSubdomains } from '../import-v1';
import { getBnsGenesisBlockFromBlockMessage } from '../event-replay/helpers';
import { Pox2ContractIdentifer } from '../pox-helpers';
import { decodePox2PrintEvent } from './pox2-event-parsing';

Expand Down Expand Up @@ -732,6 +734,24 @@ export type EventStreamServer = net.Server & {
closeAsync: () => Promise<void>;
};

export const bnsImportMiddleware = (db: PgWriteStore) => {
return asyncHandler(async (req, res, next) => {
const blockMessage: CoreNodeBlockMessage = req.body;
const bnsDir = process.env.BNS_IMPORT_DIR;
if (blockMessage.block_height === 1 && bnsDir) {
const configState = await db.getConfigState();
if (!configState.bns_names_onchain_imported || !configState.bns_subdomains_imported) {
const bnsGenesisBlock = getBnsGenesisBlockFromBlockMessage(blockMessage);
logger.verbose('Starting V1 BNS names import');
await importV1BnsNames(db, bnsDir, bnsGenesisBlock);
logger.verbose('Starting V1 BNS subdomains import');
await importV1BnsSubdomains(db, bnsDir, bnsGenesisBlock);
}
}
next();
});
};

export async function startEventServer(opts: {
datastore: PgWriteStore;
chainId: ChainID;
Expand Down Expand Up @@ -801,15 +821,16 @@ export async function startEventServer(opts: {
'/new_block',
asyncHandler(async (req, res, next) => {
try {
const msg: CoreNodeBlockMessage = req.body;
await messageHandler.handleBlockMessage(opts.chainId, msg, db);
const blockMessage: CoreNodeBlockMessage = req.body;
await messageHandler.handleBlockMessage(opts.chainId, blockMessage, db);
res.status(200).json({ result: 'ok' });
next();
} catch (error) {
logError(`error processing core-node /new_block: ${error}`, error);
res.status(500).json({ error: error });
}
}),
bnsImportMiddleware(db),
handleRawEventRequest
);

Expand Down
63 changes: 55 additions & 8 deletions src/tests-bns/event-server-tests.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { ChainID } from '@stacks/transactions';
import { bnsNameCV, httpPostRequest } from '../helpers';
import { EventStreamServer, startEventServer } from '../event-stream/event-server';
import { bnsImportMiddleware, EventStreamServer, startEventServer } from '../event-stream/event-server';
import { TestBlockBuilder, TestMicroblockStreamBuilder } from '../test-utils/test-builders';
import { DbAssetEventTypeId, DbBnsZoneFile } from '../datastore/common';
import { PgWriteStore } from '../datastore/pg-write-store';
import { cycleMigrations, runMigrations } from '../datastore/migrations';
import { PgSqlClient } from '../datastore/connection';
import { getGenesisBlockData } from '../event-replay/helpers';
import { NextFunction } from 'express';

describe('BNS event server tests', () => {
let db: PgWriteStore;
Expand All @@ -15,7 +17,7 @@ describe('BNS event server tests', () => {
beforeEach(async () => {
process.env.PG_DATABASE = 'postgres';
await cycleMigrations();
db = await PgWriteStore.connect({ usageName: 'tests', withNotifier: false });
db = await PgWriteStore.connect({ usageName: 'tests', withNotifier: true });
client = db.sql;
eventServer = await startEventServer({
datastore: db,
Expand All @@ -26,6 +28,12 @@ describe('BNS event server tests', () => {
});
});

afterEach(async () => {
await eventServer.closeAsync();
await db?.close();
await runMigrations(undefined, 'down');
});

test('namespace-ready called by a contract other than BNS', async () => {
const block = new TestBlockBuilder({
block_height: 1,
Expand Down Expand Up @@ -1027,9 +1035,48 @@ describe('BNS event server tests', () => {
expect(namespaceList.results).toStrictEqual(['ape.mega']);
});

afterEach(async () => {
await eventServer.closeAsync();
await db?.close();
await runMigrations(undefined, 'down');
});
});
test('BNS middleware imports bns when ir receives the genesis block', async () => {
process.env.BNS_IMPORT_DIR = 'src/tests-bns/import-test-files';
const genesisBlock = await getGenesisBlockData('src/tests-event-replay/tsv/mainnet.tsv');
const bnsImportMiddlewareInitialized = bnsImportMiddleware(db);
let mockRequest = {
body: genesisBlock
} as unknown as Partial<Request>;
let mockResponse: Partial<Response> = {};
let nextFunction: NextFunction = jest.fn();
await bnsImportMiddlewareInitialized(mockRequest as any, mockResponse as any, nextFunction)

const configState = await db.getConfigState();
expect(configState.bns_names_onchain_imported).toBe(true)
expect(configState.bns_subdomains_imported).toBe(true)
})

test('BNS middleware is async. /new_block posts return before importing BNS finishes', async () => {
process.env.BNS_IMPORT_DIR = 'src/tests-bns/import-test-files';
const genesisBlock = await getGenesisBlockData('src/tests-event-replay/tsv/mainnet.tsv');

await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: '/new_block',
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(JSON.stringify(genesisBlock), 'utf8'),
throwOnNotOK: true,
});

const configState = await db.getConfigState();
expect(configState.bns_names_onchain_imported).toBe(false)
expect(configState.bns_subdomains_imported).toBe(false)

await new Promise(resolve => {
db.eventEmitter.on('configStateUpdate', (configState) => {
if (configState.bns_names_onchain_imported && configState.bns_subdomains_imported) {
expect(configState.bns_names_onchain_imported).toBe(true)
expect(configState.bns_subdomains_imported).toBe(true);
resolve(undefined);
}
})
})
db.eventEmitter.removeAllListeners('configStateUpdate');
})
})
14 changes: 7 additions & 7 deletions src/tests-bns/v1-import-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ describe('BNS V1 import', () => {
await db.update(block);
});

afterEach(async () => {
await new Promise(resolve => eventServer.close(() => resolve(true)));
await api.terminate();
await db?.close();
await runMigrations(undefined, 'down');
});

test('v1-import', async () => {
const genesis: BnsGenesisBlock = {
index_block_hash: block.block.index_block_hash,
Expand Down Expand Up @@ -161,11 +168,4 @@ describe('BNS V1 import', () => {
if (dbquery.result){
expect(dbquery.result.name).toBe('id.blockstack');}
});

afterEach(async () => {
await new Promise(resolve => eventServer.close(() => resolve(true)));
await api.terminate();
await db?.close();
await runMigrations(undefined, 'down');
});
});
Loading

0 comments on commit fe0a820

Please sign in to comment.