Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(mito): Allow creating multiple files in ParquetWriter #5291

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
- **Refactored SST File Handling**:
   - Introduced `FilePathProvider` trait and its implementations (`WriteCachePathProvider`, `RegionFilePathFactory`) to manage SST and index file paths.
   - Updated `AccessLayer`, `WriteCache`, and `ParquetWriter` to use `FilePathProvider` for path management.
   - Modified `SstWriteRequest` and `SstUploadRequest` to use path providers instead of direct paths.
   - Files affected: `access_layer.rs`, `write_cache.rs`, `parquet.rs`, `writer.rs`.

 - **Enhanced Indexer Management**:
   - Replaced `IndexerBuilder` with `IndexerBuilderImpl` and made it async to support dynamic indexer creation.
   - Updated `ParquetWriter` to handle multiple indexers and file IDs.
   - Files affected: `index.rs`, `parquet.rs`, `writer.rs`.

 - **Removed Redundant File ID Handling**:
   - Removed `file_id` from `SstWriteRequest` and `CompactionOutput`.
   - Updated related logic to dynamically generate file IDs where necessary.
   - Files affected: `compaction.rs`, `flush.rs`, `picker.rs`, `twcs.rs`, `window.rs`.

 - **Test Adjustments**:
   - Updated tests to align with new path and indexer management.
   - Introduced `FixedPathProvider` and `NoopIndexBuilder` for testing purposes.
   - Files affected: `sst_util.rs`, `version_util.rs`, `parquet.rs`.
v0y4g3r committed Jan 5, 2025
commit 958258e8fbdf3510dfebc60ed5806568002997e7
94 changes: 72 additions & 22 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
@@ -17,10 +17,12 @@ use std::sync::Arc;
use object_store::services::Fs;
use object_store::util::{join_dir, with_instrument_layers};
use object_store::ObjectStore;
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;
use store_api::storage::{RegionId, SequenceNumber};

use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
@@ -30,13 +32,15 @@ use crate::region::options::IndexOptions;
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::IndexerBuilder;
use crate::sst::index::IndexerBuilderImpl;
use crate::sst::location;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};

pub type AccessLayerRef = Arc<AccessLayer>;
/// SST write results.
pub type SstInfoArray = SmallVec<[SstInfo; 2]>;

/// A layer to access SST files under the same directory.
pub struct AccessLayer {
@@ -121,11 +125,8 @@ impl AccessLayer {
&self,
request: SstWriteRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
let file_path = location::sst_file_path(&self.region_dir, request.file_id);
let index_file_path = location::index_file_path(&self.region_dir, request.file_id);
) -> Result<SstInfoArray> {
let region_id = request.metadata.region_id;
let file_id = request.file_id;
let cache_manager = request.cache_manager.clone();

let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
@@ -134,8 +135,9 @@ impl AccessLayer {
.write_and_upload_sst(
request,
SstUploadRequest {
upload_path: file_path,
index_upload_path: index_file_path,
dest_path_provider: RegionFilePathFactory {
region_dir: self.region_dir.clone(),
},
remote_store: self.object_store.clone(),
},
write_opts,
@@ -144,36 +146,41 @@ impl AccessLayer {
} else {
// Write cache is disabled.
let store = self.object_store.clone();
let indexer = IndexerBuilder {
let indexer_builder = IndexerBuilderImpl {
op_type: request.op_type,
file_id,
file_path: index_file_path,
metadata: &request.metadata,
metadata: request.metadata.clone(),
row_group_size: write_opts.row_group_size,
puffin_manager: self.puffin_manager_factory.build(store),
intermediate_manager: self.intermediate_manager.clone(),
index_options: request.index_options,
inverted_index_config: request.inverted_index_config,
fulltext_index_config: request.fulltext_index_config,
bloom_filter_index_config: request.bloom_filter_index_config,
}
.build()
.await;
};
let mut writer = ParquetWriter::new_with_object_store(
self.object_store.clone(),
file_path,
request.metadata,
indexer,
);
indexer_builder,
RegionFilePathFactory {
region_dir: self.region_dir.clone(),
},
)
.await;
writer
.write_all(request.source, request.max_sequence, write_opts)
.await?
};

// Put parquet metadata to cache manager.
if let Some(sst_info) = &sst_info {
if let Some(parquet_metadata) = &sst_info.file_metadata {
cache_manager.put_parquet_meta_data(region_id, file_id, parquet_metadata.clone())
if !sst_info.is_empty() {
for sst in &sst_info {
if let Some(parquet_metadata) = &sst.file_metadata {
cache_manager.put_parquet_meta_data(
region_id,
sst.file_id,
parquet_metadata.clone(),
)
}
}
}

@@ -191,7 +198,6 @@ pub(crate) enum OperationType {
/// Contents to build a SST.
pub(crate) struct SstWriteRequest {
pub(crate) op_type: OperationType,
pub(crate) file_id: FileId,
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
@@ -229,3 +235,47 @@ async fn clean_dir(dir: &str) -> Result<()> {

Ok(())
}

/// Path provider for SST file and index file.
pub trait FilePathProvider: Send + Sync {
/// Creates index file path of given file id.
fn build_index_file_path(&self, file_id: FileId) -> String;

/// Creates SST file path of given file id.
fn build_sst_file_path(&self, file_id: FileId) -> String;
}

/// Path provider that builds paths in local write cache.
#[derive(Clone)]
pub(crate) struct WriteCachePathProvider {
pub(crate) region_id: RegionId,
pub(crate) file_cache: FileCacheRef,
}

impl FilePathProvider for WriteCachePathProvider {
fn build_index_file_path(&self, file_id: FileId) -> String {
let puffin_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
self.file_cache.cache_file_path(puffin_key)
}

fn build_sst_file_path(&self, file_id: FileId) -> String {
let parquet_file_key = IndexKey::new(self.region_id, file_id, FileType::Parquet);
self.file_cache.cache_file_path(parquet_file_key)
}
}

/// Path provider that builds paths in region storage path.
#[derive(Clone, Debug)]
pub(crate) struct RegionFilePathFactory {
pub(crate) region_dir: String,
}

impl FilePathProvider for RegionFilePathFactory {
fn build_index_file_path(&self, file_id: FileId) -> String {
location::index_file_path(&self.region_dir, file_id)
}

fn build_sst_file_path(&self, file_id: FileId) -> String {
location::sst_file_path(&self.region_dir, file_id)
}
}
Loading