Skip to content

Commit

Permalink
WIP: building and reading changes index
Browse files Browse the repository at this point in the history
  • Loading branch information
vgrichina committed Feb 6, 2024
1 parent 0b6f9a8 commit 658a93f
Showing 1 changed file with 98 additions and 23 deletions.
121 changes: 98 additions & 23 deletions scripts/build-raw-near-lake-index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

const fs = require('fs');
const { writeFile } = require('fs/promises');
const { writeFile, open } = require('fs/promises');
const zlib = require('zlib');
const tar = require('tar-stream');

Expand Down Expand Up @@ -114,44 +114,77 @@ async function writeChangesFile(outPath, changesByAccount) {
const outStream = fs.createWriteStream(outPath);
const buffer = Buffer.alloc(PAGE_SIZE);
let offset = 0;

function writeUInt16(value) {
offset = buffer.writeUInt16LE(value, offset);
}

function writeString(value) {
writeUInt16(value.length);
offset += buffer.write(value, offset);
}

async function flushPage(accountId) {
console.log('Writing', outPath, accountId, offset);

// TODO: Make sure that at least 4 zeros indicate the end of the page
// Fill the rest of the page with zeros
buffer.fill(0, offset);

await new Promise((resolve, reject) => {
outStream.write(buffer, e => e ? reject(e) : resolve());
});
offset = 0;

if (accountId) {
writeString(accountId);
}
}

for (let accountId in changesByAccount) {
offset = buffer.writeUInt8(accountId.length, offset);
offset += buffer.write(accountId, offset);
const accountIdLength = Buffer.byteLength(accountId) + 2;
if (offset + accountIdLength >= PAGE_SIZE) {
await flushPage(accountId);
} else {
writeString(accountId);
}

const accountChanges = changesByAccount[accountId];

// TODO: Align all changes by page size, start new page with account id
// NOTE: This is needed to avoid reading the whole file to find account changes
for (let key in accountChanges) {
const allChanges = accountChanges[key];

// NOTE: Changes arrays are split into chunks of 0xFF items
for (let i = 0; i < allChanges.length + 0xFF; i += 0xFF) {
const changes = allChanges.slice(i, i + 0xFF);
// NOTE: Changes arrays are split into chunks of up to 0xFF items
// TODO: Use 0xFFFF instead of 0xFF
const MAX_CHANGES_PER_RECORD = 0xFF;
for (let i = 0; i < allChanges.length; ) {
let changes = allChanges.slice(i, i + MAX_CHANGES_PER_RECORD);

// TODO: Check max key length
offset = buffer.writeUInt8(key.length, offset);
offset += buffer.write(key, offset);
const keyLength = Buffer.byteLength(key) + 2;
const minChangesLength = 2 + 4 * 8; // 8 changes
if (offset + keyLength + minChangesLength > PAGE_SIZE) {
await flushPage(accountId);
}
writeString(key);

offset = buffer.writeUInt8(changes.length, offset);
const maxChangesLength = Math.floor((buffer.length - offset - 2) / 4);
if (changes.length > maxChangesLength) {
changes = changes.slice(0, maxChangesLength);
}
writeUInt16(changes.length);
for (let change of changes) {
offset = buffer.writeInt32LE(change, offset);

// TODO: Adjust this as needed
if (offset > PAGE_SIZE - 1000) {
console.log('Writing', outPath, offset);
await new Promise((resolve, reject) => {
outStream.write(buffer.slice(0, offset), e => e ? reject(e) : resolve());
});
offset = 0;
}
}
i += changes.length;
}
}

writeUInt16(0);
}

await new Promise((resolve, reject) => {
console.log('Writing', outPath, offset);
outStream.end(buffer.slice(0, offset), e => e ? reject(e) : resolve());
});
await flushPage();
}

function reduceRecursive(items, fn) {
Expand Down Expand Up @@ -236,6 +269,48 @@ function changeKey(type, changeData) {
}
}

async function *readChangesFile(inPath) {
const file = await open(inPath, 'r');

const int32Buffer = Buffer.alloc(4);
async function readInt32() {
await file.read({ buffer: int32Buffer, length: 4 });
return int32Buffer.readUInt32LE(0);
}

const int16Buffer = Buffer.alloc(2);
async function readUInt16() {
await file.read({ buffer: int16Buffer, length: 2 });
return int16Buffer.readUInt16LE(0);
}

async function readString() {
const length = await readUInt16();
if (length === 0) {
return null;
}

const buffer = Buffer.alloc(length);
await file.read({ buffer, length });
return buffer.toString('utf-8');
}

// TODO: Iterate through pages
let accountId;
while (accountId = await readString()) {
let key;
while (key = await readString()) {
const changes = accountChanges[key] || [];
const count = await readUInt16();
for (let i = 0; i < count; i++) {
changes.push(await readInt32());
}

yield { accountId, key, changes };
}
}
}

function normalizeBlockHeight(number) {
return number.toString().padStart(12, '0');
}
Expand Down

0 comments on commit 658a93f

Please sign in to comment.