diff --git a/influxdb3_write/src/catalog.rs b/influxdb3_write/src/catalog.rs index 747187b9330..e1632388a90 100644 --- a/influxdb3_write/src/catalog.rs +++ b/influxdb3_write/src/catalog.rs @@ -123,13 +123,13 @@ impl Catalog { db } None => { - info!("return new db {}", db_name); let mut inner = self.inner.write(); if inner.databases.len() >= Self::NUM_DBS_LIMIT { return Err(Error::TooManyDbs); } + info!("return new db {}", db_name); let db = Arc::new(DatabaseSchema::new(db_name)); inner.databases.insert(db.name.clone(), Arc::clone(&db)); db diff --git a/influxdb3_write/src/wal.rs b/influxdb3_write/src/wal.rs index b3eeb2a0986..3c0d053addd 100644 --- a/influxdb3_write/src/wal.rs +++ b/influxdb3_write/src/wal.rs @@ -663,6 +663,7 @@ mod tests { use crate::catalog::Catalog; use crate::LpWriteOp; use crate::Precision; + use std::sync::Arc; #[test] fn segment_writer_reader() { @@ -808,7 +809,7 @@ mod tests { // Reopen the wal and make sure it loads the precision via // `load_buffer_from_segment` - let catalog = Catalog::default(); + let catalog = Arc::new(Catalog::default()); let wal = WalImpl::new(dir).unwrap(); let schema = schema::SchemaBuilder::new() .tag("host") diff --git a/influxdb3_write/src/write_buffer/buffer_segment.rs b/influxdb3_write/src/write_buffer/buffer_segment.rs index cb3139af75c..cdba670680c 100644 --- a/influxdb3_write/src/write_buffer/buffer_segment.rs +++ b/influxdb3_write/src/write_buffer/buffer_segment.rs @@ -2,11 +2,12 @@ //! single WAL segment. Only one segment should be open for writes in the write buffer at any //! given time. -use crate::catalog::{Catalog, DatabaseSchema}; +use crate::catalog::Catalog; use crate::chunk::BufferChunk; use crate::paths::ParquetFilePath; use crate::write_buffer::flusher::BufferedWriteResult; use crate::write_buffer::table_buffer::{Builder, Result as TableBufferResult, TableBuffer}; +use crate::write_buffer::DatabaseSchema; use crate::write_buffer::{ parse_validate_and_update_catalog, Error, TableBatch, ValidSegmentedData, }; @@ -108,23 +109,20 @@ impl OpenBufferSegment { .buffered_data .database_buffers .entry(db_name.to_string()) - .or_insert_with(|| { - let db_schema = self - .catalog - .db_schema(&db_name) - .expect("db schema should exist"); - DatabaseBuffer { - table_buffers: HashMap::new(), - db_schema, - } + .or_insert_with(|| DatabaseBuffer { + table_buffers: HashMap::new(), }); + let schema = self + .catalog + .db_schema(&db_name) + .expect("database should exist in schema"); for (table_name, table_batch) in db_batch.table_batches { // TODO: for now we'll just have the number of rows represent the segment size. The entire // buffer is going to get refactored to use different structures, so this will change. self.segment_size += table_batch.rows.len(); - db_buffer.buffer_table_batch(table_name, &self.segment_key, table_batch); + db_buffer.buffer_table_batch(table_name, &self.segment_key, table_batch, &schema); } } @@ -177,7 +175,7 @@ impl OpenBufferSegment { } pub(crate) fn load_buffer_from_segment( - catalog: &Catalog, + catalog: &Arc, mut segment_reader: Box, ) -> Result<(BufferedData, usize)> { let mut segment_size = 0; @@ -201,12 +199,10 @@ pub(crate) fn load_buffer_from_segment( let db_name = &write.db_name; if !buffered_data.database_buffers.contains_key(db_name) { - let db_schema = catalog.db_schema(db_name).expect("db schema should exist"); buffered_data.database_buffers.insert( db_name.clone(), DatabaseBuffer { table_buffers: HashMap::new(), - db_schema, }, ); } @@ -221,12 +217,20 @@ pub(crate) fn load_buffer_from_segment( } let segment_data = validated_write.valid_segmented_data.pop().unwrap(); + let schema = catalog + .db_schema(db_name) + .expect("database exists in schema"); for (table_name, table_batch) in segment_data.table_batches { // TODO: for now we'll just have the number of rows represent the segment size. The entire // buffer is going to get refactored to use different structures, so this will change. segment_size += table_batch.rows.len(); - db_buffer.buffer_table_batch(table_name, &segment_key, table_batch); + db_buffer.buffer_table_batch( + table_name, + &segment_key, + table_batch, + &schema, + ); } } } @@ -318,7 +322,6 @@ impl BufferedData { #[derive(Debug)] struct DatabaseBuffer { table_buffers: HashMap, - db_schema: Arc, } impl DatabaseBuffer { @@ -327,19 +330,20 @@ impl DatabaseBuffer { table_name: String, segment_key: &PartitionKey, table_batch: TableBatch, + schema: &Arc, ) { if !self.table_buffers.contains_key(&table_name) { - // TODO: this check shouldn't be necessary. If the table doesn't exist in the catalog - // and we've gotten here, it means we're dropping a write. - if let Some(table) = self.db_schema.get_table(&table_name) { + if let Some(table) = schema.get_table(&table_name) { self.table_buffers.insert( table_name.clone(), TableBuffer::new(segment_key.clone(), &table.index_columns()), ); } else { - return; + // Sanity check panic in case this isn't true + unreachable!("table should exist in schema"); } } + let table_buffer = self .table_buffers .get_mut(&table_name)