From c9512fafbd1ac3eda3d2cbc0e5aa8b449231ea1e Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Fri, 26 Jul 2024 12:26:25 -0500 Subject: [PATCH] fix(watch): properly shut down persistent tasks --- crates/turborepo-lib/src/run/mod.rs | 23 +++++++++++++++++++ crates/turborepo-lib/src/run/watch.rs | 33 +++++++++++++++++++-------- crates/turborepo-ui/src/tui/app.rs | 3 +++ 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index 16cf9731c5d60..0997a047713a3 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -207,6 +207,13 @@ impl Run { Ok(Some((sender, handle))) } + /// Returns a handle that can be used to stop a run + pub fn stopper(&self) -> RunStopper { + RunStopper { + manager: self.processes.clone(), + } + } + pub async fn run( &mut self, experimental_ui_sender: Option, @@ -215,6 +222,11 @@ impl Run { if let Some(subscriber) = self.signal_handler.subscribe() { let run_cache = self.run_cache.clone(); tokio::spawn(async move { + // Caching is disabled for watch so we don't need to wait on shutting down the + // cache. + if is_watch { + return; + } let _guard = subscriber.listen().await; let spinner = turborepo_ui::start_spinner("...Finishing writing to cache..."); if let Ok((status, closed)) = run_cache.shutdown_cache().await { @@ -439,3 +451,14 @@ impl Run { Ok(exit_code) } } + +#[derive(Debug, Clone)] +pub struct RunStopper { + manager: ProcessManager, +} + +impl RunStopper { + pub async fn stop(&self) { + self.manager.stop().await; + } +} diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index eecfbf8a79dd1..466bdd5e48e24 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -47,7 +47,7 @@ impl ChangedPackages { pub struct WatchClient { run: Run, watched_packages: HashSet, - persistent_tasks_handle: Option>>, + persistent_tasks_handle: Option, connector: DaemonConnector, base: CommandBase, telemetry: CommandEventBuilder, @@ -56,6 +56,11 @@ pub struct WatchClient { ui_handle: Option>>, } +struct PersistentRunHandle { + stopper: run::RunStopper, + run_task: JoinHandle>, +} + #[derive(Debug, Error, Diagnostic)] pub enum Error { #[error("failed to connect to daemon")] @@ -304,6 +309,14 @@ impl WatchClient { self.watched_packages = self.run.get_relevant_packages(); + // Clean up currently running persistent tasks + if let Some(PersistentRunHandle { stopper, run_task }) = + self.persistent_tasks_handle.take() + { + // Shut down the tasks for the run + stopper.stop().await; + run_task.abort(); + } if let Some(sender) = &self.ui_sender { let task_names = self.run.engine.tasks_with_command(&self.run.pkg_dep_graph); sender @@ -312,18 +325,20 @@ impl WatchClient { } if self.run.has_persistent_tasks() { - // Abort old run - if let Some(run) = self.persistent_tasks_handle.take() { - run.abort(); - } - + debug_assert!( + self.persistent_tasks_handle.is_none(), + "persistent handle should be empty before creating a new one" + ); let mut persistent_run = self.run.create_run_for_persistent_tasks(); let ui_sender = self.ui_sender.clone(); // If we have persistent tasks, we run them on a separate thread // since persistent tasks don't finish - self.persistent_tasks_handle = Some(tokio::spawn(async move { - persistent_run.run(ui_sender, true).await - })); + self.persistent_tasks_handle = Some(PersistentRunHandle { + stopper: persistent_run.stopper(), + run_task: tokio::spawn( + async move { persistent_run.run(ui_sender, true).await }, + ), + }); // But we still run the regular tasks blocking let mut non_persistent_run = self.run.create_run_without_persistent_tasks(); diff --git a/crates/turborepo-ui/src/tui/app.rs b/crates/turborepo-ui/src/tui/app.rs index bf8f008855011..d841475c07a21 100644 --- a/crates/turborepo-ui/src/tui/app.rs +++ b/crates/turborepo-ui/src/tui/app.rs @@ -139,6 +139,7 @@ impl App { /// If finished, removes from finished and starts again as new task. #[tracing::instrument(skip(self, output_logs))] pub fn start_task(&mut self, task: &str, output_logs: OutputLogs) -> Result<(), Error> { + debug!("starting {task}"); // Name of currently highlighted task. // We will use this after the order switches. let highlighted_task = self @@ -202,6 +203,7 @@ impl App { /// Errors if given task wasn't a running task #[tracing::instrument(skip(self, result))] pub fn finish_task(&mut self, task: &str, result: TaskResult) -> Result<(), Error> { + debug!("finishing task {task}"); // Name of currently highlighted task. // We will use this after the order switches. let highlighted_task = self @@ -265,6 +267,7 @@ impl App { #[tracing::instrument(skip(self))] pub fn update_tasks(&mut self, tasks: Vec) { + debug!("updating task list: {tasks:?}"); // Make sure all tasks have a terminal output for task in &tasks { self.tasks