diff --git a/crates/turbo-tasks-memory/src/aggregation/mod.rs b/crates/turbo-tasks-memory/src/aggregation/mod.rs index 154031527de5c..bfbd8da7b657f 100644 --- a/crates/turbo-tasks-memory/src/aggregation/mod.rs +++ b/crates/turbo-tasks-memory/src/aggregation/mod.rs @@ -81,7 +81,7 @@ impl AggregationNode { AggregationNode::Leaf { aggregation_number, .. } => *aggregation_number as u32, - AggregationNode::Aggegating(aggegating) => aggegating.aggregation_number, + AggregationNode::Aggegating(aggregating) => aggregating.aggregation_number, } } @@ -92,21 +92,21 @@ impl AggregationNode { fn uppers(&self) -> &CountHashSet { match self { AggregationNode::Leaf { uppers, .. } => uppers, - AggregationNode::Aggegating(aggegating) => &aggegating.uppers, + AggregationNode::Aggegating(aggregating) => &aggregating.uppers, } } fn uppers_mut(&mut self) -> &mut CountHashSet { match self { AggregationNode::Leaf { uppers, .. } => uppers, - AggregationNode::Aggegating(aggegating) => &mut aggegating.uppers, + AggregationNode::Aggegating(aggregating) => &mut aggregating.uppers, } } fn followers(&self) -> Option<&CountHashSet> { match self { AggregationNode::Leaf { .. } => None, - AggregationNode::Aggegating(aggegating) => Some(&aggegating.followers), + AggregationNode::Aggegating(aggregating) => Some(&aggregating.followers), } } } diff --git a/crates/turbo-tasks-memory/src/cell.rs b/crates/turbo-tasks-memory/src/cell.rs index a23086da20305..11b756f84217c 100644 --- a/crates/turbo-tasks-memory/src/cell.rs +++ b/crates/turbo-tasks-memory/src/cell.rs @@ -158,9 +158,14 @@ impl Cell { } } + /// Assigns a new content to the cell. Will notify dependent tasks if the + /// content has changed. + /// If clean = true, the task inputs weren't changes since the last + /// execution and can be assumed to produce the same content again. pub fn assign( &mut self, content: CellContent, + clean: bool, turbo_tasks: &dyn TurboTasksBackendApi, ) { match self { @@ -175,31 +180,46 @@ impl Cell { ref mut dependent_tasks, } => { event.notify(usize::MAX); - // Assigning to a cell will invalidate all dependent tasks as the content might - // have changed. - if !dependent_tasks.is_empty() { - turbo_tasks.schedule_notify_tasks_set(dependent_tasks); + if clean { + // We can assume that the task is deterministic and produces the same content + // again. No need to notify dependent tasks. + *self = Cell::Value { + content, + dependent_tasks: take(dependent_tasks), + }; + } else { + // Assigning to a cell will invalidate all dependent tasks as the content might + // have changed. + if !dependent_tasks.is_empty() { + turbo_tasks.schedule_notify_tasks_set(dependent_tasks); + } + *self = Cell::Value { + content, + dependent_tasks: AutoSet::default(), + }; } - *self = Cell::Value { - content, - dependent_tasks: AutoSet::default(), - }; } &mut Cell::TrackedValueless { ref mut dependent_tasks, } => { - // Assigning to a cell will invalidate all dependent tasks as the content might - // have changed. - // TODO this leads to flagging task unnecessarily dirty when a GC'ed task is - // recomputed. We need to use the notification of changed cells for the current - // task to check if it's valid to skip the invalidation here - if !dependent_tasks.is_empty() { - turbo_tasks.schedule_notify_tasks_set(dependent_tasks); + if clean { + // We can assume that the task is deterministic and produces the same content + // again. No need to notify dependent tasks. + *self = Cell::Value { + content, + dependent_tasks: take(dependent_tasks), + }; + } else { + // Assigning to a cell will invalidate all dependent tasks as the content might + // have changed. + if !dependent_tasks.is_empty() { + turbo_tasks.schedule_notify_tasks_set(dependent_tasks); + } + *self = Cell::Value { + content, + dependent_tasks: AutoSet::default(), + }; } - *self = Cell::Value { - content, - dependent_tasks: AutoSet::default(), - }; } Cell::Value { content: ref mut cell_content, diff --git a/crates/turbo-tasks-memory/src/gc.rs b/crates/turbo-tasks-memory/src/gc.rs index b0253fa0e8110..96c7be4800cd9 100644 --- a/crates/turbo-tasks-memory/src/gc.rs +++ b/crates/turbo-tasks-memory/src/gc.rs @@ -3,16 +3,18 @@ use std::{ collections::VecDeque, fmt::Debug, mem::take, + num::NonZeroU32, sync::atomic::{AtomicU32, AtomicUsize, Ordering}, time::Duration, }; use concurrent_queue::ConcurrentQueue; +use dashmap::DashSet; use parking_lot::Mutex; use tracing::field::{debug, Empty}; -use turbo_tasks::TaskId; +use turbo_tasks::{TaskId, TurboTasksBackendApi}; -use crate::MemoryBackend; +use crate::{task::GcResult, MemoryBackend}; /// The priority of a task for garbage collection. /// Any action will shrink the internal memory structures of the task in a @@ -30,7 +32,7 @@ pub struct GcPriority { pub struct GcTaskState { pub priority: GcPriority, /// The generation where the task was last accessed. - pub generation: u32, + pub generation: Option, } impl GcTaskState { @@ -38,9 +40,9 @@ impl GcTaskState { &mut self, duration: Duration, memory_usage: usize, - generation: u32, + generation: NonZeroU32, ) { - self.generation = generation; + self.generation = Some(generation); self.priority = GcPriority { memory_per_time: ((memory_usage + TASK_BASE_MEMORY_USAGE) as u64 / (duration.as_micros() as u64 + TASK_BASE_COMPUTE_DURATION_IN_MICROS)) @@ -49,16 +51,22 @@ impl GcTaskState { }; } - pub(crate) fn on_read(&mut self, generation: u32) -> bool { - if self.generation < generation { - self.generation = generation; - true + pub(crate) fn on_read(&mut self, generation: NonZeroU32) -> bool { + if let Some(old_generation) = self.generation { + if old_generation < generation { + self.generation = Some(generation); + true + } else { + false + } } else { - false + self.generation = Some(generation); + true } } } +const MAX_DEACTIVATIONS: usize = 100_000; const TASKS_PER_NEW_GENERATION: usize = 100_000; const MAX_TASKS_PER_OLD_GENERATION: usize = 200_000; const PERCENTAGE_TO_COLLECT: usize = 30; @@ -69,11 +77,16 @@ pub const PERCENTAGE_IDLE_TARGET_MEMORY: usize = 75; struct OldGeneration { tasks: Vec, - generation: u32, + generation: NonZeroU32, } struct ProcessGenerationResult { priority: Option, + content_dropped_count: usize, + unloaded_count: usize, +} + +struct ProcessDeactivationsResult { count: usize, } @@ -88,34 +101,66 @@ pub struct GcQueue { /// Tasks from old generations. The oldest generation will be garbage /// collected next. generations: Mutex>, + /// Tasks that have become inactive. Processing them should ensure them for + /// GC, if they are not already ensured and put all child tasks into the + /// activation_queue + deactivation_queue: ConcurrentQueue, + /// Tasks that are active and not enqueued in the deactivation queue. + // TODO Could be a bit field with locks, an array of atomics or an AMQF. + active_tasks: DashSet, } impl GcQueue { pub fn new() -> Self { Self { - generation: AtomicU32::new(0), + // SAFETY: Starting at 1 to produce NonZeroU32s + generation: AtomicU32::new(1), incoming_tasks: ConcurrentQueue::unbounded(), incoming_tasks_count: AtomicUsize::new(0), generations: Mutex::new(VecDeque::with_capacity(128)), + deactivation_queue: ConcurrentQueue::unbounded(), + active_tasks: DashSet::new(), } } /// Get the current generation number. - pub fn generation(&self) -> u32 { - self.generation.load(Ordering::Relaxed) + pub fn generation(&self) -> NonZeroU32 { + // SAFETY: We are sure that the generation is not 0, since we start at 1. + unsafe { NonZeroU32::new_unchecked(self.generation.load(Ordering::Relaxed)) } } /// Notify the GC queue that a task has been executed. - pub fn task_executed(&self, task: TaskId) -> u32 { + #[must_use] + pub fn task_executed(&self, task: TaskId) -> NonZeroU32 { self.add_task(task) } /// Notify the GC queue that a task has been accessed. - pub fn task_accessed(&self, task: TaskId) -> u32 { + #[must_use] + pub fn task_accessed(&self, task: TaskId) -> NonZeroU32 { self.add_task(task) } - fn add_task(&self, task: TaskId) -> u32 { + /// Notify the GC queue that a task should be enqueue for GC because it is + /// inactive. + #[must_use] + pub fn task_inactive(&self, task: TaskId) -> NonZeroU32 { + self.add_task(task) + } + + /// Notify the GC queue that a task was active during GC + pub fn task_gc_active(&self, task: TaskId) { + self.active_tasks.insert(task); + } + + /// Notify the GC queue that a task might be inactive now. + pub fn task_potentially_no_longer_active(&self, task: TaskId) { + if self.active_tasks.remove(&task).is_some() { + let _ = self.deactivation_queue.push(task); + } + } + + fn add_task(&self, task: TaskId) -> NonZeroU32 { let _ = self.incoming_tasks.push(task); if self.incoming_tasks_count.fetch_add(1, Ordering::Acquire) % TASKS_PER_NEW_GENERATION == TASKS_PER_NEW_GENERATION - 1 @@ -123,7 +168,10 @@ impl GcQueue { self.incoming_tasks_count .fetch_sub(TASKS_PER_NEW_GENERATION, Ordering::Release); // We are selected to move TASKS_PER_NEW_GENERATION tasks into a generation - let gen = self.generation.fetch_add(1, Ordering::Relaxed); + let gen = unsafe { + // SAFETY: We are sure that the generation is not 0, since we start at 1. + NonZeroU32::new_unchecked(self.generation.fetch_add(1, Ordering::Relaxed)) + }; let mut tasks = Vec::with_capacity(TASKS_PER_NEW_GENERATION); for _ in 0..TASKS_PER_NEW_GENERATION { match self.incoming_tasks.pop() { @@ -143,11 +191,38 @@ impl GcQueue { }); gen } else { - self.generation.load(Ordering::Relaxed) + self.generation() + } + } + + fn process_deactivations( + &self, + backend: &MemoryBackend, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> ProcessDeactivationsResult { + let mut i = 0; + loop { + let Ok(id) = self.deactivation_queue.pop() else { + break; + }; + backend.with_task(id, |task| { + if !task.potentially_become_inactive(self, backend, turbo_tasks) { + self.active_tasks.insert(id); + } + }); + i += 1; + if i > MAX_DEACTIVATIONS { + break; + } } + ProcessDeactivationsResult { count: i } } - fn process_old_generation(&self, backend: &MemoryBackend) -> ProcessGenerationResult { + fn process_old_generation( + &self, + backend: &MemoryBackend, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> ProcessGenerationResult { let old_generation = { let guard = &mut self.generations.lock(); guard.pop_back() @@ -160,7 +235,8 @@ impl GcQueue { // No old generation to process return ProcessGenerationResult { priority: None, - count: 0, + content_dropped_count: 0, + unloaded_count: 0, }; }; // Check all tasks for the correct generation @@ -169,8 +245,10 @@ impl GcQueue { for (i, task) in tasks.iter().enumerate() { backend.with_task(*task, |task| { if let Some(state) = task.gc_state() { - if state.generation <= generation { - indices.push((Reverse(state.priority), i as u32)); + if let Some(gen) = state.generation { + if gen <= generation { + indices.push((Reverse(state.priority), i as u32)); + } } } }); @@ -180,7 +258,8 @@ impl GcQueue { // No valid tasks in old generation to process return ProcessGenerationResult { priority: None, - count: 0, + content_dropped_count: 0, + unloaded_count: 0, }; } @@ -235,35 +314,66 @@ impl GcQueue { } // GC the tasks - let mut count = 0; + let mut content_dropped_count = 0; + let mut unloaded_count = 0; for task in tasks[..tasks_to_collect].iter() { backend.with_task(*task, |task| { - if task.run_gc(generation) { - count += 1; + match task.run_gc(generation, self, backend, turbo_tasks) { + GcResult::NotPossible => {} + GcResult::Stale => {} + GcResult::ContentDropped => { + content_dropped_count += 1; + } + GcResult::Unloaded => { + unloaded_count += 1; + } } }); } ProcessGenerationResult { priority: Some(max_priority), - count, + content_dropped_count, + unloaded_count, } } /// Run garbage collection on the queue. - pub fn run_gc(&self, backend: &MemoryBackend) -> Option<(GcPriority, usize)> { - let span = - tracing::trace_span!("garbage collection", priority = Empty, count = Empty).entered(); + pub fn run_gc( + &self, + backend: &MemoryBackend, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> Option<(GcPriority, usize)> { + let span = tracing::trace_span!( + parent: None, + "garbage collection", + priority = Empty, + deactivations_count = Empty, + content_dropped_count = Empty, + unloaded_count = Empty, + already_unloaded_count = Empty + ) + .entered(); + + let ProcessDeactivationsResult { + count: deactivations_count, + } = self.process_deactivations(backend, turbo_tasks); - let ProcessGenerationResult { priority, count } = self.process_old_generation(backend); + let ProcessGenerationResult { + priority, + content_dropped_count, + unloaded_count, + } = self.process_old_generation(backend, turbo_tasks); - span.record("count", count); + span.record("deactivations_count", deactivations_count); + span.record("content_dropped_count", content_dropped_count); + span.record("unloaded_count", unloaded_count); if let Some(priority) = &priority { span.record("priority", debug(priority)); } else { span.record("priority", ""); } - priority.map(|p| (p, count)) + priority.map(|p| (p, content_dropped_count)) } } diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index 64fe7cb9182e8..cc2f973988ac9 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -3,6 +3,7 @@ use std::{ cell::RefCell, future::Future, hash::{BuildHasher, BuildHasherDefault, Hash}, + num::NonZeroU32, pin::Pin, sync::{ atomic::{AtomicBool, Ordering}, @@ -137,7 +138,7 @@ impl MemoryBackend { pub fn run_gc( &self, idle: bool, - _turbo_tasks: &dyn TurboTasksBackendApi, + turbo_tasks: &dyn TurboTasksBackendApi, ) -> bool { if let Some(gc_queue) = &self.gc_queue { let mut did_something = false; @@ -154,7 +155,7 @@ impl MemoryBackend { return did_something; } - let collected = gc_queue.run_gc(self); + let collected = gc_queue.run_gc(self, turbo_tasks); // Collecting less than 100 tasks is not worth it if !collected.map_or(false, |(_, count)| count > 100) { @@ -330,21 +331,28 @@ impl Backend for MemoryBackend { let generation = if let Some(gc_queue) = &self.gc_queue { gc_queue.generation() } else { - 0 + // SAFETY: 1 is not zero + unsafe { NonZeroU32::new_unchecked(1) } }; - let reexecute = self.with_task(task_id, |task| { - task.execution_completed( - duration, - memory_usage, - generation, - stateful, - self, - turbo_tasks, + let (reexecute, once_task) = self.with_task(task_id, |task| { + ( + task.execution_completed( + duration, + memory_usage, + generation, + stateful, + self, + turbo_tasks, + ), + task.is_once(), ) }); if !reexecute { if let Some(gc_queue) = &self.gc_queue { - gc_queue.task_executed(task_id); + let _ = gc_queue.task_executed(task_id); + if once_task { + gc_queue.task_potentially_no_longer_active(task_id); + } self.run_gc(false, turbo_tasks); } } @@ -402,7 +410,7 @@ impl Backend for MemoryBackend { } else { Task::add_dependency_to_current(TaskEdge::Cell(task_id, index)); self.with_task(task_id, |task| { - match task.with_cell_mut(index, self.gc_queue.as_ref(), |cell| { + match task.with_cell_mut(index, self.gc_queue.as_ref(), |cell, _| { cell.read_content( reader, move || format!("{task_id} {index}"), @@ -439,7 +447,7 @@ impl Backend for MemoryBackend { turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result> { self.with_task(task_id, |task| { - match task.with_cell_mut(index, self.gc_queue.as_ref(), |cell| { + match task.with_cell_mut(index, self.gc_queue.as_ref(), |cell, _| { cell.read_content_untracked( move || format!("{task_id}"), move || format!("reading {} {} untracked", task_id, index), @@ -499,8 +507,8 @@ impl Backend for MemoryBackend { turbo_tasks: &dyn TurboTasksBackendApi, ) { self.with_task(task, |task| { - task.with_cell_mut(index, self.gc_queue.as_ref(), |cell| { - cell.assign(content, turbo_tasks) + task.with_cell_mut(index, self.gc_queue.as_ref(), |cell, clean| { + cell.assign(content, clean, turbo_tasks) }) }) } diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index 0920f0687429f..4d470de14d71c 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -5,6 +5,7 @@ use std::{ future::Future, hash::{BuildHasherDefault, Hash}, mem::{replace, take}, + num::NonZeroU32, pin::Pin, sync::{atomic::AtomicU32, Arc}, time::Duration, @@ -187,6 +188,7 @@ impl TaskState { state_type: Scheduled { event: Event::new(move || format!("TaskState({})::event", description())), outdated_edges: Default::default(), + clean: true, }, collectibles: Default::default(), output: Default::default(), @@ -310,6 +312,8 @@ impl MaybeCollectibles { struct InProgressState { event: Event, count_as_finished: bool, + /// true, when the task wasn't changed since the last execution + clean: bool, /// Dependencies and children that need to be disconnected once leaving /// this state outdated_edges: TaskEdgesSet, @@ -346,6 +350,8 @@ enum TaskStateType { Scheduled { event: Event, outdated_edges: Box, + /// true, when the task wasn't changed since the last execution + clean: bool, }, /// Execution is happening @@ -427,6 +433,21 @@ use self::{ }, }; +pub enum GcResult { + /// The task is not allowed to GC, e. g. due to it being non-pure or having + /// state. + NotPossible, + /// The task was rescheduled for GC and must not be GC'ed now but at a later + /// time. + Stale, + /// Dropped the content of task cells to save memory. + ContentDropped, + /// Unloaded the task completely to save memory. This disconnects the task + /// from the graph and only makes sense when the task isn't currently + /// active. + Unloaded, +} + impl Task { pub(crate) fn new_persistent( id: TaskId, @@ -481,6 +502,14 @@ impl Task { } } + pub(crate) fn is_once(&self) -> bool { + match &self.ty { + TaskType::Persistent { .. } => false, + TaskType::Root(_) => false, + TaskType::Once(_) => true, + } + } + pub(crate) fn set_root( id: TaskId, backend: &MemoryBackend, @@ -669,6 +698,7 @@ impl Task { Scheduled { ref mut event, ref mut outdated_edges, + clean, } => { let event = event.take(); let outdated_edges = *take(outdated_edges); @@ -676,6 +706,7 @@ impl Task { state.state_type = InProgress(Box::new(InProgressState { event, count_as_finished: false, + clean, outdated_edges, outdated_collectibles, new_children: Default::default(), @@ -865,7 +896,7 @@ impl Task { &self, duration: Duration, memory_usage: usize, - generation: u32, + generation: NonZeroU32, stateful: bool, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, @@ -889,6 +920,7 @@ impl Task { ref mut outdated_edges, ref mut outdated_collectibles, ref mut new_children, + clean: _, }) => { let event = event.take(); let mut outdated_edges = take(outdated_edges); @@ -953,6 +985,7 @@ impl Task { state.state_type = Scheduled { event, outdated_edges: Box::new(outdated_edges), + clean: false, }; schedule_task = true; } @@ -983,15 +1016,6 @@ impl Task { &self, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, - ) { - self.make_dirty_internal(false, backend, turbo_tasks); - } - - fn make_dirty_internal( - &self, - force_schedule: bool, - backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, ) { if let TaskType::Once(_) = self.ty { // once task won't become dirty @@ -999,45 +1023,20 @@ impl Task { } let mut aggregation_context = TaskAggregationContext::new(turbo_tasks, backend); - let should_schedule = force_schedule - || query_root_info(&aggregation_context, ActiveQuery::default(), self.id); + let should_schedule = + query_root_info(&aggregation_context, ActiveQuery::default(), self.id); - let state = if force_schedule { - TaskMetaStateWriteGuard::Full(self.full_state_mut()) - } else { - self.state_mut() - }; - if let TaskMetaStateWriteGuard::Full(mut state) = state { + if let TaskMetaStateWriteGuard::Full(mut state) = self.state_mut() { match state.state_type { - Scheduled { .. } | InProgressDirty { .. } => { - // already dirty + Scheduled { ref mut clean, .. } => { + *clean = false; + + // already scheduled drop(state); } - Dirty { - ref mut outdated_edges, - } => { - if force_schedule { - let description = self.get_event_description(); - state.state_type = Scheduled { - event: Event::new(move || { - format!("TaskState({})::event", description()) - }), - outdated_edges: take(outdated_edges), - }; - let change_job = state.aggregation_node.apply_change( - &aggregation_context, - TaskChange { - dirty_tasks_update: vec![(self.id, -1)], - ..Default::default() - }, - ); - drop(state); - change_job.apply(&aggregation_context); - turbo_tasks.schedule(self.id); - } else { - // already dirty - drop(state); - } + Dirty { .. } | InProgressDirty { .. } => { + // already dirty + drop(state); } Done { ref mut edges, .. } => { let outdated_edges = take(edges).into_set(); @@ -1058,6 +1057,7 @@ impl Task { format!("TaskState({})::event", description()) }), outdated_edges: Box::new(outdated_edges), + clean: false, }; drop(state); change_job.apply(&aggregation_context); @@ -1090,6 +1090,7 @@ impl Task { ref mut outdated_edges, ref mut outdated_collectibles, ref mut new_children, + clean: _, }) => { let event = event.take(); let mut outdated_edges = take(outdated_edges); @@ -1138,6 +1139,76 @@ impl Task { aggregation_context.apply_queued_updates(); } + /// Called when the task need to be recomputed because a gc'ed cell was + /// read. + pub(crate) fn recompute( + &self, + backend: &MemoryBackend, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { + let _span = tracing::trace_span!("turbo_tasks::recompute", id = *self.id).entered(); + + // Events that lead to recomputation of non-pure task must not happen + assert!(self.is_pure()); + + let mut aggregation_context = TaskAggregationContext::new(turbo_tasks, backend); + let mut state = self.full_state_mut(); + match state.state_type { + Scheduled { .. } => { + // already scheduled + drop(state); + } + InProgressDirty { .. } | InProgress(..) => { + // already in progress + drop(state); + } + Dirty { + ref mut outdated_edges, + } => { + let description = self.get_event_description(); + state.state_type = Scheduled { + event: Event::new(move || format!("TaskState({})::event", description())), + outdated_edges: take(outdated_edges), + clean: false, + }; + let change_job = state.aggregation_node.apply_change( + &aggregation_context, + TaskChange { + dirty_tasks_update: vec![(self.id, -1)], + ..Default::default() + }, + ); + drop(state); + change_job.apply(&aggregation_context); + turbo_tasks.schedule(self.id); + } + Done { ref mut edges, .. } => { + let outdated_edges = take(edges).into_set(); + // add to dirty lists and potentially schedule + let description = self.get_event_description(); + let change_job = state.aggregation_node.apply_change( + &aggregation_context, + TaskChange { + unfinished: 1, + #[cfg(feature = "track_unfinished")] + unfinished_tasks_update: vec![(self.id, 1)], + ..Default::default() + }, + ); + state.state_type = Scheduled { + event: Event::new(move || format!("TaskState({})::event", description())), + outdated_edges: Box::new(outdated_edges), + clean: true, + }; + drop(state); + change_job.apply(&aggregation_context); + + turbo_tasks.schedule(self.id); + } + } + aggregation_context.apply_queued_updates(); + } + pub(crate) fn schedule_when_dirty_from_aggregation( &self, backend: &MemoryBackend, @@ -1153,6 +1224,7 @@ impl Task { state.state_type = Scheduled { event: Event::new(move || format!("TaskState({})::event", description())), outdated_edges: take(outdated_edges), + clean: false, }; let job = state.aggregation_node.apply_change( &aggregation_context, @@ -1191,17 +1263,6 @@ impl Task { self.make_dirty(backend, turbo_tasks) } - /// Called when the task need to be recomputed because a gc'ed cell was - /// read. - pub(crate) fn recompute( - &self, - backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, - ) { - let _span = tracing::trace_span!("turbo_tasks::recompute", id = *self.id).entered(); - self.make_dirty_internal(true, backend, turbo_tasks) - } - /// Access to the output cell. pub(crate) fn with_output_mut_if_available( &self, @@ -1219,21 +1280,25 @@ impl Task { &self, index: CellId, gc_queue: Option<&GcQueue>, - func: impl FnOnce(&mut Cell) -> T, + func: impl FnOnce(&mut Cell, bool) -> T, ) -> T { let mut state = self.full_state_mut(); if let Some(gc_queue) = gc_queue { let generation = gc_queue.generation(); if state.gc.on_read(generation) { - gc_queue.task_accessed(self.id); + let _ = gc_queue.task_accessed(self.id); } } + let clean = match state.state_type { + InProgress(box InProgressState { clean, .. }) => clean, + _ => false, + }; let list = state.cells.entry(index.type_id).or_default(); let i = index.index as usize; if list.len() <= i { list.resize_with(i + 1, Default::default); } - func(&mut list[i]) + func(&mut list[i], clean) } /// Access to a cell. @@ -1262,6 +1327,30 @@ impl Task { } } + /// Checks if the task is inactive. Returns false if it's still active. + pub(crate) fn potentially_become_inactive( + &self, + gc_queue: &GcQueue, + backend: &MemoryBackend, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> bool { + let aggregation_context = TaskAggregationContext::new(turbo_tasks, backend); + let active = query_root_info(&aggregation_context, ActiveQuery::default(), self.id); + if active { + return false; + } + if let TaskMetaStateWriteGuard::Full(mut state) = self.state_mut() { + if state.gc.generation.is_none() { + let generation = gc_queue.task_inactive(self.id); + state.gc.generation = Some(generation); + } + for child in state.state_type.children() { + gc_queue.task_potentially_no_longer_active(child); + } + } + true + } + pub fn is_pending(&self) -> bool { if let TaskMetaStateReadGuard::Full(state) = self.state() { !matches!(state.state_type, TaskStateType::Done { .. }) @@ -1425,6 +1514,7 @@ impl Task { state.state_type = Scheduled { event, outdated_edges: take(outdated_edges), + clean: false, }; let change_job = state.aggregation_node.apply_change( &aggregation_context, @@ -1506,57 +1596,80 @@ impl Task { aggregation_context.apply_queued_updates(); } - pub(crate) fn run_gc(&self, generation: u32) -> bool { + pub(crate) fn run_gc( + &self, + generation: NonZeroU32, + gc_queue: &GcQueue, + backend: &MemoryBackend, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> GcResult { if !self.is_pure() { - return false; + return GcResult::NotPossible; } - let mut cells_to_drop = Vec::new(); + let aggregation_context = TaskAggregationContext::new(turbo_tasks, backend); + let active = query_root_info(&aggregation_context, ActiveQuery::default(), self.id); match self.state_mut() { TaskMetaStateWriteGuard::Full(mut state) => { - if state.gc.generation > generation { - return false; + if let Some(old_gen) = state.gc.generation { + if old_gen > generation { + return GcResult::Stale; + } + } else { + return GcResult::Stale; } + state.gc.generation = None; - match &mut state.state_type { - TaskStateType::Done { stateful, edges: _ } => { - if *stateful { - return false; + if active { + let mut cells_to_drop = Vec::new(); + + match &mut state.state_type { + TaskStateType::Done { stateful, edges: _ } => { + if *stateful { + return GcResult::NotPossible; + } + } + TaskStateType::Dirty { .. } => {} + _ => { + // GC can't run in this state. We will reschedule it when the execution + // completes. + return GcResult::NotPossible; } } - TaskStateType::Dirty { .. } => {} - _ => { - // GC can't run in this state. We will reschedule it when the execution - // completes. - return false; - } - } - // shrinking memory and dropping cells - state.aggregation_node.shrink_to_fit(); - state.output.dependent_tasks.shrink_to_fit(); - state.cells.shrink_to_fit(); - for cells in state.cells.values_mut() { - cells.shrink_to_fit(); - for cell in cells.iter_mut() { - cells_to_drop.extend(cell.gc_content()); - cell.shrink_to_fit(); + // shrinking memory and dropping cells + state.aggregation_node.shrink_to_fit(); + state.output.dependent_tasks.shrink_to_fit(); + state.cells.shrink_to_fit(); + for cells in state.cells.values_mut() { + cells.shrink_to_fit(); + for cell in cells.iter_mut() { + cells_to_drop.extend(cell.gc_content()); + cell.shrink_to_fit(); + } } - } - drop(state); + drop(state); - // Dropping cells outside of the lock - drop(cells_to_drop); + gc_queue.task_gc_active(self.id); - true + // Dropping cells outside of the lock + drop(cells_to_drop); + + GcResult::ContentDropped + } else { + // Task is inactive, unload task + self.unload(state, backend, turbo_tasks); + GcResult::Unloaded + } } TaskMetaStateWriteGuard::Partial(mut state) => { state.aggregation_node.shrink_to_fit(); - false + GcResult::Unloaded } - _ => false, + TaskMetaStateWriteGuard::Unloaded(_) => GcResult::Unloaded, + TaskMetaStateWriteGuard::TemporaryFiller => unreachable!(), } } @@ -1568,8 +1681,6 @@ impl Task { } } - // TODO not used yet, but planned - #[allow(dead_code)] fn unload( &self, mut full_state: FullTaskWriteGuard<'_>, @@ -1646,6 +1757,8 @@ impl Task { None }; + aggregation_node.shrink_to_fit(); + // TODO aggregation_node let unset = false;