Skip to content

Commit

Permalink
Redo wait() API.
Browse files Browse the repository at this point in the history
The idea is that the shim implementations would need to do less thread
management.

Signed-off-by: Ismo Puustinen <[email protected]>
  • Loading branch information
ipuustin committed Mar 24, 2023
1 parent 110c097 commit a10c554
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 58 deletions.
69 changes: 48 additions & 21 deletions crates/containerd-shim-wasm/src/sandbox/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,46 @@ pub trait Instance {
/// delete any reference to the instance
/// This is called after the instance has exited.
fn delete(&self) -> Result<(), Error>;
/// wait for the instance to exit
/// The sender is used to send the exit code and time back to the caller
/// Ideally this would just be a blocking call with a normal result, however
/// because of how this is called from a thread it causes issues with lifetimes of the trait implementer.
fn wait(&self, send: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error>;
/// Wait for the instance to exit.
/// The Wait struct is used to send the exit code and time back to the
/// caller. The recipient is expected to call function
/// set_up_exit_code_wait() implemented by Wait to set up exit code
/// processing.
fn wait(&self, waiter: &Wait) -> Result<(), Error>;
}

/// This is used for waiting for the container process to exit and deliver the exit code to the caller.
/// Since the shim needs to provide the caller the process exit code, this struct wraps the required
/// thread setup to make the shims simpler.
pub struct Wait {
tx: Sender<(u32, DateTime<Utc>)>,
}

impl Wait {
/// Create a new Wait struct with the provided sending endpoint of a channel.
pub fn new(sender: Sender<(u32, DateTime<Utc>)>) -> Self {
Wait { tx: sender }
}

/// This is called by the shim to create the thread to wait for the exit
/// code. When the child process exits, the shim will use the ExitCode
/// to signal the exit status to the caller. This function returns so that
/// the wait() function in the shim implementation API would not block.
pub fn set_up_exit_code_wait(&self, exit_code: Arc<ExitCode>) -> Result<(), Error> {
let sender = self.tx.clone();
let code = Arc::clone(&exit_code);
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
sender.send(ec).unwrap();
});

Ok(())
}
}

/// This is used for the "pause" container with cri and is a no-op instance implementation.
Expand Down Expand Up @@ -156,19 +191,8 @@ impl Instance for Nop {
fn delete(&self) -> Result<(), Error> {
Ok(())
}

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});
Ok(())
fn wait(&self, waiter: &Wait) -> Result<(), Error> {
waiter.set_up_exit_code_wait(self.exit_code.clone())
}
}

Expand All @@ -188,9 +212,10 @@ mod noptests {
let (tx, rx) = channel();

let n = nop.clone();
let waiter = Wait::new(tx);

thread::spawn(move || {
n.wait(tx).unwrap();
n.wait(&waiter).unwrap();
});

nop.kill(SIGKILL as u32)?;
Expand All @@ -205,9 +230,10 @@ mod noptests {
let (tx, rx) = channel();

let n = nop.clone();
let waiter = Wait::new(tx);

thread::spawn(move || {
n.wait(tx).unwrap();
n.wait(&waiter).unwrap();
});

nop.kill(SIGTERM as u32)?;
Expand All @@ -222,9 +248,10 @@ mod noptests {
let (tx, rx) = channel();

let n = nop.clone();
let waiter = Wait::new(tx);

thread::spawn(move || {
n.wait(tx).unwrap();
n.wait(&waiter).unwrap();
});

nop.kill(SIGINT as u32)?;
Expand Down
14 changes: 8 additions & 6 deletions crates/containerd-shim-wasm/src/sandbox/shim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::thread;

use super::instance::{EngineGetter, Instance, InstanceConfig, Nop};
use super::instance::{EngineGetter, Instance, InstanceConfig, Nop, Wait};
use super::{oci, Error, SandboxService};
use chrono::{DateTime, Utc};
use containerd_shim::{
Expand Down Expand Up @@ -147,11 +147,11 @@ where
})
}

fn wait(&self, send: Sender<(u32, DateTime<Utc>)>) -> Result<()> {
fn wait(&self, waiter: &Wait) -> Result<()> {
if self.instance.is_some() {
return self.instance.as_ref().unwrap().wait(send);
return self.instance.as_ref().unwrap().wait(waiter);
}
self.base.as_ref().unwrap().wait(send)
self.base.as_ref().unwrap().wait(waiter)
}
}

Expand Down Expand Up @@ -990,7 +990,8 @@ where
});

let (tx, rx) = channel::<(u32, DateTime<Utc>)>();
i.wait(tx)?;
let waiter = Wait::new(tx);
i.wait(&waiter)?;

let status = i.status.clone();

Expand Down Expand Up @@ -1112,7 +1113,8 @@ where
}

let (tx, rx) = channel::<(u32, DateTime<Utc>)>();
i.wait(tx)?;
let waiter = Wait::new(tx);
i.wait(&waiter)?;

let code = rx.recv().unwrap();
debug!("wait done: {:?}", req);
Expand Down
23 changes: 6 additions & 17 deletions crates/containerd-shim-wasmedge/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::Context;
use chrono::{DateTime, Utc};
use containerd_shim_wasm::sandbox::error::Error;
use containerd_shim_wasm::sandbox::exec;
use containerd_shim_wasm::sandbox::instance::Wait;
use containerd_shim_wasm::sandbox::oci;
use containerd_shim_wasm::sandbox::{EngineGetter, Instance, InstanceConfig};
use libc::{dup, dup2, SIGINT, SIGKILL, STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
Expand All @@ -10,10 +11,7 @@ use std::fs::OpenOptions;
use std::io::ErrorKind;
use std::os::unix::io::{IntoRawFd, RawFd};
use std::path::Path;
use std::sync::{
mpsc::Sender,
{Arc, Condvar, Mutex},
};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

use wasmedge_sdk::{
Expand Down Expand Up @@ -271,19 +269,9 @@ impl Instance for Wasi {
Ok(())
}

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
fn wait(&self, waiter: &Wait) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});

Ok(())
waiter.set_up_exit_code_wait(code)
}
}

Expand Down Expand Up @@ -377,8 +365,9 @@ mod wasitest {

let w = wasi.clone();
let (tx, rx) = channel();
let waiter = Wait::new(tx);
thread::spawn(move || {
w.wait(tx).unwrap();
w.wait(&waiter).unwrap();
});

let res = match rx.recv_timeout(Duration::from_secs(10)) {
Expand Down
20 changes: 6 additions & 14 deletions crates/containerd-shim-wasmtime/src/instance.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::fs::OpenOptions;
use std::path::Path;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

use anyhow::Context;
use chrono::{DateTime, Utc};
use containerd_shim_wasm::sandbox::error::Error;
use containerd_shim_wasm::sandbox::exec;
use containerd_shim_wasm::sandbox::instance::Wait;
use containerd_shim_wasm::sandbox::oci;
use containerd_shim_wasm::sandbox::{EngineGetter, Instance, InstanceConfig};
use log::{debug, error};
Expand Down Expand Up @@ -256,19 +256,9 @@ impl Instance for Wasi {
Ok(())
}

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
fn wait(&self, waiter: &Wait) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});

Ok(())
waiter.set_up_exit_code_wait(code)
}
}

Expand All @@ -279,6 +269,7 @@ mod wasitest {
use std::sync::mpsc::channel;
use std::time::Duration;

use containerd_shim_wasm::sandbox::instance::Wait;
use oci_spec::runtime::{ProcessBuilder, RootBuilder, SpecBuilder};
use tempfile::tempdir;

Expand Down Expand Up @@ -357,8 +348,9 @@ mod wasitest {

let w = wasi.clone();
let (tx, rx) = channel();
let waiter = Wait::new(tx);
thread::spawn(move || {
w.wait(tx).unwrap();
w.wait(&waiter).unwrap();
});

let res = match rx.recv_timeout(Duration::from_secs(10)) {
Expand Down

0 comments on commit a10c554

Please sign in to comment.