From 2381cc6f1d6c289a09840024b99fada990c0a23b Mon Sep 17 00:00:00 2001 From: Michael Gattozzi Date: Thu, 16 May 2024 11:08:43 -0400 Subject: [PATCH] fix: make DB Buffer use the up to date schema (#25001) Alternate Title: The DB Schema only ever has one table This is a story of subtle bugs, gnashing of teeth, and hair pulling. Gather round as I tell you the tale of of an Arc that pointed to an outdated schema. In #24954 we introduced an Index for the database as this will allow us to perform faster queries. When we added that code this check was added: ```rust if !self.table_buffers.contains_key(&table_name) { // TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog // and we've gotten here, it means we're dropping a write. if let Some(table) = self.db_schema.get_table(&table_name) { self.table_buffers.insert( table_name.clone(), TableBuffer::new(segment_key.clone(), &table.index_columns()), ); } else { return; } } ``` Adding the return there let us continue on with our day and make the tests pass. However, just because these tests passed didn't mean the code was correct as I would soon find out. With a follow up ticket of #24955 created we merged the changes and I began to debug the issue. Note we had the assumption of dropping a single write due to limits because the limits test is what failed. What began was a chase of a few days to prove that the limits weren't what was failing. This was a bit long but the conclusion was that the limits weren't causing it, but it did expose the fact that a Database only ever had one table which was weird. I then began to dig into this a bit more. Why would there only be one table? We weren't just dropping one write, we were dropping all but *one* write or so it seemed. Many printlns/hours later it became clear that we were actually updating the schema! It existed in the Catalog, but not in the pointer to the schema in the DatabaseBuffer struct so what gives? Well we need to look at [another piece of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L540-L541). In the `validate_or_insert_schema_and_partitions` function for the WriteBuffer we have this bit of code: ```rust // The (potentially updated) DatabaseSchema to return to the caller. let mut schema = Cow::Borrowed(schema); ``` As we pass in a reference to the schema in the catalog. However, when we [go a bit further down](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L565-L568) we see this code: ```rust let schema = match schema { Cow::Owned(s) => Some(s), Cow::Borrowed(_) => None, }; ``` What this means is that if we make a change we clone the original and update it. We *aren't* making a change to the original schema. When we go back up the call stack we get to [this bit of code](https://github.com/influxdata/influxdb/blob/8f72bf06e13e2269db973b9c2852e092239c701e/influxdb3_write/src/write_buffer/mod.rs#L456-L460): ```rust if let Some(schema) = result.schema.take() { debug!("replacing schema for {:?}", schema); catalog.replace_database(sequence, Arc::new(schema))?; } ``` We are updating the catalog with the new schema, but how does that work? ```rust inner.databases.insert(db.name.clone(), db); ``` Oh. Oh no. We're just overwriting it. Which means that the DatabaseBuffer has an Arc to the *old* schema, not the *new* one. Which means that the buffer will get the first copy of the schema with the first new table, but *none* of the other ones. The solution is to make sure that the buffer is passed the current schema so that it can use the most up to date version from the catalog. This commit makes those changes to make sure it works. This was a very very subtle mutability/pointer bug given the intersection of valid borrow checking and some writes making it in, but luckily we caught it. It does mean though that until this fix is in, we can consider changes between the Index PR and now are subtly broken and shouldn't be used for anything beyond writing to a signle table per DB. TL;DR We should ask the Catalog what the schema is as it contains the up to date version of it. Closes #24955 --- influxdb3_write/src/catalog.rs | 2 +- influxdb3_write/src/wal.rs | 3 +- .../src/write_buffer/buffer_segment.rs | 44 ++++++++++--------- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/influxdb3_write/src/catalog.rs b/influxdb3_write/src/catalog.rs index 747187b9330..e1632388a90 100644 --- a/influxdb3_write/src/catalog.rs +++ b/influxdb3_write/src/catalog.rs @@ -123,13 +123,13 @@ impl Catalog { db } None => { - info!("return new db {}", db_name); let mut inner = self.inner.write(); if inner.databases.len() >= Self::NUM_DBS_LIMIT { return Err(Error::TooManyDbs); } + info!("return new db {}", db_name); let db = Arc::new(DatabaseSchema::new(db_name)); inner.databases.insert(db.name.clone(), Arc::clone(&db)); db diff --git a/influxdb3_write/src/wal.rs b/influxdb3_write/src/wal.rs index b3eeb2a0986..3c0d053addd 100644 --- a/influxdb3_write/src/wal.rs +++ b/influxdb3_write/src/wal.rs @@ -663,6 +663,7 @@ mod tests { use crate::catalog::Catalog; use crate::LpWriteOp; use crate::Precision; + use std::sync::Arc; #[test] fn segment_writer_reader() { @@ -808,7 +809,7 @@ mod tests { // Reopen the wal and make sure it loads the precision via // `load_buffer_from_segment` - let catalog = Catalog::default(); + let catalog = Arc::new(Catalog::default()); let wal = WalImpl::new(dir).unwrap(); let schema = schema::SchemaBuilder::new() .tag("host") diff --git a/influxdb3_write/src/write_buffer/buffer_segment.rs b/influxdb3_write/src/write_buffer/buffer_segment.rs index cb3139af75c..cdba670680c 100644 --- a/influxdb3_write/src/write_buffer/buffer_segment.rs +++ b/influxdb3_write/src/write_buffer/buffer_segment.rs @@ -2,11 +2,12 @@ //! single WAL segment. Only one segment should be open for writes in the write buffer at any //! given time. -use crate::catalog::{Catalog, DatabaseSchema}; +use crate::catalog::Catalog; use crate::chunk::BufferChunk; use crate::paths::ParquetFilePath; use crate::write_buffer::flusher::BufferedWriteResult; use crate::write_buffer::table_buffer::{Builder, Result as TableBufferResult, TableBuffer}; +use crate::write_buffer::DatabaseSchema; use crate::write_buffer::{ parse_validate_and_update_catalog, Error, TableBatch, ValidSegmentedData, }; @@ -108,23 +109,20 @@ impl OpenBufferSegment { .buffered_data .database_buffers .entry(db_name.to_string()) - .or_insert_with(|| { - let db_schema = self - .catalog - .db_schema(&db_name) - .expect("db schema should exist"); - DatabaseBuffer { - table_buffers: HashMap::new(), - db_schema, - } + .or_insert_with(|| DatabaseBuffer { + table_buffers: HashMap::new(), }); + let schema = self + .catalog + .db_schema(&db_name) + .expect("database should exist in schema"); for (table_name, table_batch) in db_batch.table_batches { // TODO: for now we'll just have the number of rows represent the segment size. The entire // buffer is going to get refactored to use different structures, so this will change. self.segment_size += table_batch.rows.len(); - db_buffer.buffer_table_batch(table_name, &self.segment_key, table_batch); + db_buffer.buffer_table_batch(table_name, &self.segment_key, table_batch, &schema); } } @@ -177,7 +175,7 @@ impl OpenBufferSegment { } pub(crate) fn load_buffer_from_segment( - catalog: &Catalog, + catalog: &Arc, mut segment_reader: Box, ) -> Result<(BufferedData, usize)> { let mut segment_size = 0; @@ -201,12 +199,10 @@ pub(crate) fn load_buffer_from_segment( let db_name = &write.db_name; if !buffered_data.database_buffers.contains_key(db_name) { - let db_schema = catalog.db_schema(db_name).expect("db schema should exist"); buffered_data.database_buffers.insert( db_name.clone(), DatabaseBuffer { table_buffers: HashMap::new(), - db_schema, }, ); } @@ -221,12 +217,20 @@ pub(crate) fn load_buffer_from_segment( } let segment_data = validated_write.valid_segmented_data.pop().unwrap(); + let schema = catalog + .db_schema(db_name) + .expect("database exists in schema"); for (table_name, table_batch) in segment_data.table_batches { // TODO: for now we'll just have the number of rows represent the segment size. The entire // buffer is going to get refactored to use different structures, so this will change. segment_size += table_batch.rows.len(); - db_buffer.buffer_table_batch(table_name, &segment_key, table_batch); + db_buffer.buffer_table_batch( + table_name, + &segment_key, + table_batch, + &schema, + ); } } } @@ -318,7 +322,6 @@ impl BufferedData { #[derive(Debug)] struct DatabaseBuffer { table_buffers: HashMap, - db_schema: Arc, } impl DatabaseBuffer { @@ -327,19 +330,20 @@ impl DatabaseBuffer { table_name: String, segment_key: &PartitionKey, table_batch: TableBatch, + schema: &Arc, ) { if !self.table_buffers.contains_key(&table_name) { - // TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog - // and we've gotten here, it means we're dropping a write. - if let Some(table) = self.db_schema.get_table(&table_name) { + if let Some(table) = schema.get_table(&table_name) { self.table_buffers.insert( table_name.clone(), TableBuffer::new(segment_key.clone(), &table.index_columns()), ); } else { - return; + // Sanity check panic in case this isn't true + unreachable!("table should exist in schema"); } } + let table_buffer = self .table_buffers .get_mut(&table_name)