Skip to content

Commit

Permalink
GC auto increase memory limit on inefficient GC (vercel/turborepo#8687)
Browse files Browse the repository at this point in the history
### Description

When GC is inefficient and can't collect enough memory, increase the
provided memory limit automatically.
We rather want to consume more memory than requested then crashing or
hanging the compilation due to GC.
  • Loading branch information
sokra authored Jul 25, 2024
1 parent 77774a2 commit 89f6f3b
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 59 deletions.
87 changes: 52 additions & 35 deletions crates/turbo-tasks-memory/src/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,24 @@ const MAX_TASKS_PER_OLD_GENERATION: usize = 200_000;
const PERCENTAGE_TO_COLLECT: usize = 30;
const TASK_BASE_MEMORY_USAGE: usize = 1_000;
const TASK_BASE_COMPUTE_DURATION_IN_MICROS: u64 = 1_000;
pub const PERCENTAGE_TARGET_MEMORY: usize = 88;
pub const PERCENTAGE_IDLE_TARGET_MEMORY: usize = 75;
pub const PERCENTAGE_MIN_TARGET_MEMORY: usize = 70;
pub const PERCENTAGE_MAX_TARGET_MEMORY: usize = 75;
pub const PERCENTAGE_MIN_IDLE_TARGET_MEMORY: usize = 55;
pub const PERCENTAGE_MAX_IDLE_TARGET_MEMORY: usize = 60;
pub const MAX_GC_STEPS: usize = 100;

struct OldGeneration {
tasks: Vec<TaskId>,
generation: NonZeroU32,
}

#[derive(Default)]
struct ProcessGenerationResult {
old_generations: usize,
priority: Option<GcPriority>,
content_dropped_count: usize,
unloaded_count: usize,
already_unloaded_count: usize,
}

struct ProcessDeactivationsResult {
Expand Down Expand Up @@ -222,22 +228,22 @@ impl GcQueue {
&self,
backend: &MemoryBackend,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> ProcessGenerationResult {
let old_generation = {
) -> Option<ProcessGenerationResult> {
let old_generation_info = {
let guard = &mut self.generations.lock();
guard.pop_back()
let len = guard.len();
guard.pop_back().map(|g| (g, len))
};
let Some(OldGeneration {
mut tasks,
generation,
}) = old_generation
let Some((
OldGeneration {
mut tasks,
generation,
},
old_generations,
)) = old_generation_info
else {
// No old generation to process
return ProcessGenerationResult {
priority: None,
content_dropped_count: 0,
unloaded_count: 0,
};
return None;
};
// Check all tasks for the correct generation
let mut indices = Vec::with_capacity(tasks.len());
Expand All @@ -256,11 +262,10 @@ impl GcQueue {

if indices.is_empty() {
// No valid tasks in old generation to process
return ProcessGenerationResult {
priority: None,
content_dropped_count: 0,
unloaded_count: 0,
};
return Some(ProcessGenerationResult {
old_generations,
..ProcessGenerationResult::default()
});
}

// Sorting based on sort_by_cached_key from std lib
Expand Down Expand Up @@ -316,6 +321,7 @@ impl GcQueue {
// GC the tasks
let mut content_dropped_count = 0;
let mut unloaded_count = 0;
let mut already_unloaded_count = 0;
for task in tasks[..tasks_to_collect].iter() {
backend.with_task(*task, |task| {
match task.run_gc(generation, self, backend, turbo_tasks) {
Expand All @@ -327,26 +333,31 @@ impl GcQueue {
GcResult::Unloaded => {
unloaded_count += 1;
}
GcResult::AlreadyUnloaded => {
already_unloaded_count += 1;
}
}
});
}

ProcessGenerationResult {
Some(ProcessGenerationResult {
old_generations,
priority: Some(max_priority),
content_dropped_count,
unloaded_count,
}
already_unloaded_count,
})
}

/// Run garbage collection on the queue.
/// Run garbage collection on the queue. Returns true, if some progress has
/// been made. Returns the number of old generations.
pub fn run_gc(
&self,
backend: &MemoryBackend,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> Option<(GcPriority, usize)> {
) -> Option<usize> {
let span = tracing::trace_span!(
parent: None,
"garbage collection",
"garbage collection step",
priority = Empty,
deactivations_count = Empty,
content_dropped_count = Empty,
Expand All @@ -359,21 +370,27 @@ impl GcQueue {
count: deactivations_count,
} = self.process_deactivations(backend, turbo_tasks);

let ProcessGenerationResult {
if let Some(ProcessGenerationResult {
old_generations,
priority,
content_dropped_count,
unloaded_count,
} = self.process_old_generation(backend, turbo_tasks);
already_unloaded_count,
}) = self.process_old_generation(backend, turbo_tasks)
{
span.record("deactivations_count", deactivations_count);
span.record("content_dropped_count", content_dropped_count);
span.record("unloaded_count", unloaded_count);
span.record("already_unloaded_count", already_unloaded_count);
if let Some(priority) = &priority {
span.record("priority", debug(priority));
} else {
span.record("priority", "");
}

span.record("deactivations_count", deactivations_count);
span.record("content_dropped_count", content_dropped_count);
span.record("unloaded_count", unloaded_count);
if let Some(priority) = &priority {
span.record("priority", debug(priority));
Some(old_generations)
} else {
span.record("priority", "");
(deactivations_count > 0).then_some(0)
}

priority.map(|p| (p, content_dropped_count))
}
}
98 changes: 76 additions & 22 deletions crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
num::NonZeroU32,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
time::Duration,
Expand All @@ -31,7 +31,10 @@ use turbo_tasks::{

use crate::{
edges_set::{TaskEdge, TaskEdgesSet},
gc::{GcQueue, PERCENTAGE_IDLE_TARGET_MEMORY, PERCENTAGE_TARGET_MEMORY},
gc::{
GcQueue, MAX_GC_STEPS, PERCENTAGE_MAX_IDLE_TARGET_MEMORY, PERCENTAGE_MAX_TARGET_MEMORY,
PERCENTAGE_MIN_IDLE_TARGET_MEMORY, PERCENTAGE_MIN_TARGET_MEMORY,
},
output::Output,
task::{ReadCellError, Task, DEPENDENCIES_TO_TRACK},
task_statistics::TaskStatisticsApi,
Expand All @@ -47,7 +50,7 @@ pub struct MemoryBackend {
backend_job_id_factory: IdFactoryWithReuse<BackendJobId>,
task_cache:
DashMap<Arc<PreHashed<PersistentTaskType>>, TaskId, BuildHasherDefault<PassThroughHash>>,
memory_limit: usize,
memory_limit: AtomicUsize,
gc_queue: Option<GcQueue>,
idle_gc_active: AtomicBool,
task_statistics: TaskStatisticsApi,
Expand All @@ -70,7 +73,7 @@ impl MemoryBackend {
(std::thread::available_parallelism().map_or(1, usize::from) * 32)
.next_power_of_two(),
),
memory_limit,
memory_limit: AtomicUsize::new(memory_limit),
gc_queue: (memory_limit != usize::MAX).then(GcQueue::new),
idle_gc_active: AtomicBool::new(false),
task_statistics: TaskStatisticsApi::default(),
Expand Down Expand Up @@ -141,27 +144,78 @@ impl MemoryBackend {
) -> bool {
if let Some(gc_queue) = &self.gc_queue {
let mut did_something = false;
loop {
let mem_limit = self.memory_limit;

let usage = turbo_tasks_malloc::TurboMalloc::memory_usage();
let target = if idle {
mem_limit * PERCENTAGE_IDLE_TARGET_MEMORY / 100
let mut remaining_generations = 0;
let mut mem_limit = self.memory_limit.load(Ordering::Relaxed);
let mut span = None;
'outer: loop {
let mut collected_generations = 0;
let (min, max) = if idle {
(
mem_limit * PERCENTAGE_MIN_IDLE_TARGET_MEMORY / 100,
mem_limit * PERCENTAGE_MAX_IDLE_TARGET_MEMORY / 100,
)
} else {
mem_limit * PERCENTAGE_TARGET_MEMORY / 100
(
mem_limit * PERCENTAGE_MIN_TARGET_MEMORY / 100,
mem_limit * PERCENTAGE_MAX_TARGET_MEMORY / 100,
)
};
if usage < target {
return did_something;
}

let collected = gc_queue.run_gc(self, turbo_tasks);

// Collecting less than 100 tasks is not worth it
if !collected.map_or(false, |(_, count)| count > 100) {
return true;
let mut target = max;
let mut counter = 0;
loop {
let usage = turbo_tasks_malloc::TurboMalloc::memory_usage();
if usage < target {
return did_something;
}
target = min;
if span.is_none() {
span =
Some(tracing::trace_span!(parent: None, "garbage collection", usage));
}

let progress = gc_queue.run_gc(self, turbo_tasks);

if progress.is_some() {
did_something = true;
}

if let Some(g) = progress {
remaining_generations = g;
if g > 0 {
collected_generations += 1;
}
}

counter += 1;
if counter > MAX_GC_STEPS
|| collected_generations > remaining_generations
|| progress.is_none()
{
let new_mem_limit = mem_limit * 4 / 3;
if self
.memory_limit
.compare_exchange(
mem_limit,
new_mem_limit,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
println!(
"Ineffective GC, increasing memory limit {} MB -> {} MB",
mem_limit / 1024 / 1024,
new_mem_limit / 1024 / 1024
);
mem_limit = new_mem_limit;
} else {
mem_limit = self.memory_limit.load(Ordering::Relaxed);
}
continue 'outer;
}

did_something = true;
}

did_something = true;
}
}
false
Expand Down
5 changes: 3 additions & 2 deletions crates/turbo-tasks-memory/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ pub enum GcResult {
/// from the graph and only makes sense when the task isn't currently
/// active.
Unloaded,
AlreadyUnloaded,
}

pub enum ReadCellError {
Expand Down Expand Up @@ -1818,9 +1819,9 @@ impl Task {
}
TaskMetaStateWriteGuard::Partial(mut state) => {
state.aggregation_node.shrink_to_fit();
GcResult::Unloaded
GcResult::AlreadyUnloaded
}
TaskMetaStateWriteGuard::Unloaded(_) => GcResult::Unloaded,
TaskMetaStateWriteGuard::Unloaded(_) => GcResult::AlreadyUnloaded,
TaskMetaStateWriteGuard::TemporaryFiller => unreachable!(),
}
}
Expand Down

0 comments on commit 89f6f3b

Please sign in to comment.