From 85d14669aef54118de1eb86fa0439a53e5f891da Mon Sep 17 00:00:00 2001 From: Ismo Puustinen Date: Fri, 10 Mar 2023 14:54:05 +0200 Subject: [PATCH] Redo wait() API. The idea is that the shim implementations would need to do less thread management. Signed-off-by: Ismo Puustinen --- README.md | 8 +- .../src/sandbox/instance.rs | 78 ++++++++++++------- .../containerd-shim-wasm/src/sandbox/shim.rs | 14 ++-- .../containerd-shim-wasmedge/src/instance.rs | 19 ++--- .../containerd-shim-wasmtime/src/instance.rs | 24 ++---- 5 files changed, 74 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index 28aa70101..1871a1798 100644 --- a/README.md +++ b/README.md @@ -37,14 +37,14 @@ pub trait Instance { fn start(&self) -> Result; /// Send a signal to the instance fn kill(&self, signal: u32) -> Result<(), Error>; - /// delete any reference to the instance + /// Delete any reference to the instance /// This is called after the instance has exited. fn delete(&self) -> Result<(), Error>; - /// wait for the instance to exit - /// The sender is used to send the exit code and time back to the caller + /// Wait for the instance to exit + /// The waiter is used to send the exit code and time back to the caller /// Ideally this would just be a blocking call with a normal result, however /// because of how this is called from a thread it causes issues with lifetimes of the trait implementer. - fn wait(&self, send: Sender<(u32, DateTime)>) -> Result<(), Error>; + fn wait(&self, waiter: &Wait) -> Result<(), Error>; } ``` diff --git a/crates/containerd-shim-wasm/src/sandbox/instance.rs b/crates/containerd-shim-wasm/src/sandbox/instance.rs index 435e246fc..e4e8a1a21 100644 --- a/crates/containerd-shim-wasm/src/sandbox/instance.rs +++ b/crates/containerd-shim-wasm/src/sandbox/instance.rs @@ -116,14 +116,50 @@ pub trait Instance { fn start(&self) -> Result; /// Send a signal to the instance fn kill(&self, signal: u32) -> Result<(), Error>; - /// delete any reference to the instance + /// Delete any reference to the instance /// This is called after the instance has exited. fn delete(&self) -> Result<(), Error>; - /// wait for the instance to exit - /// The sender is used to send the exit code and time back to the caller - /// Ideally this would just be a blocking call with a normal result, however - /// because of how this is called from a thread it causes issues with lifetimes of the trait implementer. - fn wait(&self, send: Sender<(u32, DateTime)>) -> Result<(), Error>; + /// Set up waiting for the instance to exit + /// The Wait struct is used to send the exit code and time back to the + /// caller. The recipient is expected to call function + /// set_up_exit_code_wait() implemented by Wait to set up exit code + /// processing. Note that the "wait" function doesn't block, but + /// it sets up the waiting channel. + fn wait(&self, waiter: &Wait) -> Result<(), Error>; +} + +/// This is used for waiting for the container process to exit and deliver the exit code to the caller. +/// Since the shim needs to provide the caller the process exit code, this struct wraps the required +/// thread setup to make the shims simpler. +pub struct Wait { + tx: Sender<(u32, DateTime)>, +} + +impl Wait { + /// Create a new Wait struct with the provided sending endpoint of a channel. + pub fn new(sender: Sender<(u32, DateTime)>) -> Self { + Wait { tx: sender } + } + + /// This is called by the shim to create the thread to wait for the exit + /// code. When the child process exits, the shim will use the ExitCode + /// to signal the exit status to the caller. This function returns so that + /// the wait() function in the shim implementation API would not block. + pub fn set_up_exit_code_wait(&self, exit_code: Arc) -> Result<(), Error> { + let sender = self.tx.clone(); + let code = Arc::clone(&exit_code); + thread::spawn(move || { + let (lock, cvar) = &*code; + let mut exit = lock.lock().unwrap(); + while (*exit).is_none() { + exit = cvar.wait(exit).unwrap(); + } + let ec = (*exit).unwrap(); + sender.send(ec).unwrap(); + }); + + Ok(()) + } } /// This is used for the "pause" container with cri and is a no-op instance implementation. @@ -164,19 +200,8 @@ impl Instance for Nop { fn delete(&self) -> Result<(), Error> { Ok(()) } - - fn wait(&self, channel: Sender<(u32, DateTime)>) -> Result<(), Error> { - let code = self.exit_code.clone(); - thread::spawn(move || { - let (lock, cvar) = &*code; - let mut exit = lock.lock().unwrap(); - while (*exit).is_none() { - exit = cvar.wait(exit).unwrap(); - } - let ec = (*exit).unwrap(); - channel.send(ec).unwrap(); - }); - Ok(()) + fn wait(&self, waiter: &Wait) -> Result<(), Error> { + waiter.set_up_exit_code_wait(self.exit_code.clone()) } } @@ -196,10 +221,9 @@ mod noptests { let (tx, rx) = channel(); let n = nop.clone(); + let waiter = Wait::new(tx); - thread::spawn(move || { - n.wait(tx).unwrap(); - }); + n.wait(&waiter).unwrap(); nop.kill(SIGKILL as u32)?; let ec = rx.recv_timeout(Duration::from_secs(3)).unwrap(); @@ -213,10 +237,9 @@ mod noptests { let (tx, rx) = channel(); let n = nop.clone(); + let waiter = Wait::new(tx); - thread::spawn(move || { - n.wait(tx).unwrap(); - }); + n.wait(&waiter).unwrap(); nop.kill(SIGTERM as u32)?; let ec = rx.recv_timeout(Duration::from_secs(3)).unwrap(); @@ -230,10 +253,9 @@ mod noptests { let (tx, rx) = channel(); let n = nop.clone(); + let waiter = Wait::new(tx); - thread::spawn(move || { - n.wait(tx).unwrap(); - }); + n.wait(&waiter).unwrap(); nop.kill(SIGINT as u32)?; let ec = rx.recv_timeout(Duration::from_secs(3)).unwrap(); diff --git a/crates/containerd-shim-wasm/src/sandbox/shim.rs b/crates/containerd-shim-wasm/src/sandbox/shim.rs index fa52a37b5..ff7d9d80f 100644 --- a/crates/containerd-shim-wasm/src/sandbox/shim.rs +++ b/crates/containerd-shim-wasm/src/sandbox/shim.rs @@ -13,7 +13,7 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::thread; -use super::instance::{EngineGetter, Instance, InstanceConfig, Nop}; +use super::instance::{EngineGetter, Instance, InstanceConfig, Nop, Wait}; use super::{oci, Error, SandboxService}; use chrono::{DateTime, Utc}; use containerd_shim::{ @@ -147,11 +147,11 @@ where }) } - fn wait(&self, send: Sender<(u32, DateTime)>) -> Result<()> { + fn wait(&self, waiter: &Wait) -> Result<()> { if self.instance.is_some() { - return self.instance.as_ref().unwrap().wait(send); + return self.instance.as_ref().unwrap().wait(waiter); } - self.base.as_ref().unwrap().wait(send) + self.base.as_ref().unwrap().wait(waiter) } } @@ -1008,7 +1008,8 @@ where }); let (tx, rx) = channel::<(u32, DateTime)>(); - i.wait(tx)?; + let waiter = Wait::new(tx); + i.wait(&waiter)?; let status = i.status.clone(); @@ -1130,7 +1131,8 @@ where } let (tx, rx) = channel::<(u32, DateTime)>(); - i.wait(tx)?; + let waiter = Wait::new(tx); + i.wait(&waiter)?; let code = rx.recv().unwrap(); debug!("wait done: {:?}", req); diff --git a/crates/containerd-shim-wasmedge/src/instance.rs b/crates/containerd-shim-wasmedge/src/instance.rs index 2a9c2dd1a..79a56a7ac 100644 --- a/crates/containerd-shim-wasmedge/src/instance.rs +++ b/crates/containerd-shim-wasmedge/src/instance.rs @@ -3,7 +3,6 @@ use std::io::prelude::*; use std::io::ErrorKind; use std::os::unix::io::{IntoRawFd, RawFd}; use std::sync::{ - mpsc::Sender, {Arc, Condvar, Mutex}, }; use std::thread; @@ -11,6 +10,7 @@ use std::thread; use anyhow::{bail, Context, Result}; use chrono::{DateTime, Utc}; use containerd_shim_wasm::sandbox::error::Error; +use containerd_shim_wasm::sandbox::instance::Wait; use containerd_shim_wasm::sandbox::{EngineGetter, Instance, InstanceConfig}; use libc::{dup2, SIGINT, SIGKILL, STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO}; use log::{debug, error}; @@ -302,19 +302,9 @@ impl Instance for Wasi { Ok(()) } - fn wait(&self, channel: Sender<(u32, DateTime)>) -> Result<(), Error> { + fn wait(&self, waiter: &Wait) -> Result<(), Error> { let code = self.exit_code.clone(); - thread::spawn(move || { - let (lock, cvar) = &*code; - let mut exit = lock.lock().unwrap(); - while (*exit).is_none() { - exit = cvar.wait(exit).unwrap(); - } - let ec = (*exit).unwrap(); - channel.send(ec).unwrap(); - }); - - Ok(()) + waiter.set_up_exit_code_wait(code) } } @@ -429,7 +419,8 @@ mod wasitest { wasi.start()?; let (tx, rx) = channel(); - wasi.wait(tx).unwrap(); + let waiter = Wait::new(tx); + wasi.wait(&waiter).unwrap(); let res = match rx.recv_timeout(Duration::from_secs(10)) { Ok(res) => Ok(res), diff --git a/crates/containerd-shim-wasmtime/src/instance.rs b/crates/containerd-shim-wasmtime/src/instance.rs index 4cf74f07c..03f1bb7ae 100644 --- a/crates/containerd-shim-wasmtime/src/instance.rs +++ b/crates/containerd-shim-wasmtime/src/instance.rs @@ -1,6 +1,5 @@ use std::fs::OpenOptions; use std::path::Path; -use std::sync::mpsc::Sender; use std::sync::{Arc, Condvar, Mutex}; use std::thread; @@ -8,6 +7,7 @@ use anyhow::Context; use chrono::{DateTime, Utc}; use containerd_shim_wasm::sandbox::error::Error; use containerd_shim_wasm::sandbox::exec; +use containerd_shim_wasm::sandbox::instance::Wait; use containerd_shim_wasm::sandbox::oci; use containerd_shim_wasm::sandbox::{EngineGetter, Instance, InstanceConfig}; use log::{debug, error}; @@ -174,7 +174,7 @@ impl Instance for Wasi { let m = prepare_module(engine.clone(), &spec, stdin, stdout, stderr) .map_err(|e| Error::Others(format!("error setting up module: {}", e)))?; - let mut store = Store::new(&engine, m.0); + let mut store = Store::new(&engine, m.0); debug!("instantiating instance"); let i = linker @@ -262,19 +262,9 @@ impl Instance for Wasi { Ok(()) } - fn wait(&self, channel: Sender<(u32, DateTime)>) -> Result<(), Error> { + fn wait(&self, waiter: &Wait) -> Result<(), Error> { let code = self.exit_code.clone(); - thread::spawn(move || { - let (lock, cvar) = &*code; - let mut exit = lock.lock().unwrap(); - while (*exit).is_none() { - exit = cvar.wait(exit).unwrap(); - } - let ec = (*exit).unwrap(); - channel.send(ec).unwrap(); - }); - - Ok(()) + waiter.set_up_exit_code_wait(code) } } @@ -285,6 +275,7 @@ mod wasitest { use std::sync::mpsc::channel; use std::time::Duration; + use containerd_shim_wasm::sandbox::instance::Wait; use oci_spec::runtime::{ProcessBuilder, RootBuilder, SpecBuilder}; use tempfile::tempdir; @@ -366,9 +357,8 @@ mod wasitest { let w = wasi.clone(); let (tx, rx) = channel(); - thread::spawn(move || { - w.wait(tx).unwrap(); - }); + let waiter = Wait::new(tx); + w.wait(&waiter).unwrap(); let res = match rx.recv_timeout(Duration::from_secs(10)) { Ok(res) => res,