From 4d96e7249e1b7f984a2ebd9cea372d63da657390 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Sat, 8 Jun 2024 21:05:42 -0700 Subject: [PATCH] First revision of ExitHandler API Proposed alternative to ExitGuard API that allows async work, and is clearer about only allowing a single listener. Allows potentially adding additional callbacks after initialization. --- crates/turbopack-trace-utils/Cargo.toml | 2 +- crates/turbopack-trace-utils/src/exit.rs | 83 +++++++++++++++++++++++- 2 files changed, 83 insertions(+), 2 deletions(-) diff --git a/crates/turbopack-trace-utils/Cargo.toml b/crates/turbopack-trace-utils/Cargo.toml index 072fa9a60bfb4..d0c0a11f276c2 100644 --- a/crates/turbopack-trace-utils/Cargo.toml +++ b/crates/turbopack-trace-utils/Cargo.toml @@ -17,7 +17,7 @@ crossbeam-channel = { workspace = true } once_cell = { workspace = true } postcard = { workspace = true, features = ["alloc", "use-std"] } serde = { workspace = true, features = ["derive"] } -tokio = { workspace = true, features = ["signal", "rt"] } +tokio = { workspace = true, features = ["macros", "signal", "sync", "rt"] } tracing = { workspace = true } tracing-subscriber = { workspace = true } turbo-tasks-malloc = { workspace = true } diff --git a/crates/turbopack-trace-utils/src/exit.rs b/crates/turbopack-trace-utils/src/exit.rs index 90ab9d78bb1e9..8e07b81c8e056 100644 --- a/crates/turbopack-trace-utils/src/exit.rs +++ b/crates/turbopack-trace-utils/src/exit.rs @@ -1,6 +1,11 @@ -use std::sync::{Arc, Mutex}; +use std::{ + future::Future, + pin::Pin, + sync::{Arc, Mutex, OnceLock}, +}; use anyhow::Result; +use tokio::{select, sync::mpsc, task::JoinSet}; /// A guard for the exit handler. When dropped, the exit guard will be dropped. /// It might also be dropped on Ctrl-C. @@ -27,3 +32,79 @@ impl ExitGuard { Ok(ExitGuard(guard)) } } + +type ExitFuture = Pin + Send + 'static>>; + +/// The singular global ExitHandler. This is primarily used to ensure +/// `ExitHandler::listen` is only called once. +/// +/// The global handler is intentionally not exposed, so that APIs that depend on +/// exit behavior are required to take the `ExitHandler`. This ensures that the +/// `ExitHandler` is configured before these APIs are run, and that these +/// consumers can be used with a mock instead. +static GLOBAL_EXIT_HANDLER: OnceLock> = OnceLock::new(); + +pub struct ExitHandler { + tx: mpsc::UnboundedSender, +} + +impl ExitHandler { + pub fn listen() -> &'static Arc { + let (tx, mut rx) = mpsc::unbounded_channel(); + let handler = Arc::new(ExitHandler { tx }); + if GLOBAL_EXIT_HANDLER.set(handler).is_err() { + panic!("ExitHandler::listen must only be called once"); + } + tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .expect("failed to set ctrl_c handler"); + run_all_futures(&mut rx).await; + std::process::exit(0); + }); + GLOBAL_EXIT_HANDLER.get().expect("value is set") + } + + pub fn mock() -> (Arc, ExitHandlerMocks) { + let (tx, rx) = mpsc::unbounded_channel(); + (Arc::new(ExitHandler { tx }), ExitHandlerMocks { rx }) + } + + pub fn on_exit(&self, fut: impl Future + Send + 'static) { + // realistically, this error case can only happen with a mock + self.tx + .send(Box::pin(fut)) + .expect("cannot send future after process exit"); + } +} + +pub struct ExitHandlerMocks { + rx: mpsc::UnboundedReceiver, +} + +impl ExitHandlerMocks { + pub async fn mock_exit(mut self) { + run_all_futures(&mut self.rx).await + } +} + +pub async fn run_all_futures(rx: &mut mpsc::UnboundedReceiver) { + let mut set = JoinSet::new(); + while let Ok(fut) = rx.try_recv() { + set.spawn(fut); + } + loop { + select! { + Some(fut) = rx.recv() => { + set.spawn(fut); + }, + val = set.join_next() => { + match val { + Some(Ok(())) => {} + Some(Err(_)) => panic!("ExitHandler future panicked!"), + None => return, + } + }, + } + } +}