Skip to content

Commit 137d519

Browse files
committed
Upgrade parquet, refactor, row count and negative offsets
1 parent 870115b commit 137d519

File tree

4 files changed

+109
-105
lines changed

4 files changed

+109
-105
lines changed

Cargo.lock

Lines changed: 11 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "parquet2json"
3-
description = "A command-line tool for converting Parquet to newline-delimited JSON"
4-
version = "1.6.1"
3+
description = "A command-line tool for streaming Parquet as line-delimited JSON"
4+
version = "2.0.0"
55
edition = "2018"
66
license = "MIT"
77
authors = ["Pieter Raubenheimer <[email protected]>"]
@@ -14,7 +14,7 @@ crossbeam-channel = { version = "0.5.1" }
1414
clap = "3.1.6"
1515
lazy_static = { version = "1.4.0" }
1616
openssl = { version = "0.10", features = ["vendored"] }
17-
parquet = { version = "10.0.0", features = ["cli"] }
17+
parquet = { version = "11.0.0", features = ["cli"] }
1818
regex = { version = "1.5.4" }
1919
reqwest = { version = "0.11.10", features = ["blocking"] }
2020
rusoto_core = { version = "0.47.0", default_features = false, features=["rustls"] }

README.md

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
# parquet2json
22

3-
A command-line tool for converting [Parquet](https://parquet.apache.org) to [newline-delimited JSON](https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON).
3+
A command-line tool for streaming [Parquet](https://parquet.apache.org) as [line-delimited JSON](https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON).
44

5-
It uses the excellent [Apache Parquet Official Native Rust Implementation](https://github.com/apache/arrow-rs/tree/master/parquet).
5+
It reads only required ranges from file, HTTP or S3 locations, and supports offset/limit and column selection.
6+
7+
It uses the [Apache Parquet Official Native Rust Implementation](https://github.com/apache/arrow-rs/tree/master/parquet) which has excellent support for compression formats and complex types.
68

79
## How to use it
810

@@ -13,18 +15,21 @@ $ cargo install parquet2json
1315
$ parquet2json --help
1416

1517
USAGE:
16-
parquet2json [OPTIONS] <FILE>
18+
parquet2json [OPTIONS] <FILE> <SUBCOMMAND>
1719

1820
ARGS:
1921
<FILE> Location of Parquet input file (file path, HTTP or S3 URL)
2022

2123
OPTIONS:
22-
-o, --offset <OFFSET> Starts outputting from this row [default: 0]
23-
-l, --limit <LIMIT> Maximum number of rows to output
24-
-t, --timeout <TIMEOUT> Request timeout in seconds [default: 60]
25-
-s, --schema-output <SCHEMA_OUTPUT> Outputs thrift schema only
26-
-c, --columns <COLUMNS> Select columns by name (comma,separated)
27-
-h, --help Print help information
24+
-t, --timeout <TIMEOUT> Request timeout in seconds [default: 60]
25+
-h, --help Print help information
26+
-V, --version Print version information
27+
28+
SUBCOMMANDS:
29+
cat Outputs data as JSON lines
30+
schema Outputs the Thrift schema
31+
rowcount Outputs only the total row count
32+
help Print this message or the help of the given subcommand(s)
2833
```
2934

3035
### S3 Settings
@@ -40,23 +45,23 @@ Use it to stream output to files and other tools such as `grep` and [jq](https:/
4045
#### Output to a file
4146

4247
```shell
43-
$ parquet2json ./myfile.pq > output.ndjson
48+
$ parquet2json ./myfile.pq cat > output.jsonl
4449
```
4550

46-
#### Filter with jq
51+
#### From S3 or HTTP (S3)
4752

4853
```shell
49-
$ parquet2json ./myfile.pq | jq 'select(.level==3) | .id'
54+
$ parquet2json s3://amazon-reviews-pds/parquet/product_category=Gift_Card/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet cat
5055
```
5156

52-
#### From S3 or HTTP (S3)
53-
5457
```shell
55-
$ parquet2json s3://amazon-reviews-pds/parquet/product_category=Gift_Card/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
58+
$ parquet2json https://amazon-reviews-pds.s3.us-east-1.amazonaws.com/parquet/product_category%3DGift_Card/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet cat
5659
```
5760

61+
#### Filter selected columns with jq
62+
5863
```shell
59-
$ parquet2json https://amazon-reviews-pds.s3.us-east-1.amazonaws.com/parquet/product_category%3DGift_Card/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
64+
$ parquet2json ./myfile.pq cat --columns=url,level | jq 'select(.level==3) | .url'
6065
```
6166

6267
## License

src/main.rs

Lines changed: 74 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
use core::time::Duration;
2+
use std::convert::TryInto;
23
use std::fs::File;
4+
use std::ops::Add;
35
use std::path::Path;
46
use std::sync::Arc;
57

6-
use clap::{AppSettings, Parser};
8+
use clap::{AppSettings, Parser, Subcommand};
79
use parquet::file::reader::{ChunkReader, FileReader, SerializedFileReader};
810
use parquet::record::reader::RowIter;
911
use parquet::schema::printer::print_schema;
@@ -101,25 +103,48 @@ fn get_projection<R: 'static + ChunkReader>(
101103
}
102104
}
103105

104-
async fn print_json_from(
105-
source: Source,
106-
offset: u32,
107-
limit: i32,
108-
should_output_schema: bool,
109-
timeout: Duration,
110-
column_names: Option<String>,
106+
fn output_for_command<R: 'static + ChunkReader>(
107+
file_reader: SerializedFileReader<R>,
108+
command: &Commands,
111109
) {
110+
match command {
111+
Commands::Cat {
112+
offset,
113+
limit,
114+
columns,
115+
} => {
116+
let absolute_offset: u32 = if offset.is_negative() {
117+
let parquet_metadata = file_reader.metadata();
118+
parquet_metadata
119+
.file_metadata()
120+
.num_rows()
121+
.add(offset + 1)
122+
.try_into()
123+
.unwrap()
124+
} else {
125+
offset.abs().try_into().unwrap()
126+
};
127+
let projection = get_projection(&file_reader, columns.clone());
128+
output_rows(&file_reader, projection, absolute_offset, *limit);
129+
}
130+
Commands::Schema {} => {
131+
output_thrift_schema(&file_reader);
132+
}
133+
Commands::Rowcount {} => {
134+
let parquet_metadata = file_reader.metadata();
135+
136+
println!("{}", parquet_metadata.file_metadata().num_rows());
137+
}
138+
}
139+
}
140+
141+
async fn handle_command(source: Source, timeout: Duration, command: Commands) {
112142
match source {
113143
Source::File(path) => {
114144
let file = File::open(&Path::new(&path)).unwrap();
115145
let file_reader = SerializedFileReader::new(file).unwrap();
116146

117-
if should_output_schema {
118-
output_thrift_schema(&file_reader);
119-
} else {
120-
let projection = get_projection(&file_reader, column_names);
121-
output_rows(&file_reader, projection, offset, limit);
122-
}
147+
output_for_command(file_reader, &command);
123148
}
124149
Source::Http(url_str) => {
125150
let mut reader = HttpChunkReader::new_unknown_size(url_str).await;
@@ -128,12 +153,7 @@ async fn print_json_from(
128153
let blocking_task = tokio::task::spawn_blocking(move || {
129154
let file_reader = SerializedFileReader::new(reader).unwrap();
130155

131-
if should_output_schema {
132-
output_thrift_schema(&file_reader);
133-
} else {
134-
let projection = get_projection(&file_reader, column_names);
135-
output_rows(&file_reader, projection, offset, limit);
136-
}
156+
output_for_command(file_reader, &command);
137157
});
138158
blocking_task.await.unwrap();
139159
}
@@ -153,12 +173,7 @@ async fn print_json_from(
153173
let blocking_task = tokio::task::spawn_blocking(move || {
154174
let file_reader = SerializedFileReader::new(reader).unwrap();
155175

156-
if should_output_schema {
157-
output_thrift_schema(&file_reader);
158-
} else {
159-
let projection = get_projection(&file_reader, column_names);
160-
output_rows(&file_reader, projection, offset, limit);
161-
}
176+
output_for_command(file_reader, &command);
162177
});
163178
blocking_task.await.unwrap();
164179
}
@@ -172,67 +187,51 @@ struct Cli {
172187
/// Location of Parquet input file (file path, HTTP or S3 URL)
173188
file: String,
174189

175-
/// Starts outputting from this row
176-
#[clap(default_value_t = 0, short, long, parse(try_from_str))]
177-
offset: u32,
178-
179-
/// Maximum number of rows to output
180-
#[clap(short, long, parse(try_from_str))]
181-
limit: Option<i32>,
182-
183190
/// Request timeout in seconds
184191
#[clap(default_value_t = 60, short, long, parse(try_from_str))]
185192
timeout: u16,
186193

187-
/// Outputs thrift schema only
188-
#[clap(short, long)]
189-
schema_output: Option<bool>,
194+
#[clap(subcommand)]
195+
command: Commands,
196+
}
197+
198+
#[derive(Subcommand)]
199+
enum Commands {
200+
/// Outputs data as JSON lines
201+
Cat {
202+
/// Starts outputting from this row (first row: 0, last row: -1)
203+
#[clap(default_value_t = 0, short, long, parse(try_from_str))]
204+
offset: i64,
205+
206+
/// Maximum number of rows to output
207+
#[clap(short, long, parse(try_from_str), default_value_t = -1)]
208+
limit: i32,
190209

191-
/// Select columns by name (comma,separated)
192-
#[clap(short, long)]
193-
columns: Option<String>,
210+
/// Select columns by name (comma,separated)
211+
#[clap(short, long)]
212+
columns: Option<String>,
213+
},
214+
215+
/// Outputs the Thrift schema
216+
Schema {},
217+
218+
/// Outputs only the total row count
219+
Rowcount {},
194220
}
195221

196222
#[tokio::main]
197223
async fn main() {
198224
let cli = Cli::parse();
199-
200-
let output_thrift_schema = cli.schema_output.unwrap_or(false);
201-
let offset = cli.offset;
202-
let limit: i32 = cli.limit.unwrap_or(-1);
203-
let timeout = Duration::from_secs(cli.timeout.into());
204225
let file = cli.file;
205-
let column_names = cli.columns;
226+
let timeout = Duration::from_secs(cli.timeout.into());
206227

207-
if file.as_str().starts_with("s3://") {
208-
print_json_from(
209-
Source::S3(file),
210-
offset,
211-
limit,
212-
output_thrift_schema,
213-
timeout,
214-
column_names,
215-
)
216-
.await;
228+
let source = if file.as_str().starts_with("s3://") {
229+
Source::S3(file)
217230
} else if file.as_str().starts_with("http") {
218-
print_json_from(
219-
Source::Http(file),
220-
offset,
221-
limit,
222-
output_thrift_schema,
223-
timeout,
224-
column_names,
225-
)
226-
.await;
231+
Source::Http(file)
227232
} else {
228-
print_json_from(
229-
Source::File(file),
230-
offset,
231-
limit,
232-
output_thrift_schema,
233-
timeout,
234-
column_names,
235-
)
236-
.await;
237-
}
233+
Source::File(file)
234+
};
235+
236+
handle_command(source, timeout, cli.command).await;
238237
}

0 commit comments

Comments
 (0)