Skip to content

Commit

Permalink
box Scheduled task state to save memory
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Jul 24, 2024
1 parent 561efca commit 7e73710
Showing 1 changed file with 41 additions and 43 deletions.
84 changes: 41 additions & 43 deletions crates/turbo-tasks-memory/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl TaskState {
let description2 = description.clone();
Self {
aggregation_node: TaskAggregationNode::new(),
state_type: Scheduled {
state_type: Scheduled(Box::new(ScheduledState {
start_event: Event::new(move || {
format!("TaskState({})::start_event", description())
}),
Expand All @@ -196,7 +196,7 @@ impl TaskState {
}),
outdated_edges: Default::default(),
clean: true,
},
})),
collectibles: Default::default(),
output: Default::default(),
cells: Default::default(),
Expand Down Expand Up @@ -326,8 +326,8 @@ struct InProgressState {
/// true, when the task was invalidated while executing. It will be
/// scheduled again.
stale: bool,
/// Dependencies and children that need to be disconnected once leaving
/// this state.
/// Dependencies and children that need to be disconnected once entering
/// Done.
outdated_edges: TaskEdgesSet,
/// Children that are connected during execution. These children are already
/// removed from `outdated_edges`.
Expand All @@ -336,6 +336,18 @@ struct InProgressState {
outdated_collectibles: MaybeCollectibles,
}

struct ScheduledState {
/// Event is fired when the task is IsProgress.
start_event: Event,
/// Event is fired when the task is Done.
done_event: Event,
/// Dependencies and children that need to be disconnected once entering
/// Done.
outdated_edges: Box<TaskEdgesSet>,
/// true, when the task wasn't changed since the last execution
clean: bool,
}

enum TaskStateType {
/// Ready
///
Expand All @@ -362,15 +374,7 @@ enum TaskStateType {
/// Execution is invalid and scheduled
///
/// on start this will move to InProgress or Dirty depending on active flag
Scheduled {
/// Event is fired when the task is IsProgress.
start_event: Event,
/// Event is fired when the task is Done.
done_event: Event,
outdated_edges: Box<TaskEdgesSet>,
/// true, when the task wasn't changed since the last execution
clean: bool,
},
Scheduled(Box<ScheduledState>),

/// Execution is happening
///
Expand All @@ -396,7 +400,7 @@ impl TaskStateType {
TaskStateType::Dirty { outdated_edges, .. } => {
Either::Right(Either::Right(outdated_edges.children()))
}
TaskStateType::Scheduled { outdated_edges, .. } => {
TaskStateType::Scheduled(box ScheduledState { outdated_edges, .. }) => {
Either::Right(Either::Right(outdated_edges.children()))
}
}
Expand All @@ -420,7 +424,7 @@ impl TaskStateType {
(edges, children)
}
TaskStateType::Dirty { outdated_edges, .. }
| TaskStateType::Scheduled { outdated_edges, .. } => {
| TaskStateType::Scheduled(box ScheduledState { outdated_edges, .. }) => {
let mut edges = *outdated_edges;
let children = edges.drain_children();
(edges, children)
Expand Down Expand Up @@ -718,12 +722,12 @@ impl Task {
// should not start in this state
return None;
}
Scheduled {
Scheduled(box ScheduledState {
ref mut done_event,
ref mut start_event,
ref mut outdated_edges,
clean,
} => {
}) => {
start_event.notify(usize::MAX);
let done_event = done_event.take();
let outdated_edges = *take(outdated_edges);
Expand Down Expand Up @@ -997,12 +1001,12 @@ impl Task {
let description = self.get_event_description();
let start_event =
Event::new(move || format!("TaskState({})::start_event", description()));
state.state_type = Scheduled {
state.state_type = Scheduled(Box::new(ScheduledState {
start_event,
done_event,
outdated_edges: Box::new(outdated_edges),
clean: false,
};
}));
drop(state);
schedule_task = true;
} else {
Expand Down Expand Up @@ -1088,7 +1092,7 @@ impl Task {

if let TaskMetaStateWriteGuard::Full(mut state) = self.state_mut() {
match state.state_type {
Scheduled { ref mut clean, .. } => {
Scheduled(box ScheduledState { ref mut clean, .. }) => {
*clean = false;

// already scheduled
Expand All @@ -1113,7 +1117,7 @@ impl Task {
);
let description = self.get_event_description();
let description2 = description.clone();
state.state_type = Scheduled {
state.state_type = Scheduled(Box::new(ScheduledState {
done_event: Event::new(move || {
format!("TaskState({})::done_event", description())
}),
Expand All @@ -1122,7 +1126,7 @@ impl Task {
}),
outdated_edges: Box::new(outdated_edges),
clean: false,
};
}));
drop(state);
change_job.apply(&aggregation_context);

Expand Down Expand Up @@ -1210,7 +1214,7 @@ impl Task {
} => {
let description = self.get_event_description();
let description2 = description.clone();
state.state_type = Scheduled {
state.state_type = Scheduled(Box::new(ScheduledState {
start_event: Event::new(move || {
format!("TaskState({})::start_event", description())
}),
Expand All @@ -1219,7 +1223,7 @@ impl Task {
}),
outdated_edges: take(outdated_edges),
clean: false,
};
}));
let change_job = state.aggregation_node.apply_change(
&aggregation_context,
TaskChange {
Expand All @@ -1245,7 +1249,7 @@ impl Task {
);
let description = self.get_event_description();
let description2 = description.clone();
state.state_type = Scheduled {
state.state_type = Scheduled(Box::new(ScheduledState {
start_event: Event::new(move || {
format!("TaskState({})::start_event", description())
}),
Expand All @@ -1254,7 +1258,7 @@ impl Task {
}),
outdated_edges: Box::new(outdated_edges),
clean: true,
};
}));
drop(state);
change_job.apply(&aggregation_context);

Expand All @@ -1277,7 +1281,7 @@ impl Task {
let mut aggregation_context = TaskAggregationContext::new(turbo_tasks, backend);
let description = self.get_event_description();
let description2 = description.clone();
state.state_type = Scheduled {
state.state_type = Scheduled(Box::new(ScheduledState {
start_event: Event::new(move || {
format!("TaskState({})::start_event", description())
}),
Expand All @@ -1286,7 +1290,7 @@ impl Task {
}),
outdated_edges: take(outdated_edges),
clean: false,
};
}));
let job = state.aggregation_node.apply_change(
&aggregation_context,
TaskChange {
Expand Down Expand Up @@ -1390,14 +1394,14 @@ impl Task {
let start_event =
Event::new(move || format!("TaskState({})::start_event", description()));
let listener = start_event.listen_with_note(note);
state.state_type = Scheduled {
state.state_type = Scheduled(Box::new(ScheduledState {
start_event,
done_event: Event::new(move || {
format!("TaskState({})::done_event", description2())
}),
outdated_edges: take(outdated_edges),
clean: false,
};
}));
let change_job = state.aggregation_node.apply_change(
&aggregation_context,
TaskChange {
Expand All @@ -1411,9 +1415,9 @@ impl Task {
aggregation_context.apply_queued_updates();
Err(ReadCellError::Recomputing(listener))
}
Scheduled {
Scheduled(box ScheduledState {
ref start_event, ..
} => Err(ReadCellError::Recomputing(
}) => Err(ReadCellError::Recomputing(
start_event.listen_with_note(note),
)),
}
Expand Down Expand Up @@ -1630,14 +1634,14 @@ impl Task {
let done_event =
Event::new(move || format!("TaskState({})::done_event", description()));
let listener = done_event.listen_with_note(note);
state.state_type = Scheduled {
state.state_type = Scheduled(Box::new(ScheduledState {
start_event: Event::new(move || {
format!("TaskState({})::start_event", description2())
}),
done_event,
outdated_edges: take(outdated_edges),
clean: false,
};
}));
let change_job = state.aggregation_node.apply_change(
&aggregation_context,
TaskChange {
Expand All @@ -1649,15 +1653,9 @@ impl Task {
change_job.apply(&aggregation_context);
Ok(Err(listener))
}
Scheduled {
done_event: ref event,
..
}
| InProgress(box InProgressState {
done_event: ref event,
..
}) => {
let listener = event.listen_with_note(note);
Scheduled(box ScheduledState { ref done_event, .. })
| InProgress(box InProgressState { ref done_event, .. }) => {
let listener = done_event.listen_with_note(note);
drop(state);
Ok(Err(listener))
}
Expand Down

0 comments on commit 7e73710

Please sign in to comment.