Skip to content

Commit

Permalink
handle recomputation of cells on read
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Jul 15, 2024
1 parent c45b56e commit 0967bee
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 47 deletions.
58 changes: 23 additions & 35 deletions crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ use turbo_tasks::{
};

use crate::{
cell::RecomputingCell,
edges_set::{TaskEdge, TaskEdgesSet},
gc::{GcQueue, PERCENTAGE_IDLE_TARGET_MEMORY, PERCENTAGE_TARGET_MEMORY},
output::Output,
task::{Task, DEPENDENCIES_TO_TRACK},
task::{ReadCellError, Task, DEPENDENCIES_TO_TRACK},
task_statistics::TaskStatisticsApi,
};

Expand Down Expand Up @@ -412,23 +411,17 @@ impl Backend for MemoryBackend {
} else {
Task::add_dependency_to_current(TaskEdge::Cell(task_id, index));
self.with_task(task_id, |task| {
match task.access_cell_for_read(index, self.gc_queue.as_ref(), |cell| {
cell.map(|cell| {
cell.read_content(
reader,
move || format!("{task_id} {index}"),
move || format!("reading {} {} from {}", task_id, index, reader),
)
})
}) {
None => Err(anyhow!("Cell doesn't exist (anymore)")),
Some(Ok(content)) => Ok(Ok(content)),
Some(Err(RecomputingCell { listener, schedule })) => {
if schedule {
task.recompute(self, turbo_tasks);
}
Ok(Err(listener))
}
match task.read_cell(
index,
self.gc_queue.as_ref(),
move || format!("reading {} {} from {}", task_id, index, reader),
Some(reader),
self,
turbo_tasks,
) {
Ok(content) => Ok(Ok(content)),
Err(ReadCellError::Recomputing(listener)) => Ok(Err(listener)),
Err(ReadCellError::CellRemoved) => Err(anyhow!("Cell {index:?} doesn't exist")),
}
})
}
Expand All @@ -452,22 +445,17 @@ impl Backend for MemoryBackend {
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> Result<Result<CellContent, EventListener>> {
self.with_task(task_id, |task| {
match task.access_cell_for_read(index, self.gc_queue.as_ref(), |cell| {
cell.map(|cell| {
cell.read_content_untracked(
move || format!("{task_id} {index}"),
move || format!("reading {} {} untracked", task_id, index),
)
})
}) {
None => Err(anyhow!("Cell doesn't exist (anymore)")),
Some(Ok(content)) => Ok(Ok(content)),
Some(Err(RecomputingCell { listener, schedule })) => {
if schedule {
task.recompute(self, turbo_tasks);
}
Ok(Err(listener))
}
match task.read_cell(
index,
self.gc_queue.as_ref(),
move || format!("reading {} {} untracked", task_id, index),
None,
self,
turbo_tasks,
) {
Ok(content) => Ok(Ok(content)),
Err(ReadCellError::Recomputing(listener)) => Ok(Err(listener)),
Err(ReadCellError::CellRemoved) => Err(anyhow!("Cell {index:?} doesn't exist")),
}
})
}
Expand Down
85 changes: 73 additions & 12 deletions crates/turbo-tasks-memory/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::task_local;
use tracing::Span;
use turbo_prehash::PreHashed;
use turbo_tasks::{
backend::{PersistentTaskType, TaskExecutionSpec},
backend::{CellContent, PersistentTaskType, TaskExecutionSpec},
event::{Event, EventListener},
get_invalidator, registry, CellId, Invalidator, RawVc, TaskId, TaskIdSet, TraitTypeId,
TurboTasksBackendApi, ValueTypeId,
Expand All @@ -32,7 +32,7 @@ use crate::{
aggregation_data, handle_new_edge, prepare_aggregation_data, query_root_info,
AggregationDataGuard, PreparedOperation,
},
cell::Cell,
cell::{Cell, RecomputingCell},
edges_set::{TaskEdge, TaskEdgesList, TaskEdgesSet},
gc::{GcQueue, GcTaskState},
output::{Output, OutputContent},
Expand Down Expand Up @@ -448,6 +448,11 @@ pub enum GcResult {
Unloaded,
}

pub enum ReadCellError {
CellRemoved,
Recomputing(EventListener),
}

impl Task {
pub(crate) fn new_persistent(
id: TaskId,
Expand Down Expand Up @@ -1286,26 +1291,82 @@ impl Task {
}
}

/// Access to a cell.
pub(crate) fn access_cell_for_read<T>(
/// Read a cell.
pub(crate) fn read_cell(
&self,
index: CellId,
gc_queue: Option<&GcQueue>,
func: impl FnOnce(Option<&mut Cell>) -> T,
) -> T {
note: impl Fn() -> String + Sync + Send + 'static,
reader: Option<TaskId>,
backend: &MemoryBackend,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> Result<CellContent, ReadCellError> {
let task_id = self.id;
let mut state = self.full_state_mut();
if let Some(gc_queue) = gc_queue {
let generation = gc_queue.generation();
if state.gc.on_read(generation) {
let _ = gc_queue.task_accessed(self.id);
}
}
let list = state.cells.entry(index.type_id).or_default();
let i = index.index as usize;
if list.len() <= i {
func(None)
} else {
func(Some(&mut list[i]))
match state.state_type {
Done { .. } | InProgress(..) | InProgressDirty { .. } => {
let is_done = matches!(state.state_type, Done { .. });
let list = state.cells.entry(index.type_id).or_default();
let i = index.index as usize;
if list.len() <= i {
if is_done {
return Err(ReadCellError::CellRemoved);
} else {
list.resize_with(i + 1, Default::default);
}
}
let cell = &mut list[i];
let description = move || format!("{task_id} {index}");
let read_result = if let Some(reader) = reader {
cell.read_content(reader, description, note)
} else {
cell.read_content_untracked(description, note)
};
drop(state);
match read_result {
Ok(content) => Ok(content),
Err(RecomputingCell { listener, schedule }) => {
if schedule {
self.recompute(backend, turbo_tasks);
}
Err(ReadCellError::Recomputing(listener))
}
}
}
Dirty {
ref mut outdated_edges,
} => {
let mut aggregation_context = TaskAggregationContext::new(turbo_tasks, backend);
let description = self.get_event_description();
let event = Event::new(move || format!("TaskState({})::event", description()));
let listener = event.listen_with_note(note);
state.state_type = Scheduled {
event,
outdated_edges: take(outdated_edges),
clean: false,
};
let change_job = state.aggregation_node.apply_change(
&aggregation_context,
TaskChange {
dirty_tasks_update: vec![(self.id, -1)],
..Default::default()
},
);
drop(state);
turbo_tasks.schedule(self.id);
change_job.apply(&aggregation_context);
aggregation_context.apply_queued_updates();
Err(ReadCellError::Recomputing(listener))
}
Scheduled { ref event, .. } => {
Err(ReadCellError::Recomputing(event.listen_with_note(note)))
}
}
}

Expand Down

0 comments on commit 0967bee

Please sign in to comment.