Skip to content

Commit

Permalink
Add manual retries
Browse files Browse the repository at this point in the history
  • Loading branch information
vgrichina committed Jan 4, 2024
1 parent 656e62a commit 008a371
Showing 1 changed file with 47 additions and 22 deletions.
69 changes: 47 additions & 22 deletions scripts/load-raw-near-lake.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,63 @@ function normalizeBlockHeight(number) {
return number.toString().padStart(12, '0');
}

async function* blockNumbersStream(client, bucketName, startAfter, pageSize = 50) {
let listObjects;
do {
listObjects = await client.send(
async function withRetries(fn, maxAttempts = 3) {
for (let attempt = 0; attempt < maxAttempts; attempt++) {
try {
return await fn();
} catch (error) {
console.error(`Attempt ${attempt + 1} failed: `, error);
if (attempt === maxAttempts - 1) {
throw error;
}
}
}
}

// TODO: Not sure why AWS doesn't retry as expected

async function listObjects(client, { bucketName, startAfter, maxKeys }) {
return withRetries(async () => {
return await client.send(
new ListObjectsV2Command({
Bucket: bucketName,
MaxKeys: pageSize,
MaxKeys: maxKeys,
Delimiter: '/',
StartAfter: normalizeBlockHeight(startAfter),
StartAfter: startAfter,
RequestPayer: 'requester',
})
);
});
}

async function getObject(client, { bucketName, key }) {
return withRetries(async () => {
if (Math.random() < 0.01) {
throw new Error('Random error');
}

return await client.send(
new GetObjectCommand({
Bucket: bucketName,
Key: key,
RequestPayer: 'requester',
})
);
const blockNumbers = (listObjects.CommonPrefixes || []).map((p) => parseInt(p.Prefix.split('/')[0]));
});
}

async function* blockNumbersStream(client, bucketName, startAfter, pageSize = 50) {
let listObjectsResult;
do {
listObjectsResult = await listObjects(client, { bucketName, startAfter: normalizeBlockHeight(startAfter), maxKeys: pageSize });
const blockNumbers = (listObjectsResult.CommonPrefixes || []).map((p) => parseInt(p.Prefix.split('/')[0]));

for (const blockNumber of blockNumbers) {
yield blockNumber;
}

startAfter = blockNumbers[blockNumbers.length - 1] + 1;
} while (listObjects.IsTruncated);
} while (listObjectsResult.IsTruncated);
}

async function *blockPromises(client, bucketName, startAfter, limit = 1000) {
Expand All @@ -55,13 +92,7 @@ async function *blockPromises(client, bucketName, startAfter, limit = 1000) {

const promise = (async () => {
const blockHeight = normalizeBlockHeight(blockNumber);
const blockData = await client.send(
new GetObjectCommand({
Bucket: bucketName,
Key: `${blockHeight}/block.json`,
RequestPayer: 'requester',
})
);
const blockData = await getObject(client, { bucketName, key: `${blockHeight}/block.json` });

const blockReadable = blockData.Body;
const chunks = [];
Expand All @@ -73,13 +104,7 @@ async function *blockPromises(client, bucketName, startAfter, limit = 1000) {

return { block: blockBuffer, blockHeight, shards: await Promise.all(
block.chunks.map(async (_, i) => {
const chunkData = await client.send(
new GetObjectCommand({
Bucket: bucketName,
Key: `${blockHeight}/shard_${i}.json`,
RequestPayer: 'requester',
})
);
const chunkData = await getObject(client, { bucketName, key: `${blockHeight}/shard_${i}.json` });

return chunkData.Body;
}))
Expand Down

0 comments on commit 008a371

Please sign in to comment.