Skip to content

Commit

Permalink
Add new ExitHandler API as an alternative to ExitGuard (vercel/turbor…
Browse files Browse the repository at this point in the history
…epo#8547)

## Why?

I need a better system for handling process exit to handle flushing to cache hit statistics (vercel/turborepo#8286).

## What?

- Supports letting another thing listen for <kbd>ctrl</kbd> + <kbd>c</kbd>. In the case of next-server, we need to let node.js own and configure these signal handlers.
- Supports setting up a <kbd>ctrl</kbd> + <kbd>c</kbd> handler for you, but now explicitly panics if you would try to set up multiple global <kbd>ctrl</kbd> + <kbd>c</kbd> listeners.
- Allows async work to happen during cleanup. This wasn't possible using `Drop` because `Drop` isn't async.
- Allows cleanup work to be scheduled after application initialization, potentially making the API a bit more flexible.

## Testing Instructions

```
cargo test -p turbopack-trace-utils
```

Add some `println!()` logging to the end of the `on_exit` future in turbopack-cli.

```
TURBOPACK_TRACING=overview cargo run -p turbopack-cli -- dev
```

Hit `ctrl+c` and see that my `println!()` runs, so I know the tracing flush finishes.
  • Loading branch information
bgw authored Jun 24, 2024
1 parent e51f491 commit a3dae53
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 26 deletions.
45 changes: 21 additions & 24 deletions crates/turbopack-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ use std::path::Path;
use anyhow::{Context, Result};
use clap::Parser;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry};
use turbo_tasks_malloc::TurboMalloc;
use turbopack_cli::{arguments::Arguments, register};
use turbopack_trace_utils::{
exit::ExitGuard,
exit::ExitHandler,
raw_trace::RawTraceLayer,
trace_writer::TraceWriter,
tracing_presets::{
Expand All @@ -17,16 +18,27 @@ use turbopack_trace_utils::{
};

#[global_allocator]
static ALLOC: turbo_tasks_malloc::TurboMalloc = turbo_tasks_malloc::TurboMalloc;
static ALLOC: TurboMalloc = TurboMalloc;

fn main() {
use turbo_tasks_malloc::TurboMalloc;

let args = Arguments::parse();

let trace = std::env::var("TURBOPACK_TRACING").ok();
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.on_thread_stop(|| {
TurboMalloc::thread_stop();
})
.build()
.unwrap()
.block_on(main_inner(args))
.unwrap();
}

let _guard = if let Some(mut trace) = trace {
async fn main_inner(args: Arguments) -> Result<()> {
let exit_handler = ExitHandler::listen();

let trace = std::env::var("TURBOPACK_TRACING").ok();
if let Some(mut trace) = trace {
// Trace presets
match trace.as_str() {
"overview" => {
Expand Down Expand Up @@ -57,27 +69,12 @@ fn main() {
let (trace_writer, guard) = TraceWriter::new(trace_writer);
let subscriber = subscriber.with(RawTraceLayer::new(trace_writer));

let guard = ExitGuard::new(guard).unwrap();
exit_handler
.on_exit(async move { tokio::task::spawn_blocking(|| drop(guard)).await.unwrap() });

subscriber.init();
}

Some(guard)
} else {
None
};

tokio::runtime::Builder::new_multi_thread()
.enable_all()
.on_thread_stop(|| {
TurboMalloc::thread_stop();
})
.build()
.unwrap()
.block_on(main_inner(args))
.unwrap();
}

async fn main_inner(args: Arguments) -> Result<()> {
register();

match args {
Expand Down
2 changes: 1 addition & 1 deletion crates/turbopack-trace-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ crossbeam-channel = { workspace = true }
once_cell = { workspace = true }
postcard = { workspace = true, features = ["alloc", "use-std"] }
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["signal", "rt"] }
tokio = { workspace = true, features = ["macros", "signal", "sync", "rt"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
turbo-tasks-malloc = { workspace = true }
185 changes: 184 additions & 1 deletion crates/turbopack-trace-utils/src/exit.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use std::sync::{Arc, Mutex};
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex, OnceLock},
};

use anyhow::Result;
use tokio::{select, sync::mpsc, task::JoinSet};

/// A guard for the exit handler. When dropped, the exit guard will be dropped.
/// It might also be dropped on Ctrl-C.
Expand All @@ -27,3 +32,181 @@ impl<T: Send + 'static> ExitGuard<T> {
Ok(ExitGuard(guard))
}
}

type BoxExitFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;

/// The singular global ExitHandler. This is primarily used to ensure
/// `ExitHandler::listen` is only called once.
///
/// The global handler is intentionally not exposed, so that APIs that depend on
/// exit behavior are required to take the `ExitHandler`. This ensures that the
/// `ExitHandler` is configured before these APIs are run, and that these
/// consumers can be used with a callback (e.g. a mock) instead.
static GLOBAL_EXIT_HANDLER: OnceLock<Arc<ExitHandler>> = OnceLock::new();

pub struct ExitHandler {
tx: mpsc::UnboundedSender<BoxExitFuture>,
}

impl ExitHandler {
/// Waits for `SIGINT` using [`tokio::signal::ctrl_c`], and exits the
/// process with exit code `0` after running any futures scheduled with
/// [`ExitHandler::on_exit`].
///
/// As this uses global process signals, this must only be called once, and
/// will panic if called multiple times. Use this when you own the
/// process (e.g. `turbopack-cli`).
///
/// If you don't own the process (e.g. you're called as a library, such as
/// in `next-swc`), use [`ExitHandler::new_trigger`] instead.
///
/// This may listen for other signals, like `SIGTERM` or `SIGPIPE` in the
/// future.
pub fn listen() -> &'static Arc<ExitHandler> {
let (handler, receiver) = Self::new_receiver();
if GLOBAL_EXIT_HANDLER.set(handler).is_err() {
panic!("ExitHandler::listen must only be called once");
}
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("failed to set ctrl_c handler");
receiver.run_exit_handler().await;
std::process::exit(0);
});
GLOBAL_EXIT_HANDLER.get().expect("value is set")
}

/// Creates an [`ExitHandler`] that can be manually controlled with an
/// [`ExitReceiver`].
///
/// This does not actually exit the process or listen for any signals. If
/// you'd like that behavior, use [`ExitHandler::listen`].
///
/// Because this API has no global side-effects and can be called many times
/// within the same process, it is possible to use it to provide a mock
/// [`ExitHandler`] inside unit tests.
pub fn new_receiver() -> (Arc<ExitHandler>, ExitReceiver) {
let (tx, rx) = mpsc::unbounded_channel();
(Arc::new(ExitHandler { tx }), ExitReceiver { rx })
}

/// Register this given [`Future`] to run upon process exit.
///
/// As there are many ways for a process be killed that are outside of a
/// process's own control (e.g. `SIGKILL` or `SIGSEGV`), this API is
/// provided on a best-effort basis.
pub fn on_exit(&self, fut: impl Future<Output = ()> + Send + 'static) {
// realistically, this error case can only happen with the `new_receiver` API.
self.tx
.send(Box::pin(fut))
.expect("cannot send future after process exit");
}
}

/// Provides a way to run futures scheduled with an [`ExitHandler`].
pub struct ExitReceiver {
rx: mpsc::UnboundedReceiver<BoxExitFuture>,
}

impl ExitReceiver {
/// Call this when the process exits to run the futures scheduled via
/// [`ExitHandler::on_exit`].
///
/// As this is intended to be used in a library context, this does not exit
/// the process. It is expected that the process will not exit until
/// this async method finishes executing.
///
/// Additional work can be scheduled using [`ExitHandler::on_exit`] even
/// while this is running, and it will execute before this function
/// finishes. Work attempted to be scheduled after this finishes will panic.
pub async fn run_exit_handler(mut self) {
let mut set = JoinSet::new();
while let Ok(fut) = self.rx.try_recv() {
set.spawn(fut);
}
loop {
select! {
biased;
Some(fut) = self.rx.recv() => {
set.spawn(fut);
},
val = set.join_next() => {
match val {
Some(Ok(())) => {}
Some(Err(_)) => panic!("ExitHandler future panicked!"),
None => return,
}
},
}
}
}
}

#[cfg(test)]
mod tests {
use std::{
future::Future,
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
},
};

use super::ExitHandler;

#[tokio::test]
async fn test_on_exit() {
let (handler, receiver) = ExitHandler::new_receiver();

let called = Arc::new(AtomicBool::new(false));
handler.on_exit({
let called = Arc::clone(&called);
async move {
tokio::task::yield_now().await;
called.store(true, Ordering::SeqCst);
}
});

receiver.run_exit_handler().await;
assert_eq!(called.load(Ordering::SeqCst), true);
}

#[tokio::test]
async fn test_queue_while_exiting() {
let (handler, receiver) = ExitHandler::new_receiver();
let call_count = Arc::new(AtomicU32::new(0));

type BoxExitFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;

// this struct is needed to construct the recursive closure type
#[derive(Clone)]
struct GetFut {
handler: Arc<ExitHandler>,
call_count: Arc<AtomicU32>,
}

impl GetFut {
fn get(self) -> BoxExitFuture {
Box::pin(async move {
tokio::task::yield_now().await;
if self.call_count.fetch_add(1, Ordering::SeqCst) < 99 {
// queue more work while the exit handler is running
Arc::clone(&self.handler).on_exit(self.get())
}
})
}
}

handler.on_exit(
GetFut {
handler: Arc::clone(&handler),
call_count: Arc::clone(&call_count),
}
.get(),
);
receiver.run_exit_handler().await;
assert_eq!(call_count.load(Ordering::SeqCst), 100);
}
}

0 comments on commit a3dae53

Please sign in to comment.