From 958258e8fbdf3510dfebc60ed5806568002997e7 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 6 Jan 2025 00:12:28 +0800 Subject: [PATCH 1/3] - **Refactored SST File Handling**: - Introduced `FilePathProvider` trait and its implementations (`WriteCachePathProvider`, `RegionFilePathFactory`) to manage SST and index file paths. - Updated `AccessLayer`, `WriteCache`, and `ParquetWriter` to use `FilePathProvider` for path management. - Modified `SstWriteRequest` and `SstUploadRequest` to use path providers instead of direct paths. - Files affected: `access_layer.rs`, `write_cache.rs`, `parquet.rs`, `writer.rs`. - **Enhanced Indexer Management**: - Replaced `IndexerBuilder` with `IndexerBuilderImpl` and made it async to support dynamic indexer creation. - Updated `ParquetWriter` to handle multiple indexers and file IDs. - Files affected: `index.rs`, `parquet.rs`, `writer.rs`. - **Removed Redundant File ID Handling**: - Removed `file_id` from `SstWriteRequest` and `CompactionOutput`. - Updated related logic to dynamically generate file IDs where necessary. - Files affected: `compaction.rs`, `flush.rs`, `picker.rs`, `twcs.rs`, `window.rs`. - **Test Adjustments**: - Updated tests to align with new path and indexer management. - Introduced `FixedPathProvider` and `NoopIndexBuilder` for testing purposes. - Files affected: `sst_util.rs`, `version_util.rs`, `parquet.rs`. --- src/mito2/src/access_layer.rs | 94 +++++++++--- src/mito2/src/cache/write_cache.rs | 109 +++++++------- src/mito2/src/compaction.rs | 4 +- src/mito2/src/compaction/compactor.rs | 13 +- src/mito2/src/compaction/picker.rs | 5 - src/mito2/src/compaction/twcs.rs | 5 +- src/mito2/src/compaction/window.rs | 3 +- src/mito2/src/flush.rs | 42 +++--- src/mito2/src/sst/index.rs | 139 ++++++++---------- src/mito2/src/sst/parquet.rs | 107 +++++++++----- src/mito2/src/sst/parquet/writer.rs | 102 +++++++++---- src/mito2/src/test_util/sst_util.rs | 12 +- src/mito2/src/test_util/version_util.rs | 1 - .../src/puffin_manager/fs_puffin_manager.rs | 1 + 14 files changed, 383 insertions(+), 254 deletions(-) diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 16d1480a61ed..51dd7a962a7e 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -17,10 +17,12 @@ use std::sync::Arc; use object_store::services::Fs; use object_store::util::{join_dir, with_instrument_layers}; use object_store::ObjectStore; +use smallvec::SmallVec; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; -use store_api::storage::SequenceNumber; +use store_api::storage::{RegionId, SequenceNumber}; +use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::write_cache::SstUploadRequest; use crate::cache::CacheManagerRef; use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; @@ -30,13 +32,15 @@ use crate::region::options::IndexOptions; use crate::sst::file::{FileHandle, FileId, FileMeta}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; -use crate::sst::index::IndexerBuilder; +use crate::sst::index::IndexerBuilderImpl; use crate::sst::location; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::parquet::{SstInfo, WriteOptions}; pub type AccessLayerRef = Arc; +/// SST write results. +pub type SstInfoArray = SmallVec<[SstInfo; 2]>; /// A layer to access SST files under the same directory. pub struct AccessLayer { @@ -121,11 +125,8 @@ impl AccessLayer { &self, request: SstWriteRequest, write_opts: &WriteOptions, - ) -> Result> { - let file_path = location::sst_file_path(&self.region_dir, request.file_id); - let index_file_path = location::index_file_path(&self.region_dir, request.file_id); + ) -> Result { let region_id = request.metadata.region_id; - let file_id = request.file_id; let cache_manager = request.cache_manager.clone(); let sst_info = if let Some(write_cache) = cache_manager.write_cache() { @@ -134,8 +135,9 @@ impl AccessLayer { .write_and_upload_sst( request, SstUploadRequest { - upload_path: file_path, - index_upload_path: index_file_path, + dest_path_provider: RegionFilePathFactory { + region_dir: self.region_dir.clone(), + }, remote_store: self.object_store.clone(), }, write_opts, @@ -144,11 +146,9 @@ impl AccessLayer { } else { // Write cache is disabled. let store = self.object_store.clone(); - let indexer = IndexerBuilder { + let indexer_builder = IndexerBuilderImpl { op_type: request.op_type, - file_id, - file_path: index_file_path, - metadata: &request.metadata, + metadata: request.metadata.clone(), row_group_size: write_opts.row_group_size, puffin_manager: self.puffin_manager_factory.build(store), intermediate_manager: self.intermediate_manager.clone(), @@ -156,24 +156,31 @@ impl AccessLayer { inverted_index_config: request.inverted_index_config, fulltext_index_config: request.fulltext_index_config, bloom_filter_index_config: request.bloom_filter_index_config, - } - .build() - .await; + }; let mut writer = ParquetWriter::new_with_object_store( self.object_store.clone(), - file_path, request.metadata, - indexer, - ); + indexer_builder, + RegionFilePathFactory { + region_dir: self.region_dir.clone(), + }, + ) + .await; writer .write_all(request.source, request.max_sequence, write_opts) .await? }; // Put parquet metadata to cache manager. - if let Some(sst_info) = &sst_info { - if let Some(parquet_metadata) = &sst_info.file_metadata { - cache_manager.put_parquet_meta_data(region_id, file_id, parquet_metadata.clone()) + if !sst_info.is_empty() { + for sst in &sst_info { + if let Some(parquet_metadata) = &sst.file_metadata { + cache_manager.put_parquet_meta_data( + region_id, + sst.file_id, + parquet_metadata.clone(), + ) + } } } @@ -191,7 +198,6 @@ pub(crate) enum OperationType { /// Contents to build a SST. pub(crate) struct SstWriteRequest { pub(crate) op_type: OperationType, - pub(crate) file_id: FileId, pub(crate) metadata: RegionMetadataRef, pub(crate) source: Source, pub(crate) cache_manager: CacheManagerRef, @@ -229,3 +235,47 @@ async fn clean_dir(dir: &str) -> Result<()> { Ok(()) } + +/// Path provider for SST file and index file. +pub trait FilePathProvider: Send + Sync { + /// Creates index file path of given file id. + fn build_index_file_path(&self, file_id: FileId) -> String; + + /// Creates SST file path of given file id. + fn build_sst_file_path(&self, file_id: FileId) -> String; +} + +/// Path provider that builds paths in local write cache. +#[derive(Clone)] +pub(crate) struct WriteCachePathProvider { + pub(crate) region_id: RegionId, + pub(crate) file_cache: FileCacheRef, +} + +impl FilePathProvider for WriteCachePathProvider { + fn build_index_file_path(&self, file_id: FileId) -> String { + let puffin_key = IndexKey::new(self.region_id, file_id, FileType::Puffin); + self.file_cache.cache_file_path(puffin_key) + } + + fn build_sst_file_path(&self, file_id: FileId) -> String { + let parquet_file_key = IndexKey::new(self.region_id, file_id, FileType::Parquet); + self.file_cache.cache_file_path(parquet_file_key) + } +} + +/// Path provider that builds paths in region storage path. +#[derive(Clone, Debug)] +pub(crate) struct RegionFilePathFactory { + pub(crate) region_dir: String, +} + +impl FilePathProvider for RegionFilePathFactory { + fn build_index_file_path(&self, file_id: FileId) -> String { + location::index_file_path(&self.region_dir, file_id) + } + + fn build_sst_file_path(&self, file_id: FileId) -> String { + location::sst_file_path(&self.region_dir, file_id) + } +} diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 1e9dfb540093..0ae00b3c6cf2 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -23,7 +23,10 @@ use futures::AsyncWriteExt; use object_store::ObjectStore; use snafu::ResultExt; -use crate::access_layer::{new_fs_cache_store, SstWriteRequest}; +use crate::access_layer::{ + new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest, + WriteCachePathProvider, +}; use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue}; use crate::error::{self, Result}; use crate::metrics::{ @@ -32,9 +35,9 @@ use crate::metrics::{ }; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; -use crate::sst::index::IndexerBuilder; +use crate::sst::index::IndexerBuilderImpl; use crate::sst::parquet::writer::ParquetWriter; -use crate::sst::parquet::{SstInfo, WriteOptions}; +use crate::sst::parquet::WriteOptions; use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; /// A cache for uploading files to remote object stores. @@ -103,22 +106,21 @@ impl WriteCache { write_request: SstWriteRequest, upload_request: SstUploadRequest, write_opts: &WriteOptions, - ) -> Result> { + ) -> Result { let timer = FLUSH_ELAPSED .with_label_values(&["write_sst"]) .start_timer(); let region_id = write_request.metadata.region_id; - let file_id = write_request.file_id; - let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet); - let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); let store = self.file_cache.local_store(); - let indexer = IndexerBuilder { + let path_provider = WriteCachePathProvider { + file_cache: self.file_cache.clone(), + region_id, + }; + let indexer = IndexerBuilderImpl { op_type: write_request.op_type, - file_id, - file_path: self.file_cache.cache_file_path(puffin_key), - metadata: &write_request.metadata, + metadata: write_request.metadata.clone(), row_group_size: write_opts.row_group_size, puffin_manager: self.puffin_manager_factory.build(store), intermediate_manager: self.intermediate_manager.clone(), @@ -126,17 +128,16 @@ impl WriteCache { inverted_index_config: write_request.inverted_index_config, fulltext_index_config: write_request.fulltext_index_config, bloom_filter_index_config: write_request.bloom_filter_index_config, - } - .build() - .await; + }; // Write to FileCache. let mut writer = ParquetWriter::new_with_object_store( self.file_cache.local_store(), - self.file_cache.cache_file_path(parquet_key), write_request.metadata, indexer, - ); + path_provider, + ) + .await; let sst_info = writer .write_all(write_request.source, write_request.max_sequence, write_opts) @@ -145,22 +146,29 @@ impl WriteCache { timer.stop_and_record(); // Upload sst file to remote object store. - let Some(sst_info) = sst_info else { - // No data need to upload. - return Ok(None); - }; + if sst_info.is_empty() { + return Ok(sst_info); + } - let parquet_path = &upload_request.upload_path; let remote_store = &upload_request.remote_store; - self.upload(parquet_key, parquet_path, remote_store).await?; - - if sst_info.index_metadata.file_size > 0 { - let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); - let puffin_path = &upload_request.index_upload_path; - self.upload(puffin_key, puffin_path, remote_store).await?; + for sst in &sst_info { + let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet); + let parquet_path = upload_request + .dest_path_provider + .build_sst_file_path(sst.file_id); + self.upload(parquet_key, &parquet_path, remote_store) + .await?; + + if sst.index_metadata.file_size > 0 { + let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin); + let puffin_path = &upload_request + .dest_path_provider + .build_index_file_path(sst.file_id); + self.upload(puffin_key, puffin_path, remote_store).await?; + } } - Ok(Some(sst_info)) + Ok(sst_info) } /// Removes a file from the cache by `index_key`. @@ -319,10 +327,8 @@ impl WriteCache { /// Request to write and upload a SST. pub struct SstUploadRequest { - /// Path to upload the file. - pub upload_path: String, - /// Path to upload the index file. - pub index_upload_path: String, + /// Destination path provider of which SST files in write cache should be uploaded to. + pub dest_path_provider: RegionFilePathFactory, /// Remote object store to upload. pub remote_store: ObjectStore, } @@ -336,11 +342,9 @@ mod tests { use crate::cache::test_util::new_fs_store; use crate::cache::{CacheManager, CacheStrategy}; use crate::region::options::IndexOptions; - use crate::sst::file::FileId; - use crate::sst::location::{index_file_path, sst_file_path}; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::test_util::sst_util::{ - assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle, + assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id, sst_region_metadata, }; use crate::test_util::TestEnv; @@ -351,9 +355,9 @@ mod tests { // and now just use local file system to mock. let mut env = TestEnv::new(); let mock_store = env.init_object_store_manager(); - let file_id = FileId::random(); - let upload_path = sst_file_path("test", file_id); - let index_upload_path = index_file_path("test", file_id); + let path_provider = RegionFilePathFactory { + region_dir: "test".to_string(), + }; let local_dir = create_temp_dir(""); let local_store = new_fs_store(local_dir.path().to_str().unwrap()); @@ -373,7 +377,6 @@ mod tests { let write_request = SstWriteRequest { op_type: OperationType::Flush, - file_id, metadata, source, storage: None, @@ -386,8 +389,7 @@ mod tests { }; let upload_request = SstUploadRequest { - upload_path: upload_path.clone(), - index_upload_path: index_upload_path.clone(), + dest_path_provider: path_provider.clone(), remote_store: mock_store.clone(), }; @@ -397,18 +399,22 @@ mod tests { }; // Write to cache and upload sst to mock remote store - write_cache + let sst_info = write_cache .write_and_upload_sst(write_request, upload_request, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); //todo(hl): we assume it only creates one file. + + let file_id = sst_info.file_id; + let sst_upload_path = path_provider.build_sst_file_path(file_id); + let index_upload_path = path_provider.build_index_file_path(file_id); // Check write cache contains the key let key = IndexKey::new(region_id, file_id, FileType::Parquet); assert!(write_cache.file_cache.contains_key(&key)); // Check file data - let remote_data = mock_store.read(&upload_path).await.unwrap(); + let remote_data = mock_store.read(&sst_upload_path).await.unwrap(); let cache_data = local_store .read(&write_cache.file_cache.cache_file_path(key)) .await @@ -436,6 +442,7 @@ mod tests { #[tokio::test] async fn test_read_metadata_from_write_cache() { + common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new(); let data_home = env.data_home().display().to_string(); let mock_store = env.init_object_store_manager(); @@ -456,8 +463,7 @@ mod tests { // Create source let metadata = Arc::new(sst_region_metadata()); - let handle = sst_file_handle(0, 1000); - let file_id = handle.file_id(); + let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), new_batch_by_range(&["b", "f"], 0, 40), @@ -467,7 +473,6 @@ mod tests { // Write to local cache and upload sst to mock remote store let write_request = SstWriteRequest { op_type: OperationType::Flush, - file_id, metadata, source, storage: None, @@ -482,11 +487,10 @@ mod tests { row_group_size: 512, ..Default::default() }; - let upload_path = sst_file_path(&data_home, file_id); - let index_upload_path = index_file_path(&data_home, file_id); let upload_request = SstUploadRequest { - upload_path: upload_path.clone(), - index_upload_path: index_upload_path.clone(), + dest_path_provider: RegionFilePathFactory { + region_dir: data_home.clone(), + }, remote_store: mock_store.clone(), }; @@ -494,10 +498,11 @@ mod tests { .write_and_upload_sst(write_request, upload_request, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); let write_parquet_metadata = sst_info.file_metadata.unwrap(); // Read metadata from write cache + let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000); let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone()) .cache(CacheStrategy::EnableAll(cache_manager.clone())); let reader = builder.build().await.unwrap(); diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index bf8df5fcec7a..21237a340c04 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -66,7 +66,7 @@ use crate::schedule::remote_job_scheduler::{ CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef, }; use crate::schedule::scheduler::SchedulerRef; -use crate::sst::file::{FileHandle, FileId, FileMeta, Level}; +use crate::sst::file::{FileHandle, FileMeta, Level}; use crate::sst::version::LevelMeta; use crate::worker::WorkerListener; @@ -548,7 +548,6 @@ impl CompactionStatus { #[derive(Debug, Clone)] pub struct CompactionOutput { - pub output_file_id: FileId, /// Compaction output file level. pub output_level: Level, /// Compaction input files. @@ -562,7 +561,6 @@ pub struct CompactionOutput { /// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta]. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SerializedCompactionOutput { - output_file_id: FileId, output_level: Level, inputs: Vec, filter_deleted: bool, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index ceeb509bc17e..affbda0f003e 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -271,7 +271,7 @@ impl Compactor for DefaultCompactor { compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); info!( - "Compaction region {} output [{}]-> {}", + "Region {} compaction input: [{}]", compaction_region.region_id, output .inputs @@ -279,7 +279,6 @@ impl Compactor for DefaultCompactor { .map(|f| f.file_id().to_string()) .collect::>() .join(","), - output.output_file_id ); let write_opts = WriteOptions { @@ -290,7 +289,6 @@ impl Compactor for DefaultCompactor { let region_metadata = compaction_region.region_metadata.clone(); let sst_layer = compaction_region.access_layer.clone(); let region_id = compaction_region.region_id; - let file_id = output.output_file_id; let cache_manager = compaction_region.cache_manager.clone(); let storage = compaction_region.region_options.storage.clone(); let index_options = compaction_region @@ -327,7 +325,6 @@ impl Compactor for DefaultCompactor { .write_sst( SstWriteRequest { op_type: OperationType::Compact, - file_id, metadata: region_metadata, source: Source::Reader(reader), cache_manager, @@ -341,9 +338,10 @@ impl Compactor for DefaultCompactor { &write_opts, ) .await? + .into_iter() .map(|sst_info| FileMeta { region_id, - file_id, + file_id: sst_info.file_id, time_range: sst_info.time_range, level: output.output_level, file_size: sst_info.file_size, @@ -352,7 +350,8 @@ impl Compactor for DefaultCompactor { num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, sequence: max_sequence, - }); + }) + .collect::>(); Ok(file_meta_opt) }); } @@ -369,7 +368,7 @@ impl Compactor for DefaultCompactor { .await .context(JoinSnafu)? .into_iter() - .collect::>>()?; + .collect::>>>()?; output_files.extend(metas.into_iter().flatten()); } diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index 9397c2bf6470..431973c3b662 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -61,7 +61,6 @@ impl From<&PickerOutput> for SerializedPickerOutput { .outputs .iter() .map(|output| SerializedCompactionOutput { - output_file_id: output.output_file_id, output_level: output.output_level, inputs: output.inputs.iter().map(|s| s.meta_ref().clone()).collect(), filter_deleted: output.filter_deleted, @@ -91,7 +90,6 @@ impl PickerOutput { .outputs .into_iter() .map(|output| CompactionOutput { - output_file_id: output.output_file_id, output_level: output.output_level, inputs: output .inputs @@ -167,14 +165,12 @@ mod tests { let picker_output = PickerOutput { outputs: vec![ CompactionOutput { - output_file_id: FileId::random(), output_level: 0, inputs: inputs_file_handle.clone(), filter_deleted: false, output_time_range: None, }, CompactionOutput { - output_file_id: FileId::random(), output_level: 0, inputs: inputs_file_handle.clone(), filter_deleted: false, @@ -205,7 +201,6 @@ mod tests { .iter() .zip(picker_output_from_serialized.outputs.iter()) .for_each(|(expected, actual)| { - assert_eq!(expected.output_file_id, actual.output_file_id); assert_eq!(expected.output_level, actual.output_level); expected .inputs diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 8efaa6c65fb6..a4e8913eeffb 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -26,7 +26,7 @@ use crate::compaction::compactor::CompactionRegion; use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::run::{find_sorted_runs, reduce_runs, Item}; use crate::compaction::{get_expired_ssts, CompactionOutput}; -use crate::sst::file::{overlaps, FileHandle, FileId, Level}; +use crate::sst::file::{overlaps, FileHandle, Level}; use crate::sst::version::LevelMeta; const LEVEL_COMPACTED: Level = 1; @@ -134,7 +134,6 @@ impl TwcsPicker { for input in split_inputs { debug_assert!(input.len() > 1); output.push(CompactionOutput { - output_file_id: FileId::random(), output_level: LEVEL_COMPACTED, // always compact to l1 inputs: input, filter_deleted, @@ -373,7 +372,7 @@ mod tests { use super::*; use crate::compaction::test_util::{new_file_handle, new_file_handles}; - use crate::sst::file::{FileMeta, Level}; + use crate::sst::file::{FileId, FileMeta, Level}; use crate::test_util::NoopFilePurger; #[test] diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index 10bdb47297d5..f7ad4af893ee 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -26,7 +26,7 @@ use crate::compaction::buckets::infer_time_bucket; use crate::compaction::compactor::{CompactionRegion, CompactionVersion}; use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::{get_expired_ssts, CompactionOutput}; -use crate::sst::file::{FileHandle, FileId}; +use crate::sst::file::FileHandle; /// Compaction picker that splits the time range of all involved files to windows, and merges /// the data segments intersects with those windows of files together so that the output files @@ -132,7 +132,6 @@ fn build_output(windows: BTreeMap)>) -> Vec, inverted_indexer: Option, last_mem_inverted_index: usize, @@ -168,11 +167,15 @@ impl Indexer { } } -pub(crate) struct IndexerBuilder<'a> { +#[async_trait::async_trait] +pub trait IndexerBuilder { + /// Builds indexer of given file id to [index_file_path]. + async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer; +} + +pub(crate) struct IndexerBuilderImpl { pub(crate) op_type: OperationType, - pub(crate) file_id: FileId, - pub(crate) file_path: String, - pub(crate) metadata: &'a RegionMetadataRef, + pub(crate) metadata: RegionMetadataRef, pub(crate) row_group_size: usize, pub(crate) puffin_manager: SstPuffinManager, pub(crate) intermediate_manager: IntermediateManager, @@ -182,20 +185,20 @@ pub(crate) struct IndexerBuilder<'a> { pub(crate) bloom_filter_index_config: BloomFilterConfig, } -impl<'a> IndexerBuilder<'a> { +#[async_trait::async_trait] +impl IndexerBuilder for IndexerBuilderImpl { /// Sanity check for arguments and create a new [Indexer] if arguments are valid. - pub(crate) async fn build(self) -> Indexer { + async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer { let mut indexer = Indexer { - file_id: self.file_id, - file_path: self.file_path.clone(), + file_id, + file_path: index_file_path, region_id: self.metadata.region_id, - ..Default::default() }; - indexer.inverted_indexer = self.build_inverted_indexer(); - indexer.fulltext_indexer = self.build_fulltext_indexer().await; - indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(); + indexer.inverted_indexer = self.build_inverted_indexer(file_id); + indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await; + indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id); if indexer.inverted_indexer.is_none() && indexer.fulltext_indexer.is_none() && indexer.bloom_filter_indexer.is_none() @@ -204,11 +207,13 @@ impl<'a> IndexerBuilder<'a> { return Indexer::default(); } - indexer.puffin_manager = Some(self.puffin_manager); + indexer.puffin_manager = Some(self.puffin_manager.clone()); indexer } +} - fn build_inverted_indexer(&self) -> Option { +impl IndexerBuilderImpl { + fn build_inverted_indexer(&self, file_id: FileId) -> Option { let create = match self.op_type { OperationType::Flush => self.inverted_index_config.create_on_flush.auto(), OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(), @@ -217,7 +222,7 @@ impl<'a> IndexerBuilder<'a> { if !create { debug!( "Skip creating inverted index due to config, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } @@ -225,7 +230,7 @@ impl<'a> IndexerBuilder<'a> { if self.metadata.primary_key.is_empty() { debug!( "No tag columns, skip creating index, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } @@ -235,7 +240,7 @@ impl<'a> IndexerBuilder<'a> { else { warn!( "Segment row count is 0, skip creating index, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; }; @@ -243,7 +248,7 @@ impl<'a> IndexerBuilder<'a> { let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else { warn!( "Row group size is 0, skip creating index, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; }; @@ -254,8 +259,8 @@ impl<'a> IndexerBuilder<'a> { } let indexer = InvertedIndexer::new( - self.file_id, - self.metadata, + file_id, + &self.metadata, self.intermediate_manager.clone(), self.inverted_index_config.mem_threshold_on_create(), segment_row_count, @@ -267,7 +272,7 @@ impl<'a> IndexerBuilder<'a> { Some(indexer) } - async fn build_fulltext_indexer(&self) -> Option { + async fn build_fulltext_indexer(&self, file_id: FileId) -> Option { let create = match self.op_type { OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(), OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(), @@ -276,7 +281,7 @@ impl<'a> IndexerBuilder<'a> { if !create { debug!( "Skip creating full-text index due to config, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } @@ -284,9 +289,9 @@ impl<'a> IndexerBuilder<'a> { let mem_limit = self.fulltext_index_config.mem_threshold_on_create(); let creator = FulltextIndexer::new( &self.metadata.region_id, - &self.file_id, + &file_id, &self.intermediate_manager, - self.metadata, + &self.metadata, self.fulltext_index_config.compress, mem_limit, ) @@ -297,7 +302,7 @@ impl<'a> IndexerBuilder<'a> { if creator.is_none() { debug!( "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } return creator; @@ -308,19 +313,19 @@ impl<'a> IndexerBuilder<'a> { if cfg!(any(test, feature = "test")) { panic!( "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}", - self.metadata.region_id, self.file_id, err + self.metadata.region_id, file_id, err ); } else { warn!( err; "Failed to create full-text indexer, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } None } - fn build_bloom_filter_indexer(&self) -> Option { + fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option { let create = match self.op_type { OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(), OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(), @@ -329,15 +334,15 @@ impl<'a> IndexerBuilder<'a> { if !create { debug!( "Skip creating bloom filter due to config, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create(); let indexer = BloomFilterIndexer::new( - self.file_id, - self.metadata, + file_id, + &self.metadata, self.intermediate_manager.clone(), mem_limit, ); @@ -347,7 +352,7 @@ impl<'a> IndexerBuilder<'a> { if indexer.is_none() { debug!( "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } return indexer; @@ -358,12 +363,12 @@ impl<'a> IndexerBuilder<'a> { if cfg!(any(test, feature = "test")) { panic!( "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}", - self.metadata.region_id, self.file_id, err + self.metadata.region_id, file_id, err ); } else { warn!( err; "Failed to create bloom filter, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } @@ -489,11 +494,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -502,7 +505,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -521,11 +524,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -537,18 +538,16 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_none()); assert!(indexer.fulltext_indexer.is_some()); assert!(indexer.bloom_filter_indexer.is_some()); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Compact, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -560,18 +559,16 @@ mod tests { }, bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); assert!(indexer.fulltext_indexer.is_none()); assert!(indexer.bloom_filter_indexer.is_some()); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Compact, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -583,7 +580,7 @@ mod tests { ..Default::default() }, } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -602,11 +599,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -615,7 +610,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_none()); @@ -627,11 +622,9 @@ mod tests { with_fulltext: false, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -640,7 +633,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -652,11 +645,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: false, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -665,7 +656,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -684,11 +675,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata, row_group_size: 0, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -697,7 +686,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_none()); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 05dafb0edfc3..12d16b7cda3e 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use common_base::readable_size::ReadableSize; use parquet::file::metadata::ParquetMetaData; -use crate::sst::file::FileTimeRange; +use crate::sst::file::{FileId, FileTimeRange}; use crate::sst::index::IndexOutput; use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; @@ -62,6 +62,8 @@ impl Default for WriteOptions { /// Parquet SST info returned by the writer. pub struct SstInfo { + /// SST file id. + pub file_id: FileId, /// Time range of the SST. The timestamps have the same time unit as the /// data in the SST. pub time_range: FileTimeRange, @@ -95,12 +97,13 @@ mod tests { use tokio_util::compat::FuturesAsyncWriteCompatExt; use super::*; + use crate::access_layer::FilePathProvider; use crate::cache::{CacheManager, CacheStrategy, PageKey}; - use crate::sst::index::Indexer; + use crate::sst::index::{Indexer, IndexerBuilder}; use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; - use crate::sst::DEFAULT_WRITE_CONCURRENCY; + use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY}; use crate::test_util::sst_util::{ assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range, new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata, @@ -109,12 +112,38 @@ mod tests { const FILE_DIR: &str = "/"; + #[derive(Clone)] + struct FixedPathProvider { + file_id: FileId, + } + + impl FilePathProvider for FixedPathProvider { + fn build_index_file_path(&self, _file_id: FileId) -> String { + location::index_file_path(FILE_DIR, self.file_id) + } + + fn build_sst_file_path(&self, _file_id: FileId) -> String { + location::sst_file_path(FILE_DIR, self.file_id) + } + } + + struct NoopIndexBuilder; + + #[async_trait::async_trait] + impl IndexerBuilder for NoopIndexBuilder { + async fn build(&self, _file_id: FileId, _path: String) -> Indexer { + Indexer::default() + } + } + #[tokio::test] async fn test_write_read() { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); + let file_path = FixedPathProvider { + file_id: handle.file_id(), + }; let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -126,18 +155,20 @@ mod tests { row_group_size: 50, ..Default::default() }; + let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), + metadata.clone(), + NoopIndexBuilder, file_path, - metadata, - Indexer::default(), - ); + ) + .await; let info = writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); assert_eq!(200, info.num_rows); assert!(info.file_size > 0); assert_eq!( @@ -168,7 +199,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -183,16 +213,19 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); // Enable page cache. let cache = CacheStrategy::EnableAll(Arc::new( @@ -236,7 +269,6 @@ mod tests { let mut env = crate::test_util::TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -252,16 +284,19 @@ mod tests { // sst info contains the parquet metadata, which is converted from FileMetaData let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; let sst_info = writer .write_all(source, None, &write_opts) .await .unwrap() - .expect("write_all should return sst info"); + .remove(0); let writer_metadata = sst_info.file_metadata.unwrap(); // read the sst file metadata @@ -277,7 +312,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -292,15 +326,18 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); // Predicate let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr { @@ -330,7 +367,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "z"], 0, 0), @@ -345,15 +381,18 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store); let mut reader = builder.build().await.unwrap(); @@ -365,7 +404,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -380,16 +418,19 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); // Predicate let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr { diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 13f7cfb3ec91..0e4eabc96ed1 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -28,6 +28,7 @@ use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::schema::types::ColumnPath; +use smallvec::smallvec; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::SEQUENCE_COLUMN_NAME; @@ -35,40 +36,48 @@ use store_api::storage::SequenceNumber; use tokio::io::AsyncWrite; use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; +use crate::access_layer::{FilePathProvider, SstInfoArray}; use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu}; use crate::read::{Batch, Source}; -use crate::sst::index::Indexer; +use crate::sst::file::FileId; +use crate::sst::index::{Indexer, IndexerBuilder}; use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; /// Parquet SST writer. -pub struct ParquetWriter { +pub struct ParquetWriter { + /// Path provider that creates SST and index file paths according to file id. + path_provider: P, writer: Option>>, + /// Current active file id. + current_file: FileId, writer_factory: F, /// Region metadata of the source and the target SST. metadata: RegionMetadataRef, - indexer: Indexer, + /// Indexer build that can create indexer for multiple files. + indexer_builder: I, + /// Current active indexer. + current_indexer: Option, bytes_written: Arc, } pub trait WriterFactory { type Writer: AsyncWrite + Send + Unpin; - fn create(&mut self) -> impl Future>; + fn create(&mut self, file_path: &str) -> impl Future>; } pub struct ObjectStoreWriterFactory { - path: String, object_store: ObjectStore, } impl WriterFactory for ObjectStoreWriterFactory { type Writer = Compat; - async fn create(&mut self) -> Result { + async fn create(&mut self, file_path: &str) -> Result { self.object_store - .writer_with(&self.path) + .writer_with(file_path) .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) .concurrent(DEFAULT_WRITE_CONCURRENCY) .await @@ -77,36 +86,73 @@ impl WriterFactory for ObjectStoreWriterFactory { } } -impl ParquetWriter { - pub fn new_with_object_store( +impl ParquetWriter +where + P: FilePathProvider, + I: IndexerBuilder, +{ + pub async fn new_with_object_store( object_store: ObjectStore, - path: String, metadata: RegionMetadataRef, - indexer: Indexer, - ) -> ParquetWriter { + indexer_builder: I, + path_provider: P, + ) -> ParquetWriter { ParquetWriter::new( - ObjectStoreWriterFactory { path, object_store }, + ObjectStoreWriterFactory { object_store }, metadata, - indexer, + indexer_builder, + path_provider, ) + .await } } -impl ParquetWriter +impl ParquetWriter where F: WriterFactory, + I: IndexerBuilder, + P: FilePathProvider, { /// Creates a new parquet SST writer. - pub fn new(factory: F, metadata: RegionMetadataRef, indexer: Indexer) -> ParquetWriter { + pub async fn new( + factory: F, + metadata: RegionMetadataRef, + indexer_builder: I, + path_provider: P, + ) -> ParquetWriter { + let init_file = FileId::random(); + let index_file_path = path_provider.build_index_file_path(init_file); + let indexer = indexer_builder.build(init_file, index_file_path).await; + ParquetWriter { + path_provider, writer: None, + current_file: init_file, writer_factory: factory, metadata, - indexer, + indexer_builder, + current_indexer: Some(indexer), bytes_written: Arc::new(AtomicUsize::new(0)), } } + async fn get_or_create_indexer(&mut self) -> &mut Indexer { + match self.current_indexer { + None => { + self.current_file = FileId::random(); + let index_file_path = self.path_provider.build_index_file_path(self.current_file); + let indexer = self + .indexer_builder + .build(self.current_file, index_file_path) + .await; + self.current_indexer = Some(indexer); + // safety: self.current_indexer already set above. + self.current_indexer.as_mut().unwrap() + } + Some(ref mut indexer) => indexer, + } + } + /// Iterates source and writes all rows to Parquet file. /// /// Returns the [SstInfo] if the SST is written. @@ -115,7 +161,7 @@ where mut source: Source, override_sequence: Option, // override the `sequence` field from `Source` opts: &WriteOptions, - ) -> Result> { + ) -> Result { let write_format = WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence); let mut stats = SourceStats::default(); @@ -128,24 +174,24 @@ where match res { Ok(batch) => { stats.update(&batch); - self.indexer.update(&batch).await; + self.get_or_create_indexer().await.update(&batch).await; } Err(e) => { - self.indexer.abort().await; + self.get_or_create_indexer().await.abort().await; return Err(e); } } } - let index_output = self.indexer.finish().await; + let index_output = self.get_or_create_indexer().await.finish().await; if stats.num_rows == 0 { - return Ok(None); + return Ok(smallvec![]); } let Some(mut arrow_writer) = self.writer.take() else { // No batch actually written. - return Ok(None); + return Ok(smallvec![]); }; arrow_writer.flush().await.context(WriteParquetSnafu)?; @@ -159,15 +205,18 @@ where // convert FileMetaData to ParquetMetaData let parquet_metadata = parse_parquet_metadata(file_meta)?; + let file_id = self.current_file; + // object_store.write will make sure all bytes are written or an error is raised. - Ok(Some(SstInfo { + Ok(smallvec![SstInfo { + file_id, time_range, file_size, num_rows: stats.num_rows, num_row_groups: parquet_metadata.num_row_groups() as u64, file_metadata: Some(Arc::new(parquet_metadata)), index_metadata: index_output, - })) + }]) } /// Customizes per-column config according to schema and maybe column cardinality. @@ -229,8 +278,9 @@ where let props_builder = Self::customize_column_config(props_builder, &self.metadata); let writer_props = props_builder.build(); + let sst_file_path = self.path_provider.build_sst_file_path(self.current_file); let writer = SizeAwareWriter::new( - self.writer_factory.create().await?, + self.writer_factory.create(&sst_file_path).await?, self.bytes_written.clone(), ); let arrow_writer = diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 63c3fc09d621..63b1d2e7a0c0 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -14,7 +14,6 @@ //! Utilities for testing SSTs. -use std::num::NonZeroU64; use std::sync::Arc; use api::v1::{OpType, SemanticType}; @@ -100,13 +99,13 @@ pub fn new_source(batches: &[Batch]) -> Source { Source::Reader(Box::new(reader)) } -/// Creates a new [FileHandle] for a SST. -pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { +/// Creates a SST file handle with provided file id +pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) -> FileHandle { let file_purger = new_noop_file_purger(); FileHandle::new( FileMeta { region_id: REGION_ID, - file_id: FileId::random(), + file_id, time_range: ( Timestamp::new_millisecond(start_ms), Timestamp::new_millisecond(end_ms), @@ -123,6 +122,11 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { ) } +/// Creates a new [FileHandle] for a SST. +pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { + sst_file_handle_with_file_id(FileId::random(), start_ms, end_ms) +} + pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { assert!(end >= start); let pk = new_primary_key(tags); diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 68534d34eeb8..9b98fbf026f4 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -15,7 +15,6 @@ //! Utilities to mock version. use std::collections::HashMap; -use std::num::NonZeroU64; use std::sync::Arc; use api::v1::value::ValueData; diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager.rs b/src/puffin/src/puffin_manager/fs_puffin_manager.rs index 52190f92fb28..c03a86aaf672 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager.rs @@ -27,6 +27,7 @@ use crate::puffin_manager::stager::Stager; use crate::puffin_manager::PuffinManager; /// `FsPuffinManager` is a `PuffinManager` that provides readers and writers for puffin data in filesystem. +#[derive(Clone)] pub struct FsPuffinManager { /// The stager. stager: S, From dc6af2244e57037637f113e81796d8e9d720d360 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Wed, 5 Feb 2025 05:37:53 +0000 Subject: [PATCH 2/3] chore: merge main --- src/mito2/src/flush.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index d2c7f6ffa276..ffd6e896a388 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -347,7 +347,6 @@ impl RegionFlushTask { } let max_sequence = mem.stats().max_sequence(); - let file_id = FileId::random(); let iter = mem.iter(None, None, None)?; let source = Source::Iter(iter); From 711bc18457b7c469a000391108c342b3a0a8f22e Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Wed, 5 Feb 2025 05:56:06 +0000 Subject: [PATCH 3/3] refactor/generate-file-id-in-parquet-writer: **Enhance Logging in Compactor** - Updated `compactor.rs` to improve logging of compaction process. - Added `itertools::Itertools` for efficient string joining. - Moved logging of compaction inputs and outputs to the async block for better context. - Enhanced log message to include both input and output file names for better traceability. --- src/mito2/src/compaction/compactor.rs | 28 +++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 25747d4a1355..6bda8c578f0b 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -20,6 +20,7 @@ use api::v1::region::compact_request; use common_meta::key::SchemaMetadataManagerRef; use common_telemetry::{info, warn}; use common_time::TimeToLive; +use itertools::Itertools; use object_store::manager::ObjectStoreManagerRef; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -278,18 +279,6 @@ impl Compactor for DefaultCompactor { for output in picker_output.outputs.drain(..) { compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); - - info!( - "Region {} compaction input: [{}]", - compaction_region.region_id, - output - .inputs - .iter() - .map(|f| f.file_id().to_string()) - .collect::>() - .join(","), - ); - let write_opts = WriteOptions { write_buffer_size: compaction_region.engine_config.sst_write_buffer_size, ..Default::default() @@ -318,6 +307,11 @@ impl Compactor for DefaultCompactor { .max() .flatten(); futs.push(async move { + let input_file_names = output + .inputs + .iter() + .map(|f| f.file_id().to_string()) + .join(","); let reader = CompactionSstReaderBuilder { metadata: region_metadata.clone(), sst_layer: sst_layer.clone(), @@ -330,7 +324,7 @@ impl Compactor for DefaultCompactor { } .build_sst_reader() .await?; - let file_meta_opt = sst_layer + let output_files = sst_layer .write_sst( SstWriteRequest { op_type: OperationType::Compact, @@ -361,7 +355,13 @@ impl Compactor for DefaultCompactor { sequence: max_sequence, }) .collect::>(); - Ok(file_meta_opt) + let output_file_names = + output_files.iter().map(|f| f.file_id.to_string()).join(","); + info!( + "Region {} compaction inputs: [{}], outputs: [{}]", + region_id, input_file_names, output_file_names + ); + Ok(output_files) }); } let mut output_files = Vec::with_capacity(futs.len());