From 3896902cfcd118960ff1c5799e6d79f7efd93188 Mon Sep 17 00:00:00 2001 From: Ishan Bhanuka Date: Sun, 17 Nov 2024 05:13:22 -0500 Subject: [PATCH] Upgrade datafusion (#2056) --- nautilus_core/Cargo.lock | 100 ++++++++++-------- nautilus_core/persistence/Cargo.toml | 2 +- .../serialization/src/arrow/trade.rs | 25 ++++- 3 files changed, 80 insertions(+), 47 deletions(-) diff --git a/nautilus_core/Cargo.lock b/nautilus_core/Cargo.lock index 5adc6e26d97..c51d1448571 100644 --- a/nautilus_core/Cargo.lock +++ b/nautilus_core/Cargo.lock @@ -1255,9 +1255,9 @@ dependencies = [ [[package]] name = "datafusion" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dae5f2abc725737d6e87b6d348a5aa2d0a77e4cf873045f004546da946e6e619" +checksum = "cbba0799cf6913b456ed07a94f0f3b6e12c62a5d88b10809e2284a0f2b915c05" dependencies = [ "ahash 0.8.11", "arrow", @@ -1311,9 +1311,9 @@ dependencies = [ [[package]] name = "datafusion-catalog" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "998761705551f11ffa4ee692cc285b44eb1def6e0d28c4eaf5041b9e2810dc1e" +checksum = "7493c5c2d40eec435b13d92e5703554f4efc7059451fcb8d3a79580ff0e45560" dependencies = [ "arrow-schema", "async-trait", @@ -1326,9 +1326,9 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11986f191e88d950f10a5cc512a598afba27d92e04a0201215ad60785005115a" +checksum = "24953049ebbd6f8964f91f60aa3514e121b5e81e068e33b60e77815ab369b25c" dependencies = [ "ahash 0.8.11", "arrow", @@ -1338,6 +1338,7 @@ dependencies = [ "chrono", "half", "hashbrown 0.14.5", + "indexmap", "instant", "libc", "num_cpus", @@ -1351,9 +1352,9 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "694c9d7ea1b82f95768215c4cb5c2d5c613690624e832a7ee64be563139d582f" +checksum = "f06df4ef76872e11c924d3c814fd2a8dd09905ed2e2195f71c857d78abd19685" dependencies = [ "log", "tokio", @@ -1361,9 +1362,9 @@ dependencies = [ [[package]] name = "datafusion-execution" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30b4cedcd98151e0a297f34021b6b232ff0ebc0f2f18ea5e7446b5ebda99b1a1" +checksum = "6bbdcb628d690f3ce5fea7de81642b514486d58ff9779a51f180a69a4eadb361" dependencies = [ "arrow", "chrono", @@ -1382,9 +1383,9 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8dd114dc0296cacaee98ad3165724529fcca9a65b2875abcd447b9cc02b2b74" +checksum = "8036495980e3131f706b7d33ab00b4492d73dc714e3cb74d11b50f9602a73246" dependencies = [ "ahash 0.8.11", "arrow", @@ -1394,7 +1395,9 @@ dependencies = [ "datafusion-common", "datafusion-expr-common", "datafusion-functions-aggregate-common", + "datafusion-functions-window-common", "datafusion-physical-expr-common", + "indexmap", "paste", "serde_json", "sqlparser", @@ -1404,20 +1407,21 @@ dependencies = [ [[package]] name = "datafusion-expr-common" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d1ba2bb018218d9260bbd7de6a46a20f61b93d4911dba8aa07735625004c4fb" +checksum = "4da0f3cb4669f9523b403d6b5a0ec85023e0ab3bf0183afd1517475b3e64fdd2" dependencies = [ "arrow", "datafusion-common", + "itertools 0.13.0", "paste", ] [[package]] name = "datafusion-functions" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "547cb780a4ac51fd8e52c0fb9188bc16cea4e35aebf6c454bda0b82a7a417304" +checksum = "f52c4012648b34853e40a2c6bcaa8772f837831019b68aca384fb38436dba162" dependencies = [ "arrow", "arrow-buffer", @@ -1438,9 +1442,9 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e68cf5aa7ebcac08bd04bb709a9a6d4963eafd227da62b628133bc509c40f5a0" +checksum = "e5b8bb624597ba28ed7446df4a9bd7c7a7bde7c578b6b527da3f47371d5f6741" dependencies = [ "ahash 0.8.11", "arrow", @@ -1452,16 +1456,16 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-expr-common", "half", + "indexmap", "log", "paste", - "sqlparser", ] [[package]] name = "datafusion-functions-aggregate-common" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2285d080dfecdfb8605b0ab2f1a41e2473208dc8e9bd6f5d1dbcfe97f517e6f" +checksum = "6fb06208fc470bc8cf1ce2d9a1159d42db591f2c7264a8c1776b53ad8f675143" dependencies = [ "ahash 0.8.11", "arrow", @@ -1473,21 +1477,34 @@ dependencies = [ [[package]] name = "datafusion-functions-window" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e78d30ebd6e9f74d4aeddec32744f5a18b5f9584591bc586fb5259c4848bac5" +checksum = "5ae23356c634e54c59f7c51acb7a5b9f6240ffb2cf997049a1a24a8a88598dbe" dependencies = [ "datafusion-common", "datafusion-expr", + "datafusion-functions-window-common", + "datafusion-physical-expr", "datafusion-physical-expr-common", "log", + "paste", +] + +[[package]] +name = "datafusion-functions-window-common" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4b3d6ff7794acea026de36007077a06b18b89e4f9c3fea7f2215f9f7dd9059b" +dependencies = [ + "datafusion-common", + "datafusion-physical-expr-common", ] [[package]] name = "datafusion-optimizer" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be172c44bf344df707e0c041fa3f41e6dc5fb0976f539c68bc442bca150ee58c" +checksum = "bec6241eb80c595fa0e1a8a6b69686b5cf3bd5fdacb8319582a0943b0bd788aa" dependencies = [ "arrow", "async-trait", @@ -1505,9 +1522,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43b86b7fa0b8161c49b0f005b0df193fc6d9b65ceec675f155422cda5d1583ca" +checksum = "3370357b8fc75ec38577700644e5d1b0bc78f38babab99c0b8bd26bafb3e4335" dependencies = [ "ahash 0.8.11", "arrow", @@ -1516,30 +1533,26 @@ dependencies = [ "arrow-ord", "arrow-schema", "arrow-string", - "base64", "chrono", "datafusion-common", - "datafusion-execution", "datafusion-expr", "datafusion-expr-common", "datafusion-functions-aggregate-common", "datafusion-physical-expr-common", "half", "hashbrown 0.14.5", - "hex", "indexmap", "itertools 0.13.0", "log", "paste", "petgraph", - "regex", ] [[package]] name = "datafusion-physical-expr-common" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "242ba8a26351d9ca16295814c46743b0d1b00ec372174bdfbba991d0953dd596" +checksum = "b8b7734d94bf2fa6f6e570935b0ddddd8421179ce200065be97874e13d46a47b" dependencies = [ "ahash 0.8.11", "arrow", @@ -1551,13 +1564,15 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25ca088eb904bf1cfc9c5e5653110c70a6eaba43164085a9d180b35b77ce3b8b" +checksum = "7eee8c479522df21d7b395640dff88c5ed05361852dce6544d7c98e9dbcebffe" dependencies = [ + "arrow", "arrow-schema", "datafusion-common", "datafusion-execution", + "datafusion-expr-common", "datafusion-physical-expr", "datafusion-physical-plan", "itertools 0.13.0", @@ -1565,9 +1580,9 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4989a53b824abc759685eb643f4d604c2fc2fea4e2c309ac3473bea263ecbbeb" +checksum = "17e1fc2e2c239d14e8556f2622b19a726bf6bc6962cc00c71fc52626274bee24" dependencies = [ "ahash 0.8.11", "arrow", @@ -1581,8 +1596,8 @@ dependencies = [ "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", - "datafusion-functions-aggregate", "datafusion-functions-aggregate-common", + "datafusion-functions-window-common", "datafusion-physical-expr", "datafusion-physical-expr-common", "futures", @@ -1600,15 +1615,16 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "42.2.0" +version = "43.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66b9b75b9da10ed656073ac0553708f17eb8fa5a7b065ef9848914c93150ab9e" +checksum = "63e3a4ed41dbee20a5d947a59ca035c225d67dc9cbe869c10f66dcdf25e7ce51" dependencies = [ "arrow", "arrow-array", "arrow-schema", "datafusion-common", "datafusion-expr", + "indexmap", "log", "regex", "sqlparser", @@ -4852,9 +4868,9 @@ dependencies = [ [[package]] name = "sqlparser" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2e5b515a2bd5168426033e9efbfd05500114833916f1d5c268f938b4ee130ac" +checksum = "5fe11944a61da0da3f592e19a45ebe5ab92dc14a779907ff1f08fbb797bfefc7" dependencies = [ "log", "sqlparser_derive", diff --git a/nautilus_core/persistence/Cargo.toml b/nautilus_core/persistence/Cargo.toml index fb3b9ec194c..45243957e0c 100644 --- a/nautilus_core/persistence/Cargo.toml +++ b/nautilus_core/persistence/Cargo.toml @@ -24,7 +24,7 @@ tokio = { workspace = true } thiserror = { workspace = true } binary-heap-plus = "0.5.0" compare = "0.1.0" -datafusion = { version = "42.2.0", default-features = false, features = ["compression", "regex_expressions", "unicode_expressions", "pyarrow"] } +datafusion = { version = "43.0.0", default-features = false, features = ["compression", "regex_expressions", "unicode_expressions", "pyarrow"] } [dev-dependencies] nautilus-test-kit = { path = "../test_kit" } diff --git a/nautilus_core/serialization/src/arrow/trade.rs b/nautilus_core/serialization/src/arrow/trade.rs index eb7e5faef50..b90178c5162 100644 --- a/nautilus_core/serialization/src/arrow/trade.rs +++ b/nautilus_core/serialization/src/arrow/trade.rs @@ -16,7 +16,7 @@ use std::{collections::HashMap, str::FromStr, sync::Arc}; use arrow::{ - array::{Int64Array, StringArray, StringBuilder, UInt64Array, UInt8Array}, + array::{Int64Array, StringArray, StringBuilder, StringViewArray, UInt64Array, UInt8Array}, datatypes::{DataType, Field, Schema}, error::ArrowError, record_batch::RecordBatch, @@ -130,10 +130,27 @@ impl DecodeFromRecordBatch for TradeTick { let size_values = extract_column::(cols, "size", 1, DataType::UInt64)?; let aggressor_side_values = extract_column::(cols, "aggressor_side", 2, DataType::UInt8)?; - let trade_id_values = extract_column::(cols, "trade_id", 3, DataType::Utf8)?; let ts_event_values = extract_column::(cols, "ts_event", 4, DataType::UInt64)?; let ts_init_values = extract_column::(cols, "ts_init", 5, DataType::UInt64)?; + // Datafusion reads trade_ids as StringView + let trade_id_values: Vec = if record_batch + .schema() + .field_with_name("trade_id")? + .data_type() + == &DataType::Utf8View + { + extract_column::(cols, "trade_id", 3, DataType::Utf8View)? + .iter() + .map(|id| TradeId::from(id.unwrap())) + .collect() + } else { + extract_column::(cols, "trade_id", 3, DataType::Utf8)? + .iter() + .map(|id| TradeId::from(id.unwrap())) + .collect() + }; + let result: Result, EncodingError> = (0..record_batch.num_rows()) .map(|i| { let price = Price::from_raw(price_values.value(i), price_precision); @@ -146,7 +163,7 @@ impl DecodeFromRecordBatch for TradeTick { format!("Invalid enum value, was {aggressor_side_value}"), ) })?; - let trade_id = TradeId::from(trade_id_values.value(i)); + let trade_id = trade_id_values[i]; let ts_event = ts_event_values.value(i).into(); let ts_init = ts_init_values.value(i).into(); @@ -184,7 +201,7 @@ mod tests { use std::sync::Arc; use arrow::{ - array::{Array, Int64Array, StringArray, UInt64Array, UInt8Array}, + array::{Array, Int64Array, UInt64Array, UInt8Array}, record_batch::RecordBatch, }; use rstest::rstest;