Skip to content

Commit

Permalink
Refactor cell reading to handle removed cells (vercel/turborepo#8819)
Browse files Browse the repository at this point in the history
### Description

Refactor cell reading to handle removed cells

handle recomputation of cells on read

use start event instead of done event when waiting for task reading

That's how the Cell state changes:


![image](https://github.com/user-attachments/assets/7e9ff129-1169-433f-a0be-dae90af014e2)


### Testing Instructions

<!--
  Give a quick description of steps to test your changes.
-->
  • Loading branch information
sokra authored Jul 24, 2024
1 parent 38a88ff commit 43c8422
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 232 deletions.
270 changes: 101 additions & 169 deletions crates/turbo-tasks-memory/src/cell.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use std::{
fmt::Debug,
mem::{replace, take},
};
use std::{fmt::Debug, mem::replace};

use auto_hash_map::AutoSet;
use turbo_tasks::{
backend::CellContent,
event::{Event, EventListener},
Expand All @@ -13,71 +9,66 @@ use turbo_tasks::{
use crate::MemoryBackend;

#[derive(Default, Debug)]
pub(crate) enum Cell {
/// No content has been set yet, or it was removed for memory pressure
/// reasons.
pub(crate) struct Cell {
dependent_tasks: TaskIdSet,
state: CellState,
}

#[derive(Default, Debug)]
pub(crate) enum CellState {
/// No content has been set yet, or
/// it was removed for memory pressure reasons, or
/// cell is no longer used (It was assigned once and then no longer used
/// after recomputation).
///
/// Assigning a value will transition to the Value state.
/// Reading this cell will transition to the Recomputing state.
/// Reading this cell will,
/// - transition to the Computing state if the task is is progress
/// - return an error if the task is already done.
#[default]
Empty,
/// The content has been removed for memory pressure reasons, but the
/// tracking is still active. Any update will invalidate dependent tasks.
/// Assigning a value will transition to the Value state.
/// Reading this cell will transition to the Recomputing state.
TrackedValueless { dependent_tasks: TaskIdSet },
/// Reading this cell will transition to the Computing state.
TrackedValueless,
/// Someone wanted to read the content and it was not available. The content
/// is now being recomputed.
/// is now being computed.
/// Assigning a value will transition to the Value state.
Recomputing {
dependent_tasks: TaskIdSet,
/// When the task ends this transitions to the Empty state if not assigned.
Computing {
/// The event that will be triggered when transitioning to another
/// state.
event: Event,
},
/// The content was set only once and is tracked.
/// GC operation will transition to the TrackedValueless state.
Value {
dependent_tasks: TaskIdSet,
content: CellContent,
},
Value { content: CellContent },
}

#[derive(Debug)]
pub struct RecomputingCell {
pub listener: EventListener,
pub schedule: bool,
pub enum ReadContentError {
Computing {
listener: EventListener,
schedule: bool,
},
Unused,
}

impl Cell {
/// Removes a task from the list of dependent tasks.
pub fn remove_dependent_task(&mut self, task: TaskId) {
match self {
Cell::Empty => {}
Cell::Value {
dependent_tasks, ..
}
| Cell::TrackedValueless {
dependent_tasks, ..
}
| Cell::Recomputing {
dependent_tasks, ..
} => {
dependent_tasks.remove(&task);
}
}
self.dependent_tasks.remove(&task);
}

/// Switch the cell to recomputing state.
fn recompute(
fn compute(
&mut self,
dependent_tasks: TaskIdSet,
description: impl Fn() -> String + Sync + Send + 'static,
note: impl Fn() -> String + Sync + Send + 'static,
) -> EventListener {
let event = Event::new(move || (description)() + " -> Cell::Recomputing::event");
let event = Event::new(move || (description)() + " -> CellState::Computing::event");
let listener = event.listen_with_note(note);
*self = Cell::Recomputing {
event,
dependent_tasks,
};
self.state = CellState::Computing { event };
listener
}

Expand All @@ -87,20 +78,24 @@ impl Cell {
pub fn read_content(
&mut self,
reader: TaskId,
task_done: bool,
description: impl Fn() -> String + Sync + Send + 'static,
note: impl Fn() -> String + Sync + Send + 'static,
) -> Result<CellContent, RecomputingCell> {
if let Cell::Value {
content,
dependent_tasks,
..
} = self
{
dependent_tasks.insert(reader);
return Ok(content.clone());
) -> Result<CellContent, ReadContentError> {
match &self.state {
CellState::Value { content } => {
self.dependent_tasks.insert(reader);
Ok(content.clone())
}
CellState::Empty if task_done => {
self.dependent_tasks.insert(reader);
Err(ReadContentError::Unused)
}
_ => {
// Same behavior for all other states, so we reuse the same code.
self.read_content_untracked(task_done, description, note)
}
}
// Same behavior for all other states, so we reuse the same code.
self.read_content_untracked(description, note)
}

/// Read the content of the cell when avaiable. Does not register the reader
Expand All @@ -111,35 +106,37 @@ impl Cell {
/// track dependencies, so using it could break cache invalidation.
pub fn read_content_untracked(
&mut self,
task_done: bool,
description: impl Fn() -> String + Sync + Send + 'static,
note: impl Fn() -> String + Sync + Send + 'static,
) -> Result<CellContent, RecomputingCell> {
match self {
Cell::Empty => {
let listener = self.recompute(AutoSet::default(), description, note);
Err(RecomputingCell {
listener,
schedule: true,
})
) -> Result<CellContent, ReadContentError> {
match &self.state {
CellState::Value { content } => Ok(content.clone()),
CellState::Empty => {
if task_done {
Err(ReadContentError::Unused)
} else {
let listener = self.compute(description, note);
Err(ReadContentError::Computing {
listener,
schedule: true,
})
}
}
Cell::Recomputing { event, .. } => {
CellState::Computing { event } => {
let listener = event.listen_with_note(note);
Err(RecomputingCell {
Err(ReadContentError::Computing {
listener,
schedule: false,
})
}
&mut Cell::TrackedValueless {
ref mut dependent_tasks,
} => {
let dependent_tasks = take(dependent_tasks);
let listener = self.recompute(dependent_tasks, description, note);
Err(RecomputingCell {
CellState::TrackedValueless => {
let listener = self.compute(description, note);
Err(ReadContentError::Computing {
listener,
schedule: true,
})
}
Cell::Value { content, .. } => Ok(content.clone()),
}
}

Expand All @@ -150,11 +147,11 @@ impl Cell {
/// INVALIDATION: Be careful with this, it will not track
/// dependencies, so using it could break cache invalidation.
pub fn read_own_content_untracked(&self) -> CellContent {
match self {
Cell::Empty | Cell::Recomputing { .. } | Cell::TrackedValueless { .. } => {
match &self.state {
CellState::Empty | CellState::Computing { .. } | CellState::TrackedValueless => {
CellContent(None)
}
Cell::Value { content, .. } => content.clone(),
CellState::Value { content } => content.clone(),
}
}

Expand All @@ -168,104 +165,56 @@ impl Cell {
clean: bool,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) {
match self {
Cell::Empty => {
*self = Cell::Value {
content,
dependent_tasks: AutoSet::default(),
};
}
&mut Cell::Recomputing {
ref mut event,
ref mut dependent_tasks,
} => {
match &self.state {
CellState::Empty => {}
CellState::Computing { event } => {
event.notify(usize::MAX);
if clean {
// We can assume that the task is deterministic and produces the same content
// again. No need to notify dependent tasks.
*self = Cell::Value {
content,
dependent_tasks: take(dependent_tasks),
};
} else {
// Assigning to a cell will invalidate all dependent tasks as the content might
// have changed.
if !dependent_tasks.is_empty() {
turbo_tasks.schedule_notify_tasks_set(dependent_tasks);
}
*self = Cell::Value {
content,
dependent_tasks: AutoSet::default(),
};
self.state = CellState::Value { content };
return;
}
}
&mut Cell::TrackedValueless {
ref mut dependent_tasks,
} => {
CellState::TrackedValueless => {
if clean {
// We can assume that the task is deterministic and produces the same content
// again. No need to notify dependent tasks.
*self = Cell::Value {
content,
dependent_tasks: take(dependent_tasks),
};
} else {
// Assigning to a cell will invalidate all dependent tasks as the content might
// have changed.
if !dependent_tasks.is_empty() {
turbo_tasks.schedule_notify_tasks_set(dependent_tasks);
}
*self = Cell::Value {
content,
dependent_tasks: AutoSet::default(),
};
self.state = CellState::Value { content };
return;
}
}
Cell::Value {
content: ref mut cell_content,
dependent_tasks,
CellState::Value {
content: cell_content,
} => {
if content != *cell_content {
if !dependent_tasks.is_empty() {
turbo_tasks.schedule_notify_tasks_set(dependent_tasks);
dependent_tasks.clear();
}
*cell_content = content;
if content == *cell_content {
return;
}
}
}
self.state = CellState::Value { content };
// Assigning to a cell will invalidate all dependent tasks as the content might
// have changed.
if !self.dependent_tasks.is_empty() {
turbo_tasks.schedule_notify_tasks_set(&self.dependent_tasks);
self.dependent_tasks.clear();
}
}

/// Reduces memory needs to the minimum.
pub fn shrink_to_fit(&mut self) {
match self {
Cell::Empty => {}
Cell::TrackedValueless {
dependent_tasks, ..
}
| Cell::Recomputing {
dependent_tasks, ..
}
| Cell::Value {
dependent_tasks, ..
} => {
dependent_tasks.shrink_to_fit();
}
}
self.dependent_tasks.shrink_to_fit();
}

/// Takes the content out of the cell. Make sure to drop the content outside
/// of the task state lock.
#[must_use]
pub fn gc_content(&mut self) -> Option<CellContent> {
match self {
Cell::Empty | Cell::Recomputing { .. } | Cell::TrackedValueless { .. } => None,
Cell::Value {
dependent_tasks, ..
} => {
let dependent_tasks = take(dependent_tasks);
let Cell::Value { content, .. } =
replace(self, Cell::TrackedValueless { dependent_tasks })
match self.state {
CellState::Empty | CellState::Computing { .. } | CellState::TrackedValueless => None,
CellState::Value { .. } => {
let CellState::Value { content, .. } =
replace(&mut self.state, CellState::TrackedValueless)
else {
unreachable!()
};
Expand All @@ -276,28 +225,11 @@ impl Cell {

/// Drops the cell after GC. Will notify all dependent tasks and events.
pub fn gc_drop(self, turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>) {
match self {
Cell::Empty => {}
Cell::Recomputing {
event,
dependent_tasks,
..
} => {
event.notify(usize::MAX);
if !dependent_tasks.is_empty() {
turbo_tasks.schedule_notify_tasks_set(&dependent_tasks);
}
}
Cell::TrackedValueless {
dependent_tasks, ..
}
| Cell::Value {
dependent_tasks, ..
} => {
if !dependent_tasks.is_empty() {
turbo_tasks.schedule_notify_tasks_set(&dependent_tasks);
}
}
if !self.dependent_tasks.is_empty() {
turbo_tasks.schedule_notify_tasks_set(&self.dependent_tasks);
}
if let CellState::Computing { event } = self.state {
event.notify(usize::MAX);
}
}
}
Loading

0 comments on commit 43c8422

Please sign in to comment.