Skip to content

Commit

Permalink
fix/avoid-suppress-manual-compaction:
Browse files Browse the repository at this point in the history
 ### Add Support for Manual Compaction Requests

 - **Compaction Logic Enhancements**:
   - Updated `CompactionScheduler` in `compaction.rs` to handle manual compaction requests using `Options::StrictWindow`.
   - Introduced `PendingCompaction` struct to manage pending manual compaction requests.
   - Added logic to reschedule manual compaction requests once the current compaction task is completed.

 - **Testing**:
   - Added `test_manual_compaction_when_compaction_in_progress` to verify the handling of manual compaction requests during ongoing compaction processes.

 These changes enhance the compaction scheduling mechanism by allowing manual compaction requests to be queued and processed efficiently.

(cherry picked from commit bc38ed0)
  • Loading branch information
v0y4g3r committed Jan 19, 2025
1 parent 74d8aaa commit 3baf81f
Showing 1 changed file with 176 additions and 6 deletions.
182 changes: 176 additions & 6 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::sync::Arc;
use std::time::Instant;

use api::v1::region::compact_request;
use api::v1::region::compact_request::Options;
use common_base::Plugins;
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{debug, error, info, warn};
Expand Down Expand Up @@ -140,8 +141,20 @@ impl CompactionScheduler {
schema_metadata_manager: SchemaMetadataManagerRef,
) -> Result<()> {
if let Some(status) = self.region_status.get_mut(&region_id) {
// Region is compacting. Add the waiter to pending list.
status.merge_waiter(waiter);
match compact_options {
Options::Regular(_) => {
// Region is compacting. Add the waiter to pending list.
status.merge_waiter(waiter);
}
options @ Options::StrictWindow(_) => {
// Incoming compaction request is manually triggered.
status.set_pending_request(PendingCompaction { options, waiter });
info!(
"Region {} is compacting, manually compaction will be re-scheduled.",
region_id
);
}
}
return Ok(());
}

Expand Down Expand Up @@ -177,6 +190,30 @@ impl CompactionScheduler {
return;
};

if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
let PendingCompaction { options, waiter } = pending_request;

let request = status.new_compaction_request(
self.request_sender.clone(),
waiter,
self.engine_config.clone(),
self.cache_manager.clone(),
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
);

if let Err(e) = self.schedule_compaction_request(request, options).await {
error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
} else {
debug!(
"Successfully scheduled manual compaction for region id: {}",
region_id
);
}
return;
}

// We should always try to compact the region until picker returns None.
let request = status.new_compaction_request(
self.request_sender.clone(),
Expand Down Expand Up @@ -445,6 +482,8 @@ struct CompactionStatus {
access_layer: AccessLayerRef,
/// Pending waiters for compaction.
waiters: Vec<OutputTx>,
/// Pending compactions that are supposed to run as soon as current compaction task finished.
pending_request: Option<PendingCompaction>,
}

impl CompactionStatus {
Expand All @@ -459,6 +498,7 @@ impl CompactionStatus {
version_control,
access_layer,
waiters: Vec::new(),
pending_request: None,
}
}

Expand All @@ -469,9 +509,21 @@ impl CompactionStatus {
}
}

/// Set pending compaction request or replace current value if already exist.
fn set_pending_request(&mut self, pending: PendingCompaction) {
if let Some(prev) = self.pending_request.replace(pending) {
debug!(
"Replace pending compaction options with new request {:?} for region: {}",
prev.options, self.region_id
);
}
}

fn on_failure(mut self, err: Arc<Error>) {
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id:self.region_id }));
waiter.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
}

Expand Down Expand Up @@ -647,9 +699,20 @@ fn get_expired_ssts(
.collect()
}

/// Pending compaction request that is supposed to run after current task is finished,
/// typically used for manual compactions.
struct PendingCompaction {
/// Compaction options. Currently, it can only be [StrictWindow].
pub(crate) options: compact_request::Options,
/// Waiters of pending requests.
pub(crate) waiter: OptionOutputTx,
}

#[cfg(test)]
mod tests {
use api::v1::region::StrictWindow;
use tokio::sync::oneshot;

use super::*;
use crate::test_util::mock_schema_metadata_manager;
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
Expand Down Expand Up @@ -802,7 +865,8 @@ mod tests {
.region_status
.get(&builder.region_id())
.unwrap()
.waiters.is_empty());
.waiters
.is_empty());

// On compaction finished and schedule next compaction.
scheduler
Expand All @@ -811,7 +875,6 @@ mod tests {
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());


// 5 files for next compaction.
apply_edit(
&version_control,
Expand All @@ -837,6 +900,113 @@ mod tests {
assert!(!scheduler
.region_status
.get(&builder.region_id())
.unwrap().waiters.is_empty());
.unwrap()
.waiters
.is_empty());
}

#[tokio::test]
async fn test_manual_compaction_when_compaction_in_progress() {
common_telemetry::init_default_ut_logging();
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let purger = builder.file_purger();
let region_id = builder.region_id();

let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
schema_metadata_manager
.register_region_table_info(
builder.region_id().table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
kv_backend,
)
.await;

// 5 files to compact.
let end = 1000 * 1000;
let version_control = Arc::new(
builder
.push_l0_file(0, end)
.push_l0_file(10, end)
.push_l0_file(50, end)
.push_l0_file(80, end)
.push_l0_file(90, end)
.build(),
);
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;

let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
.files
.values()
.map(|file| file.meta_ref().clone())
.collect();

// 5 files for next compaction and removes old files.
apply_edit(
&version_control,
&[(0, end), (20, end), (40, end), (60, end), (80, end)],
&file_metas,
purger.clone(),
);

scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
)
.await
.unwrap();
// Should schedule 1 compaction.
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
assert!(scheduler
.region_status
.get(&region_id)
.unwrap()
.pending_request
.is_none());

// Schedule another manual compaction.
let (tx, _rx) = oneshot::channel();
scheduler
.schedule_compaction(
region_id,
compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
&version_control,
&env.access_layer,
OptionOutputTx::new(Some(OutputTx::new(tx))),
&manifest_ctx,
schema_metadata_manager.clone(),
)
.await
.unwrap();
assert_eq!(1, scheduler.region_status.len());
// Current job num should be 1 since compaction is in progress.
assert_eq!(1, job_scheduler.num_jobs());
let status = scheduler.region_status.get(&builder.region_id()).unwrap();
assert!(status.pending_request.is_some());

// On compaction finished and schedule next compaction.
scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
.await;
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());

let status = scheduler.region_status.get(&builder.region_id()).unwrap();
assert!(status.pending_request.is_none());
}
}

0 comments on commit 3baf81f

Please sign in to comment.