Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC auto increase memory limit on inefficient GC #8687

Merged
merged 2 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 52 additions & 35 deletions crates/turbo-tasks-memory/src/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,24 @@ const MAX_TASKS_PER_OLD_GENERATION: usize = 200_000;
const PERCENTAGE_TO_COLLECT: usize = 30;
const TASK_BASE_MEMORY_USAGE: usize = 1_000;
const TASK_BASE_COMPUTE_DURATION_IN_MICROS: u64 = 1_000;
pub const PERCENTAGE_TARGET_MEMORY: usize = 88;
pub const PERCENTAGE_IDLE_TARGET_MEMORY: usize = 75;
pub const PERCENTAGE_MIN_TARGET_MEMORY: usize = 70;
pub const PERCENTAGE_MAX_TARGET_MEMORY: usize = 75;
pub const PERCENTAGE_MIN_IDLE_TARGET_MEMORY: usize = 55;
pub const PERCENTAGE_MAX_IDLE_TARGET_MEMORY: usize = 60;
pub const MAX_GC_STEPS: usize = 100;

struct OldGeneration {
tasks: Vec<TaskId>,
generation: NonZeroU32,
}

#[derive(Default)]
struct ProcessGenerationResult {
old_generations: usize,
priority: Option<GcPriority>,
content_dropped_count: usize,
unloaded_count: usize,
already_unloaded_count: usize,
}

struct ProcessDeactivationsResult {
Expand Down Expand Up @@ -222,22 +228,22 @@ impl GcQueue {
&self,
backend: &MemoryBackend,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> ProcessGenerationResult {
let old_generation = {
) -> Option<ProcessGenerationResult> {
let old_generation_info = {
let guard = &mut self.generations.lock();
guard.pop_back()
let len = guard.len();
guard.pop_back().map(|g| (g, len))
};
let Some(OldGeneration {
mut tasks,
generation,
}) = old_generation
let Some((
OldGeneration {
mut tasks,
generation,
},
old_generations,
)) = old_generation_info
else {
// No old generation to process
return ProcessGenerationResult {
priority: None,
content_dropped_count: 0,
unloaded_count: 0,
};
return None;
};
// Check all tasks for the correct generation
let mut indices = Vec::with_capacity(tasks.len());
Expand All @@ -256,11 +262,10 @@ impl GcQueue {

if indices.is_empty() {
// No valid tasks in old generation to process
return ProcessGenerationResult {
priority: None,
content_dropped_count: 0,
unloaded_count: 0,
};
return Some(ProcessGenerationResult {
old_generations,
..ProcessGenerationResult::default()
});
}

// Sorting based on sort_by_cached_key from std lib
Expand Down Expand Up @@ -316,6 +321,7 @@ impl GcQueue {
// GC the tasks
let mut content_dropped_count = 0;
let mut unloaded_count = 0;
let mut already_unloaded_count = 0;
for task in tasks[..tasks_to_collect].iter() {
backend.with_task(*task, |task| {
match task.run_gc(generation, self, backend, turbo_tasks) {
Expand All @@ -327,26 +333,31 @@ impl GcQueue {
GcResult::Unloaded => {
unloaded_count += 1;
}
GcResult::AlreadyUnloaded => {
already_unloaded_count += 1;
}
}
});
}

ProcessGenerationResult {
Some(ProcessGenerationResult {
old_generations,
priority: Some(max_priority),
content_dropped_count,
unloaded_count,
}
already_unloaded_count,
})
}

/// Run garbage collection on the queue.
/// Run garbage collection on the queue. Returns true, if some progress has
/// been made. Returns the number of old generations.
pub fn run_gc(
&self,
backend: &MemoryBackend,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> Option<(GcPriority, usize)> {
) -> Option<usize> {
let span = tracing::trace_span!(
parent: None,
"garbage collection",
"garbage collection step",
priority = Empty,
deactivations_count = Empty,
content_dropped_count = Empty,
Expand All @@ -359,21 +370,27 @@ impl GcQueue {
count: deactivations_count,
} = self.process_deactivations(backend, turbo_tasks);

let ProcessGenerationResult {
if let Some(ProcessGenerationResult {
old_generations,
priority,
content_dropped_count,
unloaded_count,
} = self.process_old_generation(backend, turbo_tasks);
already_unloaded_count,
}) = self.process_old_generation(backend, turbo_tasks)
{
span.record("deactivations_count", deactivations_count);
span.record("content_dropped_count", content_dropped_count);
span.record("unloaded_count", unloaded_count);
span.record("already_unloaded_count", already_unloaded_count);
if let Some(priority) = &priority {
span.record("priority", debug(priority));
} else {
span.record("priority", "");
}

span.record("deactivations_count", deactivations_count);
span.record("content_dropped_count", content_dropped_count);
span.record("unloaded_count", unloaded_count);
if let Some(priority) = &priority {
span.record("priority", debug(priority));
Some(old_generations)
} else {
span.record("priority", "");
(deactivations_count > 0).then_some(0)
}

priority.map(|p| (p, content_dropped_count))
}
}
98 changes: 76 additions & 22 deletions crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
num::NonZeroU32,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
time::Duration,
Expand All @@ -31,7 +31,10 @@ use turbo_tasks::{

use crate::{
edges_set::{TaskEdge, TaskEdgesSet},
gc::{GcQueue, PERCENTAGE_IDLE_TARGET_MEMORY, PERCENTAGE_TARGET_MEMORY},
gc::{
GcQueue, MAX_GC_STEPS, PERCENTAGE_MAX_IDLE_TARGET_MEMORY, PERCENTAGE_MAX_TARGET_MEMORY,
PERCENTAGE_MIN_IDLE_TARGET_MEMORY, PERCENTAGE_MIN_TARGET_MEMORY,
},
output::Output,
task::{ReadCellError, Task, DEPENDENCIES_TO_TRACK},
task_statistics::TaskStatisticsApi,
Expand All @@ -47,7 +50,7 @@ pub struct MemoryBackend {
backend_job_id_factory: IdFactoryWithReuse<BackendJobId>,
task_cache:
DashMap<Arc<PreHashed<PersistentTaskType>>, TaskId, BuildHasherDefault<PassThroughHash>>,
memory_limit: usize,
memory_limit: AtomicUsize,
gc_queue: Option<GcQueue>,
idle_gc_active: AtomicBool,
task_statistics: TaskStatisticsApi,
Expand All @@ -70,7 +73,7 @@ impl MemoryBackend {
(std::thread::available_parallelism().map_or(1, usize::from) * 32)
.next_power_of_two(),
),
memory_limit,
memory_limit: AtomicUsize::new(memory_limit),
gc_queue: (memory_limit != usize::MAX).then(GcQueue::new),
idle_gc_active: AtomicBool::new(false),
task_statistics: TaskStatisticsApi::default(),
Expand Down Expand Up @@ -141,27 +144,78 @@ impl MemoryBackend {
) -> bool {
if let Some(gc_queue) = &self.gc_queue {
let mut did_something = false;
loop {
let mem_limit = self.memory_limit;

let usage = turbo_tasks_malloc::TurboMalloc::memory_usage();
let target = if idle {
mem_limit * PERCENTAGE_IDLE_TARGET_MEMORY / 100
let mut remaining_generations = 0;
let mut mem_limit = self.memory_limit.load(Ordering::Relaxed);
let mut span = None;
'outer: loop {
let mut collected_generations = 0;
let (min, max) = if idle {
(
mem_limit * PERCENTAGE_MIN_IDLE_TARGET_MEMORY / 100,
mem_limit * PERCENTAGE_MAX_IDLE_TARGET_MEMORY / 100,
)
} else {
mem_limit * PERCENTAGE_TARGET_MEMORY / 100
(
mem_limit * PERCENTAGE_MIN_TARGET_MEMORY / 100,
mem_limit * PERCENTAGE_MAX_TARGET_MEMORY / 100,
)
};
if usage < target {
return did_something;
}

let collected = gc_queue.run_gc(self, turbo_tasks);

// Collecting less than 100 tasks is not worth it
if !collected.map_or(false, |(_, count)| count > 100) {
return true;
let mut target = max;
let mut counter = 0;
loop {
let usage = turbo_tasks_malloc::TurboMalloc::memory_usage();
if usage < target {
return did_something;
}
target = min;
if span.is_none() {
span =
Some(tracing::trace_span!(parent: None, "garbage collection", usage));
}

let progress = gc_queue.run_gc(self, turbo_tasks);

if progress.is_some() {
did_something = true;
}

if let Some(g) = progress {
remaining_generations = g;
if g > 0 {
collected_generations += 1;
}
}

counter += 1;
if counter > MAX_GC_STEPS
|| collected_generations > remaining_generations
|| progress.is_none()
{
let new_mem_limit = mem_limit * 4 / 3;
if self
.memory_limit
.compare_exchange(
mem_limit,
new_mem_limit,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
println!(
"Ineffective GC, increasing memory limit {} MB -> {} MB",
mem_limit / 1024 / 1024,
new_mem_limit / 1024 / 1024
);
mem_limit = new_mem_limit;
} else {
mem_limit = self.memory_limit.load(Ordering::Relaxed);
}
continue 'outer;
}

did_something = true;
}

did_something = true;
}
}
false
Expand Down
5 changes: 3 additions & 2 deletions crates/turbo-tasks-memory/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ pub enum GcResult {
/// from the graph and only makes sense when the task isn't currently
/// active.
Unloaded,
AlreadyUnloaded,
}

pub enum ReadCellError {
Expand Down Expand Up @@ -1818,9 +1819,9 @@ impl Task {
}
TaskMetaStateWriteGuard::Partial(mut state) => {
state.aggregation_node.shrink_to_fit();
GcResult::Unloaded
GcResult::AlreadyUnloaded
}
TaskMetaStateWriteGuard::Unloaded(_) => GcResult::Unloaded,
TaskMetaStateWriteGuard::Unloaded(_) => GcResult::AlreadyUnloaded,
TaskMetaStateWriteGuard::TemporaryFiller => unreachable!(),
}
}
Expand Down
Loading