Skip to content

Commit

Permalink
fix(watch): properly shut down persistent tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-olszewski committed Jul 26, 2024
1 parent 2422858 commit c9512fa
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 9 deletions.
23 changes: 23 additions & 0 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ impl Run {
Ok(Some((sender, handle)))
}

/// Returns a handle that can be used to stop a run
pub fn stopper(&self) -> RunStopper {
RunStopper {
manager: self.processes.clone(),
}
}

pub async fn run(
&mut self,
experimental_ui_sender: Option<AppSender>,
Expand All @@ -215,6 +222,11 @@ impl Run {
if let Some(subscriber) = self.signal_handler.subscribe() {
let run_cache = self.run_cache.clone();
tokio::spawn(async move {
// Caching is disabled for watch so we don't need to wait on shutting down the
// cache.
if is_watch {
return;
}
let _guard = subscriber.listen().await;
let spinner = turborepo_ui::start_spinner("...Finishing writing to cache...");
if let Ok((status, closed)) = run_cache.shutdown_cache().await {
Expand Down Expand Up @@ -439,3 +451,14 @@ impl Run {
Ok(exit_code)
}
}

#[derive(Debug, Clone)]
pub struct RunStopper {
manager: ProcessManager,
}

impl RunStopper {
pub async fn stop(&self) {
self.manager.stop().await;
}
}
33 changes: 24 additions & 9 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl ChangedPackages {
pub struct WatchClient {
run: Run,
watched_packages: HashSet<PackageName>,
persistent_tasks_handle: Option<JoinHandle<Result<i32, run::Error>>>,
persistent_tasks_handle: Option<PersistentRunHandle>,
connector: DaemonConnector,
base: CommandBase,
telemetry: CommandEventBuilder,
Expand All @@ -56,6 +56,11 @@ pub struct WatchClient {
ui_handle: Option<JoinHandle<Result<(), tui::Error>>>,
}

struct PersistentRunHandle {
stopper: run::RunStopper,
run_task: JoinHandle<Result<i32, run::Error>>,
}

#[derive(Debug, Error, Diagnostic)]
pub enum Error {
#[error("failed to connect to daemon")]
Expand Down Expand Up @@ -304,6 +309,14 @@ impl WatchClient {

self.watched_packages = self.run.get_relevant_packages();

// Clean up currently running persistent tasks
if let Some(PersistentRunHandle { stopper, run_task }) =
self.persistent_tasks_handle.take()
{
// Shut down the tasks for the run
stopper.stop().await;
run_task.abort();
}
if let Some(sender) = &self.ui_sender {
let task_names = self.run.engine.tasks_with_command(&self.run.pkg_dep_graph);
sender
Expand All @@ -312,18 +325,20 @@ impl WatchClient {
}

if self.run.has_persistent_tasks() {
// Abort old run
if let Some(run) = self.persistent_tasks_handle.take() {
run.abort();
}

debug_assert!(
self.persistent_tasks_handle.is_none(),
"persistent handle should be empty before creating a new one"
);
let mut persistent_run = self.run.create_run_for_persistent_tasks();
let ui_sender = self.ui_sender.clone();
// If we have persistent tasks, we run them on a separate thread
// since persistent tasks don't finish
self.persistent_tasks_handle = Some(tokio::spawn(async move {
persistent_run.run(ui_sender, true).await
}));
self.persistent_tasks_handle = Some(PersistentRunHandle {
stopper: persistent_run.stopper(),
run_task: tokio::spawn(
async move { persistent_run.run(ui_sender, true).await },
),
});

// But we still run the regular tasks blocking
let mut non_persistent_run = self.run.create_run_without_persistent_tasks();
Expand Down
3 changes: 3 additions & 0 deletions crates/turborepo-ui/src/tui/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl<W> App<W> {
/// If finished, removes from finished and starts again as new task.
#[tracing::instrument(skip(self, output_logs))]
pub fn start_task(&mut self, task: &str, output_logs: OutputLogs) -> Result<(), Error> {
debug!("starting {task}");
// Name of currently highlighted task.
// We will use this after the order switches.
let highlighted_task = self
Expand Down Expand Up @@ -202,6 +203,7 @@ impl<W> App<W> {
/// Errors if given task wasn't a running task
#[tracing::instrument(skip(self, result))]
pub fn finish_task(&mut self, task: &str, result: TaskResult) -> Result<(), Error> {
debug!("finishing task {task}");
// Name of currently highlighted task.
// We will use this after the order switches.
let highlighted_task = self
Expand Down Expand Up @@ -265,6 +267,7 @@ impl<W> App<W> {

#[tracing::instrument(skip(self))]
pub fn update_tasks(&mut self, tasks: Vec<String>) {
debug!("updating task list: {tasks:?}");
// Make sure all tasks have a terminal output
for task in &tasks {
self.tasks
Expand Down

0 comments on commit c9512fa

Please sign in to comment.