Skip to content

Commit 5b08db3

Browse files
connecalamb
authored andcommitted
Support newlines_in_values CSV option (apache#11533)
* feat!: support `newlines_in_values` CSV option This significantly simplifies the UX when dealing with large CSV files that must support newlines in (quoted) values. By default, large CSV files will be repartitioned into multiple parallel range scans. This is great for performance in the common case but when large CSVs contain newlines in values the parallel scan will fail due to splitting on newlines within quotes rather than actual line terminators. With the current implementation, this behaviour can be controlled by the session-level `datafusion.optimizer.repartition_file_scans` and `datafusion.optimizer.repartition_file_min_size` settings. This commit introduces a `newlines_in_values` option to `CsvOptions` and plumbs it through to `CsvExec`, which includes it in the test for whether parallel execution is supported. This provides a convenient and searchable way to disable file scan repartitioning on a per-CSV basis. BREAKING CHANGE: This adds new public fields to types with all public fields, which is a breaking change. * docs: normalise `newlines_in_values` documentation * test: add/fix sqllogictests for `newlines_in_values` * docs: document `datafusion.catalog.newlines_in_values` * fix: typo in config.md * chore: suppress lint on too many arguments for `CsvExec::new` * fix: always checkout `*.slt` with LF line endings This is a bit of a stab in the dark, but it might fix multiline tests on Windows. * fix: always checkout `newlines_in_values.csv` with `LF` line endings The default git behaviour of converting line endings for checked out files causes the `csv_files.slt` test to fail when testing `newlines_in_values`. This appears to be due to the quoted newlines being converted to CRLF, which are not then normalised when the CSV is read. Assuming that the sqllogictests do normalise line endings in the expected output, this could then lead to a "spurious" diff from the actual output. --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 8bab5b7 commit 5b08db3

File tree

23 files changed

+250
-3
lines changed

23 files changed

+250
-3
lines changed

.gitattributes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
.github/ export-ignore
2+
datafusion/core/tests/data/newlines_in_values.csv text eol=lf
23
datafusion/proto/src/generated/prost.rs linguist-generated
34
datafusion/proto/src/generated/pbjson.rs linguist-generated

datafusion/common/src/config.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,16 @@ config_namespace! {
184184
/// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
185185
/// if not specified explicitly in the statement.
186186
pub has_header: bool, default = false
187+
188+
/// Specifies whether newlines in (quoted) CSV values are supported.
189+
///
190+
/// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
191+
/// if not specified explicitly in the statement.
192+
///
193+
/// Parsing newlines in quoted values may be affected by execution behaviour such as
194+
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
195+
/// parsed successfully, which may reduce performance.
196+
pub newlines_in_values: bool, default = false
187197
}
188198
}
189199

@@ -1593,6 +1603,14 @@ config_namespace! {
15931603
pub quote: u8, default = b'"'
15941604
pub escape: Option<u8>, default = None
15951605
pub double_quote: Option<bool>, default = None
1606+
/// Specifies whether newlines in (quoted) values are supported.
1607+
///
1608+
/// Parsing newlines in quoted values may be affected by execution behaviour such as
1609+
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
1610+
/// parsed successfully, which may reduce performance.
1611+
///
1612+
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
1613+
pub newlines_in_values: Option<bool>, default = None
15961614
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
15971615
pub schema_infer_max_rec: usize, default = 100
15981616
pub date_format: Option<String>, default = None
@@ -1665,6 +1683,18 @@ impl CsvOptions {
16651683
self
16661684
}
16671685

1686+
/// Specifies whether newlines in (quoted) values are supported.
1687+
///
1688+
/// Parsing newlines in quoted values may be affected by execution behaviour such as
1689+
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
1690+
/// parsed successfully, which may reduce performance.
1691+
///
1692+
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
1693+
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
1694+
self.newlines_in_values = Some(newlines_in_values);
1695+
self
1696+
}
1697+
16681698
/// Set a `CompressionTypeVariant` of CSV
16691699
/// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
16701700
pub fn with_file_compression_type(

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,18 @@ impl CsvFormat {
233233
self
234234
}
235235

236+
/// Specifies whether newlines in (quoted) values are supported.
237+
///
238+
/// Parsing newlines in quoted values may be affected by execution behaviour such as
239+
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
240+
/// parsed successfully, which may reduce performance.
241+
///
242+
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
243+
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
244+
self.options.newlines_in_values = Some(newlines_in_values);
245+
self
246+
}
247+
236248
/// Set a `FileCompressionType` of CSV
237249
/// - defaults to `FileCompressionType::UNCOMPRESSED`
238250
pub fn with_file_compression_type(
@@ -330,6 +342,9 @@ impl FileFormat for CsvFormat {
330342
self.options.quote,
331343
self.options.escape,
332344
self.options.comment,
345+
self.options
346+
.newlines_in_values
347+
.unwrap_or(state.config_options().catalog.newlines_in_values),
333348
self.options.compression.into(),
334349
);
335350
Ok(Arc::new(exec))
@@ -1052,6 +1067,41 @@ mod tests {
10521067
Ok(())
10531068
}
10541069

1070+
#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
1071+
#[tokio::test]
1072+
async fn test_csv_parallel_newlines_in_values(n_partitions: usize) -> Result<()> {
1073+
let config = SessionConfig::new()
1074+
.with_repartition_file_scans(true)
1075+
.with_repartition_file_min_size(0)
1076+
.with_target_partitions(n_partitions);
1077+
let csv_options = CsvReadOptions::default()
1078+
.has_header(true)
1079+
.newlines_in_values(true);
1080+
let ctx = SessionContext::new_with_config(config);
1081+
let testdata = arrow_test_data();
1082+
ctx.register_csv(
1083+
"aggr",
1084+
&format!("{testdata}/csv/aggregate_test_100.csv"),
1085+
csv_options,
1086+
)
1087+
.await?;
1088+
1089+
let query = "select sum(c3) from aggr;";
1090+
let query_result = ctx.sql(query).await?.collect().await?;
1091+
let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
1092+
1093+
#[rustfmt::skip]
1094+
let expected = ["+--------------+",
1095+
"| sum(aggr.c3) |",
1096+
"+--------------+",
1097+
"| 781 |",
1098+
"+--------------+"];
1099+
assert_batches_eq!(expected, &query_result);
1100+
assert_eq!(1, actual_partitions); // csv won't be scanned in parallel when newlines_in_values is set
1101+
1102+
Ok(())
1103+
}
1104+
10551105
/// Read a single empty csv file in parallel
10561106
///
10571107
/// empty_0_byte.csv:

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ pub struct CsvReadOptions<'a> {
6363
pub escape: Option<u8>,
6464
/// If enabled, lines beginning with this byte are ignored.
6565
pub comment: Option<u8>,
66+
/// Specifies whether newlines in (quoted) values are supported.
67+
///
68+
/// Parsing newlines in quoted values may be affected by execution behaviour such as
69+
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
70+
/// parsed successfully, which may reduce performance.
71+
///
72+
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
73+
pub newlines_in_values: bool,
6674
/// An optional schema representing the CSV files. If None, CSV reader will try to infer it
6775
/// based on data in file.
6876
pub schema: Option<&'a Schema>,
@@ -95,6 +103,7 @@ impl<'a> CsvReadOptions<'a> {
95103
delimiter: b',',
96104
quote: b'"',
97105
escape: None,
106+
newlines_in_values: false,
98107
file_extension: DEFAULT_CSV_EXTENSION,
99108
table_partition_cols: vec![],
100109
file_compression_type: FileCompressionType::UNCOMPRESSED,
@@ -133,6 +142,18 @@ impl<'a> CsvReadOptions<'a> {
133142
self
134143
}
135144

145+
/// Specifies whether newlines in (quoted) values are supported.
146+
///
147+
/// Parsing newlines in quoted values may be affected by execution behaviour such as
148+
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
149+
/// parsed successfully, which may reduce performance.
150+
///
151+
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
152+
pub fn newlines_in_values(mut self, newlines_in_values: bool) -> Self {
153+
self.newlines_in_values = newlines_in_values;
154+
self
155+
}
156+
136157
/// Specify the file extension for CSV file selection
137158
pub fn file_extension(mut self, file_extension: &'a str) -> Self {
138159
self.file_extension = file_extension;
@@ -490,6 +511,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
490511
.with_delimiter(self.delimiter)
491512
.with_quote(self.quote)
492513
.with_escape(self.escape)
514+
.with_newlines_in_values(self.newlines_in_values)
493515
.with_schema_infer_max_rec(self.schema_infer_max_records)
494516
.with_file_compression_type(self.file_compression_type.to_owned());
495517

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub struct CsvExec {
5959
quote: u8,
6060
escape: Option<u8>,
6161
comment: Option<u8>,
62+
newlines_in_values: bool,
6263
/// Execution metrics
6364
metrics: ExecutionPlanMetricsSet,
6465
/// Compression type of the file associated with CsvExec
@@ -68,13 +69,15 @@ pub struct CsvExec {
6869

6970
impl CsvExec {
7071
/// Create a new CSV reader execution plan provided base and specific configurations
72+
#[allow(clippy::too_many_arguments)]
7173
pub fn new(
7274
base_config: FileScanConfig,
7375
has_header: bool,
7476
delimiter: u8,
7577
quote: u8,
7678
escape: Option<u8>,
7779
comment: Option<u8>,
80+
newlines_in_values: bool,
7881
file_compression_type: FileCompressionType,
7982
) -> Self {
8083
let (projected_schema, projected_statistics, projected_output_ordering) =
@@ -91,6 +94,7 @@ impl CsvExec {
9194
delimiter,
9295
quote,
9396
escape,
97+
newlines_in_values,
9498
metrics: ExecutionPlanMetricsSet::new(),
9599
file_compression_type,
96100
cache,
@@ -126,6 +130,17 @@ impl CsvExec {
126130
self.escape
127131
}
128132

133+
/// Specifies whether newlines in (quoted) values are supported.
134+
///
135+
/// Parsing newlines in quoted values may be affected by execution behaviour such as
136+
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
137+
/// parsed successfully, which may reduce performance.
138+
///
139+
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
140+
pub fn newlines_in_values(&self) -> bool {
141+
self.newlines_in_values
142+
}
143+
129144
fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning {
130145
Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
131146
}
@@ -196,15 +211,15 @@ impl ExecutionPlan for CsvExec {
196211
/// Redistribute files across partitions according to their size
197212
/// See comments on [`FileGroupPartitioner`] for more detail.
198213
///
199-
/// Return `None` if can't get repartitioned(empty/compressed file).
214+
/// Return `None` if can't get repartitioned (empty, compressed file, or `newlines_in_values` set).
200215
fn repartitioned(
201216
&self,
202217
target_partitions: usize,
203218
config: &ConfigOptions,
204219
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
205220
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
206-
// Parallel execution on compressed CSV file is not supported yet.
207-
if self.file_compression_type.is_compressed() {
221+
// Parallel execution on compressed CSV files or files that must support newlines in values is not supported yet.
222+
if self.file_compression_type.is_compressed() || self.newlines_in_values {
208223
return Ok(None);
209224
}
210225

@@ -589,6 +604,7 @@ mod tests {
589604
b'"',
590605
None,
591606
None,
607+
false,
592608
file_compression_type.to_owned(),
593609
);
594610
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -658,6 +674,7 @@ mod tests {
658674
b'"',
659675
None,
660676
None,
677+
false,
661678
file_compression_type.to_owned(),
662679
);
663680
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -727,6 +744,7 @@ mod tests {
727744
b'"',
728745
None,
729746
None,
747+
false,
730748
file_compression_type.to_owned(),
731749
);
732750
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -793,6 +811,7 @@ mod tests {
793811
b'"',
794812
None,
795813
None,
814+
false,
796815
file_compression_type.to_owned(),
797816
);
798817
assert_eq!(14, csv.base_config.file_schema.fields().len());
@@ -858,6 +877,7 @@ mod tests {
858877
b'"',
859878
None,
860879
None,
880+
false,
861881
file_compression_type.to_owned(),
862882
);
863883
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -953,6 +973,7 @@ mod tests {
953973
b'"',
954974
None,
955975
None,
976+
false,
956977
file_compression_type.to_owned(),
957978
);
958979

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1472,6 +1472,7 @@ pub(crate) mod tests {
14721472
b'"',
14731473
None,
14741474
None,
1475+
false,
14751476
FileCompressionType::UNCOMPRESSED,
14761477
))
14771478
}
@@ -1496,6 +1497,7 @@ pub(crate) mod tests {
14961497
b'"',
14971498
None,
14981499
None,
1500+
false,
14991501
FileCompressionType::UNCOMPRESSED,
15001502
))
15011503
}
@@ -3770,6 +3772,7 @@ pub(crate) mod tests {
37703772
b'"',
37713773
None,
37723774
None,
3775+
false,
37733776
compression_type,
37743777
)),
37753778
vec![("a".to_string(), "a".to_string())],

datafusion/core/src/physical_optimizer/projection_pushdown.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ fn try_swapping_with_csv(
186186
csv.quote(),
187187
csv.escape(),
188188
csv.comment(),
189+
csv.newlines_in_values(),
189190
csv.file_compression_type,
190191
)) as _
191192
})
@@ -1700,6 +1701,7 @@ mod tests {
17001701
0,
17011702
None,
17021703
None,
1704+
false,
17031705
FileCompressionType::UNCOMPRESSED,
17041706
))
17051707
}
@@ -1723,6 +1725,7 @@ mod tests {
17231725
0,
17241726
None,
17251727
None,
1728+
false,
17261729
FileCompressionType::UNCOMPRESSED,
17271730
))
17281731
}

datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1503,6 +1503,7 @@ mod tests {
15031503
b'"',
15041504
None,
15051505
None,
1506+
false,
15061507
FileCompressionType::UNCOMPRESSED,
15071508
))
15081509
}

datafusion/core/src/test/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result<Arc<Cs
9999
b'"',
100100
None,
101101
None,
102+
false,
102103
FileCompressionType::UNCOMPRESSED,
103104
)))
104105
}
@@ -283,6 +284,7 @@ pub fn csv_exec_sorted(
283284
0,
284285
None,
285286
None,
287+
false,
286288
FileCompressionType::UNCOMPRESSED,
287289
))
288290
}
@@ -339,6 +341,7 @@ pub fn csv_exec_ordered(
339341
b'"',
340342
None,
341343
None,
344+
false,
342345
FileCompressionType::UNCOMPRESSED,
343346
))
344347
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
id,message
2+
1,"hello
3+
world"
4+
2,"something
5+
else"
6+
3,"
7+
many
8+
lines
9+
make
10+
good test
11+
"
12+
4,unquoted
13+
value,end

0 commit comments

Comments
 (0)