Skip to content

Commit

Permalink
improve GC
Browse files Browse the repository at this point in the history
GC until memory is reduced by 5%
increase memory limit when needed
  • Loading branch information
sokra committed Jul 24, 2024
1 parent 2e6a26c commit 20ed510
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 50 deletions.
72 changes: 44 additions & 28 deletions crates/turbo-tasks-memory/src/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ const MAX_TASKS_PER_OLD_GENERATION: usize = 200_000;
const PERCENTAGE_TO_COLLECT: usize = 30;
const TASK_BASE_MEMORY_USAGE: usize = 1_000;
const TASK_BASE_COMPUTE_DURATION_IN_MICROS: u64 = 1_000;
pub const PERCENTAGE_TARGET_MEMORY: usize = 88;
pub const PERCENTAGE_IDLE_TARGET_MEMORY: usize = 75;
pub const PERCENTAGE_MIN_TARGET_MEMORY: usize = 70;
pub const PERCENTAGE_MAX_TARGET_MEMORY: usize = 75;
pub const PERCENTAGE_MIN_IDLE_TARGET_MEMORY: usize = 55;
pub const PERCENTAGE_MAX_IDLE_TARGET_MEMORY: usize = 60;
pub const MAX_GC_STEPS: usize = 100;

struct OldGeneration {
tasks: Vec<TaskId>,
Expand All @@ -82,6 +85,7 @@ struct OldGeneration {

#[derive(Default)]
struct ProcessGenerationResult {
old_generations: usize,
priority: Option<GcPriority>,
content_dropped_count: usize,
unloaded_count: usize,
Expand Down Expand Up @@ -224,18 +228,22 @@ impl GcQueue {
&self,
backend: &MemoryBackend,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> ProcessGenerationResult {
let old_generation = {
) -> Option<ProcessGenerationResult> {
let old_generation_info = {
let guard = &mut self.generations.lock();
guard.pop_back()
let len = guard.len();
guard.pop_back().map(|g| (g, len))
};
let Some(OldGeneration {
mut tasks,
generation,
}) = old_generation
let Some((
OldGeneration {
mut tasks,
generation,
},
old_generations,
)) = old_generation_info
else {
// No old generation to process
return ProcessGenerationResult::default();
return None;
};
// Check all tasks for the correct generation
let mut indices = Vec::with_capacity(tasks.len());
Expand All @@ -254,7 +262,10 @@ impl GcQueue {

if indices.is_empty() {
// No valid tasks in old generation to process
return ProcessGenerationResult::default();
return Some(ProcessGenerationResult {
old_generations,
..ProcessGenerationResult::default()
});
}

// Sorting based on sort_by_cached_key from std lib
Expand Down Expand Up @@ -329,23 +340,24 @@ impl GcQueue {
});
}

ProcessGenerationResult {
Some(ProcessGenerationResult {
old_generations,
priority: Some(max_priority),
content_dropped_count,
unloaded_count,
already_unloaded_count,
}
})
}

/// Run garbage collection on the queue.
/// Run garbage collection on the queue. Returns true, if some progress has
/// been made. Returns the number of old generations.
pub fn run_gc(
&self,
backend: &MemoryBackend,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> Option<(GcPriority, usize)> {
) -> Option<usize> {
let span = tracing::trace_span!(
parent: None,
"garbage collection",
"garbage collection step",
priority = Empty,
deactivations_count = Empty,
content_dropped_count = Empty,
Expand All @@ -358,23 +370,27 @@ impl GcQueue {
count: deactivations_count,
} = self.process_deactivations(backend, turbo_tasks);

let ProcessGenerationResult {
if let Some(ProcessGenerationResult {
old_generations,
priority,
content_dropped_count,
unloaded_count,
already_unloaded_count,
} = self.process_old_generation(backend, turbo_tasks);
}) = self.process_old_generation(backend, turbo_tasks)
{
span.record("deactivations_count", deactivations_count);
span.record("content_dropped_count", content_dropped_count);
span.record("unloaded_count", unloaded_count);
span.record("already_unloaded_count", already_unloaded_count);
if let Some(priority) = &priority {
span.record("priority", debug(priority));
} else {
span.record("priority", "");
}

span.record("deactivations_count", deactivations_count);
span.record("content_dropped_count", content_dropped_count);
span.record("unloaded_count", unloaded_count);
span.record("already_unloaded_count", already_unloaded_count);
if let Some(priority) = &priority {
span.record("priority", debug(priority));
Some(old_generations)
} else {
span.record("priority", "");
(deactivations_count > 0).then_some(0)
}

priority.map(|p| (p, content_dropped_count))
}
}
98 changes: 76 additions & 22 deletions crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
num::NonZeroU32,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
time::Duration,
Expand All @@ -31,7 +31,10 @@ use turbo_tasks::{

use crate::{
edges_set::{TaskEdge, TaskEdgesSet},
gc::{GcQueue, PERCENTAGE_IDLE_TARGET_MEMORY, PERCENTAGE_TARGET_MEMORY},
gc::{
GcQueue, MAX_GC_STEPS, PERCENTAGE_MAX_IDLE_TARGET_MEMORY, PERCENTAGE_MAX_TARGET_MEMORY,
PERCENTAGE_MIN_IDLE_TARGET_MEMORY, PERCENTAGE_MIN_TARGET_MEMORY,
},
output::Output,
task::{ReadCellError, Task, DEPENDENCIES_TO_TRACK},
task_statistics::TaskStatisticsApi,
Expand All @@ -47,7 +50,7 @@ pub struct MemoryBackend {
backend_job_id_factory: IdFactoryWithReuse<BackendJobId>,
task_cache:
DashMap<Arc<PreHashed<PersistentTaskType>>, TaskId, BuildHasherDefault<PassThroughHash>>,
memory_limit: usize,
memory_limit: AtomicUsize,
gc_queue: Option<GcQueue>,
idle_gc_active: AtomicBool,
task_statistics: TaskStatisticsApi,
Expand All @@ -70,7 +73,7 @@ impl MemoryBackend {
(std::thread::available_parallelism().map_or(1, usize::from) * 32)
.next_power_of_two(),
),
memory_limit,
memory_limit: AtomicUsize::new(memory_limit),
gc_queue: (memory_limit != usize::MAX).then(GcQueue::new),
idle_gc_active: AtomicBool::new(false),
task_statistics: TaskStatisticsApi::default(),
Expand Down Expand Up @@ -141,27 +144,78 @@ impl MemoryBackend {
) -> bool {
if let Some(gc_queue) = &self.gc_queue {
let mut did_something = false;
loop {
let mem_limit = self.memory_limit;

let usage = turbo_tasks_malloc::TurboMalloc::memory_usage();
let target = if idle {
mem_limit * PERCENTAGE_IDLE_TARGET_MEMORY / 100
let mut remaining_generations = 0;
let mut mem_limit = self.memory_limit.load(Ordering::Relaxed);
let mut span = None;
'outer: loop {
let mut collected_generations = 0;
let (min, max) = if idle {
(
mem_limit * PERCENTAGE_MIN_IDLE_TARGET_MEMORY / 100,
mem_limit * PERCENTAGE_MAX_IDLE_TARGET_MEMORY / 100,
)
} else {
mem_limit * PERCENTAGE_TARGET_MEMORY / 100
(
mem_limit * PERCENTAGE_MIN_TARGET_MEMORY / 100,
mem_limit * PERCENTAGE_MAX_TARGET_MEMORY / 100,
)
};
if usage < target {
return did_something;
}

let collected = gc_queue.run_gc(self, turbo_tasks);

// Collecting less than 100 tasks is not worth it
if !collected.map_or(false, |(_, count)| count > 100) {
return true;
let mut target = max;
let mut counter = 0;
loop {
let usage = turbo_tasks_malloc::TurboMalloc::memory_usage();
if usage < target {
return did_something;
}
target = min;
if span.is_none() {
span =
Some(tracing::trace_span!(parent: None, "garbage collection", usage));
}

let progress = gc_queue.run_gc(self, turbo_tasks);

if progress.is_some() {
did_something = true;
}

if let Some(g) = progress {
remaining_generations = g;
if g > 0 {
collected_generations += 1;
}
}

counter += 1;
if counter > MAX_GC_STEPS
|| collected_generations > remaining_generations
|| progress.is_none()
{
let new_mem_limit = mem_limit * 4 / 3;
if self
.memory_limit
.compare_exchange(
mem_limit,
new_mem_limit,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
println!(
"Ineffective GC, increasing memory limit {} MB -> {} MB",
mem_limit / 1024 / 1024,
new_mem_limit / 1024 / 1024
);
mem_limit = new_mem_limit;
} else {
mem_limit = self.memory_limit.load(Ordering::Relaxed);
}
continue 'outer;
}

did_something = true;
}

did_something = true;
}
}
false
Expand Down

0 comments on commit 20ed510

Please sign in to comment.