Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redo wait() API. #88

Merged
merged 1 commit into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ pub trait Instance {
fn start(&self) -> Result<u32, Error>;
/// Send a signal to the instance
fn kill(&self, signal: u32) -> Result<(), Error>;
/// delete any reference to the instance
/// Delete any reference to the instance
/// This is called after the instance has exited.
fn delete(&self) -> Result<(), Error>;
/// wait for the instance to exit
/// The sender is used to send the exit code and time back to the caller
/// Wait for the instance to exit
/// The waiter is used to send the exit code and time back to the caller
/// Ideally this would just be a blocking call with a normal result, however
/// because of how this is called from a thread it causes issues with lifetimes of the trait implementer.
fn wait(&self, send: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error>;
fn wait(&self, waiter: &Wait) -> Result<(), Error>;
}
```

Expand Down
78 changes: 50 additions & 28 deletions crates/containerd-shim-wasm/src/sandbox/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,50 @@ pub trait Instance {
fn start(&self) -> Result<u32, Error>;
/// Send a signal to the instance
fn kill(&self, signal: u32) -> Result<(), Error>;
/// delete any reference to the instance
/// Delete any reference to the instance
/// This is called after the instance has exited.
fn delete(&self) -> Result<(), Error>;
/// wait for the instance to exit
/// The sender is used to send the exit code and time back to the caller
/// Ideally this would just be a blocking call with a normal result, however
/// because of how this is called from a thread it causes issues with lifetimes of the trait implementer.
fn wait(&self, send: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error>;
/// Set up waiting for the instance to exit
/// The Wait struct is used to send the exit code and time back to the
/// caller. The recipient is expected to call function
/// set_up_exit_code_wait() implemented by Wait to set up exit code
/// processing. Note that the "wait" function doesn't block, but
/// it sets up the waiting channel.
fn wait(&self, waiter: &Wait) -> Result<(), Error>;
}

/// This is used for waiting for the container process to exit and deliver the exit code to the caller.
/// Since the shim needs to provide the caller the process exit code, this struct wraps the required
/// thread setup to make the shims simpler.
pub struct Wait {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add docs for all these public type/functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added docs and also tried to think of a better name for the function.

tx: Sender<(u32, DateTime<Utc>)>,
}

impl Wait {
/// Create a new Wait struct with the provided sending endpoint of a channel.
pub fn new(sender: Sender<(u32, DateTime<Utc>)>) -> Self {
Wait { tx: sender }
}

/// This is called by the shim to create the thread to wait for the exit
/// code. When the child process exits, the shim will use the ExitCode
/// to signal the exit status to the caller. This function returns so that
/// the wait() function in the shim implementation API would not block.
pub fn set_up_exit_code_wait(&self, exit_code: Arc<ExitCode>) -> Result<(), Error> {
let sender = self.tx.clone();
let code = Arc::clone(&exit_code);
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
sender.send(ec).unwrap();
});

Ok(())
}
}

/// This is used for the "pause" container with cri and is a no-op instance implementation.
Expand Down Expand Up @@ -164,19 +200,8 @@ impl Instance for Nop {
fn delete(&self) -> Result<(), Error> {
Ok(())
}

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});
Ok(())
fn wait(&self, waiter: &Wait) -> Result<(), Error> {
waiter.set_up_exit_code_wait(self.exit_code.clone())
}
}

Expand All @@ -196,10 +221,9 @@ mod noptests {
let (tx, rx) = channel();

let n = nop.clone();
let waiter = Wait::new(tx);

thread::spawn(move || {
n.wait(tx).unwrap();
});
n.wait(&waiter).unwrap();

nop.kill(SIGKILL as u32)?;
let ec = rx.recv_timeout(Duration::from_secs(3)).unwrap();
Expand All @@ -213,10 +237,9 @@ mod noptests {
let (tx, rx) = channel();

let n = nop.clone();
let waiter = Wait::new(tx);

thread::spawn(move || {
n.wait(tx).unwrap();
});
n.wait(&waiter).unwrap();

nop.kill(SIGTERM as u32)?;
let ec = rx.recv_timeout(Duration::from_secs(3)).unwrap();
Expand All @@ -230,10 +253,9 @@ mod noptests {
let (tx, rx) = channel();

let n = nop.clone();
let waiter = Wait::new(tx);

thread::spawn(move || {
n.wait(tx).unwrap();
});
n.wait(&waiter).unwrap();

nop.kill(SIGINT as u32)?;
let ec = rx.recv_timeout(Duration::from_secs(3)).unwrap();
Expand Down
14 changes: 8 additions & 6 deletions crates/containerd-shim-wasm/src/sandbox/shim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::thread;

use super::instance::{EngineGetter, Instance, InstanceConfig, Nop};
use super::instance::{EngineGetter, Instance, InstanceConfig, Nop, Wait};
use super::{oci, Error, SandboxService};
use chrono::{DateTime, Utc};
use containerd_shim::{
Expand Down Expand Up @@ -147,11 +147,11 @@ where
})
}

fn wait(&self, send: Sender<(u32, DateTime<Utc>)>) -> Result<()> {
fn wait(&self, waiter: &Wait) -> Result<()> {
if self.instance.is_some() {
return self.instance.as_ref().unwrap().wait(send);
return self.instance.as_ref().unwrap().wait(waiter);
}
self.base.as_ref().unwrap().wait(send)
self.base.as_ref().unwrap().wait(waiter)
}
}

Expand Down Expand Up @@ -1008,7 +1008,8 @@ where
});

let (tx, rx) = channel::<(u32, DateTime<Utc>)>();
i.wait(tx)?;
let waiter = Wait::new(tx);
i.wait(&waiter)?;

let status = i.status.clone();

Expand Down Expand Up @@ -1130,7 +1131,8 @@ where
}

let (tx, rx) = channel::<(u32, DateTime<Utc>)>();
i.wait(tx)?;
let waiter = Wait::new(tx);
i.wait(&waiter)?;

let code = rx.recv().unwrap();
debug!("wait done: {:?}", req);
Expand Down
19 changes: 5 additions & 14 deletions crates/containerd-shim-wasmedge/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::io::prelude::*;
use std::io::ErrorKind;
use std::os::unix::io::{IntoRawFd, RawFd};
use std::sync::{
mpsc::Sender,
{Arc, Condvar, Mutex},
};
use std::thread;

use anyhow::{bail, Context, Result};
use chrono::{DateTime, Utc};
use containerd_shim_wasm::sandbox::error::Error;
use containerd_shim_wasm::sandbox::instance::Wait;
use containerd_shim_wasm::sandbox::{EngineGetter, Instance, InstanceConfig};
use libc::{dup2, SIGINT, SIGKILL, STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
use log::{debug, error};
Expand Down Expand Up @@ -302,19 +302,9 @@ impl Instance for Wasi {
Ok(())
}

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
fn wait(&self, waiter: &Wait) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});

Ok(())
waiter.set_up_exit_code_wait(code)
}
}

Expand Down Expand Up @@ -429,7 +419,8 @@ mod wasitest {
wasi.start()?;

let (tx, rx) = channel();
wasi.wait(tx).unwrap();
let waiter = Wait::new(tx);
wasi.wait(&waiter).unwrap();

let res = match rx.recv_timeout(Duration::from_secs(10)) {
Ok(res) => Ok(res),
Expand Down
24 changes: 7 additions & 17 deletions crates/containerd-shim-wasmtime/src/instance.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::fs::OpenOptions;
use std::path::Path;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

use anyhow::Context;
use chrono::{DateTime, Utc};
use containerd_shim_wasm::sandbox::error::Error;
use containerd_shim_wasm::sandbox::exec;
use containerd_shim_wasm::sandbox::instance::Wait;
use containerd_shim_wasm::sandbox::oci;
use containerd_shim_wasm::sandbox::{EngineGetter, Instance, InstanceConfig};
use log::{debug, error};
Expand Down Expand Up @@ -174,7 +174,7 @@ impl Instance for Wasi {
let m = prepare_module(engine.clone(), &spec, stdin, stdout, stderr)
.map_err(|e| Error::Others(format!("error setting up module: {}", e)))?;

let mut store = Store::new(&engine, m.0);
let mut store = Store::new(&engine, m.0);

debug!("instantiating instance");
let i = linker
Expand Down Expand Up @@ -262,19 +262,9 @@ impl Instance for Wasi {
Ok(())
}

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
fn wait(&self, waiter: &Wait) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});

Ok(())
waiter.set_up_exit_code_wait(code)
}
}

Expand All @@ -285,6 +275,7 @@ mod wasitest {
use std::sync::mpsc::channel;
use std::time::Duration;

use containerd_shim_wasm::sandbox::instance::Wait;
use oci_spec::runtime::{ProcessBuilder, RootBuilder, SpecBuilder};
use tempfile::tempdir;

Expand Down Expand Up @@ -366,9 +357,8 @@ mod wasitest {

let w = wasi.clone();
let (tx, rx) = channel();
thread::spawn(move || {
w.wait(tx).unwrap();
});
let waiter = Wait::new(tx);
w.wait(&waiter).unwrap();

let res = match rx.recv_timeout(Duration::from_secs(10)) {
Ok(res) => res,
Expand Down