Skip to content

Commit 60fbdac

Browse files
committed
refactor: restore the ability to add kv metadata into the generated file sink
1 parent 391e074 commit 60fbdac

File tree

5 files changed

+30
-4
lines changed

5 files changed

+30
-4
lines changed

datafusion/common/src/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ use std::collections::{BTreeMap, HashMap};
2222
use std::fmt::{self, Display};
2323
use std::str::FromStr;
2424

25+
#[cfg(feature = "parquet")]
26+
use parquet::file::metadata::KeyValue;
27+
2528
use crate::error::_config_err;
2629
use crate::parsers::CompressionTypeVariant;
2730
use crate::{DataFusionError, FileType, Result};
@@ -1368,6 +1371,10 @@ pub struct TableParquetOptions {
13681371
pub global: ParquetOptions,
13691372
/// Column specific options. Default usage is parquet.XX::column.
13701373
pub column_specific_options: HashMap<String, ColumnOptions>,
1374+
/// Optional, additional metadata to be inserted into the key_value_metadata
1375+
/// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
1376+
#[cfg(feature = "parquet")]
1377+
pub key_value_metadata: Option<Vec<KeyValue>>,
13711378
}
13721379

13731380
impl ConfigField for TableParquetOptions {

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
8888
.set_created_by(created_by.clone())
8989
.set_column_index_truncate_length(*column_index_truncate_length)
9090
.set_data_page_row_count_limit(*data_page_row_count_limit)
91-
.set_bloom_filter_enabled(*bloom_filter_enabled);
91+
.set_bloom_filter_enabled(*bloom_filter_enabled)
92+
.set_key_value_metadata(parquet_options.key_value_metadata.clone());
9293

9394
if let Some(encoding) = &encoding {
9495
builder = builder.set_encoding(parse_encoding_string(encoding)?);

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,7 +1136,7 @@ mod tests {
11361136
};
11371137
use parquet::arrow::arrow_reader::ArrowReaderOptions;
11381138
use parquet::arrow::ParquetRecordBatchStreamBuilder;
1139-
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
1139+
use parquet::file::metadata::{KeyValue, ParquetColumnIndex, ParquetOffsetIndex};
11401140
use parquet::file::page_index::index::Index;
11411141
use tokio::fs::File;
11421142
use tokio::io::AsyncWrite;
@@ -1865,7 +1865,13 @@ mod tests {
18651865
};
18661866
let parquet_sink = Arc::new(ParquetSink::new(
18671867
file_sink_config,
1868-
TableParquetOptions::default(),
1868+
TableParquetOptions {
1869+
key_value_metadata: Some(vec![KeyValue {
1870+
key: "my-data".into(),
1871+
value: Some("stuff".to_string()),
1872+
}]),
1873+
..Default::default()
1874+
},
18691875
));
18701876

18711877
// create data
@@ -1899,7 +1905,10 @@ mod tests {
18991905
let (
19001906
path,
19011907
FileMetaData {
1902-
num_rows, schema, ..
1908+
num_rows,
1909+
schema,
1910+
key_value_metadata,
1911+
..
19031912
},
19041913
) = written.take(1).next().unwrap();
19051914
let path_parts = path.parts().collect::<Vec<_>>();
@@ -1915,6 +1924,13 @@ mod tests {
19151924
"output file metadata should contain col b"
19161925
);
19171926

1927+
let key_value_metadata = key_value_metadata.unwrap();
1928+
let my_metadata = key_value_metadata
1929+
.iter()
1930+
.filter(|kv| kv.key == "my-data")
1931+
.collect::<Vec<_>>();
1932+
assert_eq!(my_metadata.len(), 1);
1933+
19181934
Ok(())
19191935
}
19201936

datafusion/proto/src/generated/pbjson.rs

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
974974
.unwrap()
975975
.unwrap(),
976976
column_specific_options,
977+
key_value_metadata: None,
977978
})
978979
}
979980
}

0 commit comments

Comments
 (0)