Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(watch): properly shut down persistent tasks #8854

Merged
merged 3 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ impl Run {
Ok(Some((sender, handle)))
}

/// Returns a handle that can be used to stop a run
pub fn stopper(&self) -> RunStopper {
RunStopper {
manager: self.processes.clone(),
}
}

pub async fn run(
&mut self,
experimental_ui_sender: Option<AppSender>,
Expand All @@ -215,6 +222,11 @@ impl Run {
if let Some(subscriber) = self.signal_handler.subscribe() {
let run_cache = self.run_cache.clone();
tokio::spawn(async move {
// Caching is disabled for watch so we don't need to wait on shutting down the
chris-olszewski marked this conversation as resolved.
Show resolved Hide resolved
// cache.
if is_watch {
return;
}
let _guard = subscriber.listen().await;
let spinner = turborepo_ui::start_spinner("...Finishing writing to cache...");
if let Ok((status, closed)) = run_cache.shutdown_cache().await {
Expand Down Expand Up @@ -439,3 +451,14 @@ impl Run {
Ok(exit_code)
}
}

#[derive(Debug, Clone)]
pub struct RunStopper {
manager: ProcessManager,
}

impl RunStopper {
pub async fn stop(&self) {
self.manager.stop().await;
}
}
33 changes: 24 additions & 9 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl ChangedPackages {
pub struct WatchClient {
run: Run,
watched_packages: HashSet<PackageName>,
persistent_tasks_handle: Option<JoinHandle<Result<i32, run::Error>>>,
persistent_tasks_handle: Option<PersistentRunHandle>,
connector: DaemonConnector,
base: CommandBase,
telemetry: CommandEventBuilder,
Expand All @@ -56,6 +56,11 @@ pub struct WatchClient {
ui_handle: Option<JoinHandle<Result<(), tui::Error>>>,
}

struct PersistentRunHandle {
stopper: run::RunStopper,
run_task: JoinHandle<Result<i32, run::Error>>,
}

#[derive(Debug, Error, Diagnostic)]
pub enum Error {
#[error("failed to connect to daemon")]
Expand Down Expand Up @@ -304,6 +309,14 @@ impl WatchClient {

self.watched_packages = self.run.get_relevant_packages();

// Clean up currently running persistent tasks
if let Some(PersistentRunHandle { stopper, run_task }) =
self.persistent_tasks_handle.take()
{
// Shut down the tasks for the run
stopper.stop().await;
run_task.abort();
}
if let Some(sender) = &self.ui_sender {
let task_names = self.run.engine.tasks_with_command(&self.run.pkg_dep_graph);
sender
Expand All @@ -312,18 +325,20 @@ impl WatchClient {
}

if self.run.has_persistent_tasks() {
// Abort old run
if let Some(run) = self.persistent_tasks_handle.take() {
run.abort();
}

debug_assert!(
self.persistent_tasks_handle.is_none(),
"persistent handle should be empty before creating a new one"
);
let mut persistent_run = self.run.create_run_for_persistent_tasks();
let ui_sender = self.ui_sender.clone();
// If we have persistent tasks, we run them on a separate thread
// since persistent tasks don't finish
self.persistent_tasks_handle = Some(tokio::spawn(async move {
persistent_run.run(ui_sender, true).await
}));
self.persistent_tasks_handle = Some(PersistentRunHandle {
stopper: persistent_run.stopper(),
run_task: tokio::spawn(
async move { persistent_run.run(ui_sender, true).await },
),
});

// But we still run the regular tasks blocking
let mut non_persistent_run = self.run.create_run_without_persistent_tasks();
Expand Down
3 changes: 3 additions & 0 deletions crates/turborepo-ui/src/tui/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl<W> App<W> {
/// If finished, removes from finished and starts again as new task.
#[tracing::instrument(skip(self, output_logs))]
pub fn start_task(&mut self, task: &str, output_logs: OutputLogs) -> Result<(), Error> {
debug!("starting {task}");
// Name of currently highlighted task.
// We will use this after the order switches.
let highlighted_task = self
Expand Down Expand Up @@ -202,6 +203,7 @@ impl<W> App<W> {
/// Errors if given task wasn't a running task
#[tracing::instrument(skip(self, result))]
pub fn finish_task(&mut self, task: &str, result: TaskResult) -> Result<(), Error> {
debug!("finishing task {task}");
// Name of currently highlighted task.
// We will use this after the order switches.
let highlighted_task = self
Expand Down Expand Up @@ -265,6 +267,7 @@ impl<W> App<W> {

#[tracing::instrument(skip(self))]
pub fn update_tasks(&mut self, tasks: Vec<String>) {
debug!("updating task list: {tasks:?}");
// Make sure all tasks have a terminal output
for task in &tasks {
self.tasks
Expand Down
Loading