Skip to content

Commit

Permalink
Use array for list of sorted keys and changes
Browse files Browse the repository at this point in the history
  • Loading branch information
vgrichina committed Feb 10, 2024
1 parent 160a4b7 commit 392b14d
Showing 1 changed file with 23 additions and 24 deletions.
47 changes: 23 additions & 24 deletions scripts/build-raw-near-lake-index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async function main() {
console.log('Processing shard', shard);
const allChangesByAccount = await reduceStream(
changesByAccountStream(shard, startBlockNumber, endBlockNumber),
(a, b) => merge(a, b, mergeChanges));
(a, b) => mergeObjects(a, b, mergeChanges));
await writeChanges(`${dstDir}/${shard}/index`, allChangesByAccount);
}

Expand Down Expand Up @@ -64,21 +64,25 @@ async function main() {
for (let { type, change } of state_changes) {
const { account_id, ...changeData } = change;
const accountChanges = changesByAccount[account_id];
// TODO: Avoid conversion to string key
const keyBytes = changeKey(type, changeData);
const key = keyBytes.toString('base64');
const key = changeKey(type, changeData);

if (!accountChanges) {
changesByAccount[account_id] = { [key]: [blockHeight] };
changesByAccount[account_id] = [ { key, changes: [blockHeight] } ];
} else {
if (accountChanges[key] && accountChanges[key].at(-1) != blockHeight) {
accountChanges[key].push(blockHeight);
const index = accountChanges.findIndex(({ key: k }) => k.equals(key));
if (index !== -1) {
accountChanges[index].changes.push(blockHeight);
} else {
accountChanges[key] = [blockHeight];
accountChanges.push({ key, changes: [blockHeight] });
}
}
}
}

for (let accountChanges of Object.values(changesByAccount)) {
accountChanges.sort((a, b) => a.key.compare(b.key));
}

yield changesByAccount;
}
}
Expand All @@ -91,7 +95,7 @@ async function writeChanges(outFolder, changesByAccount) {

for (let accountId in changesByAccount) {
const accountChanges = changesByAccount[accountId];
const totalChanges = Object.values(accountChanges).reduce((sum, changes) => sum + changes.length, 0);
const totalChanges = accountChanges.reduce((sum, { changes }) => sum + changes.length, 0);
if (totalChanges < MIN_CHANGES_PER_FILE) {
continue;
}
Expand Down Expand Up @@ -159,27 +163,21 @@ async function writeChangesFile(outPath, changesByAccount) {
} else {
writeBuffer(accountId);
}
const accountChanges = changesByAccount[accountId];
const sortedKeys = Object.keys(accountChanges).sort();

// NOTE: This is needed to avoid reading the whole file to find account changes
for (let keyStr of sortedKeys) {
// TODO: Avoid conversion to string key
const keyBytes = Buffer.from(keyStr, 'base64');
const allChanges = accountChanges[keyStr];

const accountChanges = changesByAccount[accountId];
for (let { key, changes: allChanges } of accountChanges) {
// NOTE: Changes arrays are split into chunks of up to 0xFF items
// TODO: Use 0xFFFF instead of 0xFF
const MAX_CHANGES_PER_RECORD = 0xFF;
for (let i = 0; i < allChanges.length; ) {
let changes = allChanges.slice(i, i + MAX_CHANGES_PER_RECORD);

const keyLength = keyBytes.length + 2;
const keyLength = key.length + 2;
const minChangesLength = 2 + 4 * 8; // 8 changes
if (offset + keyLength + minChangesLength > PAGE_SIZE) {
await flushPage(accountId);
}
writeBuffer(keyBytes);
writeBuffer(key);

// TODO: Calculate actual varint length
const maxChangesLength = Math.floor((buffer.length - offset - 2) / 4);
Expand Down Expand Up @@ -254,7 +252,7 @@ async function reduceStream(stream, fn) {
return fn(result, reduceRecursive(chunk, fn));
}

function merge(a, b, fn) {
function mergeObjects(a, b, fn) {
for (k in b) {
if (a[k]) {
a[k] = fn(a[k], b[k]);
Expand All @@ -266,18 +264,19 @@ function merge(a, b, fn) {
}

function mergeChanges(a, b) {
return merge(a, b, mergeSortedArrays);
return mergeSortedArrays(a, b, (a, b) => a.key.compare(b.key));
}

function mergeSortedArrays(a, b) {
function mergeSortedArrays(a, b, fn = (a, b) => a < b ? -1 : a > b ? 1 : 0) {
const result = [];
let i = 0;
let j = 0;
while (i < a.length && j < b.length) {
if (a[i] < b[j]) {
const comparison = fn(a[i], b[j]);
if (comparison < 0) {
result.push(a[i]);
i++;
} else if (a[i] > b[j]) {
} else if (comparison > 0) {
result.push(b[j]);
j++;
} else {
Expand Down

0 comments on commit 392b14d

Please sign in to comment.