Skip to content

Commit

Permalink
Redo wait() API.
Browse files Browse the repository at this point in the history
The idea is that the shim implementations would need to do less thread
management.

Signed-off-by: Ismo Puustinen <[email protected]>
  • Loading branch information
ipuustin committed Apr 24, 2023
1 parent 8499ffc commit fbbef3a
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 68 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ pub trait Instance {
fn start(&self) -> Result<u32, Error>;
/// Send a signal to the instance
fn kill(&self, signal: u32) -> Result<(), Error>;
/// delete any reference to the instance
/// Delete any reference to the instance
/// This is called after the instance has exited.
fn delete(&self) -> Result<(), Error>;
/// wait for the instance to exit
/// Wait for the instance to exit
/// The sender is used to send the exit code and time back to the caller
/// Ideally this would just be a blocking call with a normal result, however
/// because of how this is called from a thread it causes issues with lifetimes of the trait implementer.
fn wait(&self, send: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error>;
fn wait(&self, waiter: &Wait) -> Result<(), Error>;
}
```

Expand Down
78 changes: 50 additions & 28 deletions crates/containerd-shim-wasm/src/sandbox/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,50 @@ pub trait Instance {
fn start(&self) -> Result<u32, Error>;
/// Send a signal to the instance
fn kill(&self, signal: u32) -> Result<(), Error>;
/// delete any reference to the instance
/// Delete any reference to the instance
/// This is called after the instance has exited.
fn delete(&self) -> Result<(), Error>;
/// wait for the instance to exit
/// The sender is used to send the exit code and time back to the caller
/// Ideally this would just be a blocking call with a normal result, however
/// because of how this is called from a thread it causes issues with lifetimes of the trait implementer.
fn wait(&self, send: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error>;
/// Set up waiting for the instance to exit
/// The Wait struct is used to send the exit code and time back to the
/// caller. The recipient is expected to call function
/// set_up_exit_code_wait() implemented by Wait to set up exit code
/// processing. Note that the "wait" function doesn't block, but
/// it sets up the waiting channel.
fn wait(&self, waiter: &Wait) -> Result<(), Error>;
}

/// This is used for waiting for the container process to exit and deliver the exit code to the caller.
/// Since the shim needs to provide the caller the process exit code, this struct wraps the required
/// thread setup to make the shims simpler.
pub struct Wait {
tx: Sender<(u32, DateTime<Utc>)>,
}

impl Wait {
/// Create a new Wait struct with the provided sending endpoint of a channel.
pub fn new(sender: Sender<(u32, DateTime<Utc>)>) -> Self {
Wait { tx: sender }
}

/// This is called by the shim to create the thread to wait for the exit
/// code. When the child process exits, the shim will use the ExitCode
/// to signal the exit status to the caller. This function returns so that
/// the wait() function in the shim implementation API would not block.
pub fn set_up_exit_code_wait(&self, exit_code: Arc<ExitCode>) -> Result<(), Error> {
let sender = self.tx.clone();
let code = Arc::clone(&exit_code);
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
sender.send(ec).unwrap();
});

Ok(())
}
}

/// This is used for the "pause" container with cri and is a no-op instance implementation.
Expand Down Expand Up @@ -164,19 +200,8 @@ impl Instance for Nop {
fn delete(&self) -> Result<(), Error> {
Ok(())
}

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});
Ok(())
fn wait(&self, waiter: &Wait) -> Result<(), Error> {
waiter.set_up_exit_code_wait(self.exit_code.clone())
}
}

Expand All @@ -196,10 +221,9 @@ mod noptests {
let (tx, rx) = channel();

let n = nop.clone();
let waiter = Wait::new(tx);

thread::spawn(move || {
n.wait(tx).unwrap();
});
n.wait(&waiter).unwrap();

nop.kill(SIGKILL as u32)?;
let ec = rx.recv_timeout(Duration::from_secs(3)).unwrap();
Expand All @@ -213,10 +237,9 @@ mod noptests {
let (tx, rx) = channel();

let n = nop.clone();
let waiter = Wait::new(tx);

thread::spawn(move || {
n.wait(tx).unwrap();
});
n.wait(&waiter).unwrap();

nop.kill(SIGTERM as u32)?;
let ec = rx.recv_timeout(Duration::from_secs(3)).unwrap();
Expand All @@ -230,10 +253,9 @@ mod noptests {
let (tx, rx) = channel();

let n = nop.clone();
let waiter = Wait::new(tx);

thread::spawn(move || {
n.wait(tx).unwrap();
});
n.wait(&waiter).unwrap();

nop.kill(SIGINT as u32)?;
let ec = rx.recv_timeout(Duration::from_secs(3)).unwrap();
Expand Down
14 changes: 8 additions & 6 deletions crates/containerd-shim-wasm/src/sandbox/shim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::thread;

use super::instance::{EngineGetter, Instance, InstanceConfig, Nop};
use super::instance::{EngineGetter, Instance, InstanceConfig, Nop, Wait};
use super::{oci, Error, SandboxService};
use chrono::{DateTime, Utc};
use containerd_shim::{
Expand Down Expand Up @@ -147,11 +147,11 @@ where
})
}

fn wait(&self, send: Sender<(u32, DateTime<Utc>)>) -> Result<()> {
fn wait(&self, waiter: &Wait) -> Result<()> {
if self.instance.is_some() {
return self.instance.as_ref().unwrap().wait(send);
return self.instance.as_ref().unwrap().wait(waiter);
}
self.base.as_ref().unwrap().wait(send)
self.base.as_ref().unwrap().wait(waiter)
}
}

Expand Down Expand Up @@ -1008,7 +1008,8 @@ where
});

let (tx, rx) = channel::<(u32, DateTime<Utc>)>();
i.wait(tx)?;
let waiter = Wait::new(tx);
i.wait(&waiter)?;

let status = i.status.clone();

Expand Down Expand Up @@ -1130,7 +1131,8 @@ where
}

let (tx, rx) = channel::<(u32, DateTime<Utc>)>();
i.wait(tx)?;
let waiter = Wait::new(tx);
i.wait(&waiter)?;

let code = rx.recv().unwrap();
debug!("wait done: {:?}", req);
Expand Down
19 changes: 5 additions & 14 deletions crates/containerd-shim-wasmedge/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::io::prelude::*;
use std::io::ErrorKind;
use std::os::unix::io::{IntoRawFd, RawFd};
use std::sync::{
mpsc::Sender,
{Arc, Condvar, Mutex},
};
use std::thread;

use anyhow::{bail, Context, Result};
use chrono::{DateTime, Utc};
use containerd_shim_wasm::sandbox::error::Error;
use containerd_shim_wasm::sandbox::instance::Wait;
use containerd_shim_wasm::sandbox::{EngineGetter, Instance, InstanceConfig};
use libc::{dup2, SIGINT, SIGKILL, STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
use log::{debug, error};
Expand Down Expand Up @@ -302,19 +302,9 @@ impl Instance for Wasi {
Ok(())
}

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
fn wait(&self, waiter: &Wait) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});

Ok(())
waiter.set_up_exit_code_wait(code)
}
}

Expand Down Expand Up @@ -429,7 +419,8 @@ mod wasitest {
wasi.start()?;

let (tx, rx) = channel();
wasi.wait(tx).unwrap();
let waiter = Wait::new(tx);
wasi.wait(&waiter).unwrap();

let res = match rx.recv_timeout(Duration::from_secs(10)) {
Ok(res) => Ok(res),
Expand Down
24 changes: 7 additions & 17 deletions crates/containerd-shim-wasmtime/src/instance.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::fs::OpenOptions;
use std::path::Path;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

use anyhow::Context;
use chrono::{DateTime, Utc};
use containerd_shim_wasm::sandbox::error::Error;
use containerd_shim_wasm::sandbox::exec;
use containerd_shim_wasm::sandbox::instance::Wait;
use containerd_shim_wasm::sandbox::oci;
use containerd_shim_wasm::sandbox::{EngineGetter, Instance, InstanceConfig};
use log::{debug, error};
Expand Down Expand Up @@ -174,7 +174,7 @@ impl Instance for Wasi {
let m = prepare_module(engine.clone(), &spec, stdin, stdout, stderr)
.map_err(|e| Error::Others(format!("error setting up module: {}", e)))?;

let mut store = Store::new(&engine, m.0);
let mut store = Store::new(&engine, m.0);

debug!("instantiating instance");
let i = linker
Expand Down Expand Up @@ -262,19 +262,9 @@ impl Instance for Wasi {
Ok(())
}

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
fn wait(&self, waiter: &Wait) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});

Ok(())
waiter.set_up_exit_code_wait(code)
}
}

Expand All @@ -285,6 +275,7 @@ mod wasitest {
use std::sync::mpsc::channel;
use std::time::Duration;

use containerd_shim_wasm::sandbox::instance::Wait;
use oci_spec::runtime::{ProcessBuilder, RootBuilder, SpecBuilder};
use tempfile::tempdir;

Expand Down Expand Up @@ -366,9 +357,8 @@ mod wasitest {

let w = wasi.clone();
let (tx, rx) = channel();
thread::spawn(move || {
w.wait(tx).unwrap();
});
let waiter = Wait::new(tx);
w.wait(&waiter).unwrap();

let res = match rx.recv_timeout(Duration::from_secs(10)) {
Ok(res) => res,
Expand Down

0 comments on commit fbbef3a

Please sign in to comment.