diff --git a/Cargo.lock b/Cargo.lock index 29b213b86e0..ccfffc18279 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -882,6 +882,30 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" +[[package]] +name = "bitcode" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee1bce7608560cd4bf0296a4262d0dbf13e6bcec5ff2105724c8ab88cc7fc784" +dependencies = [ + "arrayvec", + "bitcode_derive", + "bytemuck", + "glam", + "serde", +] + +[[package]] +name = "bitcode_derive" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a539389a13af092cd345a2b47ae7dec12deb306d660b2223d25cd3419b253ebe" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -975,6 +999,12 @@ version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +[[package]] +name = "bytemuck" +version = "1.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b37c88a63ffd85d15b406896cc343916d7cf57838a847b3a6f2ca5d39a5695a" + [[package]] name = "byteorder" version = "1.5.0" @@ -2457,6 +2487,12 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "glam" +version = "0.29.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc46dd3ec48fdd8e693a98d2b8bafae273a2d54c1de02a2a7e3d57d501f39677" + [[package]] name = "glob" version = "0.3.1" @@ -3340,6 +3376,7 @@ name = "influxdb3_wal" version = "0.1.0" dependencies = [ "async-trait", + "bitcode", "byteorder", "bytes", "crc32fast", @@ -3355,7 +3392,6 @@ dependencies = [ "parking_lot", "schema", "serde", - "serde_json", "serde_with", "thiserror 1.0.69", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 968a5cf33b1..c4655d5e923 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ async-trait = "0.1" backtrace = "0.3" base64 = "0.22.0" bimap = "0.6.3" +bitcode = { version = "0.6.3", features = ["serde"] } byteorder = "1.3.4" bytes = "1.9" chrono = "0.4" @@ -182,6 +183,7 @@ inherits = "release" codegen-units = 16 lto = false incremental = true +debug = 1 # This profile extends the `quick-release` profile with debuginfo turned on in order to # produce more human friendly symbols for profiling tools diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index dcbcf847a93..2baa26df374 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -40,6 +40,7 @@ use std::{collections::HashMap, path::Path, str::FromStr}; use std::{num::NonZeroUsize, sync::Arc}; use thiserror::Error; use tokio::net::TcpListener; +use tokio::time::Instant; use tokio_util::sync::CancellationToken; use trace_exporters::TracingConfig; use trace_http::ctx::TraceHeaderParser; @@ -359,6 +360,7 @@ fn ensure_directory_exists(p: &Path) { } pub async fn command(config: Config) -> Result<()> { + let startup_timer = Instant::now(); let num_cpus = num_cpus::get(); let build_malloc_conf = build_malloc_conf(); info!( @@ -542,7 +544,7 @@ pub async fn command(config: Config) -> Result<()> { } else { builder.build() }; - serve(server, frontend_shutdown).await?; + serve(server, frontend_shutdown, startup_timer).await?; Ok(()) } diff --git a/influxdb3_cache/src/last_cache/snapshots/influxdb3_cache__last_cache__tests__catalog_initialization.snap b/influxdb3_cache/src/last_cache/snapshots/influxdb3_cache__last_cache__tests__catalog_initialization.snap index 0d053bf59ce..0c260145180 100644 --- a/influxdb3_cache/src/last_cache/snapshots/influxdb3_cache__last_cache__tests__catalog_initialization.snap +++ b/influxdb3_cache/src/last_cache/snapshots/influxdb3_cache__last_cache__tests__catalog_initialization.snap @@ -1,6 +1,7 @@ --- source: influxdb3_cache/src/last_cache/mod.rs expression: caches +snapshot_kind: text --- [ { @@ -11,9 +12,7 @@ expression: caches 0, 1 ], - "value_columns": { - "type": "all_non_key_columns" - }, + "value_columns": "all_non_key_columns", "count": 1, "ttl": 600 }, @@ -25,11 +24,12 @@ expression: caches 6 ], "value_columns": { - "type": "explicit", - "columns": [ - 8, - 7 - ] + "explicit": { + "columns": [ + 8, + 7 + ] + } }, "count": 5, "ttl": 60 @@ -40,11 +40,12 @@ expression: caches "name": "test_cache_3", "key_columns": [], "value_columns": { - "type": "explicit", - "columns": [ - 9, - 7 - ] + "explicit": { + "columns": [ + 9, + 7 + ] + } }, "count": 10, "ttl": 500 diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs index 4bd116aec46..3ed17f95ff4 100644 --- a/influxdb3_client/src/lib.rs +++ b/influxdb3_client/src/lib.rs @@ -845,7 +845,7 @@ pub struct LastCacheCreatedResponse { /// A last cache will either store values for an explicit set of columns, or will accept all /// non-key columns #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] -#[serde(tag = "type", rename_all = "snake_case")] +#[serde(rename_all = "snake_case")] pub enum LastCacheValueColumnsDef { /// Explicit list of column names Explicit { columns: Vec }, @@ -1220,8 +1220,9 @@ mod tests { "name": "cache_name", "key_columns": [0, 1], "value_columns": { - "type": "explicit", - "columns": [2, 3] + "explicit": { + "columns": [2, 3] + } }, "ttl": 120, "count": 5 diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index e7559cb07e2..48abbbad7c3 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -35,12 +35,14 @@ use iox_query::QueryDatabase; use iox_query_params::StatementParams; use iox_time::TimeProvider; use observability_deps::tracing::error; +use observability_deps::tracing::info; use service::hybrid; use std::convert::Infallible; use std::fmt::Debug; use std::sync::Arc; use thiserror::Error; use tokio::net::TcpListener; +use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tower::Layer; use trace::ctx::SpanContext; @@ -174,7 +176,11 @@ impl Server { } } -pub async fn serve(server: Server, shutdown: CancellationToken) -> Result<()> +pub async fn serve( + server: Server, + shutdown: CancellationToken, + startup_timer: Instant, +) -> Result<()> where T: TimeProvider, { @@ -206,6 +212,9 @@ where let hybrid_make_service = hybrid(rest_service, grpc_service); let addr = AddrIncoming::from_listener(server.listener)?; + let timer_end = Instant::now(); + let startup_time = timer_end.duration_since(startup_timer); + info!("Server Startup Time: {}ms", startup_time.as_millis()); hyper::server::Builder::new(addr, Http::new()) .tcp_nodelay(true) .serve(hybrid_make_service) @@ -764,6 +773,7 @@ mod tests { } async fn setup_server(start_time: i64) -> (String, CancellationToken, Arc) { + let server_start_time = tokio::time::Instant::now(); let trace_header_parser = trace_http::ctx::TraceHeaderParser::new(); let metrics = Arc::new(metric::Registry::new()); let object_store: Arc = Arc::new(object_store::memory::InMemory::new()); @@ -856,7 +866,7 @@ mod tests { let frontend_shutdown = CancellationToken::new(); let shutdown = frontend_shutdown.clone(); - tokio::spawn(async move { serve(server, frontend_shutdown).await }); + tokio::spawn(async move { serve(server, frontend_shutdown, server_start_time).await }); (format!("http://{addr}"), shutdown, write_buffer) } diff --git a/influxdb3_wal/Cargo.toml b/influxdb3_wal/Cargo.toml index f21e799dc86..3d46b44efdb 100644 --- a/influxdb3_wal/Cargo.toml +++ b/influxdb3_wal/Cargo.toml @@ -18,6 +18,7 @@ influxdb3_id = { path = "../influxdb3_id" } # crates.io dependencies async-trait.workspace = true +bitcode.workspace = true bytes.workspace = true byteorder.workspace = true crc32fast.workspace = true @@ -27,7 +28,6 @@ indexmap.workspace = true object_store.workspace = true parking_lot.workspace = true serde.workspace = true -serde_json.workspace = true serde_with.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index e2de1d8b801..50f1545b1d2 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -40,6 +40,9 @@ pub enum Error { #[error("deserialize error: {0}")] Serialize(#[from] crate::serialize::Error), + #[error("join error: {0}")] + Join(#[from] tokio::task::JoinError), + #[error("object store error: {0}")] ObjectStoreError(#[from] ::object_store::Error), @@ -426,7 +429,7 @@ impl LastCacheDefinition { /// A last cache will either store values for an explicit set of columns, or will accept all /// non-key columns #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] -#[serde(tag = "type", rename_all = "snake_case")] +#[serde(rename_all = "snake_case")] pub enum LastCacheValueColumnsDef { /// Explicit list of column names Explicit { columns: Vec }, diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index 65d080e83aa..fa2d08cf51e 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -104,9 +104,22 @@ impl WalObjectStore { .last_snapshot_sequence_number() }; + async fn get_contents( + object_store: Arc, + path: Path, + ) -> Result { + let file_bytes = object_store.get(&path).await?.bytes().await?; + Ok(verify_file_type_and_deserialize(file_bytes)?) + } + + let mut replay_tasks = Vec::new(); for path in paths { - let file_bytes = self.object_store.get(&path).await?.bytes().await?; - let wal_contents = verify_file_type_and_deserialize(file_bytes)?; + let object_store = Arc::clone(&self.object_store); + replay_tasks.push(tokio::spawn(get_contents(object_store, path))); + } + + for wal_contents in replay_tasks { + let wal_contents = wal_contents.await??; // add this to the snapshot tracker, so we know what to clear out later if the replay // was a wal file that had a snapshot @@ -120,6 +133,7 @@ impl WalObjectStore { )); match wal_contents.snapshot { + // This branch uses so much time None => self.file_notifier.notify(wal_contents), Some(snapshot_details) => { let snapshot_info = { diff --git a/influxdb3_wal/src/serialize.rs b/influxdb3_wal/src/serialize.rs index 71ef0b077c8..ddee1b4edda 100644 --- a/influxdb3_wal/src/serialize.rs +++ b/influxdb3_wal/src/serialize.rs @@ -17,8 +17,8 @@ pub enum Error { #[error("crc32 checksum mismatch")] Crc32Mismatch, - #[error("Serde error: {0}")] - Serde(#[from] serde_json::Error), + #[error("bitcode error: {0}")] + Bitcode(#[from] bitcode::Error), #[error("IO error: {0}")] Io(#[from] std::io::Error), @@ -32,6 +32,7 @@ pub(crate) type Result = std::result::Result; /// The first bytes written into a wal file to identify it and its version. const FILE_TYPE_IDENTIFIER: &[u8] = b"idb3.001"; +#[inline(always)] pub fn verify_file_type_and_deserialize(b: Bytes) -> Result { let contents = b.to_vec(); @@ -61,7 +62,7 @@ pub fn verify_file_type_and_deserialize(b: Bytes) -> Result { } // Deserialize the data into a WalContents - let contents: WalContents = serde_json::from_slice(data)?; + let contents: WalContents = bitcode::deserialize(data)?; Ok(contents) } @@ -70,8 +71,8 @@ pub(crate) fn serialize_to_file_bytes(contents: &WalContents) -> Result> let mut buf = Vec::new(); buf.extend_from_slice(FILE_TYPE_IDENTIFIER); - // serialize the contents into json bytes - let data = serde_json::to_vec(contents)?; + // serialize the contents into bitcode bytes + let data = bitcode::serialize(contents)?; // calculate the crc32 checksum let mut hasher = crc32fast::Hasher::new(); diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 7adc4fc0db0..d3216a1fb2e 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -513,11 +513,10 @@ impl BufferState { let sort_key = table_def .series_key .iter() - .map(|c| table_def.column_id_to_name_unchecked(c).to_string()) - .collect::>(); + .map(|c| Arc::clone(&table_def.column_id_to_name_unchecked(c))); let index_columns = table_def.index_column_ids(); - TableBuffer::new(index_columns, SortKey::from(sort_key)) + TableBuffer::new(index_columns, SortKey::from_columns(sort_key)) }); for (chunk_time, chunk) in table_chunks.chunk_time_to_chunk { table_buffer.buffer_chunk(chunk_time, chunk.rows);