From 958258e8fbdf3510dfebc60ed5806568002997e7 Mon Sep 17 00:00:00 2001
From: "Lei, HUANG" <mrsatangel@gmail.com>
Date: Mon, 6 Jan 2025 00:12:28 +0800
Subject: [PATCH 1/4]  - **Refactored SST File Handling**:    - Introduced
 `FilePathProvider` trait and its implementations (`WriteCachePathProvider`,
 `RegionFilePathFactory`) to manage SST and index file paths.    - Updated
 `AccessLayer`, `WriteCache`, and `ParquetWriter` to use `FilePathProvider`
 for path management.    - Modified `SstWriteRequest` and `SstUploadRequest`
 to use path providers instead of direct paths.    - Files affected:
 `access_layer.rs`, `write_cache.rs`, `parquet.rs`, `writer.rs`.

 - **Enhanced Indexer Management**:
   - Replaced `IndexerBuilder` with `IndexerBuilderImpl` and made it async to support dynamic indexer creation.
   - Updated `ParquetWriter` to handle multiple indexers and file IDs.
   - Files affected: `index.rs`, `parquet.rs`, `writer.rs`.

 - **Removed Redundant File ID Handling**:
   - Removed `file_id` from `SstWriteRequest` and `CompactionOutput`.
   - Updated related logic to dynamically generate file IDs where necessary.
   - Files affected: `compaction.rs`, `flush.rs`, `picker.rs`, `twcs.rs`, `window.rs`.

 - **Test Adjustments**:
   - Updated tests to align with new path and indexer management.
   - Introduced `FixedPathProvider` and `NoopIndexBuilder` for testing purposes.
   - Files affected: `sst_util.rs`, `version_util.rs`, `parquet.rs`.
---
 src/mito2/src/access_layer.rs                 |  94 +++++++++---
 src/mito2/src/cache/write_cache.rs            | 109 +++++++-------
 src/mito2/src/compaction.rs                   |   4 +-
 src/mito2/src/compaction/compactor.rs         |  13 +-
 src/mito2/src/compaction/picker.rs            |   5 -
 src/mito2/src/compaction/twcs.rs              |   5 +-
 src/mito2/src/compaction/window.rs            |   3 +-
 src/mito2/src/flush.rs                        |  42 +++---
 src/mito2/src/sst/index.rs                    | 139 ++++++++----------
 src/mito2/src/sst/parquet.rs                  | 107 +++++++++-----
 src/mito2/src/sst/parquet/writer.rs           | 102 +++++++++----
 src/mito2/src/test_util/sst_util.rs           |  12 +-
 src/mito2/src/test_util/version_util.rs       |   1 -
 .../src/puffin_manager/fs_puffin_manager.rs   |   1 +
 14 files changed, 383 insertions(+), 254 deletions(-)

diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs
index 16d1480a61ed..51dd7a962a7e 100644
--- a/src/mito2/src/access_layer.rs
+++ b/src/mito2/src/access_layer.rs
@@ -17,10 +17,12 @@ use std::sync::Arc;
 use object_store::services::Fs;
 use object_store::util::{join_dir, with_instrument_layers};
 use object_store::ObjectStore;
+use smallvec::SmallVec;
 use snafu::ResultExt;
 use store_api::metadata::RegionMetadataRef;
-use store_api::storage::SequenceNumber;
+use store_api::storage::{RegionId, SequenceNumber};
 
+use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
 use crate::cache::write_cache::SstUploadRequest;
 use crate::cache::CacheManagerRef;
 use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
@@ -30,13 +32,15 @@ use crate::region::options::IndexOptions;
 use crate::sst::file::{FileHandle, FileId, FileMeta};
 use crate::sst::index::intermediate::IntermediateManager;
 use crate::sst::index::puffin_manager::PuffinManagerFactory;
-use crate::sst::index::IndexerBuilder;
+use crate::sst::index::IndexerBuilderImpl;
 use crate::sst::location;
 use crate::sst::parquet::reader::ParquetReaderBuilder;
 use crate::sst::parquet::writer::ParquetWriter;
 use crate::sst::parquet::{SstInfo, WriteOptions};
 
 pub type AccessLayerRef = Arc<AccessLayer>;
+/// SST write results.
+pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
 
 /// A layer to access SST files under the same directory.
 pub struct AccessLayer {
@@ -121,11 +125,8 @@ impl AccessLayer {
         &self,
         request: SstWriteRequest,
         write_opts: &WriteOptions,
-    ) -> Result<Option<SstInfo>> {
-        let file_path = location::sst_file_path(&self.region_dir, request.file_id);
-        let index_file_path = location::index_file_path(&self.region_dir, request.file_id);
+    ) -> Result<SstInfoArray> {
         let region_id = request.metadata.region_id;
-        let file_id = request.file_id;
         let cache_manager = request.cache_manager.clone();
 
         let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
@@ -134,8 +135,9 @@ impl AccessLayer {
                 .write_and_upload_sst(
                     request,
                     SstUploadRequest {
-                        upload_path: file_path,
-                        index_upload_path: index_file_path,
+                        dest_path_provider: RegionFilePathFactory {
+                            region_dir: self.region_dir.clone(),
+                        },
                         remote_store: self.object_store.clone(),
                     },
                     write_opts,
@@ -144,11 +146,9 @@ impl AccessLayer {
         } else {
             // Write cache is disabled.
             let store = self.object_store.clone();
-            let indexer = IndexerBuilder {
+            let indexer_builder = IndexerBuilderImpl {
                 op_type: request.op_type,
-                file_id,
-                file_path: index_file_path,
-                metadata: &request.metadata,
+                metadata: request.metadata.clone(),
                 row_group_size: write_opts.row_group_size,
                 puffin_manager: self.puffin_manager_factory.build(store),
                 intermediate_manager: self.intermediate_manager.clone(),
@@ -156,24 +156,31 @@ impl AccessLayer {
                 inverted_index_config: request.inverted_index_config,
                 fulltext_index_config: request.fulltext_index_config,
                 bloom_filter_index_config: request.bloom_filter_index_config,
-            }
-            .build()
-            .await;
+            };
             let mut writer = ParquetWriter::new_with_object_store(
                 self.object_store.clone(),
-                file_path,
                 request.metadata,
-                indexer,
-            );
+                indexer_builder,
+                RegionFilePathFactory {
+                    region_dir: self.region_dir.clone(),
+                },
+            )
+            .await;
             writer
                 .write_all(request.source, request.max_sequence, write_opts)
                 .await?
         };
 
         // Put parquet metadata to cache manager.
-        if let Some(sst_info) = &sst_info {
-            if let Some(parquet_metadata) = &sst_info.file_metadata {
-                cache_manager.put_parquet_meta_data(region_id, file_id, parquet_metadata.clone())
+        if !sst_info.is_empty() {
+            for sst in &sst_info {
+                if let Some(parquet_metadata) = &sst.file_metadata {
+                    cache_manager.put_parquet_meta_data(
+                        region_id,
+                        sst.file_id,
+                        parquet_metadata.clone(),
+                    )
+                }
             }
         }
 
@@ -191,7 +198,6 @@ pub(crate) enum OperationType {
 /// Contents to build a SST.
 pub(crate) struct SstWriteRequest {
     pub(crate) op_type: OperationType,
-    pub(crate) file_id: FileId,
     pub(crate) metadata: RegionMetadataRef,
     pub(crate) source: Source,
     pub(crate) cache_manager: CacheManagerRef,
@@ -229,3 +235,47 @@ async fn clean_dir(dir: &str) -> Result<()> {
 
     Ok(())
 }
+
+/// Path provider for SST file and index file.
+pub trait FilePathProvider: Send + Sync {
+    /// Creates index file path of given file id.
+    fn build_index_file_path(&self, file_id: FileId) -> String;
+
+    /// Creates SST file path of given file id.
+    fn build_sst_file_path(&self, file_id: FileId) -> String;
+}
+
+/// Path provider that builds paths in local write cache.
+#[derive(Clone)]
+pub(crate) struct WriteCachePathProvider {
+    pub(crate) region_id: RegionId,
+    pub(crate) file_cache: FileCacheRef,
+}
+
+impl FilePathProvider for WriteCachePathProvider {
+    fn build_index_file_path(&self, file_id: FileId) -> String {
+        let puffin_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
+        self.file_cache.cache_file_path(puffin_key)
+    }
+
+    fn build_sst_file_path(&self, file_id: FileId) -> String {
+        let parquet_file_key = IndexKey::new(self.region_id, file_id, FileType::Parquet);
+        self.file_cache.cache_file_path(parquet_file_key)
+    }
+}
+
+/// Path provider that builds paths in region storage path.
+#[derive(Clone, Debug)]
+pub(crate) struct RegionFilePathFactory {
+    pub(crate) region_dir: String,
+}
+
+impl FilePathProvider for RegionFilePathFactory {
+    fn build_index_file_path(&self, file_id: FileId) -> String {
+        location::index_file_path(&self.region_dir, file_id)
+    }
+
+    fn build_sst_file_path(&self, file_id: FileId) -> String {
+        location::sst_file_path(&self.region_dir, file_id)
+    }
+}
diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs
index 1e9dfb540093..0ae00b3c6cf2 100644
--- a/src/mito2/src/cache/write_cache.rs
+++ b/src/mito2/src/cache/write_cache.rs
@@ -23,7 +23,10 @@ use futures::AsyncWriteExt;
 use object_store::ObjectStore;
 use snafu::ResultExt;
 
-use crate::access_layer::{new_fs_cache_store, SstWriteRequest};
+use crate::access_layer::{
+    new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
+    WriteCachePathProvider,
+};
 use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
 use crate::error::{self, Result};
 use crate::metrics::{
@@ -32,9 +35,9 @@ use crate::metrics::{
 };
 use crate::sst::index::intermediate::IntermediateManager;
 use crate::sst::index::puffin_manager::PuffinManagerFactory;
-use crate::sst::index::IndexerBuilder;
+use crate::sst::index::IndexerBuilderImpl;
 use crate::sst::parquet::writer::ParquetWriter;
-use crate::sst::parquet::{SstInfo, WriteOptions};
+use crate::sst::parquet::WriteOptions;
 use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
 
 /// A cache for uploading files to remote object stores.
@@ -103,22 +106,21 @@ impl WriteCache {
         write_request: SstWriteRequest,
         upload_request: SstUploadRequest,
         write_opts: &WriteOptions,
-    ) -> Result<Option<SstInfo>> {
+    ) -> Result<SstInfoArray> {
         let timer = FLUSH_ELAPSED
             .with_label_values(&["write_sst"])
             .start_timer();
 
         let region_id = write_request.metadata.region_id;
-        let file_id = write_request.file_id;
-        let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
-        let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
 
         let store = self.file_cache.local_store();
-        let indexer = IndexerBuilder {
+        let path_provider = WriteCachePathProvider {
+            file_cache: self.file_cache.clone(),
+            region_id,
+        };
+        let indexer = IndexerBuilderImpl {
             op_type: write_request.op_type,
-            file_id,
-            file_path: self.file_cache.cache_file_path(puffin_key),
-            metadata: &write_request.metadata,
+            metadata: write_request.metadata.clone(),
             row_group_size: write_opts.row_group_size,
             puffin_manager: self.puffin_manager_factory.build(store),
             intermediate_manager: self.intermediate_manager.clone(),
@@ -126,17 +128,16 @@ impl WriteCache {
             inverted_index_config: write_request.inverted_index_config,
             fulltext_index_config: write_request.fulltext_index_config,
             bloom_filter_index_config: write_request.bloom_filter_index_config,
-        }
-        .build()
-        .await;
+        };
 
         // Write to FileCache.
         let mut writer = ParquetWriter::new_with_object_store(
             self.file_cache.local_store(),
-            self.file_cache.cache_file_path(parquet_key),
             write_request.metadata,
             indexer,
-        );
+            path_provider,
+        )
+        .await;
 
         let sst_info = writer
             .write_all(write_request.source, write_request.max_sequence, write_opts)
@@ -145,22 +146,29 @@ impl WriteCache {
         timer.stop_and_record();
 
         // Upload sst file to remote object store.
-        let Some(sst_info) = sst_info else {
-            // No data need to upload.
-            return Ok(None);
-        };
+        if sst_info.is_empty() {
+            return Ok(sst_info);
+        }
 
-        let parquet_path = &upload_request.upload_path;
         let remote_store = &upload_request.remote_store;
-        self.upload(parquet_key, parquet_path, remote_store).await?;
-
-        if sst_info.index_metadata.file_size > 0 {
-            let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
-            let puffin_path = &upload_request.index_upload_path;
-            self.upload(puffin_key, puffin_path, remote_store).await?;
+        for sst in &sst_info {
+            let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
+            let parquet_path = upload_request
+                .dest_path_provider
+                .build_sst_file_path(sst.file_id);
+            self.upload(parquet_key, &parquet_path, remote_store)
+                .await?;
+
+            if sst.index_metadata.file_size > 0 {
+                let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
+                let puffin_path = &upload_request
+                    .dest_path_provider
+                    .build_index_file_path(sst.file_id);
+                self.upload(puffin_key, puffin_path, remote_store).await?;
+            }
         }
 
-        Ok(Some(sst_info))
+        Ok(sst_info)
     }
 
     /// Removes a file from the cache by `index_key`.
@@ -319,10 +327,8 @@ impl WriteCache {
 
 /// Request to write and upload a SST.
 pub struct SstUploadRequest {
-    /// Path to upload the file.
-    pub upload_path: String,
-    /// Path to upload the index file.
-    pub index_upload_path: String,
+    /// Destination path provider of which SST files in write cache should be uploaded to.
+    pub dest_path_provider: RegionFilePathFactory,
     /// Remote object store to upload.
     pub remote_store: ObjectStore,
 }
@@ -336,11 +342,9 @@ mod tests {
     use crate::cache::test_util::new_fs_store;
     use crate::cache::{CacheManager, CacheStrategy};
     use crate::region::options::IndexOptions;
-    use crate::sst::file::FileId;
-    use crate::sst::location::{index_file_path, sst_file_path};
     use crate::sst::parquet::reader::ParquetReaderBuilder;
     use crate::test_util::sst_util::{
-        assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle,
+        assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
         sst_region_metadata,
     };
     use crate::test_util::TestEnv;
@@ -351,9 +355,9 @@ mod tests {
         // and now just use local file system to mock.
         let mut env = TestEnv::new();
         let mock_store = env.init_object_store_manager();
-        let file_id = FileId::random();
-        let upload_path = sst_file_path("test", file_id);
-        let index_upload_path = index_file_path("test", file_id);
+        let path_provider = RegionFilePathFactory {
+            region_dir: "test".to_string(),
+        };
 
         let local_dir = create_temp_dir("");
         let local_store = new_fs_store(local_dir.path().to_str().unwrap());
@@ -373,7 +377,6 @@ mod tests {
 
         let write_request = SstWriteRequest {
             op_type: OperationType::Flush,
-            file_id,
             metadata,
             source,
             storage: None,
@@ -386,8 +389,7 @@ mod tests {
         };
 
         let upload_request = SstUploadRequest {
-            upload_path: upload_path.clone(),
-            index_upload_path: index_upload_path.clone(),
+            dest_path_provider: path_provider.clone(),
             remote_store: mock_store.clone(),
         };
 
@@ -397,18 +399,22 @@ mod tests {
         };
 
         // Write to cache and upload sst to mock remote store
-        write_cache
+        let sst_info = write_cache
             .write_and_upload_sst(write_request, upload_request, &write_opts)
             .await
             .unwrap()
-            .unwrap();
+            .remove(0); //todo(hl): we assume it only creates one file.
+
+        let file_id = sst_info.file_id;
+        let sst_upload_path = path_provider.build_sst_file_path(file_id);
+        let index_upload_path = path_provider.build_index_file_path(file_id);
 
         // Check write cache contains the key
         let key = IndexKey::new(region_id, file_id, FileType::Parquet);
         assert!(write_cache.file_cache.contains_key(&key));
 
         // Check file data
-        let remote_data = mock_store.read(&upload_path).await.unwrap();
+        let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
         let cache_data = local_store
             .read(&write_cache.file_cache.cache_file_path(key))
             .await
@@ -436,6 +442,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_read_metadata_from_write_cache() {
+        common_telemetry::init_default_ut_logging();
         let mut env = TestEnv::new();
         let data_home = env.data_home().display().to_string();
         let mock_store = env.init_object_store_manager();
@@ -456,8 +463,7 @@ mod tests {
 
         // Create source
         let metadata = Arc::new(sst_region_metadata());
-        let handle = sst_file_handle(0, 1000);
-        let file_id = handle.file_id();
+
         let source = new_source(&[
             new_batch_by_range(&["a", "d"], 0, 60),
             new_batch_by_range(&["b", "f"], 0, 40),
@@ -467,7 +473,6 @@ mod tests {
         // Write to local cache and upload sst to mock remote store
         let write_request = SstWriteRequest {
             op_type: OperationType::Flush,
-            file_id,
             metadata,
             source,
             storage: None,
@@ -482,11 +487,10 @@ mod tests {
             row_group_size: 512,
             ..Default::default()
         };
-        let upload_path = sst_file_path(&data_home, file_id);
-        let index_upload_path = index_file_path(&data_home, file_id);
         let upload_request = SstUploadRequest {
-            upload_path: upload_path.clone(),
-            index_upload_path: index_upload_path.clone(),
+            dest_path_provider: RegionFilePathFactory {
+                region_dir: data_home.clone(),
+            },
             remote_store: mock_store.clone(),
         };
 
@@ -494,10 +498,11 @@ mod tests {
             .write_and_upload_sst(write_request, upload_request, &write_opts)
             .await
             .unwrap()
-            .unwrap();
+            .remove(0);
         let write_parquet_metadata = sst_info.file_metadata.unwrap();
 
         // Read metadata from write cache
+        let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
         let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
             .cache(CacheStrategy::EnableAll(cache_manager.clone()));
         let reader = builder.build().await.unwrap();
diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs
index bf8df5fcec7a..21237a340c04 100644
--- a/src/mito2/src/compaction.rs
+++ b/src/mito2/src/compaction.rs
@@ -66,7 +66,7 @@ use crate::schedule::remote_job_scheduler::{
     CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
 };
 use crate::schedule::scheduler::SchedulerRef;
-use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
+use crate::sst::file::{FileHandle, FileMeta, Level};
 use crate::sst::version::LevelMeta;
 use crate::worker::WorkerListener;
 
@@ -548,7 +548,6 @@ impl CompactionStatus {
 
 #[derive(Debug, Clone)]
 pub struct CompactionOutput {
-    pub output_file_id: FileId,
     /// Compaction output file level.
     pub output_level: Level,
     /// Compaction input files.
@@ -562,7 +561,6 @@ pub struct CompactionOutput {
 /// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
 #[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct SerializedCompactionOutput {
-    output_file_id: FileId,
     output_level: Level,
     inputs: Vec<FileMeta>,
     filter_deleted: bool,
diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs
index ceeb509bc17e..affbda0f003e 100644
--- a/src/mito2/src/compaction/compactor.rs
+++ b/src/mito2/src/compaction/compactor.rs
@@ -271,7 +271,7 @@ impl Compactor for DefaultCompactor {
             compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
 
             info!(
-                "Compaction region {} output [{}]-> {}",
+                "Region {} compaction input: [{}]",
                 compaction_region.region_id,
                 output
                     .inputs
@@ -279,7 +279,6 @@ impl Compactor for DefaultCompactor {
                     .map(|f| f.file_id().to_string())
                     .collect::<Vec<_>>()
                     .join(","),
-                output.output_file_id
             );
 
             let write_opts = WriteOptions {
@@ -290,7 +289,6 @@ impl Compactor for DefaultCompactor {
             let region_metadata = compaction_region.region_metadata.clone();
             let sst_layer = compaction_region.access_layer.clone();
             let region_id = compaction_region.region_id;
-            let file_id = output.output_file_id;
             let cache_manager = compaction_region.cache_manager.clone();
             let storage = compaction_region.region_options.storage.clone();
             let index_options = compaction_region
@@ -327,7 +325,6 @@ impl Compactor for DefaultCompactor {
                     .write_sst(
                         SstWriteRequest {
                             op_type: OperationType::Compact,
-                            file_id,
                             metadata: region_metadata,
                             source: Source::Reader(reader),
                             cache_manager,
@@ -341,9 +338,10 @@ impl Compactor for DefaultCompactor {
                         &write_opts,
                     )
                     .await?
+                    .into_iter()
                     .map(|sst_info| FileMeta {
                         region_id,
-                        file_id,
+                        file_id: sst_info.file_id,
                         time_range: sst_info.time_range,
                         level: output.output_level,
                         file_size: sst_info.file_size,
@@ -352,7 +350,8 @@ impl Compactor for DefaultCompactor {
                         num_rows: sst_info.num_rows as u64,
                         num_row_groups: sst_info.num_row_groups,
                         sequence: max_sequence,
-                    });
+                    })
+                    .collect::<Vec<_>>();
                 Ok(file_meta_opt)
             });
         }
@@ -369,7 +368,7 @@ impl Compactor for DefaultCompactor {
                 .await
                 .context(JoinSnafu)?
                 .into_iter()
-                .collect::<Result<Vec<_>>>()?;
+                .collect::<Result<Vec<Vec<_>>>>()?;
             output_files.extend(metas.into_iter().flatten());
         }
 
diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs
index 9397c2bf6470..431973c3b662 100644
--- a/src/mito2/src/compaction/picker.rs
+++ b/src/mito2/src/compaction/picker.rs
@@ -61,7 +61,6 @@ impl From<&PickerOutput> for SerializedPickerOutput {
             .outputs
             .iter()
             .map(|output| SerializedCompactionOutput {
-                output_file_id: output.output_file_id,
                 output_level: output.output_level,
                 inputs: output.inputs.iter().map(|s| s.meta_ref().clone()).collect(),
                 filter_deleted: output.filter_deleted,
@@ -91,7 +90,6 @@ impl PickerOutput {
             .outputs
             .into_iter()
             .map(|output| CompactionOutput {
-                output_file_id: output.output_file_id,
                 output_level: output.output_level,
                 inputs: output
                     .inputs
@@ -167,14 +165,12 @@ mod tests {
         let picker_output = PickerOutput {
             outputs: vec![
                 CompactionOutput {
-                    output_file_id: FileId::random(),
                     output_level: 0,
                     inputs: inputs_file_handle.clone(),
                     filter_deleted: false,
                     output_time_range: None,
                 },
                 CompactionOutput {
-                    output_file_id: FileId::random(),
                     output_level: 0,
                     inputs: inputs_file_handle.clone(),
                     filter_deleted: false,
@@ -205,7 +201,6 @@ mod tests {
             .iter()
             .zip(picker_output_from_serialized.outputs.iter())
             .for_each(|(expected, actual)| {
-                assert_eq!(expected.output_file_id, actual.output_file_id);
                 assert_eq!(expected.output_level, actual.output_level);
                 expected
                     .inputs
diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs
index 8efaa6c65fb6..a4e8913eeffb 100644
--- a/src/mito2/src/compaction/twcs.rs
+++ b/src/mito2/src/compaction/twcs.rs
@@ -26,7 +26,7 @@ use crate::compaction::compactor::CompactionRegion;
 use crate::compaction::picker::{Picker, PickerOutput};
 use crate::compaction::run::{find_sorted_runs, reduce_runs, Item};
 use crate::compaction::{get_expired_ssts, CompactionOutput};
-use crate::sst::file::{overlaps, FileHandle, FileId, Level};
+use crate::sst::file::{overlaps, FileHandle, Level};
 use crate::sst::version::LevelMeta;
 
 const LEVEL_COMPACTED: Level = 1;
@@ -134,7 +134,6 @@ impl TwcsPicker {
             for input in split_inputs {
                 debug_assert!(input.len() > 1);
                 output.push(CompactionOutput {
-                    output_file_id: FileId::random(),
                     output_level: LEVEL_COMPACTED, // always compact to l1
                     inputs: input,
                     filter_deleted,
@@ -373,7 +372,7 @@ mod tests {
 
     use super::*;
     use crate::compaction::test_util::{new_file_handle, new_file_handles};
-    use crate::sst::file::{FileMeta, Level};
+    use crate::sst::file::{FileId, FileMeta, Level};
     use crate::test_util::NoopFilePurger;
 
     #[test]
diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs
index 10bdb47297d5..f7ad4af893ee 100644
--- a/src/mito2/src/compaction/window.rs
+++ b/src/mito2/src/compaction/window.rs
@@ -26,7 +26,7 @@ use crate::compaction::buckets::infer_time_bucket;
 use crate::compaction::compactor::{CompactionRegion, CompactionVersion};
 use crate::compaction::picker::{Picker, PickerOutput};
 use crate::compaction::{get_expired_ssts, CompactionOutput};
-use crate::sst::file::{FileHandle, FileId};
+use crate::sst::file::FileHandle;
 
 /// Compaction picker that splits the time range of all involved files to windows, and merges
 /// the data segments intersects with those windows of files together so that the output files
@@ -132,7 +132,6 @@ fn build_output(windows: BTreeMap<i64, (i64, Vec<FileHandle>)>) -> Vec<Compactio
         );
 
         let output = CompactionOutput {
-            output_file_id: FileId::random(),
             output_level: 1,
             inputs: files,
             filter_deleted: false,
diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs
index a0400deb5bc0..0d36555b8eb9 100644
--- a/src/mito2/src/flush.rs
+++ b/src/mito2/src/flush.rs
@@ -45,7 +45,7 @@ use crate::request::{
     SenderWriteRequest, WorkerRequest,
 };
 use crate::schedule::scheduler::{Job, SchedulerRef};
-use crate::sst::file::{FileId, FileMeta};
+use crate::sst::file::FileMeta;
 use crate::sst::parquet::WriteOptions;
 use crate::worker::WorkerListener;
 
@@ -347,14 +347,12 @@ impl RegionFlushTask {
             }
 
             let max_sequence = mem.stats().max_sequence();
-            let file_id = FileId::random();
             let iter = mem.iter(None, None)?;
             let source = Source::Iter(iter);
 
             // Flush to level 0.
             let write_request = SstWriteRequest {
                 op_type: OperationType::Flush,
-                file_id,
                 metadata: version.metadata.clone(),
                 source,
                 cache_manager: self.cache_manager.clone(),
@@ -365,29 +363,31 @@ impl RegionFlushTask {
                 fulltext_index_config: self.engine_config.fulltext_index.clone(),
                 bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
             };
-            let Some(sst_info) = self
+
+            let ssts_written = self
                 .access_layer
                 .write_sst(write_request, &write_opts)
-                .await?
-            else {
+                .await?;
+            if ssts_written.is_empty() {
                 // No data written.
                 continue;
-            };
+            }
 
-            flushed_bytes += sst_info.file_size;
-            let file_meta = FileMeta {
-                region_id: self.region_id,
-                file_id,
-                time_range: sst_info.time_range,
-                level: 0,
-                file_size: sst_info.file_size,
-                available_indexes: sst_info.index_metadata.build_available_indexes(),
-                index_file_size: sst_info.index_metadata.file_size,
-                num_rows: sst_info.num_rows as u64,
-                num_row_groups: sst_info.num_row_groups,
-                sequence: NonZeroU64::new(max_sequence),
-            };
-            file_metas.push(file_meta);
+            file_metas.extend(ssts_written.into_iter().map(|sst_info| {
+                flushed_bytes += sst_info.file_size;
+                FileMeta {
+                    region_id: self.region_id,
+                    file_id: sst_info.file_id,
+                    time_range: sst_info.time_range,
+                    level: 0,
+                    file_size: sst_info.file_size,
+                    available_indexes: sst_info.index_metadata.build_available_indexes(),
+                    index_file_size: sst_info.index_metadata.file_size,
+                    num_rows: sst_info.num_rows as u64,
+                    num_row_groups: sst_info.num_row_groups,
+                    sequence: NonZeroU64::new(max_sequence),
+                }
+            }));
         }
 
         if !file_metas.is_empty() {
diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs
index dc0f0978f84c..0266b3f1dfdf 100644
--- a/src/mito2/src/sst/index.rs
+++ b/src/mito2/src/sst/index.rs
@@ -105,7 +105,6 @@ pub struct Indexer {
     file_id: FileId,
     file_path: String,
     region_id: RegionId,
-
     puffin_manager: Option<SstPuffinManager>,
     inverted_indexer: Option<InvertedIndexer>,
     last_mem_inverted_index: usize,
@@ -168,11 +167,15 @@ impl Indexer {
     }
 }
 
-pub(crate) struct IndexerBuilder<'a> {
+#[async_trait::async_trait]
+pub trait IndexerBuilder {
+    /// Builds indexer of given file id to [index_file_path].
+    async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer;
+}
+
+pub(crate) struct IndexerBuilderImpl {
     pub(crate) op_type: OperationType,
-    pub(crate) file_id: FileId,
-    pub(crate) file_path: String,
-    pub(crate) metadata: &'a RegionMetadataRef,
+    pub(crate) metadata: RegionMetadataRef,
     pub(crate) row_group_size: usize,
     pub(crate) puffin_manager: SstPuffinManager,
     pub(crate) intermediate_manager: IntermediateManager,
@@ -182,20 +185,20 @@ pub(crate) struct IndexerBuilder<'a> {
     pub(crate) bloom_filter_index_config: BloomFilterConfig,
 }
 
-impl<'a> IndexerBuilder<'a> {
+#[async_trait::async_trait]
+impl IndexerBuilder for IndexerBuilderImpl {
     /// Sanity check for arguments and create a new [Indexer] if arguments are valid.
-    pub(crate) async fn build(self) -> Indexer {
+    async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer {
         let mut indexer = Indexer {
-            file_id: self.file_id,
-            file_path: self.file_path.clone(),
+            file_id,
+            file_path: index_file_path,
             region_id: self.metadata.region_id,
-
             ..Default::default()
         };
 
-        indexer.inverted_indexer = self.build_inverted_indexer();
-        indexer.fulltext_indexer = self.build_fulltext_indexer().await;
-        indexer.bloom_filter_indexer = self.build_bloom_filter_indexer();
+        indexer.inverted_indexer = self.build_inverted_indexer(file_id);
+        indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
+        indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
         if indexer.inverted_indexer.is_none()
             && indexer.fulltext_indexer.is_none()
             && indexer.bloom_filter_indexer.is_none()
@@ -204,11 +207,13 @@ impl<'a> IndexerBuilder<'a> {
             return Indexer::default();
         }
 
-        indexer.puffin_manager = Some(self.puffin_manager);
+        indexer.puffin_manager = Some(self.puffin_manager.clone());
         indexer
     }
+}
 
-    fn build_inverted_indexer(&self) -> Option<InvertedIndexer> {
+impl IndexerBuilderImpl {
+    fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
         let create = match self.op_type {
             OperationType::Flush => self.inverted_index_config.create_on_flush.auto(),
             OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(),
@@ -217,7 +222,7 @@ impl<'a> IndexerBuilder<'a> {
         if !create {
             debug!(
                 "Skip creating inverted index due to config, region_id: {}, file_id: {}",
-                self.metadata.region_id, self.file_id,
+                self.metadata.region_id, file_id,
             );
             return None;
         }
@@ -225,7 +230,7 @@ impl<'a> IndexerBuilder<'a> {
         if self.metadata.primary_key.is_empty() {
             debug!(
                 "No tag columns, skip creating index, region_id: {}, file_id: {}",
-                self.metadata.region_id, self.file_id,
+                self.metadata.region_id, file_id,
             );
             return None;
         }
@@ -235,7 +240,7 @@ impl<'a> IndexerBuilder<'a> {
         else {
             warn!(
                 "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
-                self.metadata.region_id, self.file_id,
+                self.metadata.region_id, file_id,
             );
             return None;
         };
@@ -243,7 +248,7 @@ impl<'a> IndexerBuilder<'a> {
         let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
             warn!(
                 "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
-                self.metadata.region_id, self.file_id,
+                self.metadata.region_id, file_id,
             );
             return None;
         };
@@ -254,8 +259,8 @@ impl<'a> IndexerBuilder<'a> {
         }
 
         let indexer = InvertedIndexer::new(
-            self.file_id,
-            self.metadata,
+            file_id,
+            &self.metadata,
             self.intermediate_manager.clone(),
             self.inverted_index_config.mem_threshold_on_create(),
             segment_row_count,
@@ -267,7 +272,7 @@ impl<'a> IndexerBuilder<'a> {
         Some(indexer)
     }
 
-    async fn build_fulltext_indexer(&self) -> Option<FulltextIndexer> {
+    async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
         let create = match self.op_type {
             OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(),
             OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
@@ -276,7 +281,7 @@ impl<'a> IndexerBuilder<'a> {
         if !create {
             debug!(
                 "Skip creating full-text index due to config, region_id: {}, file_id: {}",
-                self.metadata.region_id, self.file_id,
+                self.metadata.region_id, file_id,
             );
             return None;
         }
@@ -284,9 +289,9 @@ impl<'a> IndexerBuilder<'a> {
         let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
         let creator = FulltextIndexer::new(
             &self.metadata.region_id,
-            &self.file_id,
+            &file_id,
             &self.intermediate_manager,
-            self.metadata,
+            &self.metadata,
             self.fulltext_index_config.compress,
             mem_limit,
         )
@@ -297,7 +302,7 @@ impl<'a> IndexerBuilder<'a> {
                 if creator.is_none() {
                     debug!(
                         "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
-                        self.metadata.region_id, self.file_id,
+                        self.metadata.region_id, file_id,
                     );
                 }
                 return creator;
@@ -308,19 +313,19 @@ impl<'a> IndexerBuilder<'a> {
         if cfg!(any(test, feature = "test")) {
             panic!(
                 "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
-                self.metadata.region_id, self.file_id, err
+                self.metadata.region_id, file_id, err
             );
         } else {
             warn!(
                 err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
-                self.metadata.region_id, self.file_id,
+                self.metadata.region_id, file_id,
             );
         }
 
         None
     }
 
-    fn build_bloom_filter_indexer(&self) -> Option<BloomFilterIndexer> {
+    fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
         let create = match self.op_type {
             OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
             OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
@@ -329,15 +334,15 @@ impl<'a> IndexerBuilder<'a> {
         if !create {
             debug!(
                 "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
-                self.metadata.region_id, self.file_id,
+                self.metadata.region_id, file_id,
             );
             return None;
         }
 
         let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
         let indexer = BloomFilterIndexer::new(
-            self.file_id,
-            self.metadata,
+            file_id,
+            &self.metadata,
             self.intermediate_manager.clone(),
             mem_limit,
         );
@@ -347,7 +352,7 @@ impl<'a> IndexerBuilder<'a> {
                 if indexer.is_none() {
                     debug!(
                         "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
-                        self.metadata.region_id, self.file_id,
+                        self.metadata.region_id, file_id,
                     );
                 }
                 return indexer;
@@ -358,12 +363,12 @@ impl<'a> IndexerBuilder<'a> {
         if cfg!(any(test, feature = "test")) {
             panic!(
                 "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
-                self.metadata.region_id, self.file_id, err
+                self.metadata.region_id, file_id, err
             );
         } else {
             warn!(
                 err; "Failed to create bloom filter, region_id: {}, file_id: {}",
-                self.metadata.region_id, self.file_id,
+                self.metadata.region_id, file_id,
             );
         }
 
@@ -489,11 +494,9 @@ mod tests {
             with_fulltext: true,
             with_skipping_bloom: true,
         });
-        let indexer = IndexerBuilder {
+        let indexer = IndexerBuilderImpl {
             op_type: OperationType::Flush,
-            file_id: FileId::random(),
-            file_path: "test".to_string(),
-            metadata: &metadata,
+            metadata,
             row_group_size: 1024,
             puffin_manager: factory.build(mock_object_store()),
             intermediate_manager: intm_manager,
@@ -502,7 +505,7 @@ mod tests {
             fulltext_index_config: FulltextIndexConfig::default(),
             bloom_filter_index_config: BloomFilterConfig::default(),
         }
-        .build()
+        .build(FileId::random(), "test".to_string())
         .await;
 
         assert!(indexer.inverted_indexer.is_some());
@@ -521,11 +524,9 @@ mod tests {
             with_fulltext: true,
             with_skipping_bloom: true,
         });
-        let indexer = IndexerBuilder {
+        let indexer = IndexerBuilderImpl {
             op_type: OperationType::Flush,
-            file_id: FileId::random(),
-            file_path: "test".to_string(),
-            metadata: &metadata,
+            metadata: metadata.clone(),
             row_group_size: 1024,
             puffin_manager: factory.build(mock_object_store()),
             intermediate_manager: intm_manager.clone(),
@@ -537,18 +538,16 @@ mod tests {
             fulltext_index_config: FulltextIndexConfig::default(),
             bloom_filter_index_config: BloomFilterConfig::default(),
         }
-        .build()
+        .build(FileId::random(), "test".to_string())
         .await;
 
         assert!(indexer.inverted_indexer.is_none());
         assert!(indexer.fulltext_indexer.is_some());
         assert!(indexer.bloom_filter_indexer.is_some());
 
-        let indexer = IndexerBuilder {
+        let indexer = IndexerBuilderImpl {
             op_type: OperationType::Compact,
-            file_id: FileId::random(),
-            file_path: "test".to_string(),
-            metadata: &metadata,
+            metadata: metadata.clone(),
             row_group_size: 1024,
             puffin_manager: factory.build(mock_object_store()),
             intermediate_manager: intm_manager.clone(),
@@ -560,18 +559,16 @@ mod tests {
             },
             bloom_filter_index_config: BloomFilterConfig::default(),
         }
-        .build()
+        .build(FileId::random(), "test".to_string())
         .await;
 
         assert!(indexer.inverted_indexer.is_some());
         assert!(indexer.fulltext_indexer.is_none());
         assert!(indexer.bloom_filter_indexer.is_some());
 
-        let indexer = IndexerBuilder {
+        let indexer = IndexerBuilderImpl {
             op_type: OperationType::Compact,
-            file_id: FileId::random(),
-            file_path: "test".to_string(),
-            metadata: &metadata,
+            metadata,
             row_group_size: 1024,
             puffin_manager: factory.build(mock_object_store()),
             intermediate_manager: intm_manager,
@@ -583,7 +580,7 @@ mod tests {
                 ..Default::default()
             },
         }
-        .build()
+        .build(FileId::random(), "test".to_string())
         .await;
 
         assert!(indexer.inverted_indexer.is_some());
@@ -602,11 +599,9 @@ mod tests {
             with_fulltext: true,
             with_skipping_bloom: true,
         });
-        let indexer = IndexerBuilder {
+        let indexer = IndexerBuilderImpl {
             op_type: OperationType::Flush,
-            file_id: FileId::random(),
-            file_path: "test".to_string(),
-            metadata: &metadata,
+            metadata: metadata.clone(),
             row_group_size: 1024,
             puffin_manager: factory.build(mock_object_store()),
             intermediate_manager: intm_manager.clone(),
@@ -615,7 +610,7 @@ mod tests {
             fulltext_index_config: FulltextIndexConfig::default(),
             bloom_filter_index_config: BloomFilterConfig::default(),
         }
-        .build()
+        .build(FileId::random(), "test".to_string())
         .await;
 
         assert!(indexer.inverted_indexer.is_none());
@@ -627,11 +622,9 @@ mod tests {
             with_fulltext: false,
             with_skipping_bloom: true,
         });
-        let indexer = IndexerBuilder {
+        let indexer = IndexerBuilderImpl {
             op_type: OperationType::Flush,
-            file_id: FileId::random(),
-            file_path: "test".to_string(),
-            metadata: &metadata,
+            metadata: metadata.clone(),
             row_group_size: 1024,
             puffin_manager: factory.build(mock_object_store()),
             intermediate_manager: intm_manager.clone(),
@@ -640,7 +633,7 @@ mod tests {
             fulltext_index_config: FulltextIndexConfig::default(),
             bloom_filter_index_config: BloomFilterConfig::default(),
         }
-        .build()
+        .build(FileId::random(), "test".to_string())
         .await;
 
         assert!(indexer.inverted_indexer.is_some());
@@ -652,11 +645,9 @@ mod tests {
             with_fulltext: true,
             with_skipping_bloom: false,
         });
-        let indexer = IndexerBuilder {
+        let indexer = IndexerBuilderImpl {
             op_type: OperationType::Flush,
-            file_id: FileId::random(),
-            file_path: "test".to_string(),
-            metadata: &metadata,
+            metadata: metadata.clone(),
             row_group_size: 1024,
             puffin_manager: factory.build(mock_object_store()),
             intermediate_manager: intm_manager,
@@ -665,7 +656,7 @@ mod tests {
             fulltext_index_config: FulltextIndexConfig::default(),
             bloom_filter_index_config: BloomFilterConfig::default(),
         }
-        .build()
+        .build(FileId::random(), "test".to_string())
         .await;
 
         assert!(indexer.inverted_indexer.is_some());
@@ -684,11 +675,9 @@ mod tests {
             with_fulltext: true,
             with_skipping_bloom: true,
         });
-        let indexer = IndexerBuilder {
+        let indexer = IndexerBuilderImpl {
             op_type: OperationType::Flush,
-            file_id: FileId::random(),
-            file_path: "test".to_string(),
-            metadata: &metadata,
+            metadata,
             row_group_size: 0,
             puffin_manager: factory.build(mock_object_store()),
             intermediate_manager: intm_manager,
@@ -697,7 +686,7 @@ mod tests {
             fulltext_index_config: FulltextIndexConfig::default(),
             bloom_filter_index_config: BloomFilterConfig::default(),
         }
-        .build()
+        .build(FileId::random(), "test".to_string())
         .await;
 
         assert!(indexer.inverted_indexer.is_none());
diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs
index 05dafb0edfc3..12d16b7cda3e 100644
--- a/src/mito2/src/sst/parquet.rs
+++ b/src/mito2/src/sst/parquet.rs
@@ -19,7 +19,7 @@ use std::sync::Arc;
 use common_base::readable_size::ReadableSize;
 use parquet::file::metadata::ParquetMetaData;
 
-use crate::sst::file::FileTimeRange;
+use crate::sst::file::{FileId, FileTimeRange};
 use crate::sst::index::IndexOutput;
 use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
 
@@ -62,6 +62,8 @@ impl Default for WriteOptions {
 
 /// Parquet SST info returned by the writer.
 pub struct SstInfo {
+    /// SST file id.
+    pub file_id: FileId,
     /// Time range of the SST. The timestamps have the same time unit as the
     /// data in the SST.
     pub time_range: FileTimeRange,
@@ -95,12 +97,13 @@ mod tests {
     use tokio_util::compat::FuturesAsyncWriteCompatExt;
 
     use super::*;
+    use crate::access_layer::FilePathProvider;
     use crate::cache::{CacheManager, CacheStrategy, PageKey};
-    use crate::sst::index::Indexer;
+    use crate::sst::index::{Indexer, IndexerBuilder};
     use crate::sst::parquet::format::WriteFormat;
     use crate::sst::parquet::reader::ParquetReaderBuilder;
     use crate::sst::parquet::writer::ParquetWriter;
-    use crate::sst::DEFAULT_WRITE_CONCURRENCY;
+    use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
     use crate::test_util::sst_util::{
         assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
         new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
@@ -109,12 +112,38 @@ mod tests {
 
     const FILE_DIR: &str = "/";
 
+    #[derive(Clone)]
+    struct FixedPathProvider {
+        file_id: FileId,
+    }
+
+    impl FilePathProvider for FixedPathProvider {
+        fn build_index_file_path(&self, _file_id: FileId) -> String {
+            location::index_file_path(FILE_DIR, self.file_id)
+        }
+
+        fn build_sst_file_path(&self, _file_id: FileId) -> String {
+            location::sst_file_path(FILE_DIR, self.file_id)
+        }
+    }
+
+    struct NoopIndexBuilder;
+
+    #[async_trait::async_trait]
+    impl IndexerBuilder for NoopIndexBuilder {
+        async fn build(&self, _file_id: FileId, _path: String) -> Indexer {
+            Indexer::default()
+        }
+    }
+
     #[tokio::test]
     async fn test_write_read() {
         let mut env = TestEnv::new();
         let object_store = env.init_object_store_manager();
         let handle = sst_file_handle(0, 1000);
-        let file_path = handle.file_path(FILE_DIR);
+        let file_path = FixedPathProvider {
+            file_id: handle.file_id(),
+        };
         let metadata = Arc::new(sst_region_metadata());
         let source = new_source(&[
             new_batch_by_range(&["a", "d"], 0, 60),
@@ -126,18 +155,20 @@ mod tests {
             row_group_size: 50,
             ..Default::default()
         };
+
         let mut writer = ParquetWriter::new_with_object_store(
             object_store.clone(),
+            metadata.clone(),
+            NoopIndexBuilder,
             file_path,
-            metadata,
-            Indexer::default(),
-        );
+        )
+        .await;
 
         let info = writer
             .write_all(source, None, &write_opts)
             .await
             .unwrap()
-            .unwrap();
+            .remove(0);
         assert_eq!(200, info.num_rows);
         assert!(info.file_size > 0);
         assert_eq!(
@@ -168,7 +199,6 @@ mod tests {
         let mut env = TestEnv::new();
         let object_store = env.init_object_store_manager();
         let handle = sst_file_handle(0, 1000);
-        let file_path = handle.file_path(FILE_DIR);
         let metadata = Arc::new(sst_region_metadata());
         let source = new_source(&[
             new_batch_by_range(&["a", "d"], 0, 60),
@@ -183,16 +213,19 @@ mod tests {
         // Prepare data.
         let mut writer = ParquetWriter::new_with_object_store(
             object_store.clone(),
-            file_path,
             metadata.clone(),
-            Indexer::default(),
-        );
+            NoopIndexBuilder,
+            FixedPathProvider {
+                file_id: handle.file_id(),
+            },
+        )
+        .await;
 
         writer
             .write_all(source, None, &write_opts)
             .await
             .unwrap()
-            .unwrap();
+            .remove(0);
 
         // Enable page cache.
         let cache = CacheStrategy::EnableAll(Arc::new(
@@ -236,7 +269,6 @@ mod tests {
         let mut env = crate::test_util::TestEnv::new();
         let object_store = env.init_object_store_manager();
         let handle = sst_file_handle(0, 1000);
-        let file_path = handle.file_path(FILE_DIR);
         let metadata = Arc::new(sst_region_metadata());
         let source = new_source(&[
             new_batch_by_range(&["a", "d"], 0, 60),
@@ -252,16 +284,19 @@ mod tests {
         // sst info contains the parquet metadata, which is converted from FileMetaData
         let mut writer = ParquetWriter::new_with_object_store(
             object_store.clone(),
-            file_path,
             metadata.clone(),
-            Indexer::default(),
-        );
+            NoopIndexBuilder,
+            FixedPathProvider {
+                file_id: handle.file_id(),
+            },
+        )
+        .await;
 
         let sst_info = writer
             .write_all(source, None, &write_opts)
             .await
             .unwrap()
-            .expect("write_all should return sst info");
+            .remove(0);
         let writer_metadata = sst_info.file_metadata.unwrap();
 
         // read the sst file metadata
@@ -277,7 +312,6 @@ mod tests {
         let mut env = TestEnv::new();
         let object_store = env.init_object_store_manager();
         let handle = sst_file_handle(0, 1000);
-        let file_path = handle.file_path(FILE_DIR);
         let metadata = Arc::new(sst_region_metadata());
         let source = new_source(&[
             new_batch_by_range(&["a", "d"], 0, 60),
@@ -292,15 +326,18 @@ mod tests {
         // Prepare data.
         let mut writer = ParquetWriter::new_with_object_store(
             object_store.clone(),
-            file_path,
             metadata.clone(),
-            Indexer::default(),
-        );
+            NoopIndexBuilder,
+            FixedPathProvider {
+                file_id: handle.file_id(),
+            },
+        )
+        .await;
         writer
             .write_all(source, None, &write_opts)
             .await
             .unwrap()
-            .unwrap();
+            .remove(0);
 
         // Predicate
         let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
@@ -330,7 +367,6 @@ mod tests {
         let mut env = TestEnv::new();
         let object_store = env.init_object_store_manager();
         let handle = sst_file_handle(0, 1000);
-        let file_path = handle.file_path(FILE_DIR);
         let metadata = Arc::new(sst_region_metadata());
         let source = new_source(&[
             new_batch_by_range(&["a", "z"], 0, 0),
@@ -345,15 +381,18 @@ mod tests {
         // Prepare data.
         let mut writer = ParquetWriter::new_with_object_store(
             object_store.clone(),
-            file_path,
             metadata.clone(),
-            Indexer::default(),
-        );
+            NoopIndexBuilder,
+            FixedPathProvider {
+                file_id: handle.file_id(),
+            },
+        )
+        .await;
         writer
             .write_all(source, None, &write_opts)
             .await
             .unwrap()
-            .unwrap();
+            .remove(0);
 
         let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
         let mut reader = builder.build().await.unwrap();
@@ -365,7 +404,6 @@ mod tests {
         let mut env = TestEnv::new();
         let object_store = env.init_object_store_manager();
         let handle = sst_file_handle(0, 1000);
-        let file_path = handle.file_path(FILE_DIR);
         let metadata = Arc::new(sst_region_metadata());
         let source = new_source(&[
             new_batch_by_range(&["a", "d"], 0, 60),
@@ -380,16 +418,19 @@ mod tests {
         // Prepare data.
         let mut writer = ParquetWriter::new_with_object_store(
             object_store.clone(),
-            file_path,
             metadata.clone(),
-            Indexer::default(),
-        );
+            NoopIndexBuilder,
+            FixedPathProvider {
+                file_id: handle.file_id(),
+            },
+        )
+        .await;
 
         writer
             .write_all(source, None, &write_opts)
             .await
             .unwrap()
-            .unwrap();
+            .remove(0);
 
         // Predicate
         let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs
index 13f7cfb3ec91..0e4eabc96ed1 100644
--- a/src/mito2/src/sst/parquet/writer.rs
+++ b/src/mito2/src/sst/parquet/writer.rs
@@ -28,6 +28,7 @@ use parquet::basic::{Compression, Encoding, ZstdLevel};
 use parquet::file::metadata::KeyValue;
 use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
 use parquet::schema::types::ColumnPath;
+use smallvec::smallvec;
 use snafu::ResultExt;
 use store_api::metadata::RegionMetadataRef;
 use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
@@ -35,40 +36,48 @@ use store_api::storage::SequenceNumber;
 use tokio::io::AsyncWrite;
 use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
 
+use crate::access_layer::{FilePathProvider, SstInfoArray};
 use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
 use crate::read::{Batch, Source};
-use crate::sst::index::Indexer;
+use crate::sst::file::FileId;
+use crate::sst::index::{Indexer, IndexerBuilder};
 use crate::sst::parquet::format::WriteFormat;
 use crate::sst::parquet::helper::parse_parquet_metadata;
 use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
 use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
 
 /// Parquet SST writer.
-pub struct ParquetWriter<F: WriterFactory> {
+pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
+    /// Path provider that creates SST and index file paths according to file id.
+    path_provider: P,
     writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
+    /// Current active file id.
+    current_file: FileId,
     writer_factory: F,
     /// Region metadata of the source and the target SST.
     metadata: RegionMetadataRef,
-    indexer: Indexer,
+    /// Indexer build that can create indexer for multiple files.
+    indexer_builder: I,
+    /// Current active indexer.
+    current_indexer: Option<Indexer>,
     bytes_written: Arc<AtomicUsize>,
 }
 
 pub trait WriterFactory {
     type Writer: AsyncWrite + Send + Unpin;
-    fn create(&mut self) -> impl Future<Output = Result<Self::Writer>>;
+    fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
 }
 
 pub struct ObjectStoreWriterFactory {
-    path: String,
     object_store: ObjectStore,
 }
 
 impl WriterFactory for ObjectStoreWriterFactory {
     type Writer = Compat<FuturesAsyncWriter>;
 
-    async fn create(&mut self) -> Result<Self::Writer> {
+    async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
         self.object_store
-            .writer_with(&self.path)
+            .writer_with(file_path)
             .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
             .concurrent(DEFAULT_WRITE_CONCURRENCY)
             .await
@@ -77,36 +86,73 @@ impl WriterFactory for ObjectStoreWriterFactory {
     }
 }
 
-impl ParquetWriter<ObjectStoreWriterFactory> {
-    pub fn new_with_object_store(
+impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
+where
+    P: FilePathProvider,
+    I: IndexerBuilder,
+{
+    pub async fn new_with_object_store(
         object_store: ObjectStore,
-        path: String,
         metadata: RegionMetadataRef,
-        indexer: Indexer,
-    ) -> ParquetWriter<ObjectStoreWriterFactory> {
+        indexer_builder: I,
+        path_provider: P,
+    ) -> ParquetWriter<ObjectStoreWriterFactory, I, P> {
         ParquetWriter::new(
-            ObjectStoreWriterFactory { path, object_store },
+            ObjectStoreWriterFactory { object_store },
             metadata,
-            indexer,
+            indexer_builder,
+            path_provider,
         )
+        .await
     }
 }
 
-impl<F> ParquetWriter<F>
+impl<F, I, P> ParquetWriter<F, I, P>
 where
     F: WriterFactory,
+    I: IndexerBuilder,
+    P: FilePathProvider,
 {
     /// Creates a new parquet SST writer.
-    pub fn new(factory: F, metadata: RegionMetadataRef, indexer: Indexer) -> ParquetWriter<F> {
+    pub async fn new(
+        factory: F,
+        metadata: RegionMetadataRef,
+        indexer_builder: I,
+        path_provider: P,
+    ) -> ParquetWriter<F, I, P> {
+        let init_file = FileId::random();
+        let index_file_path = path_provider.build_index_file_path(init_file);
+        let indexer = indexer_builder.build(init_file, index_file_path).await;
+
         ParquetWriter {
+            path_provider,
             writer: None,
+            current_file: init_file,
             writer_factory: factory,
             metadata,
-            indexer,
+            indexer_builder,
+            current_indexer: Some(indexer),
             bytes_written: Arc::new(AtomicUsize::new(0)),
         }
     }
 
+    async fn get_or_create_indexer(&mut self) -> &mut Indexer {
+        match self.current_indexer {
+            None => {
+                self.current_file = FileId::random();
+                let index_file_path = self.path_provider.build_index_file_path(self.current_file);
+                let indexer = self
+                    .indexer_builder
+                    .build(self.current_file, index_file_path)
+                    .await;
+                self.current_indexer = Some(indexer);
+                // safety: self.current_indexer already set above.
+                self.current_indexer.as_mut().unwrap()
+            }
+            Some(ref mut indexer) => indexer,
+        }
+    }
+
     /// Iterates source and writes all rows to Parquet file.
     ///
     /// Returns the [SstInfo] if the SST is written.
@@ -115,7 +161,7 @@ where
         mut source: Source,
         override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
         opts: &WriteOptions,
-    ) -> Result<Option<SstInfo>> {
+    ) -> Result<SstInfoArray> {
         let write_format =
             WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
         let mut stats = SourceStats::default();
@@ -128,24 +174,24 @@ where
             match res {
                 Ok(batch) => {
                     stats.update(&batch);
-                    self.indexer.update(&batch).await;
+                    self.get_or_create_indexer().await.update(&batch).await;
                 }
                 Err(e) => {
-                    self.indexer.abort().await;
+                    self.get_or_create_indexer().await.abort().await;
                     return Err(e);
                 }
             }
         }
 
-        let index_output = self.indexer.finish().await;
+        let index_output = self.get_or_create_indexer().await.finish().await;
 
         if stats.num_rows == 0 {
-            return Ok(None);
+            return Ok(smallvec![]);
         }
 
         let Some(mut arrow_writer) = self.writer.take() else {
             // No batch actually written.
-            return Ok(None);
+            return Ok(smallvec![]);
         };
 
         arrow_writer.flush().await.context(WriteParquetSnafu)?;
@@ -159,15 +205,18 @@ where
         // convert FileMetaData to ParquetMetaData
         let parquet_metadata = parse_parquet_metadata(file_meta)?;
 
+        let file_id = self.current_file;
+
         // object_store.write will make sure all bytes are written or an error is raised.
-        Ok(Some(SstInfo {
+        Ok(smallvec![SstInfo {
+            file_id,
             time_range,
             file_size,
             num_rows: stats.num_rows,
             num_row_groups: parquet_metadata.num_row_groups() as u64,
             file_metadata: Some(Arc::new(parquet_metadata)),
             index_metadata: index_output,
-        }))
+        }])
     }
 
     /// Customizes per-column config according to schema and maybe column cardinality.
@@ -229,8 +278,9 @@ where
             let props_builder = Self::customize_column_config(props_builder, &self.metadata);
             let writer_props = props_builder.build();
 
+            let sst_file_path = self.path_provider.build_sst_file_path(self.current_file);
             let writer = SizeAwareWriter::new(
-                self.writer_factory.create().await?,
+                self.writer_factory.create(&sst_file_path).await?,
                 self.bytes_written.clone(),
             );
             let arrow_writer =
diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs
index 63c3fc09d621..63b1d2e7a0c0 100644
--- a/src/mito2/src/test_util/sst_util.rs
+++ b/src/mito2/src/test_util/sst_util.rs
@@ -14,7 +14,6 @@
 
 //! Utilities for testing SSTs.
 
-use std::num::NonZeroU64;
 use std::sync::Arc;
 
 use api::v1::{OpType, SemanticType};
@@ -100,13 +99,13 @@ pub fn new_source(batches: &[Batch]) -> Source {
     Source::Reader(Box::new(reader))
 }
 
-/// Creates a new [FileHandle] for a SST.
-pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
+/// Creates a SST file handle with provided file id
+pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) -> FileHandle {
     let file_purger = new_noop_file_purger();
     FileHandle::new(
         FileMeta {
             region_id: REGION_ID,
-            file_id: FileId::random(),
+            file_id,
             time_range: (
                 Timestamp::new_millisecond(start_ms),
                 Timestamp::new_millisecond(end_ms),
@@ -123,6 +122,11 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
     )
 }
 
+/// Creates a new [FileHandle] for a SST.
+pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
+    sst_file_handle_with_file_id(FileId::random(), start_ms, end_ms)
+}
+
 pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
     assert!(end >= start);
     let pk = new_primary_key(tags);
diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs
index 68534d34eeb8..9b98fbf026f4 100644
--- a/src/mito2/src/test_util/version_util.rs
+++ b/src/mito2/src/test_util/version_util.rs
@@ -15,7 +15,6 @@
 //! Utilities to mock version.
 
 use std::collections::HashMap;
-use std::num::NonZeroU64;
 use std::sync::Arc;
 
 use api::v1::value::ValueData;
diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager.rs b/src/puffin/src/puffin_manager/fs_puffin_manager.rs
index 52190f92fb28..c03a86aaf672 100644
--- a/src/puffin/src/puffin_manager/fs_puffin_manager.rs
+++ b/src/puffin/src/puffin_manager/fs_puffin_manager.rs
@@ -27,6 +27,7 @@ use crate::puffin_manager::stager::Stager;
 use crate::puffin_manager::PuffinManager;
 
 /// `FsPuffinManager` is a `PuffinManager` that provides readers and writers for puffin data in filesystem.
+#[derive(Clone)]
 pub struct FsPuffinManager<S, F> {
     /// The stager.
     stager: S,

From c344c6aaa13bd32174bed100531a2506e8d85975 Mon Sep 17 00:00:00 2001
From: "Lei, HUANG" <mrsatangel@gmail.com>
Date: Mon, 6 Jan 2025 11:43:46 +0800
Subject: [PATCH 2/4]  - **Removed Output Size Enforcement in `twcs.rs`:**    -
 Deleted the `enforce_max_output_size` function and related logic to simplify
 compaction input handling.

 - **Added Max File Size Option in `parquet.rs`:**
   - Introduced `max_file_size` in `WriteOptions` to control the maximum size of output files.

 - **Refactored Indexer Management in `parquet/writer.rs`:**
   - Changed `current_indexer` from an `Option` to a direct `Indexer` type.
   - Implemented `roll_to_next_file` to handle file transitions when exceeding `max_file_size`.
   - Simplified indexer initialization and management logic.
---
 src/mito2/src/compaction/twcs.rs    | 95 +----------------------------
 src/mito2/src/sst/parquet.rs        |  3 +
 src/mito2/src/sst/parquet/writer.rs | 39 ++++++------
 3 files changed, 24 insertions(+), 113 deletions(-)

diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs
index a4e8913eeffb..2cf2f730e2cf 100644
--- a/src/mito2/src/compaction/twcs.rs
+++ b/src/mito2/src/compaction/twcs.rs
@@ -118,20 +118,7 @@ impl TwcsPicker {
                 continue;
             };
 
-            let split_inputs = if !filter_deleted
-                && let Some(max_output_file_size) = self.max_output_file_size
-            {
-                let len_before_split = inputs.len();
-                let maybe_split = enforce_max_output_size(inputs, max_output_file_size);
-                if maybe_split.len() != len_before_split {
-                    info!("Compaction output file size exceeds threshold {}, split compaction inputs to: {:?}", max_output_file_size, maybe_split);
-                }
-                maybe_split
-            } else {
-                inputs
-            };
-
-            for input in split_inputs {
+            for input in inputs {
                 debug_assert!(input.len() > 1);
                 output.push(CompactionOutput {
                     output_level: LEVEL_COMPACTED, // always compact to l1
@@ -145,43 +132,6 @@ impl TwcsPicker {
     }
 }
 
-/// Limits the size of compaction output in a naive manner.
-/// todo(hl): we can find the output file size more precisely by checking the time range
-/// of each row group and adding the sizes of those non-overlapping row groups. But now
-/// we'd better not to expose the SST details in this level.
-fn enforce_max_output_size(
-    inputs: Vec<Vec<FileHandle>>,
-    max_output_file_size: u64,
-) -> Vec<Vec<FileHandle>> {
-    inputs
-        .into_iter()
-        .flat_map(|input| {
-            debug_assert!(input.len() > 1);
-            let estimated_output_size = input.iter().map(|f| f.size()).sum::<u64>();
-            if estimated_output_size < max_output_file_size {
-                // total file size does not exceed the threshold, just return the original input.
-                return vec![input];
-            }
-            let mut splits = vec![];
-            let mut new_input = vec![];
-            let mut new_input_size = 0;
-            for f in input {
-                if new_input_size + f.size() > max_output_file_size {
-                    splits.push(std::mem::take(&mut new_input));
-                    new_input_size = 0;
-                }
-                new_input_size += f.size();
-                new_input.push(f);
-            }
-            if !new_input.is_empty() {
-                splits.push(new_input);
-            }
-            splits
-        })
-        .filter(|p| p.len() > 1)
-        .collect()
-}
-
 /// Merges consecutive files so that file num does not exceed `max_file_num`, and chooses
 /// the solution with minimum overhead according to files sizes to be merged.
 /// `enforce_file_num` only merges consecutive files so that it won't create overlapping outputs.
@@ -368,12 +318,10 @@ fn find_latest_window_in_seconds<'a>(
 #[cfg(test)]
 mod tests {
     use std::collections::HashSet;
-    use std::sync::Arc;
 
     use super::*;
     use crate::compaction::test_util::{new_file_handle, new_file_handles};
-    use crate::sst::file::{FileId, FileMeta, Level};
-    use crate::test_util::NoopFilePurger;
+    use crate::sst::file::{FileId, Level};
 
     #[test]
     fn test_get_latest_window_in_seconds() {
@@ -741,44 +689,5 @@ mod tests {
         .check();
     }
 
-    fn make_file_handles(inputs: &[(i64, i64, u64)]) -> Vec<FileHandle> {
-        inputs
-            .iter()
-            .map(|(start, end, size)| {
-                FileHandle::new(
-                    FileMeta {
-                        region_id: Default::default(),
-                        file_id: Default::default(),
-                        time_range: (
-                            Timestamp::new_millisecond(*start),
-                            Timestamp::new_millisecond(*end),
-                        ),
-                        level: 0,
-                        file_size: *size,
-                        available_indexes: Default::default(),
-                        index_file_size: 0,
-                        num_rows: 0,
-                        num_row_groups: 0,
-                        sequence: None,
-                    },
-                    Arc::new(NoopFilePurger),
-                )
-            })
-            .collect()
-    }
-
-    #[test]
-    fn test_limit_output_size() {
-        let mut files = make_file_handles(&[(1, 1, 1)].repeat(6));
-        let runs = find_sorted_runs(&mut files);
-        assert_eq!(6, runs.len());
-        let files_to_merge = reduce_runs(runs, 2);
-
-        let enforced = enforce_max_output_size(files_to_merge, 2);
-        assert_eq!(2, enforced.len());
-        assert_eq!(2, enforced[0].len());
-        assert_eq!(2, enforced[1].len());
-    }
-
     // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
 }
diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs
index 12d16b7cda3e..85b1a789dba1 100644
--- a/src/mito2/src/sst/parquet.rs
+++ b/src/mito2/src/sst/parquet.rs
@@ -49,6 +49,8 @@ pub struct WriteOptions {
     pub write_buffer_size: ReadableSize,
     /// Row group size.
     pub row_group_size: usize,
+    /// Max single output file size.
+    pub max_file_size: usize,
 }
 
 impl Default for WriteOptions {
@@ -56,6 +58,7 @@ impl Default for WriteOptions {
         WriteOptions {
             write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
             row_group_size: DEFAULT_ROW_GROUP_SIZE,
+            max_file_size: usize::MAX,
         }
     }
 }
diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs
index 0e4eabc96ed1..03aeee807dd4 100644
--- a/src/mito2/src/sst/parquet/writer.rs
+++ b/src/mito2/src/sst/parquet/writer.rs
@@ -59,7 +59,7 @@ pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvide
     /// Indexer build that can create indexer for multiple files.
     indexer_builder: I,
     /// Current active indexer.
-    current_indexer: Option<Indexer>,
+    current_indexer: Indexer,
     bytes_written: Arc<AtomicUsize>,
 }
 
@@ -131,26 +131,22 @@ where
             writer_factory: factory,
             metadata,
             indexer_builder,
-            current_indexer: Some(indexer),
+            current_indexer: indexer,
             bytes_written: Arc::new(AtomicUsize::new(0)),
         }
     }
 
-    async fn get_or_create_indexer(&mut self) -> &mut Indexer {
-        match self.current_indexer {
-            None => {
-                self.current_file = FileId::random();
-                let index_file_path = self.path_provider.build_index_file_path(self.current_file);
-                let indexer = self
-                    .indexer_builder
-                    .build(self.current_file, index_file_path)
-                    .await;
-                self.current_indexer = Some(indexer);
-                // safety: self.current_indexer already set above.
-                self.current_indexer.as_mut().unwrap()
-            }
-            Some(ref mut indexer) => indexer,
-        }
+    async fn roll_to_next_file(&mut self) {
+        self.current_file = FileId::random();
+        let index_file_path = self.path_provider.build_index_file_path(self.current_file);
+        let indexer = self
+            .indexer_builder
+            .build(self.current_file, index_file_path)
+            .await;
+        self.current_indexer = indexer;
+
+        // maybe_init_writer will re-create a new file.
+        self.writer = None;
     }
 
     /// Iterates source and writes all rows to Parquet file.
@@ -174,16 +170,19 @@ where
             match res {
                 Ok(batch) => {
                     stats.update(&batch);
-                    self.get_or_create_indexer().await.update(&batch).await;
+                    self.current_indexer.update(&batch).await;
+                    if self.bytes_written.load(Ordering::Relaxed) > opts.max_file_size {
+                        self.roll_to_next_file().await;
+                    }
                 }
                 Err(e) => {
-                    self.get_or_create_indexer().await.abort().await;
+                    self.current_indexer.abort().await;
                     return Err(e);
                 }
             }
         }
 
-        let index_output = self.get_or_create_indexer().await.finish().await;
+        let index_output = self.current_indexer.finish().await;
 
         if stats.num_rows == 0 {
             return Ok(smallvec![]);

From 2c5ace52ddcfc42b1b62a3cf308ce246da900142 Mon Sep 17 00:00:00 2001
From: "Lei, HUANG" <mrsatangel@gmail.com>
Date: Mon, 6 Jan 2025 14:34:12 +0800
Subject: [PATCH 3/4]  **Refactor Parquet Writer Initialization and File
 Handling**  - Updated `ParquetWriter` in `writer.rs` to handle
 `current_indexer` as an `Option`, allowing for more flexible initialization
 and management.  - Introduced `finish_current_file` method to encapsulate
 logic for completing and transitioning between SST files, improving code
 clarity and maintainability.  - Enhanced error handling and logging with
 `debug` statements for better traceability during file operations.

---
 src/mito2/src/sst/parquet/writer.rs | 114 ++++++++++++++++------------
 1 file changed, 65 insertions(+), 49 deletions(-)

diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs
index 03aeee807dd4..55189bc9e7a0 100644
--- a/src/mito2/src/sst/parquet/writer.rs
+++ b/src/mito2/src/sst/parquet/writer.rs
@@ -15,11 +15,13 @@
 //! Parquet writer.
 
 use std::future::Future;
+use std::mem;
 use std::pin::Pin;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
+use common_telemetry::debug;
 use common_time::Timestamp;
 use datatypes::arrow::datatypes::SchemaRef;
 use object_store::{FuturesAsyncWriter, ObjectStore};
@@ -59,7 +61,7 @@ pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvide
     /// Indexer build that can create indexer for multiple files.
     indexer_builder: I,
     /// Current active indexer.
-    current_indexer: Indexer,
+    current_indexer: Option<Indexer>,
     bytes_written: Arc<AtomicUsize>,
 }
 
@@ -121,8 +123,6 @@ where
         path_provider: P,
     ) -> ParquetWriter<F, I, P> {
         let init_file = FileId::random();
-        let index_file_path = path_provider.build_index_file_path(init_file);
-        let indexer = indexer_builder.build(init_file, index_file_path).await;
 
         ParquetWriter {
             path_provider,
@@ -131,22 +131,50 @@ where
             writer_factory: factory,
             metadata,
             indexer_builder,
-            current_indexer: indexer,
+            current_indexer: None,
             bytes_written: Arc::new(AtomicUsize::new(0)),
         }
     }
 
-    async fn roll_to_next_file(&mut self) {
-        self.current_file = FileId::random();
-        let index_file_path = self.path_provider.build_index_file_path(self.current_file);
-        let indexer = self
-            .indexer_builder
-            .build(self.current_file, index_file_path)
-            .await;
-        self.current_indexer = indexer;
-
+    /// Finishes current SST file and index file.
+    async fn finish_current_file(
+        &mut self,
+        ssts: &mut SstInfoArray,
+        stats: &mut SourceStats,
+    ) -> Result<()> {
         // maybe_init_writer will re-create a new file.
-        self.writer = None;
+        if let Some(mut current_writer) = mem::take(&mut self.writer) {
+            let stats = mem::take(stats);
+            // At least one row has been written.
+            assert!(stats.num_rows > 0);
+
+            // Finish indexer and writer.
+            // safety: writer and index can only be both present or not.
+            let index_output = self.current_indexer.as_mut().unwrap().finish().await;
+            current_writer.flush().await.context(WriteParquetSnafu)?;
+
+            let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
+            let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
+
+            // Safety: num rows > 0 so we must have min/max.
+            let time_range = stats.time_range.unwrap();
+
+            // convert FileMetaData to ParquetMetaData
+            let parquet_metadata = parse_parquet_metadata(file_meta)?;
+            ssts.push(SstInfo {
+                file_id: self.current_file,
+                time_range,
+                file_size,
+                num_rows: stats.num_rows,
+                num_row_groups: parquet_metadata.num_row_groups() as u64,
+                file_metadata: Some(Arc::new(parquet_metadata)),
+                index_metadata: index_output,
+            });
+            self.current_file = FileId::random();
+            self.bytes_written.store(0, Ordering::Relaxed)
+        };
+
+        Ok(())
     }
 
     /// Iterates source and writes all rows to Parquet file.
@@ -158,6 +186,7 @@ where
         override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
         opts: &WriteOptions,
     ) -> Result<SstInfoArray> {
+        let mut results = smallvec![];
         let write_format =
             WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
         let mut stats = SourceStats::default();
@@ -170,52 +199,31 @@ where
             match res {
                 Ok(batch) => {
                     stats.update(&batch);
-                    self.current_indexer.update(&batch).await;
+                    // safety: self.current_indexer must be set when first batch has been written.
+                    self.current_indexer.as_mut().unwrap().update(&batch).await;
                     if self.bytes_written.load(Ordering::Relaxed) > opts.max_file_size {
-                        self.roll_to_next_file().await;
+                        debug!(
+                            "Finishing current file {}, file size: {}, max file size: {}",
+                            self.current_file,
+                            self.bytes_written.load(Ordering::Relaxed),
+                            opts.max_file_size
+                        );
+                        self.finish_current_file(&mut results, &mut stats).await?;
                     }
                 }
                 Err(e) => {
-                    self.current_indexer.abort().await;
+                    if let Some(indexer) = &mut self.current_indexer {
+                        indexer.abort().await;
+                    }
                     return Err(e);
                 }
             }
         }
 
-        let index_output = self.current_indexer.finish().await;
-
-        if stats.num_rows == 0 {
-            return Ok(smallvec![]);
-        }
-
-        let Some(mut arrow_writer) = self.writer.take() else {
-            // No batch actually written.
-            return Ok(smallvec![]);
-        };
-
-        arrow_writer.flush().await.context(WriteParquetSnafu)?;
-
-        let file_meta = arrow_writer.close().await.context(WriteParquetSnafu)?;
-        let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
-
-        // Safety: num rows > 0 so we must have min/max.
-        let time_range = stats.time_range.unwrap();
-
-        // convert FileMetaData to ParquetMetaData
-        let parquet_metadata = parse_parquet_metadata(file_meta)?;
-
-        let file_id = self.current_file;
+        self.finish_current_file(&mut results, &mut stats).await?;
 
         // object_store.write will make sure all bytes are written or an error is raised.
-        Ok(smallvec![SstInfo {
-            file_id,
-            time_range,
-            file_size,
-            num_rows: stats.num_rows,
-            num_row_groups: parquet_metadata.num_row_groups() as u64,
-            file_metadata: Some(Arc::new(parquet_metadata)),
-            index_metadata: index_output,
-        }])
+        Ok(results)
     }
 
     /// Customizes per-column config according to schema and maybe column cardinality.
@@ -286,6 +294,14 @@ where
                 AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
                     .context(WriteParquetSnafu)?;
             self.writer = Some(arrow_writer);
+
+            let index_file_path = self.path_provider.build_index_file_path(self.current_file);
+            let indexer = self
+                .indexer_builder
+                .build(self.current_file, index_file_path)
+                .await;
+            self.current_indexer = Some(indexer);
+
             // safety: self.writer is assigned above
             Ok(self.writer.as_mut().unwrap())
         }

From d410f7fcbad8907cb4c5d36b01dfcc5faa5b1f15 Mon Sep 17 00:00:00 2001
From: "Lei, HUANG" <mrsatangel@gmail.com>
Date: Mon, 6 Jan 2025 19:52:52 +0800
Subject: [PATCH 4/4]  - **Refactor `RegionFilePathFactory` to
 `RegionFilePathProvider`:** Updated references and implementations in
 `access_layer.rs`, `write_cache.rs`, and related test files to use the new
 struct name.  - **Add `max_file_size` support in compaction:** Introduced
 `max_file_size` option in `PickerOutput`, `SerializedPickerOutput`, and
 `WriteOptions` in `compactor.rs`, `picker.rs`, `twcs.rs`, and `window.rs`.  -
 **Enhance Parquet writing logic:** Modified `parquet.rs` and
 `parquet/writer.rs` to support optional `max_file_size` and added a test case
 `test_write_multiple_files` to verify writing multiple files based on size
 constraints.

---
 src/mito2/src/access_layer.rs         |  8 ++--
 src/mito2/src/cache/write_cache.rs    |  8 ++--
 src/mito2/src/compaction/compactor.rs |  1 +
 src/mito2/src/compaction/picker.rs    |  6 +++
 src/mito2/src/compaction/twcs.rs      |  2 +
 src/mito2/src/compaction/window.rs    |  1 +
 src/mito2/src/sst/parquet.rs          | 66 +++++++++++++++++++++++++--
 src/mito2/src/sst/parquet/writer.rs   | 17 ++++---
 8 files changed, 90 insertions(+), 19 deletions(-)

diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs
index 51dd7a962a7e..f82c15858b4b 100644
--- a/src/mito2/src/access_layer.rs
+++ b/src/mito2/src/access_layer.rs
@@ -135,7 +135,7 @@ impl AccessLayer {
                 .write_and_upload_sst(
                     request,
                     SstUploadRequest {
-                        dest_path_provider: RegionFilePathFactory {
+                        dest_path_provider: RegionFilePathProvider {
                             region_dir: self.region_dir.clone(),
                         },
                         remote_store: self.object_store.clone(),
@@ -161,7 +161,7 @@ impl AccessLayer {
                 self.object_store.clone(),
                 request.metadata,
                 indexer_builder,
-                RegionFilePathFactory {
+                RegionFilePathProvider {
                     region_dir: self.region_dir.clone(),
                 },
             )
@@ -266,11 +266,11 @@ impl FilePathProvider for WriteCachePathProvider {
 
 /// Path provider that builds paths in region storage path.
 #[derive(Clone, Debug)]
-pub(crate) struct RegionFilePathFactory {
+pub(crate) struct RegionFilePathProvider {
     pub(crate) region_dir: String,
 }
 
-impl FilePathProvider for RegionFilePathFactory {
+impl FilePathProvider for RegionFilePathProvider {
     fn build_index_file_path(&self, file_id: FileId) -> String {
         location::index_file_path(&self.region_dir, file_id)
     }
diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs
index 0ae00b3c6cf2..a0f068399ff7 100644
--- a/src/mito2/src/cache/write_cache.rs
+++ b/src/mito2/src/cache/write_cache.rs
@@ -24,7 +24,7 @@ use object_store::ObjectStore;
 use snafu::ResultExt;
 
 use crate::access_layer::{
-    new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
+    new_fs_cache_store, FilePathProvider, RegionFilePathProvider, SstInfoArray, SstWriteRequest,
     WriteCachePathProvider,
 };
 use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
@@ -328,7 +328,7 @@ impl WriteCache {
 /// Request to write and upload a SST.
 pub struct SstUploadRequest {
     /// Destination path provider of which SST files in write cache should be uploaded to.
-    pub dest_path_provider: RegionFilePathFactory,
+    pub dest_path_provider: RegionFilePathProvider,
     /// Remote object store to upload.
     pub remote_store: ObjectStore,
 }
@@ -355,7 +355,7 @@ mod tests {
         // and now just use local file system to mock.
         let mut env = TestEnv::new();
         let mock_store = env.init_object_store_manager();
-        let path_provider = RegionFilePathFactory {
+        let path_provider = RegionFilePathProvider {
             region_dir: "test".to_string(),
         };
 
@@ -488,7 +488,7 @@ mod tests {
             ..Default::default()
         };
         let upload_request = SstUploadRequest {
-            dest_path_provider: RegionFilePathFactory {
+            dest_path_provider: RegionFilePathProvider {
                 region_dir: data_home.clone(),
             },
             remote_store: mock_store.clone(),
diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs
index affbda0f003e..a3929fb9c227 100644
--- a/src/mito2/src/compaction/compactor.rs
+++ b/src/mito2/src/compaction/compactor.rs
@@ -283,6 +283,7 @@ impl Compactor for DefaultCompactor {
 
             let write_opts = WriteOptions {
                 write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
+                max_file_size: picker_output.max_file_size,
                 ..Default::default()
             };
 
diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs
index 431973c3b662..0ceafefcd861 100644
--- a/src/mito2/src/compaction/picker.rs
+++ b/src/mito2/src/compaction/picker.rs
@@ -45,6 +45,8 @@ pub struct PickerOutput {
     pub outputs: Vec<CompactionOutput>,
     pub expired_ssts: Vec<FileHandle>,
     pub time_window_size: i64,
+    /// Max single output file size in bytes.
+    pub max_file_size: Option<usize>,
 }
 
 /// SerializedPickerOutput is a serialized version of PickerOutput by replacing [CompactionOutput] and [FileHandle] with [SerializedCompactionOutput] and [FileMeta].
@@ -53,6 +55,7 @@ pub struct SerializedPickerOutput {
     pub outputs: Vec<SerializedCompactionOutput>,
     pub expired_ssts: Vec<FileMeta>,
     pub time_window_size: i64,
+    pub max_file_size: Option<usize>,
 }
 
 impl From<&PickerOutput> for SerializedPickerOutput {
@@ -76,6 +79,7 @@ impl From<&PickerOutput> for SerializedPickerOutput {
             outputs,
             expired_ssts,
             time_window_size: input.time_window_size,
+            max_file_size: input.max_file_size,
         }
     }
 }
@@ -111,6 +115,7 @@ impl PickerOutput {
             outputs,
             expired_ssts,
             time_window_size: input.time_window_size,
+            max_file_size: input.max_file_size,
         }
     }
 }
@@ -179,6 +184,7 @@ mod tests {
             ],
             expired_ssts: expired_ssts_file_handle.clone(),
             time_window_size: 1000,
+            max_file_size: None,
         };
 
         let picker_output_str =
diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs
index 2cf2f730e2cf..dddfb7934905 100644
--- a/src/mito2/src/compaction/twcs.rs
+++ b/src/mito2/src/compaction/twcs.rs
@@ -196,10 +196,12 @@ impl Picker for TwcsPicker {
             return None;
         }
 
+        let max_file_size = self.max_output_file_size.map(|v| v as usize);
         Some(PickerOutput {
             outputs,
             expired_ssts,
             time_window_size,
+            max_file_size,
         })
     }
 }
diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs
index f7ad4af893ee..06212cb6d513 100644
--- a/src/mito2/src/compaction/window.rs
+++ b/src/mito2/src/compaction/window.rs
@@ -115,6 +115,7 @@ impl Picker for WindowedCompactionPicker {
             outputs,
             expired_ssts,
             time_window_size: time_window,
+            max_file_size: None, // todo (hl): we may need to support `max_file_size` parameter in manual compaction.
         })
     }
 }
diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs
index 85b1a789dba1..2df6ee70f863 100644
--- a/src/mito2/src/sst/parquet.rs
+++ b/src/mito2/src/sst/parquet.rs
@@ -50,7 +50,9 @@ pub struct WriteOptions {
     /// Row group size.
     pub row_group_size: usize,
     /// Max single output file size.
-    pub max_file_size: usize,
+    /// Note: This is not a hard limit as we can only observe the file size when
+    /// ArrowWrite writes to underlying writers.
+    pub max_file_size: Option<usize>,
 }
 
 impl Default for WriteOptions {
@@ -58,7 +60,7 @@ impl Default for WriteOptions {
         WriteOptions {
             write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
             row_group_size: DEFAULT_ROW_GROUP_SIZE,
-            max_file_size: usize::MAX,
+            max_file_size: None,
         }
     }
 }
@@ -100,8 +102,9 @@ mod tests {
     use tokio_util::compat::FuturesAsyncWriteCompatExt;
 
     use super::*;
-    use crate::access_layer::FilePathProvider;
+    use crate::access_layer::{FilePathProvider, RegionFilePathProvider};
     use crate::cache::{CacheManager, CacheStrategy, PageKey};
+    use crate::read::BatchReader;
     use crate::sst::index::{Indexer, IndexerBuilder};
     use crate::sst::parquet::format::WriteFormat;
     use crate::sst::parquet::reader::ParquetReaderBuilder;
@@ -109,7 +112,8 @@ mod tests {
     use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
     use crate::test_util::sst_util::{
         assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
-        new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
+        new_batch_with_binary, new_source, sst_file_handle, sst_file_handle_with_file_id,
+        sst_region_metadata,
     };
     use crate::test_util::{check_reader_result, TestEnv};
 
@@ -539,4 +543,58 @@ mod tests {
         )
         .await;
     }
+
+    #[tokio::test]
+    async fn test_write_multiple_files() {
+        common_telemetry::init_default_ut_logging();
+        // create test env
+        let mut env = TestEnv::new();
+        let object_store = env.init_object_store_manager();
+        let metadata = Arc::new(sst_region_metadata());
+        let batches = &[
+            new_batch_by_range(&["a", "d"], 0, 1000),
+            new_batch_by_range(&["b", "f"], 0, 1000),
+            new_batch_by_range(&["b", "h"], 100, 200),
+            new_batch_by_range(&["b", "h"], 200, 300),
+            new_batch_by_range(&["b", "h"], 300, 1000),
+        ];
+        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
+
+        let source = new_source(batches);
+        let write_opts = WriteOptions {
+            row_group_size: 50,
+            max_file_size: Some(1024 * 16),
+            ..Default::default()
+        };
+
+        let path_provider = RegionFilePathProvider {
+            region_dir: "test".to_string(),
+        };
+        let mut writer = ParquetWriter::new_with_object_store(
+            object_store.clone(),
+            metadata.clone(),
+            NoopIndexBuilder,
+            path_provider,
+        )
+        .await;
+
+        let files = writer.write_all(source, None, &write_opts).await.unwrap();
+        assert_eq!(2, files.len());
+
+        let mut rows_read = 0;
+        for f in &files {
+            let file_handle = sst_file_handle_with_file_id(
+                f.file_id,
+                f.time_range.0.value(),
+                f.time_range.1.value(),
+            );
+            let builder =
+                ParquetReaderBuilder::new("test".to_string(), file_handle, object_store.clone());
+            let mut reader = builder.build().await.unwrap();
+            while let Some(batch) = reader.next_batch().await.unwrap() {
+                rows_read += batch.num_rows();
+            }
+        }
+        assert_eq!(total_rows, rows_read);
+    }
 }
diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs
index 55189bc9e7a0..52764e21c85e 100644
--- a/src/mito2/src/sst/parquet/writer.rs
+++ b/src/mito2/src/sst/parquet/writer.rs
@@ -148,6 +148,13 @@ where
             // At least one row has been written.
             assert!(stats.num_rows > 0);
 
+            debug!(
+                "Finishing current file {}, file size: {}, num rows: {}",
+                self.current_file,
+                self.bytes_written.load(Ordering::Relaxed),
+                stats.num_rows
+            );
+
             // Finish indexer and writer.
             // safety: writer and index can only be both present or not.
             let index_output = self.current_indexer.as_mut().unwrap().finish().await;
@@ -201,13 +208,9 @@ where
                     stats.update(&batch);
                     // safety: self.current_indexer must be set when first batch has been written.
                     self.current_indexer.as_mut().unwrap().update(&batch).await;
-                    if self.bytes_written.load(Ordering::Relaxed) > opts.max_file_size {
-                        debug!(
-                            "Finishing current file {}, file size: {}, max file size: {}",
-                            self.current_file,
-                            self.bytes_written.load(Ordering::Relaxed),
-                            opts.max_file_size
-                        );
+                    if let Some(max_file_size) = opts.max_file_size
+                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
+                    {
                         self.finish_current_file(&mut results, &mut stats).await?;
                     }
                 }