Skip to content

Commit

Permalink
First revision of ExitHandler API
Browse files Browse the repository at this point in the history
Proposed alternative to ExitGuard API that allows async work, and is
clearer about only allowing a single listener. Allows potentially adding
additional callbacks after initialization.
  • Loading branch information
bgw committed Jun 9, 2024
1 parent a43455d commit 4d96e72
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 2 deletions.
2 changes: 1 addition & 1 deletion crates/turbopack-trace-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ crossbeam-channel = { workspace = true }
once_cell = { workspace = true }
postcard = { workspace = true, features = ["alloc", "use-std"] }
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["signal", "rt"] }
tokio = { workspace = true, features = ["macros", "signal", "sync", "rt"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
turbo-tasks-malloc = { workspace = true }
83 changes: 82 additions & 1 deletion crates/turbopack-trace-utils/src/exit.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use std::sync::{Arc, Mutex};
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex, OnceLock},
};

use anyhow::Result;
use tokio::{select, sync::mpsc, task::JoinSet};

/// A guard for the exit handler. When dropped, the exit guard will be dropped.
/// It might also be dropped on Ctrl-C.
Expand All @@ -27,3 +32,79 @@ impl<T: Send + 'static> ExitGuard<T> {
Ok(ExitGuard(guard))
}
}

type ExitFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;

/// The singular global ExitHandler. This is primarily used to ensure
/// `ExitHandler::listen` is only called once.
///
/// The global handler is intentionally not exposed, so that APIs that depend on
/// exit behavior are required to take the `ExitHandler`. This ensures that the
/// `ExitHandler` is configured before these APIs are run, and that these
/// consumers can be used with a mock instead.
static GLOBAL_EXIT_HANDLER: OnceLock<Arc<ExitHandler>> = OnceLock::new();

pub struct ExitHandler {
tx: mpsc::UnboundedSender<ExitFuture>,
}

impl ExitHandler {
pub fn listen() -> &'static Arc<ExitHandler> {
let (tx, mut rx) = mpsc::unbounded_channel();
let handler = Arc::new(ExitHandler { tx });
if GLOBAL_EXIT_HANDLER.set(handler).is_err() {
panic!("ExitHandler::listen must only be called once");
}
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("failed to set ctrl_c handler");
run_all_futures(&mut rx).await;
std::process::exit(0);
});
GLOBAL_EXIT_HANDLER.get().expect("value is set")
}

pub fn mock() -> (Arc<ExitHandler>, ExitHandlerMocks) {
let (tx, rx) = mpsc::unbounded_channel();
(Arc::new(ExitHandler { tx }), ExitHandlerMocks { rx })
}

pub fn on_exit(&self, fut: impl Future<Output = ()> + Send + 'static) {
// realistically, this error case can only happen with a mock
self.tx
.send(Box::pin(fut))
.expect("cannot send future after process exit");
}
}

pub struct ExitHandlerMocks {
rx: mpsc::UnboundedReceiver<ExitFuture>,
}

impl ExitHandlerMocks {
pub async fn mock_exit(mut self) {
run_all_futures(&mut self.rx).await
}
}

pub async fn run_all_futures(rx: &mut mpsc::UnboundedReceiver<ExitFuture>) {
let mut set = JoinSet::new();
while let Ok(fut) = rx.try_recv() {
set.spawn(fut);
}
loop {
select! {
Some(fut) = rx.recv() => {
set.spawn(fut);
},
val = set.join_next() => {
match val {
Some(Ok(())) => {}
Some(Err(_)) => panic!("ExitHandler future panicked!"),
None => return,
}
},
}
}
}

0 comments on commit 4d96e72

Please sign in to comment.