Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Parquet as an output format for collections #394

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 101 additions & 3 deletions task/collect.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { mkdirp } from 'mkdirp';
import S3 from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import archiver from 'archiver';
import parquetjs from '@dsnp/parquetjs';
import minimist from 'minimist';
import { Transform } from 'stream';

Expand Down Expand Up @@ -108,9 +109,15 @@ async function collect(tmp, collection, oa) {
const zip = await zip_datas(tmp, collection_data, collection.name);

console.error(`ok - zip created: ${zip}`);
await upload_collection(zip, collection.name);
await upload_zip_collection(zip, collection.name);
console.error('ok - archive uploaded');

const pq = await parquet_datas(tmp, collection_data, collection.name);

console.error(`ok - parquet created: ${pq}`);
await upload_parquet_collection(pq, collection.name);
console.error('ok - parquet uploaded');

await oa.cmd('collection', 'update', {
':collection': collection.id,
size: fs.statSync(zip).size
Expand Down Expand Up @@ -193,7 +200,7 @@ async function get_source(oa, tmp, data, stats) {
return path.resolve(tmp, 'sources', dir, source);
}

async function upload_collection(file, name) {
async function upload_zip_collection(file, name) {
const s3uploader = new Upload({
client: s3,
params: {
Expand All @@ -217,7 +224,6 @@ async function upload_collection(file, name) {
endpoint: `https://${process.env.CLOUDFLARE_ACCOUNT_ID}.r2.cloudflarestorage.com`
});


const r2uploader = new Upload({
client: r2,
params: {
Expand All @@ -231,7 +237,45 @@ async function upload_collection(file, name) {
await r2uploader.done();

console.error(`ok - uploaded: r2://${process.env.R2Bucket}/v2.openaddresses.io/${process.env.StackName}/collection-${name}.zip`);
}

async function upload_parquet_collection(file, name) {
const s3uploader = new Upload({
client: s3,
params: {
ContentType: 'application/vnd.apache.parquet',
Body: fs.createReadStream(file),
Bucket: process.env.Bucket,
Key: `${process.env.StackName}/collection-${name}.parquet`
}
});

await s3uploader.done();

console.error(`ok - s3://${process.env.Bucket}/${process.env.StackName}/collection-${name}.parquet`);

const r2 = new S3.S3Client({
region: 'auto',
credentials: {
accessKeyId: process.env.R2_ACCESS_KEY_ID,
secretAccessKey: process.env.R2_SECRET_ACCESS_KEY
},
endpoint: `https://${process.env.CLOUDFLARE_ACCOUNT_ID}.r2.cloudflarestorage.com`
});

const r2uploader = new Upload({
client: r2,
params: {
ContentType: 'application/vnd.apache.parquet',
Body: fs.createReadStream(file),
Bucket: process.env.R2Bucket,
Key: `v2.openaddresses.io/${process.env.StackName}/collection-${name}.parquet`
}
});

await r2uploader.done();

console.error(`ok - uploaded: r2://${process.env.R2Bucket}/v2.openaddresses.io/${process.env.StackName}/collection-${name}.parquet`);
}

function zip_datas(tmp, datas, name) {
Expand Down Expand Up @@ -273,3 +317,57 @@ function zip_datas(tmp, datas, name) {
archive.finalize();
});
}

function parquet_datas(tmp, datas, name) {
return new Promise((resolve, reject) => {
const schema = {
source_name: { type: 'UTF8' },
geometry: { type: 'blob' },
id: { type: 'UTF8' },
pid: { type: 'UTF8' },
number: { type: 'UTF8' },
street: { type: 'UTF8' },
unit: { type: 'UTF8' },
city: { type: 'UTF8' },
postcode: { type: 'UTF8' },
district: { type: 'UTF8' },
region: { type: 'UTF8' },
addrtype: { type: 'UTF8' },
notes: { type: 'UTF8' },
};
const writer = parquet.ParquetWriter.openFile(schema, path.resolve(tmp, `${name}.parquet`));

for (const data of datas) {
const resolved_data_filename = path.resolve(tmp, 'sources', data);

// Read the file and parse it as linefeed-delimited JSON
const data_stream = fs.createReadStream(resolved_data_filename);
const data_lines = data_stream.pipe(split());
data_lines.on('data', (line) => {
const record = JSON.parse(line);
const properties = record.properties;
writer.appendRow({
source_name: data,
geometry: record.geometry, // TODO: Convert to WKB
id: properties.id,
pid: properties.pid,
number: properties.number,
street: properties.street,
unit: properties.unit,
city: properties.city,
postcode: properties.postcode,
district: properties.district,
region: properties.region,
addrtype: properties.addrtype,
notes: properties.notes,
});
});
data_lines.on('end', () => {
console.error(`ok - ${resolved_data_filename} processed and appended to parquet file`);
});
}

writer.close();
return resolve(path.resolve(tmp, `${name}.parquet`));
});
}
Loading