Skip to content

Commit

Permalink
Drop excessive cells after task reexecution
Browse files Browse the repository at this point in the history
When cells become unused after recomputation of a task, drop them.
  • Loading branch information
sokra committed Jul 15, 2024
1 parent c454e35 commit 64b1e46
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 28 deletions.
44 changes: 26 additions & 18 deletions crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
time::Duration,
};

use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use auto_hash_map::AutoMap;
use dashmap::{mapref::entry::Entry, DashMap};
use rustc_hash::FxHasher;
Expand All @@ -26,7 +26,7 @@ use turbo_tasks::{
},
event::EventListener,
util::{IdFactory, NoMoveVec},
CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused,
CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused, ValueTypeId,
};

use crate::{
Expand Down Expand Up @@ -325,6 +325,7 @@ impl Backend for MemoryBackend {
task_id: TaskId,
duration: Duration,
memory_usage: usize,
cell_counters: AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
stateful: bool,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> bool {
Expand All @@ -340,6 +341,7 @@ impl Backend for MemoryBackend {
duration,
memory_usage,
generation,
cell_counters,
stateful,
self,
turbo_tasks,
Expand Down Expand Up @@ -410,15 +412,18 @@ impl Backend for MemoryBackend {
} else {
Task::add_dependency_to_current(TaskEdge::Cell(task_id, index));
self.with_task(task_id, |task| {
match task.with_cell_mut(index, self.gc_queue.as_ref(), |cell, _| {
cell.read_content(
reader,
move || format!("{task_id} {index}"),
move || format!("reading {} {} from {}", task_id, index, reader),
)
match task.access_cell_for_read(index, self.gc_queue.as_ref(), |cell| {
cell.map(|cell| {
cell.read_content(
reader,
move || format!("{task_id} {index}"),
move || format!("reading {} {} from {}", task_id, index, reader),
)
})
}) {
Ok(content) => Ok(Ok(content)),
Err(RecomputingCell { listener, schedule }) => {
None => Err(anyhow!("Cell doesn't exist (anymore)")),
Some(Ok(content)) => Ok(Ok(content)),
Some(Err(RecomputingCell { listener, schedule })) => {
if schedule {
task.recompute(self, turbo_tasks);
}
Expand Down Expand Up @@ -447,14 +452,17 @@ impl Backend for MemoryBackend {
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> Result<Result<CellContent, EventListener>> {
self.with_task(task_id, |task| {
match task.with_cell_mut(index, self.gc_queue.as_ref(), |cell, _| {
cell.read_content_untracked(
move || format!("{task_id}"),
move || format!("reading {} {} untracked", task_id, index),
)
match task.access_cell_for_read(index, self.gc_queue.as_ref(), |cell| {
cell.map(|cell| {
cell.read_content_untracked(
move || format!("{task_id} {index}"),
move || format!("reading {} {} untracked", task_id, index),
)
})
}) {
Ok(content) => Ok(Ok(content)),
Err(RecomputingCell { listener, schedule }) => {
None => Err(anyhow!("Cell doesn't exist (anymore)")),
Some(Ok(content)) => Ok(Ok(content)),
Some(Err(RecomputingCell { listener, schedule })) => {
if schedule {
task.recompute(self, turbo_tasks);
}
Expand Down Expand Up @@ -507,7 +515,7 @@ impl Backend for MemoryBackend {
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) {
self.with_task(task, |task| {
task.with_cell_mut(index, self.gc_queue.as_ref(), |cell, clean| {
task.access_cell_for_write(index, |cell, clean| {
cell.assign(content, clean, turbo_tasks)
})
})
Expand Down
5 changes: 4 additions & 1 deletion crates/turbo-tasks-memory/src/memory_backend_with_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
collections::{BinaryHeap, HashMap},
fmt::Debug,
future::Future,
hash::BuildHasherDefault,
mem::{replace, take},
pin::Pin,
sync::{
Expand All @@ -16,6 +17,7 @@ use anyhow::{anyhow, Result};
use auto_hash_map::{AutoMap, AutoSet};
use concurrent_queue::ConcurrentQueue;
use dashmap::{mapref::entry::Entry, DashMap, DashSet};
use rustc_hash::FxHasher;
use turbo_tasks::{
backend::{
Backend, BackendJobId, CellContent, PersistentTaskType, TaskExecutionSpec,
Expand All @@ -27,7 +29,7 @@ use turbo_tasks::{
PersistedGraphApi, ReadTaskState, TaskCell, TaskData,
},
util::{IdFactory, NoMoveVec, SharedError},
CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused,
CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused, ValueTypeId,
};

type RootTaskFn =
Expand Down Expand Up @@ -1154,6 +1156,7 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
task: TaskId,
duration: Duration,
_memory_usage: usize,
_cell_counters: AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
_stateful: bool,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackendWithPersistedGraph<P>>,
) -> bool {
Expand Down
39 changes: 33 additions & 6 deletions crates/turbo-tasks-memory/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,14 +627,14 @@ impl Task {
match dep {
TaskEdge::Output(task) => {
backend.with_task(task, |task| {
task.with_output_mut_if_available(|output| {
task.access_output_for_removing_dependents(|output| {
output.dependent_tasks.remove(&reader);
});
});
}
TaskEdge::Cell(task, index) => {
backend.with_task(task, |task| {
task.with_cell_mut_if_available(index, |cell| {
task.access_cell_for_removing_dependents(index, |cell| {
cell.remove_dependent_task(reader);
});
});
Expand Down Expand Up @@ -897,6 +897,7 @@ impl Task {
duration: Duration,
memory_usage: usize,
generation: NonZeroU32,
cell_counters: AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
stateful: bool,
backend: &MemoryBackend,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
Expand All @@ -906,13 +907,20 @@ impl Task {
{
let mut change_job = None;
let mut remove_job = None;
let mut drained_cells = SmallVec::<[Cell; 8]>::new();
let mut dependencies = DEPENDENCIES_TO_TRACK.with(|deps| deps.take());
{
let mut state = self.full_state_mut();

state
.gc
.execution_completed(duration, memory_usage, generation);
for (value_type, cells) in state.cells.iter_mut() {
let counter = cell_counters.get(value_type).copied().unwrap_or_default();
if counter != cells.len() as u32 {
drained_cells.extend(cells.drain(counter as usize..));
}
}
match state.state_type {
InProgress(box InProgressState {
ref mut event,
Expand Down Expand Up @@ -1000,6 +1008,9 @@ impl Task {
if !dependencies.is_empty() {
self.clear_dependencies(dependencies, backend, turbo_tasks);
}
for cell in drained_cells {
cell.gc_drop(turbo_tasks);
}
change_job.apply(&aggregation_context);
remove_job.apply(&aggregation_context);
}
Expand Down Expand Up @@ -1264,7 +1275,7 @@ impl Task {
}

/// Access to the output cell.
pub(crate) fn with_output_mut_if_available<T>(
pub(crate) fn access_output_for_removing_dependents<T>(
&self,
func: impl FnOnce(&mut Output) -> T,
) -> Option<T> {
Expand All @@ -1276,11 +1287,11 @@ impl Task {
}

/// Access to a cell.
pub(crate) fn with_cell_mut<T>(
pub(crate) fn access_cell_for_read<T>(
&self,
index: CellId,
gc_queue: Option<&GcQueue>,
func: impl FnOnce(&mut Cell, bool) -> T,
func: impl FnOnce(Option<&mut Cell>) -> T,
) -> T {
let mut state = self.full_state_mut();
if let Some(gc_queue) = gc_queue {
Expand All @@ -1289,6 +1300,22 @@ impl Task {
let _ = gc_queue.task_accessed(self.id);
}
}
let list = state.cells.entry(index.type_id).or_default();
let i = index.index as usize;
if list.len() <= i {
func(None)
} else {
func(Some(&mut list[i]))
}
}

/// Access to a cell.
pub(crate) fn access_cell_for_write<T>(
&self,
index: CellId,
func: impl FnOnce(&mut Cell, bool) -> T,
) -> T {
let mut state = self.full_state_mut();
let clean = match state.state_type {
InProgress(box InProgressState { clean, .. }) => clean,
_ => false,
Expand All @@ -1302,7 +1329,7 @@ impl Task {
}

/// Access to a cell.
pub(crate) fn with_cell_mut_if_available<T>(
pub(crate) fn access_cell_for_removing_dependents<T>(
&self,
index: CellId,
func: impl FnOnce(&mut Cell) -> T,
Expand Down
8 changes: 5 additions & 3 deletions crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{
any::Any,
borrow::Cow,
fmt,
fmt::{Debug, Display, Write},
fmt::{self, Debug, Display, Write},
future::Future,
hash::BuildHasherDefault,
mem::take,
pin::Pin,
sync::Arc,
Expand All @@ -12,14 +12,15 @@ use std::{

use anyhow::{anyhow, bail, Result};
use auto_hash_map::AutoMap;
use rustc_hash::FxHasher;
use serde::{Deserialize, Serialize};
use tracing::Span;

pub use crate::id::BackendJobId;
use crate::{
event::EventListener, manager::TurboTasksBackendApi, raw_vc::CellId, registry,
ConcreteTaskInput, FunctionId, RawVc, ReadRef, SharedReference, TaskId, TaskIdProvider,
TaskIdSet, TraitRef, TraitTypeId, VcValueTrait, VcValueType,
TaskIdSet, TraitRef, TraitTypeId, ValueTypeId, VcValueTrait, VcValueType,
};

pub enum TaskType {
Expand Down Expand Up @@ -237,6 +238,7 @@ pub trait Backend: Sync + Send {
task: TaskId,
duration: Duration,
memory_usage: usize,
cell_counters: AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
stateful: bool,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> bool;
Expand Down
3 changes: 3 additions & 0 deletions crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,13 @@ impl<B: Backend + 'static> TurboTasks<B> {
});
this.backend.task_execution_result(task_id, result, &*this);
let stateful = this.finish_current_task_state();
let cell_counters =
CELL_COUNTERS.with(|cc| take(&mut *cc.borrow_mut()));
this.backend.task_execution_completed(
task_id,
duration,
memory_usage,
cell_counters,
stateful,
&*this,
)
Expand Down

0 comments on commit 64b1e46

Please sign in to comment.