Skip to content

Commit

Permalink
Redo wait() API.
Browse files Browse the repository at this point in the history
The idea is that the shim implementations would need to do less thread
management.

Signed-off-by: Ismo Puustinen <[email protected]>
  • Loading branch information
ipuustin committed Mar 14, 2023
1 parent 67c6be5 commit c580629
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 57 deletions.
56 changes: 36 additions & 20 deletions crates/containerd-shim-wasm/src/sandbox/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,34 @@ pub trait Instance {
/// This is called after the instance has exited.
fn delete(&self) -> Result<(), Error>;
/// wait for the instance to exit
/// The sender is used to send the exit code and time back to the caller
/// Ideally this would just be a blocking call with a normal result, however
/// because of how this is called from a thread it causes issues with lifetimes of the trait implementer.
fn wait(&self, send: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error>;
/// The waiter is used to send the exit code and time back to the caller
fn wait(&self, waiter: &Wait) -> Result<(), Error>;
}

pub struct Wait {
tx: Sender<(u32, DateTime<Utc>)>,
}

impl Wait {
pub fn new(sender: Sender<(u32, DateTime<Utc>)>) -> Self {
Wait { tx: sender }
}

pub fn wait_for_exit_code(&self, exit_code: Arc<ExitCode>) -> Result<(), Error> {
let sender = self.tx.clone();
let code = Arc::clone(&exit_code);
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
sender.send(ec).unwrap();
});

Ok(())
}
}

/// This is used for the "pause" container with cri and is a no-op instance implementation.
Expand Down Expand Up @@ -156,19 +180,8 @@ impl Instance for Nop {
fn delete(&self) -> Result<(), Error> {
Ok(())
}

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});
Ok(())
fn wait(&self, waiter: &Wait) -> Result<(), Error> {
waiter.wait_for_exit_code(self.exit_code.clone())
}
}

Expand All @@ -188,9 +201,10 @@ mod noptests {
let (tx, rx) = channel();

let n = nop.clone();
let waiter = Wait::new(tx);

thread::spawn(move || {
n.wait(tx).unwrap();
n.wait(&waiter).unwrap();
});

nop.kill(SIGKILL as u32)?;
Expand All @@ -205,9 +219,10 @@ mod noptests {
let (tx, rx) = channel();

let n = nop.clone();
let waiter = Wait::new(tx);

thread::spawn(move || {
n.wait(tx).unwrap();
n.wait(&waiter).unwrap();
});

nop.kill(SIGTERM as u32)?;
Expand All @@ -222,9 +237,10 @@ mod noptests {
let (tx, rx) = channel();

let n = nop.clone();
let waiter = Wait::new(tx);

thread::spawn(move || {
n.wait(tx).unwrap();
n.wait(&waiter).unwrap();
});

nop.kill(SIGINT as u32)?;
Expand Down
14 changes: 8 additions & 6 deletions crates/containerd-shim-wasm/src/sandbox/shim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::thread;

use super::instance::{EngineGetter, Instance, InstanceConfig, Nop};
use super::instance::{EngineGetter, Instance, InstanceConfig, Nop, Wait};
use super::{oci, Error, SandboxService};
use chrono::{DateTime, Utc};
use containerd_shim::{
Expand Down Expand Up @@ -147,11 +147,11 @@ where
})
}

fn wait(&self, send: Sender<(u32, DateTime<Utc>)>) -> Result<()> {
fn wait(&self, waiter: &Wait) -> Result<()> {
if self.instance.is_some() {
return self.instance.as_ref().unwrap().wait(send);
return self.instance.as_ref().unwrap().wait(waiter);
}
self.base.as_ref().unwrap().wait(send)
self.base.as_ref().unwrap().wait(waiter)
}
}

Expand Down Expand Up @@ -990,7 +990,8 @@ where
});

let (tx, rx) = channel::<(u32, DateTime<Utc>)>();
i.wait(tx)?;
let waiter = Wait::new(tx);
i.wait(&waiter)?;

let status = i.status.clone();

Expand Down Expand Up @@ -1112,7 +1113,8 @@ where
}

let (tx, rx) = channel::<(u32, DateTime<Utc>)>();
i.wait(tx)?;
let waiter = Wait::new(tx);
i.wait(&waiter)?;

let code = rx.recv().unwrap();
debug!("wait done: {:?}", req);
Expand Down
23 changes: 6 additions & 17 deletions crates/containerd-shim-wasmedge/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::Context;
use chrono::{DateTime, Utc};
use containerd_shim_wasm::sandbox::error::Error;
use containerd_shim_wasm::sandbox::exec;
use containerd_shim_wasm::sandbox::instance::Wait;
use containerd_shim_wasm::sandbox::oci;
use containerd_shim_wasm::sandbox::{EngineGetter, Instance, InstanceConfig};
use libc::{dup, dup2, SIGINT, SIGKILL, STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
Expand All @@ -10,10 +11,7 @@ use std::fs::OpenOptions;
use std::io::ErrorKind;
use std::os::unix::io::{IntoRawFd, RawFd};
use std::path::Path;
use std::sync::{
mpsc::Sender,
{Arc, Condvar, Mutex},
};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

use wasmedge_sdk::{
Expand Down Expand Up @@ -271,19 +269,9 @@ impl Instance for Wasi {
Ok(())
}

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
fn wait(&self, waiter: &Wait) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});

Ok(())
waiter.wait_for_exit_code(code)
}
}

Expand Down Expand Up @@ -377,8 +365,9 @@ mod wasitest {

let w = wasi.clone();
let (tx, rx) = channel();
let waiter = Wait::new(tx);
thread::spawn(move || {
w.wait(tx).unwrap();
w.wait(&waiter).unwrap();
});

let res = match rx.recv_timeout(Duration::from_secs(10)) {
Expand Down
20 changes: 6 additions & 14 deletions crates/containerd-shim-wasmtime/src/instance.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::fs::OpenOptions;
use std::path::Path;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

use anyhow::Context;
use chrono::{DateTime, Utc};
use containerd_shim_wasm::sandbox::error::Error;
use containerd_shim_wasm::sandbox::exec;
use containerd_shim_wasm::sandbox::instance::Wait;
use containerd_shim_wasm::sandbox::oci;
use containerd_shim_wasm::sandbox::{EngineGetter, Instance, InstanceConfig};
use log::{debug, error};
Expand Down Expand Up @@ -256,19 +256,9 @@ impl Instance for Wasi {
Ok(())
}

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
fn wait(&self, waiter: &Wait) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});

Ok(())
waiter.wait_for_exit_code(code)
}
}

Expand All @@ -279,6 +269,7 @@ mod wasitest {
use std::sync::mpsc::channel;
use std::time::Duration;

use containerd_shim_wasm::sandbox::instance::Wait;
use oci_spec::runtime::{ProcessBuilder, RootBuilder, SpecBuilder};
use tempfile::tempdir;

Expand Down Expand Up @@ -357,8 +348,9 @@ mod wasitest {

let w = wasi.clone();
let (tx, rx) = channel();
let waiter = Wait::new(tx);
thread::spawn(move || {
w.wait(tx).unwrap();
w.wait(&waiter).unwrap();
});

let res = match rx.recv_timeout(Duration::from_secs(10)) {
Expand Down

0 comments on commit c580629

Please sign in to comment.