diff --git a/crates/containerd-shim-wasm/src/sandbox/instance.rs b/crates/containerd-shim-wasm/src/sandbox/instance.rs index 9a0b5f314..6807ca4bc 100644 --- a/crates/containerd-shim-wasm/src/sandbox/instance.rs +++ b/crates/containerd-shim-wasm/src/sandbox/instance.rs @@ -112,10 +112,34 @@ pub trait Instance { /// This is called after the instance has exited. fn delete(&self) -> Result<(), Error>; /// wait for the instance to exit - /// The sender is used to send the exit code and time back to the caller - /// Ideally this would just be a blocking call with a normal result, however - /// because of how this is called from a thread it causes issues with lifetimes of the trait implementer. - fn wait(&self, send: Sender<(u32, DateTime)>) -> Result<(), Error>; + /// The waiter is used to send the exit code and time back to the caller + fn wait(&self, waiter: &Wait) -> Result<(), Error>; +} + +pub struct Wait { + tx: Sender<(u32, DateTime)>, +} + +impl Wait { + pub fn new(sender: Sender<(u32, DateTime)>) -> Self { + Wait { tx: sender } + } + + pub fn wait_for_exit_code(&self, exit_code: Arc) -> Result<(), Error> { + let sender = self.tx.clone(); + let code = Arc::clone(&exit_code); + thread::spawn(move || { + let (lock, cvar) = &*code; + let mut exit = lock.lock().unwrap(); + while (*exit).is_none() { + exit = cvar.wait(exit).unwrap(); + } + let ec = (*exit).unwrap(); + sender.send(ec).unwrap(); + }); + + Ok(()) + } } /// This is used for the "pause" container with cri and is a no-op instance implementation. @@ -156,19 +180,8 @@ impl Instance for Nop { fn delete(&self) -> Result<(), Error> { Ok(()) } - - fn wait(&self, channel: Sender<(u32, DateTime)>) -> Result<(), Error> { - let code = self.exit_code.clone(); - thread::spawn(move || { - let (lock, cvar) = &*code; - let mut exit = lock.lock().unwrap(); - while (*exit).is_none() { - exit = cvar.wait(exit).unwrap(); - } - let ec = (*exit).unwrap(); - channel.send(ec).unwrap(); - }); - Ok(()) + fn wait(&self, waiter: &Wait) -> Result<(), Error> { + waiter.wait_for_exit_code(self.exit_code.clone()) } } @@ -188,9 +201,10 @@ mod noptests { let (tx, rx) = channel(); let n = nop.clone(); + let waiter = Wait::new(tx); thread::spawn(move || { - n.wait(tx).unwrap(); + n.wait(&waiter).unwrap(); }); nop.kill(SIGKILL as u32)?; @@ -205,9 +219,10 @@ mod noptests { let (tx, rx) = channel(); let n = nop.clone(); + let waiter = Wait::new(tx); thread::spawn(move || { - n.wait(tx).unwrap(); + n.wait(&waiter).unwrap(); }); nop.kill(SIGTERM as u32)?; @@ -222,9 +237,10 @@ mod noptests { let (tx, rx) = channel(); let n = nop.clone(); + let waiter = Wait::new(tx); thread::spawn(move || { - n.wait(tx).unwrap(); + n.wait(&waiter).unwrap(); }); nop.kill(SIGINT as u32)?; diff --git a/crates/containerd-shim-wasm/src/sandbox/shim.rs b/crates/containerd-shim-wasm/src/sandbox/shim.rs index d79b39321..b9d531a78 100644 --- a/crates/containerd-shim-wasm/src/sandbox/shim.rs +++ b/crates/containerd-shim-wasm/src/sandbox/shim.rs @@ -13,7 +13,7 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::thread; -use super::instance::{EngineGetter, Instance, InstanceConfig, Nop}; +use super::instance::{EngineGetter, Instance, InstanceConfig, Nop, Wait}; use super::{oci, Error, SandboxService}; use chrono::{DateTime, Utc}; use containerd_shim::{ @@ -147,11 +147,11 @@ where }) } - fn wait(&self, send: Sender<(u32, DateTime)>) -> Result<()> { + fn wait(&self, waiter: &Wait) -> Result<()> { if self.instance.is_some() { - return self.instance.as_ref().unwrap().wait(send); + return self.instance.as_ref().unwrap().wait(waiter); } - self.base.as_ref().unwrap().wait(send) + self.base.as_ref().unwrap().wait(waiter) } } @@ -990,7 +990,8 @@ where }); let (tx, rx) = channel::<(u32, DateTime)>(); - i.wait(tx)?; + let waiter = Wait::new(tx); + i.wait(&waiter)?; let status = i.status.clone(); @@ -1112,7 +1113,8 @@ where } let (tx, rx) = channel::<(u32, DateTime)>(); - i.wait(tx)?; + let waiter = Wait::new(tx); + i.wait(&waiter)?; let code = rx.recv().unwrap(); debug!("wait done: {:?}", req); diff --git a/crates/containerd-shim-wasmedge/src/instance.rs b/crates/containerd-shim-wasmedge/src/instance.rs index 8a721fd83..05c46ceee 100644 --- a/crates/containerd-shim-wasmedge/src/instance.rs +++ b/crates/containerd-shim-wasmedge/src/instance.rs @@ -2,6 +2,7 @@ use anyhow::Context; use chrono::{DateTime, Utc}; use containerd_shim_wasm::sandbox::error::Error; use containerd_shim_wasm::sandbox::exec; +use containerd_shim_wasm::sandbox::instance::Wait; use containerd_shim_wasm::sandbox::oci; use containerd_shim_wasm::sandbox::{EngineGetter, Instance, InstanceConfig}; use libc::{dup, dup2, SIGINT, SIGKILL, STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO}; @@ -10,10 +11,7 @@ use std::fs::OpenOptions; use std::io::ErrorKind; use std::os::unix::io::{IntoRawFd, RawFd}; use std::path::Path; -use std::sync::{ - mpsc::Sender, - {Arc, Condvar, Mutex}, -}; +use std::sync::{Arc, Condvar, Mutex}; use std::thread; use wasmedge_sdk::{ @@ -271,19 +269,9 @@ impl Instance for Wasi { Ok(()) } - fn wait(&self, channel: Sender<(u32, DateTime)>) -> Result<(), Error> { + fn wait(&self, waiter: &Wait) -> Result<(), Error> { let code = self.exit_code.clone(); - thread::spawn(move || { - let (lock, cvar) = &*code; - let mut exit = lock.lock().unwrap(); - while (*exit).is_none() { - exit = cvar.wait(exit).unwrap(); - } - let ec = (*exit).unwrap(); - channel.send(ec).unwrap(); - }); - - Ok(()) + waiter.wait_for_exit_code(code) } } @@ -377,8 +365,9 @@ mod wasitest { let w = wasi.clone(); let (tx, rx) = channel(); + let waiter = Wait::new(tx); thread::spawn(move || { - w.wait(tx).unwrap(); + w.wait(&waiter).unwrap(); }); let res = match rx.recv_timeout(Duration::from_secs(10)) { diff --git a/crates/containerd-shim-wasmtime/src/instance.rs b/crates/containerd-shim-wasmtime/src/instance.rs index 83be8a2fb..4c1ed9c2d 100644 --- a/crates/containerd-shim-wasmtime/src/instance.rs +++ b/crates/containerd-shim-wasmtime/src/instance.rs @@ -1,6 +1,5 @@ use std::fs::OpenOptions; use std::path::Path; -use std::sync::mpsc::Sender; use std::sync::{Arc, Condvar, Mutex}; use std::thread; @@ -8,6 +7,7 @@ use anyhow::Context; use chrono::{DateTime, Utc}; use containerd_shim_wasm::sandbox::error::Error; use containerd_shim_wasm::sandbox::exec; +use containerd_shim_wasm::sandbox::instance::Wait; use containerd_shim_wasm::sandbox::oci; use containerd_shim_wasm::sandbox::{EngineGetter, Instance, InstanceConfig}; use log::{debug, error}; @@ -256,19 +256,9 @@ impl Instance for Wasi { Ok(()) } - fn wait(&self, channel: Sender<(u32, DateTime)>) -> Result<(), Error> { + fn wait(&self, waiter: &Wait) -> Result<(), Error> { let code = self.exit_code.clone(); - thread::spawn(move || { - let (lock, cvar) = &*code; - let mut exit = lock.lock().unwrap(); - while (*exit).is_none() { - exit = cvar.wait(exit).unwrap(); - } - let ec = (*exit).unwrap(); - channel.send(ec).unwrap(); - }); - - Ok(()) + waiter.wait_for_exit_code(code) } } @@ -279,6 +269,7 @@ mod wasitest { use std::sync::mpsc::channel; use std::time::Duration; + use containerd_shim_wasm::sandbox::instance::Wait; use oci_spec::runtime::{ProcessBuilder, RootBuilder, SpecBuilder}; use tempfile::tempdir; @@ -357,8 +348,9 @@ mod wasitest { let w = wasi.clone(); let (tx, rx) = channel(); + let waiter = Wait::new(tx); thread::spawn(move || { - w.wait(tx).unwrap(); + w.wait(&waiter).unwrap(); }); let res = match rx.recv_timeout(Duration::from_secs(10)) {