From 2f8ec525a864bc48e7a4e657a92a392b9dd17073 Mon Sep 17 00:00:00 2001 From: Ian Dees Date: Sun, 23 Jun 2024 20:37:25 -0500 Subject: [PATCH] First sketch at outputting Parquet formatted data --- task/collect.js | 104 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 101 insertions(+), 3 deletions(-) diff --git a/task/collect.js b/task/collect.js index ea8ffdc2..f97994d2 100755 --- a/task/collect.js +++ b/task/collect.js @@ -16,6 +16,7 @@ import { mkdirp } from 'mkdirp'; import S3 from '@aws-sdk/client-s3'; import { Upload } from '@aws-sdk/lib-storage'; import archiver from 'archiver'; +import parquetjs from '@dsnp/parquetjs'; import minimist from 'minimist'; import { Transform } from 'stream'; @@ -108,9 +109,15 @@ async function collect(tmp, collection, oa) { const zip = await zip_datas(tmp, collection_data, collection.name); console.error(`ok - zip created: ${zip}`); - await upload_collection(zip, collection.name); + await upload_zip_collection(zip, collection.name); console.error('ok - archive uploaded'); + const pq = await parquet_datas(tmp, collection_data, collection.name); + + console.error(`ok - parquet created: ${pq}`); + await upload_parquet_collection(pq, collection.name); + console.error('ok - parquet uploaded'); + await oa.cmd('collection', 'update', { ':collection': collection.id, size: fs.statSync(zip).size @@ -193,7 +200,7 @@ async function get_source(oa, tmp, data, stats) { return path.resolve(tmp, 'sources', dir, source); } -async function upload_collection(file, name) { +async function upload_zip_collection(file, name) { const s3uploader = new Upload({ client: s3, params: { @@ -217,7 +224,6 @@ async function upload_collection(file, name) { endpoint: `https://${process.env.CLOUDFLARE_ACCOUNT_ID}.r2.cloudflarestorage.com` }); - const r2uploader = new Upload({ client: r2, params: { @@ -231,7 +237,45 @@ async function upload_collection(file, name) { await r2uploader.done(); console.error(`ok - uploaded: r2://${process.env.R2Bucket}/v2.openaddresses.io/${process.env.StackName}/collection-${name}.zip`); +} + +async function upload_parquet_collection(file, name) { + const s3uploader = new Upload({ + client: s3, + params: { + ContentType: 'application/vnd.apache.parquet', + Body: fs.createReadStream(file), + Bucket: process.env.Bucket, + Key: `${process.env.StackName}/collection-${name}.parquet` + } + }); + + await s3uploader.done(); + + console.error(`ok - s3://${process.env.Bucket}/${process.env.StackName}/collection-${name}.parquet`); + + const r2 = new S3.S3Client({ + region: 'auto', + credentials: { + accessKeyId: process.env.R2_ACCESS_KEY_ID, + secretAccessKey: process.env.R2_SECRET_ACCESS_KEY + }, + endpoint: `https://${process.env.CLOUDFLARE_ACCOUNT_ID}.r2.cloudflarestorage.com` + }); + + const r2uploader = new Upload({ + client: r2, + params: { + ContentType: 'application/vnd.apache.parquet', + Body: fs.createReadStream(file), + Bucket: process.env.R2Bucket, + Key: `v2.openaddresses.io/${process.env.StackName}/collection-${name}.parquet` + } + }); + + await r2uploader.done(); + console.error(`ok - uploaded: r2://${process.env.R2Bucket}/v2.openaddresses.io/${process.env.StackName}/collection-${name}.parquet`); } function zip_datas(tmp, datas, name) { @@ -273,3 +317,57 @@ function zip_datas(tmp, datas, name) { archive.finalize(); }); } + +function parquet_datas(tmp, datas, name) { + return new Promise((resolve, reject) => { + const schema = { + source_name: { type: 'UTF8' }, + geometry: { type: 'blob' }, + id: { type: 'UTF8' }, + pid: { type: 'UTF8' }, + number: { type: 'UTF8' }, + street: { type: 'UTF8' }, + unit: { type: 'UTF8' }, + city: { type: 'UTF8' }, + postcode: { type: 'UTF8' }, + district: { type: 'UTF8' }, + region: { type: 'UTF8' }, + addrtype: { type: 'UTF8' }, + notes: { type: 'UTF8' }, + }; + const writer = parquet.ParquetWriter.openFile(schema, path.resolve(tmp, `${name}.parquet`)); + + for (const data of datas) { + const resolved_data_filename = path.resolve(tmp, 'sources', data); + + // Read the file and parse it as linefeed-delimited JSON + const data_stream = fs.createReadStream(resolved_data_filename); + const data_lines = data_stream.pipe(split()); + data_lines.on('data', (line) => { + const record = JSON.parse(line); + const properties = record.properties; + writer.appendRow({ + source_name: data, + geometry: record.geometry, // TODO: Convert to WKB + id: properties.id, + pid: properties.pid, + number: properties.number, + street: properties.street, + unit: properties.unit, + city: properties.city, + postcode: properties.postcode, + district: properties.district, + region: properties.region, + addrtype: properties.addrtype, + notes: properties.notes, + }); + }); + data_lines.on('end', () => { + console.error(`ok - ${resolved_data_filename} processed and appended to parquet file`); + }); + } + + writer.close(); + return resolve(path.resolve(tmp, `${name}.parquet`)); + }); +}