Skip to content

Commit 1a984ce

Browse files
roeaprtyler
andauthored
feat: arrow backed log replay and table state (#2037)
# Description This is still very much a work in progress, opening it up for visibility and discussion. Finally I do hope that we can make the switch to arrow based log handling. Aside from hopefully advantages in the memory footprint, I also believe it opens us up to many future optimizations as well. To make the transition we introduce two new structs - `Snapshot` - a half lazy version of the Snapshot, which only tries to get `Protocol` & `Metadata` actions ASAP. Of course these drive all our planning activities and without them there is not much we can do. - `EagerSnapshot` - An intermediary structure, which eagerly loads file actions and does log replay to serve as a compatibility laver for the current `DeltaTable` APIs. One conceptually larger change is related to how we view the availability of information. Up until now `DeltaTableState` could be initialized empty, containing no useful information for any code to work with. State (snapshots) now always needs to be created valid. The thing that may not yet be initialized is the `DeltaTable`, which now only carries the table configuration and the `LogStore`. the state / snapshot is now optional. Consequently all code that works against a snapshot no longer needs to handle that matadata / schema etc may not be available. This also has implications for the datafusion integration. We already are working against snapshots mostly, but should abolish most traits implemented for `DeltaTable` as this does not provide the information (and never has) that is al least required to execute a query. Some larger notable changes include: * remove `DeltaTableMetadata` and always use `Metadata` action. * arrow and parquet are now required, as such the features got removed. Personalyl I would also argue, that if you cannot read checkpoints, you cannot read delta tables :). - so hopefully users weren't using arrow-free versions. ### Major follow-ups: * (pre-0.17) review integration with `log_store` and `object_store`. Currently we make use mostly of `ObjectStore` inside the state handling. What we really use is `head` / `list_from` / `get` - my hope would be that we end up with a single abstraction... * test cleanup - we are currently dealing with test flakiness and have several approaches to scaffolding tests. SInce we have the `deltalake-test` crate now, this can be reconciled. * ... * do more processing on borrowed data ... * perform file-heavy operations on arrow data * update checkpoint writing to leverage new state handling and arrow ... * switch to exposing URL in public APIs ## Questions * should paths be percent-encoded when written to checkpoint? # Related Issue(s) supersedes: #454 supersedes: #1837 closes: #1776 closes: #425 (should also be addressed in the current implementation) closes: #288 (multi-part checkpoints are deprecated) related: #435 # Documentation <!--- Share links to useful documentation ---> --------- Co-authored-by: R. Tyler Croy <[email protected]>
1 parent 61ca275 commit 1a984ce

File tree

164 files changed

+6658
-4604
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

164 files changed

+6658
-4604
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ arrow-arith = { version = "49" }
2424
arrow-array = { version = "49" }
2525
arrow-buffer = { version = "49" }
2626
arrow-cast = { version = "49" }
27+
arrow-ipc = { version = "49" }
28+
arrow-json = { version = "49" }
2729
arrow-ord = { version = "49" }
2830
arrow-row = { version = "49" }
2931
arrow-schema = { version = "49" }

crates/benchmarks/src/bin/merge.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,10 @@ async fn benchmark_merge_tpcds(
193193
merge: fn(DataFrame, DeltaTable) -> Result<MergeBuilder, DeltaTableError>,
194194
) -> Result<(core::time::Duration, MergeMetrics), DataFusionError> {
195195
let table = DeltaTableBuilder::from_uri(path).load().await?;
196-
let file_count = table.state.files().len();
196+
let file_count = table.snapshot()?.files_count();
197197

198198
let provider = DeltaTableProvider::try_new(
199-
table.state.clone(),
199+
table.snapshot()?.clone(),
200200
table.log_store(),
201201
DeltaScanConfig {
202202
file_column_name: Some("file_path".to_string()),

crates/deltalake-aws/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ url = { workspace = true }
2626
backoff = { version = "0.4", features = [ "tokio" ] }
2727

2828
[dev-dependencies]
29+
deltalake-core = { path = "../deltalake-core", features = ["datafusion"] }
2930
chrono = { workspace = true }
3031
serial_test = "3"
3132
deltalake-test = { path = "../deltalake-test" }

crates/deltalake-aws/tests/integration_s3_dynamodb.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,11 @@ fn add_action(name: &str) -> Action {
267267
let ts = (SystemTime::now() - Duration::from_secs(1800))
268268
.duration_since(UNIX_EPOCH)
269269
.unwrap()
270-
.as_secs();
271-
Action::Add(Add {
270+
.as_millis();
271+
Add {
272272
path: format!("{}.parquet", name),
273273
size: 396,
274274
partition_values: HashMap::new(),
275-
partition_values_parsed: None,
276275
modification_time: ts as i64,
277276
data_change: true,
278277
stats: None,
@@ -282,7 +281,8 @@ fn add_action(name: &str) -> Action {
282281
base_row_id: None,
283282
default_row_commit_version: None,
284283
clustering_provider: None,
285-
})
284+
}
285+
.into()
286286
}
287287

288288
async fn prepare_table(context: &IntegrationContext, table_name: &str) -> TestResult<DeltaTable> {
@@ -322,7 +322,7 @@ async fn append_to_table(
322322
table.log_store().as_ref(),
323323
&actions,
324324
operation,
325-
&table.state,
325+
Some(table.snapshot()?),
326326
metadata,
327327
)
328328
.await

crates/deltalake-core/Cargo.toml

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,22 @@ features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity
1919

2020
[dependencies]
2121
# arrow
22-
arrow = { workspace = true, optional = true }
23-
arrow-arith = { workspace = true, optional = true }
24-
arrow-array = { workspace = true, optional = true }
25-
arrow-buffer = { workspace = true, optional = true }
26-
arrow-cast = { workspace = true, optional = true }
27-
arrow-ord = { workspace = true, optional = true }
28-
arrow-row = { workspace = true, optional = true }
29-
arrow-schema = { workspace = true, optional = true, features = ["serde"] }
30-
arrow-select = { workspace = true, optional = true }
22+
arrow = { workspace = true }
23+
arrow-arith = { workspace = true }
24+
arrow-array = { workspace = true }
25+
arrow-buffer = { workspace = true }
26+
arrow-cast = { workspace = true }
27+
arrow-ipc = { workspace = true }
28+
arrow-json = { workspace = true }
29+
arrow-ord = { workspace = true }
30+
arrow-row = { workspace = true }
31+
arrow-schema = { workspace = true, features = ["serde"] }
32+
arrow-select = { workspace = true }
3133
parquet = { workspace = true, features = [
3234
"async",
3335
"object_store",
34-
], optional = true }
36+
] }
37+
pin-project-lite = "^0.2.7"
3538

3639
# datafusion
3740
datafusion = { workspace = true, optional = true }
@@ -48,6 +51,7 @@ serde_json = { workspace = true }
4851
# "stdlib"
4952
bytes = { workspace = true }
5053
chrono = { workspace = true, default-features = false, features = ["clock"] }
54+
hashbrown = "*"
5155
regex = { workspace = true }
5256
thiserror = { workspace = true }
5357
uuid = { workspace = true, features = ["serde", "v4"] }
@@ -111,18 +115,7 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
111115
utime = "0.3"
112116

113117
[features]
114-
arrow = [
115-
"dep:arrow",
116-
"arrow-arith",
117-
"arrow-array",
118-
"arrow-cast",
119-
"arrow-ord",
120-
"arrow-row",
121-
"arrow-schema",
122-
"arrow-select",
123-
"arrow-buffer",
124-
]
125-
default = ["arrow", "parquet"]
118+
default = []
126119
datafusion = [
127120
"dep:datafusion",
128121
"datafusion-expr",
@@ -131,14 +124,8 @@ datafusion = [
131124
"datafusion-physical-expr",
132125
"datafusion-sql",
133126
"sqlparser",
134-
"arrow",
135-
"parquet",
136127
]
137128
datafusion-ext = ["datafusion"]
138129
json = ["parquet/json"]
139130
python = ["arrow/pyarrow"]
140131
unity-experimental = ["reqwest", "hyper"]
141-
142-
[[bench]]
143-
name = "read_checkpoint"
144-
harness = false

crates/deltalake-core/benches/read_checkpoint.rs

Lines changed: 0 additions & 29 deletions
This file was deleted.

crates/deltalake-core/src/delta_datafusion/expr.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,8 @@ mod test {
575575
.cast_to::<DFSchema>(
576576
&arrow_schema::DataType::Utf8,
577577
&table
578-
.state
578+
.snapshot()
579+
.unwrap()
579580
.input_schema()
580581
.unwrap()
581582
.as_ref()
@@ -612,7 +613,8 @@ mod test {
612613
assert_eq!(test.expected, actual);
613614

614615
let actual_expr = table
615-
.state
616+
.snapshot()
617+
.unwrap()
616618
.parse_predicate_expression(actual, &session.state())
617619
.unwrap();
618620

0 commit comments

Comments
 (0)