Skip to content

Commit 5912025

Browse files
authored
Add support for reading CSV files with comments (apache#10467)
This patch adds support for parsing CSV files containing comment lines. Closes apache#10262.
1 parent 29fda88 commit 5912025

File tree

20 files changed

+156
-1
lines changed

20 files changed

+156
-1
lines changed

datafusion-examples/examples/csv_opener.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ async fn main() -> Result<()> {
4848
b',',
4949
b'"',
5050
object_store,
51+
Some(b'#'),
5152
);
5253

5354
let opener = CsvOpener::new(Arc::new(config), FileCompressionType::UNCOMPRESSED);

datafusion/common/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1567,6 +1567,7 @@ config_namespace! {
15671567
pub timestamp_tz_format: Option<String>, default = None
15681568
pub time_format: Option<String>, default = None
15691569
pub null_value: Option<String>, default = None
1570+
pub comment: Option<u8>, default = None
15701571
}
15711572
}
15721573

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,12 @@ impl CsvFormat {
147147
self.options.has_header
148148
}
149149

150+
/// Lines beginning with this byte are ignored.
151+
pub fn with_comment(mut self, comment: Option<u8>) -> Self {
152+
self.options.comment = comment;
153+
self
154+
}
155+
150156
/// The character separating values within a row.
151157
/// - default to ','
152158
pub fn with_delimiter(mut self, delimiter: u8) -> Self {
@@ -252,6 +258,7 @@ impl FileFormat for CsvFormat {
252258
self.options.delimiter,
253259
self.options.quote,
254260
self.options.escape,
261+
self.options.comment,
255262
self.options.compression.into(),
256263
);
257264
Ok(Arc::new(exec))
@@ -300,7 +307,7 @@ impl CsvFormat {
300307
pin_mut!(stream);
301308

302309
while let Some(chunk) = stream.next().await.transpose()? {
303-
let format = arrow::csv::reader::Format::default()
310+
let mut format = arrow::csv::reader::Format::default()
304311
.with_header(
305312
first_chunk
306313
&& self
@@ -310,6 +317,10 @@ impl CsvFormat {
310317
)
311318
.with_delimiter(self.options.delimiter);
312319

320+
if let Some(comment) = self.options.comment {
321+
format = format.with_comment(comment);
322+
}
323+
313324
let (Schema { fields, .. }, records_read) =
314325
format.infer_schema(chunk.reader(), Some(records_to_read))?;
315326

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ pub struct CsvReadOptions<'a> {
6161
pub quote: u8,
6262
/// An optional escape character. Defaults to None.
6363
pub escape: Option<u8>,
64+
/// If enabled, lines beginning with this byte are ignored.
65+
pub comment: Option<u8>,
6466
/// An optional schema representing the CSV files. If None, CSV reader will try to infer it
6567
/// based on data in file.
6668
pub schema: Option<&'a Schema>,
@@ -97,6 +99,7 @@ impl<'a> CsvReadOptions<'a> {
9799
table_partition_cols: vec![],
98100
file_compression_type: FileCompressionType::UNCOMPRESSED,
99101
file_sort_order: vec![],
102+
comment: None,
100103
}
101104
}
102105

@@ -106,6 +109,12 @@ impl<'a> CsvReadOptions<'a> {
106109
self
107110
}
108111

112+
/// Specify comment char to use for CSV read
113+
pub fn comment(mut self, comment: u8) -> Self {
114+
self.comment = Some(comment);
115+
self
116+
}
117+
109118
/// Specify delimiter to use for CSV read
110119
pub fn delimiter(mut self, delimiter: u8) -> Self {
111120
self.delimiter = delimiter;
@@ -477,6 +486,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
477486
let file_format = CsvFormat::default()
478487
.with_options(table_options.csv)
479488
.with_has_header(self.has_header)
489+
.with_comment(self.comment)
480490
.with_delimiter(self.delimiter)
481491
.with_quote(self.quote)
482492
.with_escape(self.escape)

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pub struct CsvExec {
5858
delimiter: u8,
5959
quote: u8,
6060
escape: Option<u8>,
61+
comment: Option<u8>,
6162
/// Execution metrics
6263
metrics: ExecutionPlanMetricsSet,
6364
/// Compression type of the file associated with CsvExec
@@ -73,6 +74,7 @@ impl CsvExec {
7374
delimiter: u8,
7475
quote: u8,
7576
escape: Option<u8>,
77+
comment: Option<u8>,
7678
file_compression_type: FileCompressionType,
7779
) -> Self {
7880
let (projected_schema, projected_statistics, projected_output_ordering) =
@@ -92,6 +94,7 @@ impl CsvExec {
9294
metrics: ExecutionPlanMetricsSet::new(),
9395
file_compression_type,
9496
cache,
97+
comment,
9598
}
9699
}
97100

@@ -113,6 +116,11 @@ impl CsvExec {
113116
self.quote
114117
}
115118

119+
/// Lines beginning with this byte are ignored.
120+
pub fn comment(&self) -> Option<u8> {
121+
self.comment
122+
}
123+
116124
/// The escape character
117125
pub fn escape(&self) -> Option<u8> {
118126
self.escape
@@ -234,6 +242,7 @@ impl ExecutionPlan for CsvExec {
234242
quote: self.quote,
235243
escape: self.escape,
236244
object_store,
245+
comment: self.comment,
237246
});
238247

239248
let opener = CsvOpener {
@@ -265,9 +274,11 @@ pub struct CsvConfig {
265274
quote: u8,
266275
escape: Option<u8>,
267276
object_store: Arc<dyn ObjectStore>,
277+
comment: Option<u8>,
268278
}
269279

270280
impl CsvConfig {
281+
#[allow(clippy::too_many_arguments)]
271282
/// Returns a [`CsvConfig`]
272283
pub fn new(
273284
batch_size: usize,
@@ -277,6 +288,7 @@ impl CsvConfig {
277288
delimiter: u8,
278289
quote: u8,
279290
object_store: Arc<dyn ObjectStore>,
291+
comment: Option<u8>,
280292
) -> Self {
281293
Self {
282294
batch_size,
@@ -287,6 +299,7 @@ impl CsvConfig {
287299
quote,
288300
escape: None,
289301
object_store,
302+
comment,
290303
}
291304
}
292305
}
@@ -309,6 +322,9 @@ impl CsvConfig {
309322
if let Some(escape) = self.escape {
310323
builder = builder.with_escape(escape)
311324
}
325+
if let Some(comment) = self.comment {
326+
builder = builder.with_comment(comment);
327+
}
312328

313329
builder
314330
}
@@ -570,6 +586,7 @@ mod tests {
570586
b',',
571587
b'"',
572588
None,
589+
None,
573590
file_compression_type.to_owned(),
574591
);
575592
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -636,6 +653,7 @@ mod tests {
636653
b',',
637654
b'"',
638655
None,
656+
None,
639657
file_compression_type.to_owned(),
640658
);
641659
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -702,6 +720,7 @@ mod tests {
702720
b',',
703721
b'"',
704722
None,
723+
None,
705724
file_compression_type.to_owned(),
706725
);
707726
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -765,6 +784,7 @@ mod tests {
765784
b',',
766785
b'"',
767786
None,
787+
None,
768788
file_compression_type.to_owned(),
769789
);
770790
assert_eq!(14, csv.base_config.file_schema.fields().len());
@@ -827,6 +847,7 @@ mod tests {
827847
b',',
828848
b'"',
829849
None,
850+
None,
830851
file_compression_type.to_owned(),
831852
);
832853
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -921,6 +942,7 @@ mod tests {
921942
b',',
922943
b'"',
923944
None,
945+
None,
924946
file_compression_type.to_owned(),
925947
);
926948

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1471,6 +1471,7 @@ pub(crate) mod tests {
14711471
b',',
14721472
b'"',
14731473
None,
1474+
None,
14741475
FileCompressionType::UNCOMPRESSED,
14751476
))
14761477
}
@@ -1494,6 +1495,7 @@ pub(crate) mod tests {
14941495
b',',
14951496
b'"',
14961497
None,
1498+
None,
14971499
FileCompressionType::UNCOMPRESSED,
14981500
))
14991501
}
@@ -3767,6 +3769,7 @@ pub(crate) mod tests {
37673769
b',',
37683770
b'"',
37693771
None,
3772+
None,
37703773
compression_type,
37713774
)),
37723775
vec![("a".to_string(), "a".to_string())],

datafusion/core/src/physical_optimizer/projection_pushdown.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ fn try_swapping_with_csv(
185185
csv.delimiter(),
186186
csv.quote(),
187187
csv.escape(),
188+
csv.comment(),
188189
csv.file_compression_type,
189190
)) as _
190191
})
@@ -1686,6 +1687,7 @@ mod tests {
16861687
0,
16871688
0,
16881689
None,
1690+
None,
16891691
FileCompressionType::UNCOMPRESSED,
16901692
))
16911693
}
@@ -1708,6 +1710,7 @@ mod tests {
17081710
0,
17091711
0,
17101712
None,
1713+
None,
17111714
FileCompressionType::UNCOMPRESSED,
17121715
))
17131716
}

datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1502,6 +1502,7 @@ mod tests {
15021502
0,
15031503
b'"',
15041504
None,
1505+
None,
15051506
FileCompressionType::UNCOMPRESSED,
15061507
))
15071508
}

datafusion/core/src/test/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result<Arc<Cs
9898
b',',
9999
b'"',
100100
None,
101+
None,
101102
FileCompressionType::UNCOMPRESSED,
102103
)))
103104
}
@@ -282,6 +283,7 @@ pub fn csv_exec_sorted(
282283
0,
283284
0,
284285
None,
286+
None,
285287
FileCompressionType::UNCOMPRESSED,
286288
))
287289
}
@@ -337,6 +339,7 @@ pub fn csv_exec_ordered(
337339
0,
338340
b'"',
339341
None,
342+
None,
340343
FileCompressionType::UNCOMPRESSED,
341344
))
342345
}

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,7 @@ message CsvOptions {
397397
string timestamp_tz_format = 10; // Optional timestamp with timezone format
398398
string time_format = 11; // Optional time format
399399
string null_value = 12; // Optional representation of null value
400+
bytes comment = 13; // Optional comment character as a byte
400401
}
401402

402403
// Options controlling CSV format

0 commit comments

Comments
 (0)