From 64b1e46c15ceb03081b6327b6573dd7c956fb1e6 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 17 May 2024 17:33:25 +0200 Subject: [PATCH] Drop excessive cells after task reexecution When cells become unused after recomputation of a task, drop them. --- .../turbo-tasks-memory/src/memory_backend.rs | 44 +++++++++++-------- .../src/memory_backend_with_pg.rs | 5 ++- crates/turbo-tasks-memory/src/task.rs | 39 +++++++++++++--- crates/turbo-tasks/src/backend.rs | 8 ++-- crates/turbo-tasks/src/manager.rs | 3 ++ 5 files changed, 71 insertions(+), 28 deletions(-) diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index cc2f973988ac9b..17b24c97ef6039 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -12,7 +12,7 @@ use std::{ time::Duration, }; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use auto_hash_map::AutoMap; use dashmap::{mapref::entry::Entry, DashMap}; use rustc_hash::FxHasher; @@ -26,7 +26,7 @@ use turbo_tasks::{ }, event::EventListener, util::{IdFactory, NoMoveVec}, - CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused, + CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused, ValueTypeId, }; use crate::{ @@ -325,6 +325,7 @@ impl Backend for MemoryBackend { task_id: TaskId, duration: Duration, memory_usage: usize, + cell_counters: AutoMap, 8>, stateful: bool, turbo_tasks: &dyn TurboTasksBackendApi, ) -> bool { @@ -340,6 +341,7 @@ impl Backend for MemoryBackend { duration, memory_usage, generation, + cell_counters, stateful, self, turbo_tasks, @@ -410,15 +412,18 @@ impl Backend for MemoryBackend { } else { Task::add_dependency_to_current(TaskEdge::Cell(task_id, index)); self.with_task(task_id, |task| { - match task.with_cell_mut(index, self.gc_queue.as_ref(), |cell, _| { - cell.read_content( - reader, - move || format!("{task_id} {index}"), - move || format!("reading {} {} from {}", task_id, index, reader), - ) + match task.access_cell_for_read(index, self.gc_queue.as_ref(), |cell| { + cell.map(|cell| { + cell.read_content( + reader, + move || format!("{task_id} {index}"), + move || format!("reading {} {} from {}", task_id, index, reader), + ) + }) }) { - Ok(content) => Ok(Ok(content)), - Err(RecomputingCell { listener, schedule }) => { + None => Err(anyhow!("Cell doesn't exist (anymore)")), + Some(Ok(content)) => Ok(Ok(content)), + Some(Err(RecomputingCell { listener, schedule })) => { if schedule { task.recompute(self, turbo_tasks); } @@ -447,14 +452,17 @@ impl Backend for MemoryBackend { turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result> { self.with_task(task_id, |task| { - match task.with_cell_mut(index, self.gc_queue.as_ref(), |cell, _| { - cell.read_content_untracked( - move || format!("{task_id}"), - move || format!("reading {} {} untracked", task_id, index), - ) + match task.access_cell_for_read(index, self.gc_queue.as_ref(), |cell| { + cell.map(|cell| { + cell.read_content_untracked( + move || format!("{task_id} {index}"), + move || format!("reading {} {} untracked", task_id, index), + ) + }) }) { - Ok(content) => Ok(Ok(content)), - Err(RecomputingCell { listener, schedule }) => { + None => Err(anyhow!("Cell doesn't exist (anymore)")), + Some(Ok(content)) => Ok(Ok(content)), + Some(Err(RecomputingCell { listener, schedule })) => { if schedule { task.recompute(self, turbo_tasks); } @@ -507,7 +515,7 @@ impl Backend for MemoryBackend { turbo_tasks: &dyn TurboTasksBackendApi, ) { self.with_task(task, |task| { - task.with_cell_mut(index, self.gc_queue.as_ref(), |cell, clean| { + task.access_cell_for_write(index, |cell, clean| { cell.assign(content, clean, turbo_tasks) }) }) diff --git a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs index a8fa8104b8af2d..dfe6cea27c1597 100644 --- a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs +++ b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs @@ -3,6 +3,7 @@ use std::{ collections::{BinaryHeap, HashMap}, fmt::Debug, future::Future, + hash::BuildHasherDefault, mem::{replace, take}, pin::Pin, sync::{ @@ -16,6 +17,7 @@ use anyhow::{anyhow, Result}; use auto_hash_map::{AutoMap, AutoSet}; use concurrent_queue::ConcurrentQueue; use dashmap::{mapref::entry::Entry, DashMap, DashSet}; +use rustc_hash::FxHasher; use turbo_tasks::{ backend::{ Backend, BackendJobId, CellContent, PersistentTaskType, TaskExecutionSpec, @@ -27,7 +29,7 @@ use turbo_tasks::{ PersistedGraphApi, ReadTaskState, TaskCell, TaskData, }, util::{IdFactory, NoMoveVec, SharedError}, - CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused, + CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused, ValueTypeId, }; type RootTaskFn = @@ -1154,6 +1156,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ task: TaskId, duration: Duration, _memory_usage: usize, + _cell_counters: AutoMap, 8>, _stateful: bool, turbo_tasks: &dyn TurboTasksBackendApi>, ) -> bool { diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index 4d470de14d71ce..d0864598a4ef2d 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -627,14 +627,14 @@ impl Task { match dep { TaskEdge::Output(task) => { backend.with_task(task, |task| { - task.with_output_mut_if_available(|output| { + task.access_output_for_removing_dependents(|output| { output.dependent_tasks.remove(&reader); }); }); } TaskEdge::Cell(task, index) => { backend.with_task(task, |task| { - task.with_cell_mut_if_available(index, |cell| { + task.access_cell_for_removing_dependents(index, |cell| { cell.remove_dependent_task(reader); }); }); @@ -897,6 +897,7 @@ impl Task { duration: Duration, memory_usage: usize, generation: NonZeroU32, + cell_counters: AutoMap, 8>, stateful: bool, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, @@ -906,6 +907,7 @@ impl Task { { let mut change_job = None; let mut remove_job = None; + let mut drained_cells = SmallVec::<[Cell; 8]>::new(); let mut dependencies = DEPENDENCIES_TO_TRACK.with(|deps| deps.take()); { let mut state = self.full_state_mut(); @@ -913,6 +915,12 @@ impl Task { state .gc .execution_completed(duration, memory_usage, generation); + for (value_type, cells) in state.cells.iter_mut() { + let counter = cell_counters.get(value_type).copied().unwrap_or_default(); + if counter != cells.len() as u32 { + drained_cells.extend(cells.drain(counter as usize..)); + } + } match state.state_type { InProgress(box InProgressState { ref mut event, @@ -1000,6 +1008,9 @@ impl Task { if !dependencies.is_empty() { self.clear_dependencies(dependencies, backend, turbo_tasks); } + for cell in drained_cells { + cell.gc_drop(turbo_tasks); + } change_job.apply(&aggregation_context); remove_job.apply(&aggregation_context); } @@ -1264,7 +1275,7 @@ impl Task { } /// Access to the output cell. - pub(crate) fn with_output_mut_if_available( + pub(crate) fn access_output_for_removing_dependents( &self, func: impl FnOnce(&mut Output) -> T, ) -> Option { @@ -1276,11 +1287,11 @@ impl Task { } /// Access to a cell. - pub(crate) fn with_cell_mut( + pub(crate) fn access_cell_for_read( &self, index: CellId, gc_queue: Option<&GcQueue>, - func: impl FnOnce(&mut Cell, bool) -> T, + func: impl FnOnce(Option<&mut Cell>) -> T, ) -> T { let mut state = self.full_state_mut(); if let Some(gc_queue) = gc_queue { @@ -1289,6 +1300,22 @@ impl Task { let _ = gc_queue.task_accessed(self.id); } } + let list = state.cells.entry(index.type_id).or_default(); + let i = index.index as usize; + if list.len() <= i { + func(None) + } else { + func(Some(&mut list[i])) + } + } + + /// Access to a cell. + pub(crate) fn access_cell_for_write( + &self, + index: CellId, + func: impl FnOnce(&mut Cell, bool) -> T, + ) -> T { + let mut state = self.full_state_mut(); let clean = match state.state_type { InProgress(box InProgressState { clean, .. }) => clean, _ => false, @@ -1302,7 +1329,7 @@ impl Task { } /// Access to a cell. - pub(crate) fn with_cell_mut_if_available( + pub(crate) fn access_cell_for_removing_dependents( &self, index: CellId, func: impl FnOnce(&mut Cell) -> T, diff --git a/crates/turbo-tasks/src/backend.rs b/crates/turbo-tasks/src/backend.rs index 1040b718121a38..461e1ea4e4da9b 100644 --- a/crates/turbo-tasks/src/backend.rs +++ b/crates/turbo-tasks/src/backend.rs @@ -1,9 +1,9 @@ use std::{ any::Any, borrow::Cow, - fmt, - fmt::{Debug, Display, Write}, + fmt::{self, Debug, Display, Write}, future::Future, + hash::BuildHasherDefault, mem::take, pin::Pin, sync::Arc, @@ -12,6 +12,7 @@ use std::{ use anyhow::{anyhow, bail, Result}; use auto_hash_map::AutoMap; +use rustc_hash::FxHasher; use serde::{Deserialize, Serialize}; use tracing::Span; @@ -19,7 +20,7 @@ pub use crate::id::BackendJobId; use crate::{ event::EventListener, manager::TurboTasksBackendApi, raw_vc::CellId, registry, ConcreteTaskInput, FunctionId, RawVc, ReadRef, SharedReference, TaskId, TaskIdProvider, - TaskIdSet, TraitRef, TraitTypeId, VcValueTrait, VcValueType, + TaskIdSet, TraitRef, TraitTypeId, ValueTypeId, VcValueTrait, VcValueType, }; pub enum TaskType { @@ -237,6 +238,7 @@ pub trait Backend: Sync + Send { task: TaskId, duration: Duration, memory_usage: usize, + cell_counters: AutoMap, 8>, stateful: bool, turbo_tasks: &dyn TurboTasksBackendApi, ) -> bool; diff --git a/crates/turbo-tasks/src/manager.rs b/crates/turbo-tasks/src/manager.rs index 87fded4ccd50ca..7ba4b6cc2ab726 100644 --- a/crates/turbo-tasks/src/manager.rs +++ b/crates/turbo-tasks/src/manager.rs @@ -458,10 +458,13 @@ impl TurboTasks { }); this.backend.task_execution_result(task_id, result, &*this); let stateful = this.finish_current_task_state(); + let cell_counters = + CELL_COUNTERS.with(|cc| take(&mut *cc.borrow_mut())); this.backend.task_execution_completed( task_id, duration, memory_usage, + cell_counters, stateful, &*this, )