From 0f2ca5074fd3427f323912d5bc88d2e9aa3c3bf6 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 22 Jan 2025 13:36:39 +0800 Subject: [PATCH] fix: avoid suppress manual compaction (#5399) * fix/avoid-suppress-manual-compaction: **Refactor Compaction Logic** - Removed `PendingCompaction` struct and integrated its functionality directly into `CompactionStatus` in `compaction.rs`. - Simplified waiter management by consolidating waiter handling logic into `CompactionStatus`. - Updated `CompactionRequest` creation to directly handle waiters without intermediate structures. - Adjusted test cases in `compaction.rs` to align with the new waiter management approach. (cherry picked from commit 87e2d1c2cc9bd82c02991d22e429bef25c5ee348) * fix/avoid-suppress-manual-compaction: ### Add Support for Manual Compaction Requests - **Compaction Logic Enhancements**: - Updated `CompactionScheduler` in `compaction.rs` to handle manual compaction requests using `Options::StrictWindow`. - Introduced `PendingCompaction` struct to manage pending manual compaction requests. - Added logic to reschedule manual compaction requests once the current compaction task is completed. - **Testing**: - Added `test_manual_compaction_when_compaction_in_progress` to verify the handling of manual compaction requests during ongoing compaction processes. These changes enhance the compaction scheduling mechanism by allowing manual compaction requests to be queued and processed efficiently. (cherry picked from commit bc38ed0f2f8ba2c4690e0d0e251aeb2acce308ca) * chore: fix conflicts * fix/avoid-suppress-manual-compaction: ### Add Error Handling for Manual Compaction Override - **`compaction.rs`**: Enhanced the `set_pending_request` method to handle manual compaction overrides by sending an error to the waiter if a previous request exists. - **`error.rs`**: Introduced a new error variant `ManualCompactionOverride` to represent manual compaction being overridden, and mapped it to the `Cancelled` status code. * fix: format * fix/avoid-suppress-manual-compaction: **Add Error Handling for Pending Compaction Requests** - Enhanced error handling in `compaction.rs` by adding logic to handle errors for pending compaction requests. - Introduced a mechanism to send errors using `waiter.send` when a pending compaction request fails, ensuring proper error propagation and context with `CompactRegionSnafu`. * fix/avoid-suppress-manual-compaction: **Fix Typo and Simplify Code Logic in `compaction.rs`** - Corrected a typo in the license comment from "langucage" to "language". - Simplified the logic for handling `pending_compaction` in `CompactionStatus` by removing unnecessary pattern matching and directly accessing `waiter`. * fix: typo --- src/mito2/src/compaction.rs | 296 ++++++++++++++++++++++++++++-------- src/mito2/src/error.rs | 5 + 2 files changed, 234 insertions(+), 67 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 6f9e5c0261ff..1ea42a91b8ad 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -27,6 +27,7 @@ use std::sync::Arc; use std::time::Instant; use api::v1::region::compact_request; +use api::v1::region::compact_request::Options; use common_base::Plugins; use common_meta::key::SchemaMetadataManagerRef; use common_telemetry::{debug, error, info, warn}; @@ -50,9 +51,9 @@ use crate::compaction::picker::{new_picker, CompactionTask}; use crate::compaction::task::CompactionTaskImpl; use crate::config::MitoConfig; use crate::error::{ - CompactRegionSnafu, Error, GetSchemaMetadataSnafu, RegionClosedSnafu, RegionDroppedSnafu, - RegionTruncatedSnafu, RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, - TimeoutSnafu, + CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu, + RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result, + TimeRangePredicateOverflowSnafu, TimeoutSnafu, }; use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT}; use crate::read::projection::ProjectionMapper; @@ -93,13 +94,6 @@ impl CompactionRequest { pub(crate) fn region_id(&self) -> RegionId { self.current_version.metadata.region_id } - - /// Push waiter to the request. - pub(crate) fn push_waiter(&mut self, mut waiter: OptionOutputTx) { - if let Some(waiter) = waiter.take_inner() { - self.waiters.push(waiter); - } - } } /// Compaction scheduler tracks and manages compaction tasks. @@ -150,8 +144,24 @@ impl CompactionScheduler { max_parallelism: usize, ) -> Result<()> { if let Some(status) = self.region_status.get_mut(®ion_id) { - // Region is compacting. Add the waiter to pending list. - status.merge_waiter(waiter); + match compact_options { + Options::Regular(_) => { + // Region is compacting. Add the waiter to pending list. + status.merge_waiter(waiter); + } + options @ Options::StrictWindow(_) => { + // Incoming compaction request is manually triggered. + status.set_pending_request(PendingCompaction { + options, + waiter, + max_parallelism, + }); + info!( + "Region {} is compacting, manually compaction will be re-scheduled.", + region_id + ); + } + } return Ok(()); } @@ -188,6 +198,35 @@ impl CompactionScheduler { return; }; + if let Some(pending_request) = std::mem::take(&mut status.pending_request) { + let PendingCompaction { + options, + waiter, + max_parallelism, + } = pending_request; + + let request = status.new_compaction_request( + self.request_sender.clone(), + waiter, + self.engine_config.clone(), + self.cache_manager.clone(), + manifest_ctx, + self.listener.clone(), + schema_metadata_manager, + max_parallelism, + ); + + if let Err(e) = self.schedule_compaction_request(request, options).await { + error!(e; "Failed to continue pending manual compaction for region id: {}", region_id); + } else { + debug!( + "Successfully scheduled manual compaction for region id: {}", + region_id + ); + } + return; + } + // We should always try to compact the region until picker returns None. let request = status.new_compaction_request( self.request_sender.clone(), @@ -424,27 +463,6 @@ impl Drop for CompactionScheduler { } } -/// Pending compaction tasks. -struct PendingCompaction { - waiters: Vec, -} - -impl PendingCompaction { - /// Push waiter to the request. - fn push_waiter(&mut self, mut waiter: OptionOutputTx) { - if let Some(waiter) = waiter.take_inner() { - self.waiters.push(waiter); - } - } - - /// Send compaction error to waiter. - fn on_failure(&mut self, region_id: RegionId, err: Arc) { - for waiter in self.waiters.drain(..) { - waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id })); - } - } -} - /// Finds TTL of table by first examine table options then database options. async fn find_ttl( table_id: TableId, @@ -478,10 +496,10 @@ struct CompactionStatus { version_control: VersionControlRef, /// Access layer of the region. access_layer: AccessLayerRef, - /// Compaction pending to schedule. - /// - /// For simplicity, we merge all pending compaction requests into one. - pending_compaction: Option, + /// Pending waiters for compaction. + waiters: Vec, + /// Pending compactions that are supposed to run as soon as current compaction task finished. + pending_request: Option, } impl CompactionStatus { @@ -495,23 +513,44 @@ impl CompactionStatus { region_id, version_control, access_layer, - pending_compaction: None, + waiters: Vec::new(), + pending_request: None, + } + } + + /// Merge the waiter to the pending compaction. + fn merge_waiter(&mut self, mut waiter: OptionOutputTx) { + if let Some(waiter) = waiter.take_inner() { + self.waiters.push(waiter); } } - /// Merge the watier to the pending compaction. - fn merge_waiter(&mut self, waiter: OptionOutputTx) { - let pending = self - .pending_compaction - .get_or_insert_with(|| PendingCompaction { - waiters: Vec::new(), - }); - pending.push_waiter(waiter); + /// Set pending compaction request or replace current value if already exist. + fn set_pending_request(&mut self, pending: PendingCompaction) { + if let Some(mut prev) = self.pending_request.replace(pending) { + debug!( + "Replace pending compaction options with new request {:?} for region: {}", + prev.options, self.region_id + ); + if let Some(waiter) = prev.waiter.take_inner() { + waiter.send(ManualCompactionOverrideSnafu.fail()); + } + } } - fn on_failure(self, err: Arc) { - if let Some(mut pending) = self.pending_compaction { - pending.on_failure(self.region_id, err.clone()); + fn on_failure(mut self, err: Arc) { + for waiter in self.waiters.drain(..) { + waiter.send(Err(err.clone()).context(CompactRegionSnafu { + region_id: self.region_id, + })); + } + + if let Some(pending_compaction) = self.pending_request { + pending_compaction + .waiter + .send(Err(err.clone()).context(CompactRegionSnafu { + region_id: self.region_id, + })); } } @@ -522,7 +561,7 @@ impl CompactionStatus { fn new_compaction_request( &mut self, request_sender: Sender, - waiter: OptionOutputTx, + mut waiter: OptionOutputTx, engine_config: Arc, cache_manager: CacheManagerRef, manifest_ctx: &ManifestContextRef, @@ -532,26 +571,26 @@ impl CompactionStatus { ) -> CompactionRequest { let current_version = CompactionVersion::from(self.version_control.current().version); let start_time = Instant::now(); - let mut req = CompactionRequest { + let mut waiters = Vec::with_capacity(self.waiters.len() + 1); + waiters.extend(std::mem::take(&mut self.waiters)); + + if let Some(waiter) = waiter.take_inner() { + waiters.push(waiter); + } + + CompactionRequest { engine_config, current_version, access_layer: self.access_layer.clone(), request_sender: request_sender.clone(), - waiters: Vec::new(), + waiters, start_time, cache_manager, manifest_ctx: manifest_ctx.clone(), listener, schema_metadata_manager, max_parallelism, - }; - - if let Some(pending) = self.pending_compaction.take() { - req.waiters = pending.waiters; } - req.push_waiter(waiter); - - req } } @@ -689,8 +728,20 @@ fn get_expired_ssts( .collect() } +/// Pending compaction request that is supposed to run after current task is finished, +/// typically used for manual compactions. +struct PendingCompaction { + /// Compaction options. Currently, it can only be [StrictWindow]. + pub(crate) options: compact_request::Options, + /// Waiters of pending requests. + pub(crate) waiter: OptionOutputTx, + /// Max parallelism for pending compaction. + pub(crate) max_parallelism: usize, +} + #[cfg(test)] mod tests { + use api::v1::region::StrictWindow; use tokio::sync::oneshot; use super::*; @@ -763,6 +814,7 @@ mod tests { #[tokio::test] async fn test_schedule_on_finished() { + common_telemetry::init_default_ut_logging(); let job_scheduler = Arc::new(VecScheduler::default()); let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone()); let (tx, _rx) = mpsc::channel(4); @@ -828,13 +880,14 @@ mod tests { purger.clone(), ); // The task is pending. + let (tx, _rx) = oneshot::channel(); scheduler .schedule_compaction( region_id, compact_request::Options::Regular(Default::default()), &version_control, &env.access_layer, - OptionOutputTx::none(), + OptionOutputTx::new(Some(OutputTx::new(tx))), &manifest_ctx, schema_metadata_manager.clone(), 1, @@ -843,12 +896,12 @@ mod tests { .unwrap(); assert_eq!(1, scheduler.region_status.len()); assert_eq!(1, job_scheduler.num_jobs()); - assert!(scheduler + assert!(!scheduler .region_status .get(&builder.region_id()) .unwrap() - .pending_compaction - .is_some()); + .waiters + .is_empty()); // On compaction finished and schedule next compaction. scheduler @@ -856,6 +909,7 @@ mod tests { .await; assert_eq!(1, scheduler.region_status.len()); assert_eq!(2, job_scheduler.num_jobs()); + // 5 files for next compaction. apply_edit( &version_control, @@ -863,6 +917,7 @@ mod tests { &[], purger.clone(), ); + let (tx, _rx) = oneshot::channel(); // The task is pending. scheduler .schedule_compaction( @@ -870,7 +925,7 @@ mod tests { compact_request::Options::Regular(Default::default()), &version_control, &env.access_layer, - OptionOutputTx::none(), + OptionOutputTx::new(Some(OutputTx::new(tx))), &manifest_ctx, schema_metadata_manager, 1, @@ -878,11 +933,118 @@ mod tests { .await .unwrap(); assert_eq!(2, job_scheduler.num_jobs()); - assert!(scheduler + assert!(!scheduler .region_status .get(&builder.region_id()) .unwrap() - .pending_compaction - .is_some()); + .waiters + .is_empty()); + } + + #[tokio::test] + async fn test_manual_compaction_when_compaction_in_progress() { + common_telemetry::init_default_ut_logging(); + let job_scheduler = Arc::new(VecScheduler::default()); + let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone()); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let mut builder = VersionControlBuilder::new(); + let purger = builder.file_purger(); + let region_id = builder.region_id(); + + let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager(); + schema_metadata_manager + .register_region_table_info( + builder.region_id().table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + kv_backend, + ) + .await; + + // 5 files to compact. + let end = 1000 * 1000; + let version_control = Arc::new( + builder + .push_l0_file(0, end) + .push_l0_file(10, end) + .push_l0_file(50, end) + .push_l0_file(80, end) + .push_l0_file(90, end) + .build(), + ); + let manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; + + let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0] + .files + .values() + .map(|file| file.meta_ref().clone()) + .collect(); + + // 5 files for next compaction and removes old files. + apply_edit( + &version_control, + &[(0, end), (20, end), (40, end), (60, end), (80, end)], + &file_metas, + purger.clone(), + ); + + scheduler + .schedule_compaction( + region_id, + compact_request::Options::Regular(Default::default()), + &version_control, + &env.access_layer, + OptionOutputTx::none(), + &manifest_ctx, + schema_metadata_manager.clone(), + 1, + ) + .await + .unwrap(); + // Should schedule 1 compaction. + assert_eq!(1, scheduler.region_status.len()); + assert_eq!(1, job_scheduler.num_jobs()); + assert!(scheduler + .region_status + .get(®ion_id) + .unwrap() + .pending_request + .is_none()); + + // Schedule another manual compaction. + let (tx, _rx) = oneshot::channel(); + scheduler + .schedule_compaction( + region_id, + compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }), + &version_control, + &env.access_layer, + OptionOutputTx::new(Some(OutputTx::new(tx))), + &manifest_ctx, + schema_metadata_manager.clone(), + 1, + ) + .await + .unwrap(); + assert_eq!(1, scheduler.region_status.len()); + // Current job num should be 1 since compaction is in progress. + assert_eq!(1, job_scheduler.num_jobs()); + let status = scheduler.region_status.get(&builder.region_id()).unwrap(); + assert!(status.pending_request.is_some()); + + // On compaction finished and schedule next compaction. + scheduler + .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone()) + .await; + assert_eq!(1, scheduler.region_status.len()); + assert_eq!(2, job_scheduler.num_jobs()); + + let status = scheduler.region_status.get(&builder.region_id()).unwrap(); + assert!(status.pending_request.is_none()); } } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 9f55c45804c5..235ed4ca0b88 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -939,6 +939,9 @@ pub enum Error { column: String, default_value: String, }, + + #[snafu(display("Manual compaction is override by following operations."))] + ManualCompactionOverride {}, } pub type Result = std::result::Result; @@ -1082,6 +1085,8 @@ impl ErrorExt for Error { PushBloomFilterValue { source, .. } | BloomFilterFinish { source, .. } => { source.status_code() } + + ManualCompactionOverride {} => StatusCode::Cancelled, } }