Skip to content

Commit

Permalink
Merge pull request #88 from ipuustin/wait-api
Browse files Browse the repository at this point in the history
Redo wait() API.
  • Loading branch information
cpuguy83 committed Apr 24, 2023
2 parents 8499ffc + 85d1466 commit 74c08ce
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 69 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ pub trait Instance {
fn start(&self) -> Result<u32, Error>;
/// Send a signal to the instance
fn kill(&self, signal: u32) -> Result<(), Error>;
/// delete any reference to the instance
/// Delete any reference to the instance
/// This is called after the instance has exited.
fn delete(&self) -> Result<(), Error>;
/// wait for the instance to exit
/// The sender is used to send the exit code and time back to the caller
/// Wait for the instance to exit
/// The waiter is used to send the exit code and time back to the caller
/// Ideally this would just be a blocking call with a normal result, however
/// because of how this is called from a thread it causes issues with lifetimes of the trait implementer.
fn wait(&self, send: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error>;
fn wait(&self, waiter: &Wait) -> Result<(), Error>;
}
```

Expand Down
78 changes: 50 additions & 28 deletions crates/containerd-shim-wasm/src/sandbox/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,50 @@ pub trait Instance {
fn start(&self) -> Result<u32, Error>;
/// Send a signal to the instance
fn kill(&self, signal: u32) -> Result<(), Error>;
/// delete any reference to the instance
/// Delete any reference to the instance
/// This is called after the instance has exited.
fn delete(&self) -> Result<(), Error>;
/// wait for the instance to exit
/// The sender is used to send the exit code and time back to the caller
/// Ideally this would just be a blocking call with a normal result, however
/// because of how this is called from a thread it causes issues with lifetimes of the trait implementer.
fn wait(&self, send: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error>;
/// Set up waiting for the instance to exit
/// The Wait struct is used to send the exit code and time back to the
/// caller. The recipient is expected to call function
/// set_up_exit_code_wait() implemented by Wait to set up exit code
/// processing. Note that the "wait" function doesn't block, but
/// it sets up the waiting channel.
fn wait(&self, waiter: &Wait) -> Result<(), Error>;
}

/// This is used for waiting for the container process to exit and deliver the exit code to the caller.
/// Since the shim needs to provide the caller the process exit code, this struct wraps the required
/// thread setup to make the shims simpler.
pub struct Wait {
tx: Sender<(u32, DateTime<Utc>)>,
}

impl Wait {
/// Create a new Wait struct with the provided sending endpoint of a channel.
pub fn new(sender: Sender<(u32, DateTime<Utc>)>) -> Self {
Wait { tx: sender }
}

/// This is called by the shim to create the thread to wait for the exit
/// code. When the child process exits, the shim will use the ExitCode
/// to signal the exit status to the caller. This function returns so that
/// the wait() function in the shim implementation API would not block.
pub fn set_up_exit_code_wait(&self, exit_code: Arc<ExitCode>) -> Result<(), Error> {
let sender = self.tx.clone();
let code = Arc::clone(&exit_code);
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
sender.send(ec).unwrap();
});

Ok(())
}
}

/// This is used for the "pause" container with cri and is a no-op instance implementation.
Expand Down Expand Up @@ -164,19 +200,8 @@ impl Instance for Nop {
fn delete(&self) -> Result<(), Error> {
Ok(())
}

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});
Ok(())
fn wait(&self, waiter: &Wait) -> Result<(), Error> {
waiter.set_up_exit_code_wait(self.exit_code.clone())
}
}

Expand All @@ -196,10 +221,9 @@ mod noptests {
let (tx, rx) = channel();

let n = nop.clone();
let waiter = Wait::new(tx);

thread::spawn(move || {
n.wait(tx).unwrap();
});
n.wait(&waiter).unwrap();

nop.kill(SIGKILL as u32)?;
let ec = rx.recv_timeout(Duration::from_secs(3)).unwrap();
Expand All @@ -213,10 +237,9 @@ mod noptests {
let (tx, rx) = channel();

let n = nop.clone();
let waiter = Wait::new(tx);

thread::spawn(move || {
n.wait(tx).unwrap();
});
n.wait(&waiter).unwrap();

nop.kill(SIGTERM as u32)?;
let ec = rx.recv_timeout(Duration::from_secs(3)).unwrap();
Expand All @@ -230,10 +253,9 @@ mod noptests {
let (tx, rx) = channel();

let n = nop.clone();
let waiter = Wait::new(tx);

thread::spawn(move || {
n.wait(tx).unwrap();
});
n.wait(&waiter).unwrap();

nop.kill(SIGINT as u32)?;
let ec = rx.recv_timeout(Duration::from_secs(3)).unwrap();
Expand Down
14 changes: 8 additions & 6 deletions crates/containerd-shim-wasm/src/sandbox/shim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::thread;

use super::instance::{EngineGetter, Instance, InstanceConfig, Nop};
use super::instance::{EngineGetter, Instance, InstanceConfig, Nop, Wait};
use super::{oci, Error, SandboxService};
use chrono::{DateTime, Utc};
use containerd_shim::{
Expand Down Expand Up @@ -147,11 +147,11 @@ where
})
}

fn wait(&self, send: Sender<(u32, DateTime<Utc>)>) -> Result<()> {
fn wait(&self, waiter: &Wait) -> Result<()> {
if self.instance.is_some() {
return self.instance.as_ref().unwrap().wait(send);
return self.instance.as_ref().unwrap().wait(waiter);
}
self.base.as_ref().unwrap().wait(send)
self.base.as_ref().unwrap().wait(waiter)
}
}

Expand Down Expand Up @@ -1008,7 +1008,8 @@ where
});

let (tx, rx) = channel::<(u32, DateTime<Utc>)>();
i.wait(tx)?;
let waiter = Wait::new(tx);
i.wait(&waiter)?;

let status = i.status.clone();

Expand Down Expand Up @@ -1130,7 +1131,8 @@ where
}

let (tx, rx) = channel::<(u32, DateTime<Utc>)>();
i.wait(tx)?;
let waiter = Wait::new(tx);
i.wait(&waiter)?;

let code = rx.recv().unwrap();
debug!("wait done: {:?}", req);
Expand Down
19 changes: 5 additions & 14 deletions crates/containerd-shim-wasmedge/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::io::prelude::*;
use std::io::ErrorKind;
use std::os::unix::io::{IntoRawFd, RawFd};
use std::sync::{
mpsc::Sender,
{Arc, Condvar, Mutex},
};
use std::thread;

use anyhow::{bail, Context, Result};
use chrono::{DateTime, Utc};
use containerd_shim_wasm::sandbox::error::Error;
use containerd_shim_wasm::sandbox::instance::Wait;
use containerd_shim_wasm::sandbox::{EngineGetter, Instance, InstanceConfig};
use libc::{dup2, SIGINT, SIGKILL, STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
use log::{debug, error};
Expand Down Expand Up @@ -302,19 +302,9 @@ impl Instance for Wasi {
Ok(())
}

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
fn wait(&self, waiter: &Wait) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});

Ok(())
waiter.set_up_exit_code_wait(code)
}
}

Expand Down Expand Up @@ -429,7 +419,8 @@ mod wasitest {
wasi.start()?;

let (tx, rx) = channel();
wasi.wait(tx).unwrap();
let waiter = Wait::new(tx);
wasi.wait(&waiter).unwrap();

let res = match rx.recv_timeout(Duration::from_secs(10)) {
Ok(res) => Ok(res),
Expand Down
24 changes: 7 additions & 17 deletions crates/containerd-shim-wasmtime/src/instance.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::fs::OpenOptions;
use std::path::Path;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

use anyhow::Context;
use chrono::{DateTime, Utc};
use containerd_shim_wasm::sandbox::error::Error;
use containerd_shim_wasm::sandbox::exec;
use containerd_shim_wasm::sandbox::instance::Wait;
use containerd_shim_wasm::sandbox::oci;
use containerd_shim_wasm::sandbox::{EngineGetter, Instance, InstanceConfig};
use log::{debug, error};
Expand Down Expand Up @@ -174,7 +174,7 @@ impl Instance for Wasi {
let m = prepare_module(engine.clone(), &spec, stdin, stdout, stderr)
.map_err(|e| Error::Others(format!("error setting up module: {}", e)))?;

let mut store = Store::new(&engine, m.0);
let mut store = Store::new(&engine, m.0);

debug!("instantiating instance");
let i = linker
Expand Down Expand Up @@ -262,19 +262,9 @@ impl Instance for Wasi {
Ok(())
}

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
fn wait(&self, waiter: &Wait) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});

Ok(())
waiter.set_up_exit_code_wait(code)
}
}

Expand All @@ -285,6 +275,7 @@ mod wasitest {
use std::sync::mpsc::channel;
use std::time::Duration;

use containerd_shim_wasm::sandbox::instance::Wait;
use oci_spec::runtime::{ProcessBuilder, RootBuilder, SpecBuilder};
use tempfile::tempdir;

Expand Down Expand Up @@ -366,9 +357,8 @@ mod wasitest {

let w = wasi.clone();
let (tx, rx) = channel();
thread::spawn(move || {
w.wait(tx).unwrap();
});
let waiter = Wait::new(tx);
w.wait(&waiter).unwrap();

let res = match rx.recv_timeout(Duration::from_secs(10)) {
Ok(res) => res,
Expand Down

0 comments on commit 74c08ce

Please sign in to comment.