diff --git a/crates/turbopack-cli/src/main.rs b/crates/turbopack-cli/src/main.rs index bb956284d37d3..0129194f88bd0 100644 --- a/crates/turbopack-cli/src/main.rs +++ b/crates/turbopack-cli/src/main.rs @@ -6,9 +6,10 @@ use std::path::Path; use anyhow::{Context, Result}; use clap::Parser; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry}; +use turbo_tasks_malloc::TurboMalloc; use turbopack_cli::{arguments::Arguments, register}; use turbopack_trace_utils::{ - exit::ExitGuard, + exit::ExitHandler, raw_trace::RawTraceLayer, trace_writer::TraceWriter, tracing_presets::{ @@ -17,16 +18,27 @@ use turbopack_trace_utils::{ }; #[global_allocator] -static ALLOC: turbo_tasks_malloc::TurboMalloc = turbo_tasks_malloc::TurboMalloc; +static ALLOC: TurboMalloc = TurboMalloc; fn main() { - use turbo_tasks_malloc::TurboMalloc; - let args = Arguments::parse(); - let trace = std::env::var("TURBOPACK_TRACING").ok(); + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .on_thread_stop(|| { + TurboMalloc::thread_stop(); + }) + .build() + .unwrap() + .block_on(main_inner(args)) + .unwrap(); +} - let _guard = if let Some(mut trace) = trace { +async fn main_inner(args: Arguments) -> Result<()> { + let exit_handler = ExitHandler::listen(); + + let trace = std::env::var("TURBOPACK_TRACING").ok(); + if let Some(mut trace) = trace { // Trace presets match trace.as_str() { "overview" => { @@ -57,27 +69,12 @@ fn main() { let (trace_writer, guard) = TraceWriter::new(trace_writer); let subscriber = subscriber.with(RawTraceLayer::new(trace_writer)); - let guard = ExitGuard::new(guard).unwrap(); + exit_handler + .on_exit(async move { tokio::task::spawn_blocking(|| drop(guard)).await.unwrap() }); subscriber.init(); + } - Some(guard) - } else { - None - }; - - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .on_thread_stop(|| { - TurboMalloc::thread_stop(); - }) - .build() - .unwrap() - .block_on(main_inner(args)) - .unwrap(); -} - -async fn main_inner(args: Arguments) -> Result<()> { register(); match args { diff --git a/crates/turbopack-trace-utils/Cargo.toml b/crates/turbopack-trace-utils/Cargo.toml index 072fa9a60bfb4..d0c0a11f276c2 100644 --- a/crates/turbopack-trace-utils/Cargo.toml +++ b/crates/turbopack-trace-utils/Cargo.toml @@ -17,7 +17,7 @@ crossbeam-channel = { workspace = true } once_cell = { workspace = true } postcard = { workspace = true, features = ["alloc", "use-std"] } serde = { workspace = true, features = ["derive"] } -tokio = { workspace = true, features = ["signal", "rt"] } +tokio = { workspace = true, features = ["macros", "signal", "sync", "rt"] } tracing = { workspace = true } tracing-subscriber = { workspace = true } turbo-tasks-malloc = { workspace = true } diff --git a/crates/turbopack-trace-utils/src/exit.rs b/crates/turbopack-trace-utils/src/exit.rs index 90ab9d78bb1e9..f8177e48624ae 100644 --- a/crates/turbopack-trace-utils/src/exit.rs +++ b/crates/turbopack-trace-utils/src/exit.rs @@ -1,6 +1,11 @@ -use std::sync::{Arc, Mutex}; +use std::{ + future::Future, + pin::Pin, + sync::{Arc, Mutex, OnceLock}, +}; use anyhow::Result; +use tokio::{select, sync::mpsc, task::JoinSet}; /// A guard for the exit handler. When dropped, the exit guard will be dropped. /// It might also be dropped on Ctrl-C. @@ -27,3 +32,181 @@ impl ExitGuard { Ok(ExitGuard(guard)) } } + +type BoxExitFuture = Pin + Send + 'static>>; + +/// The singular global ExitHandler. This is primarily used to ensure +/// `ExitHandler::listen` is only called once. +/// +/// The global handler is intentionally not exposed, so that APIs that depend on +/// exit behavior are required to take the `ExitHandler`. This ensures that the +/// `ExitHandler` is configured before these APIs are run, and that these +/// consumers can be used with a callback (e.g. a mock) instead. +static GLOBAL_EXIT_HANDLER: OnceLock> = OnceLock::new(); + +pub struct ExitHandler { + tx: mpsc::UnboundedSender, +} + +impl ExitHandler { + /// Waits for `SIGINT` using [`tokio::signal::ctrl_c`], and exits the + /// process with exit code `0` after running any futures scheduled with + /// [`ExitHandler::on_exit`]. + /// + /// As this uses global process signals, this must only be called once, and + /// will panic if called multiple times. Use this when you own the + /// process (e.g. `turbopack-cli`). + /// + /// If you don't own the process (e.g. you're called as a library, such as + /// in `next-swc`), use [`ExitHandler::new_trigger`] instead. + /// + /// This may listen for other signals, like `SIGTERM` or `SIGPIPE` in the + /// future. + pub fn listen() -> &'static Arc { + let (handler, receiver) = Self::new_receiver(); + if GLOBAL_EXIT_HANDLER.set(handler).is_err() { + panic!("ExitHandler::listen must only be called once"); + } + tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .expect("failed to set ctrl_c handler"); + receiver.run_exit_handler().await; + std::process::exit(0); + }); + GLOBAL_EXIT_HANDLER.get().expect("value is set") + } + + /// Creates an [`ExitHandler`] that can be manually controlled with an + /// [`ExitReceiver`]. + /// + /// This does not actually exit the process or listen for any signals. If + /// you'd like that behavior, use [`ExitHandler::listen`]. + /// + /// Because this API has no global side-effects and can be called many times + /// within the same process, it is possible to use it to provide a mock + /// [`ExitHandler`] inside unit tests. + pub fn new_receiver() -> (Arc, ExitReceiver) { + let (tx, rx) = mpsc::unbounded_channel(); + (Arc::new(ExitHandler { tx }), ExitReceiver { rx }) + } + + /// Register this given [`Future`] to run upon process exit. + /// + /// As there are many ways for a process be killed that are outside of a + /// process's own control (e.g. `SIGKILL` or `SIGSEGV`), this API is + /// provided on a best-effort basis. + pub fn on_exit(&self, fut: impl Future + Send + 'static) { + // realistically, this error case can only happen with the `new_receiver` API. + self.tx + .send(Box::pin(fut)) + .expect("cannot send future after process exit"); + } +} + +/// Provides a way to run futures scheduled with an [`ExitHandler`]. +pub struct ExitReceiver { + rx: mpsc::UnboundedReceiver, +} + +impl ExitReceiver { + /// Call this when the process exits to run the futures scheduled via + /// [`ExitHandler::on_exit`]. + /// + /// As this is intended to be used in a library context, this does not exit + /// the process. It is expected that the process will not exit until + /// this async method finishes executing. + /// + /// Additional work can be scheduled using [`ExitHandler::on_exit`] even + /// while this is running, and it will execute before this function + /// finishes. Work attempted to be scheduled after this finishes will panic. + pub async fn run_exit_handler(mut self) { + let mut set = JoinSet::new(); + while let Ok(fut) = self.rx.try_recv() { + set.spawn(fut); + } + loop { + select! { + biased; + Some(fut) = self.rx.recv() => { + set.spawn(fut); + }, + val = set.join_next() => { + match val { + Some(Ok(())) => {} + Some(Err(_)) => panic!("ExitHandler future panicked!"), + None => return, + } + }, + } + } + } +} + +#[cfg(test)] +mod tests { + use std::{ + future::Future, + pin::Pin, + sync::{ + atomic::{AtomicBool, AtomicU32, Ordering}, + Arc, + }, + }; + + use super::ExitHandler; + + #[tokio::test] + async fn test_on_exit() { + let (handler, receiver) = ExitHandler::new_receiver(); + + let called = Arc::new(AtomicBool::new(false)); + handler.on_exit({ + let called = Arc::clone(&called); + async move { + tokio::task::yield_now().await; + called.store(true, Ordering::SeqCst); + } + }); + + receiver.run_exit_handler().await; + assert_eq!(called.load(Ordering::SeqCst), true); + } + + #[tokio::test] + async fn test_queue_while_exiting() { + let (handler, receiver) = ExitHandler::new_receiver(); + let call_count = Arc::new(AtomicU32::new(0)); + + type BoxExitFuture = Pin + Send + 'static>>; + + // this struct is needed to construct the recursive closure type + #[derive(Clone)] + struct GetFut { + handler: Arc, + call_count: Arc, + } + + impl GetFut { + fn get(self) -> BoxExitFuture { + Box::pin(async move { + tokio::task::yield_now().await; + if self.call_count.fetch_add(1, Ordering::SeqCst) < 99 { + // queue more work while the exit handler is running + Arc::clone(&self.handler).on_exit(self.get()) + } + }) + } + } + + handler.on_exit( + GetFut { + handler: Arc::clone(&handler), + call_count: Arc::clone(&call_count), + } + .get(), + ); + receiver.run_exit_handler().await; + assert_eq!(call_count.load(Ordering::SeqCst), 100); + } +}