Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Rust client #47

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ version = "0.3.0"
[workspace]

members = [
"rain_client",
"rain_core",
"rain_server",
"rain_task",
"rain_task_test",
]

[patch.crates-io]
rain_client = { path = "rain_client" }
rain_core = { path = "rain_core" }
rain_task = { path = "rain_task" }
31 changes: 31 additions & 0 deletions rain_client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "rain_client"
version = "0.3.0"

description = "Distributed computational framework for large-scale task-based pipelines. Client library in Rust."
# documentation = "https://docs.rs/rain_task/" # default docs.rs
homepage = "https://github.com/substantic/rain"
repository = "https://github.com/substantic/rain/"
readme = "README.md"
authors = [
"Stanislav Bohm <[email protected]>",
"Tomas Gavenciak <[email protected]>",
"Vojtech Cima <[email protected]>",
]
license = "MIT"

exclude = ["testing/**/*"]

[badges]
travis-ci = { repository = "substantic/rain", branch = "master" }
maintenance = { status = "actively-developed" }

[dependencies]
capnp-rpc = "0.8"
error-chain = "0.11"
futures = "0.1"
log = "0.4"
rain_core = "0.3.0"
tokio-core = "0.1"
serde = "1"
serde_json = "1"
73 changes: 73 additions & 0 deletions rain_client/src/client/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::error::Error;
use std::net::SocketAddr;

use super::communicator::Communicator;
use super::session::Session;
use rain_core::CLIENT_PROTOCOL_VERSION;
use std::rc::Rc;

pub struct Client {
comm: Rc<Communicator>,
}

impl Client {
pub fn new(scheduler: SocketAddr) -> Result<Self, Box<Error>> {
let comm = Rc::new(Communicator::new(scheduler, CLIENT_PROTOCOL_VERSION)?);

Ok(Client { comm })
}

pub fn new_session(&self) -> Result<Session, Box<Error>> {
let session_id = self.comm.new_session()?;
Ok(Session::new(session_id, self.comm.clone()))
}

pub fn terminate_server(&self) -> Result<(), Box<Error>> {
self.comm.terminate_server()
}
}

#[cfg(test)]
mod tests {
use super::super::localcluster::LocalCluster;
use super::super::tasks::CommonTasks;
use super::Client;
use super::Session;
use std::env;

#[allow(dead_code)]
struct TestContext {
cluster: LocalCluster,
client: Client,
session: Session,
}

fn ctx() -> TestContext {
let rain = env::var("RAIN_BINARY").unwrap();

let cluster = LocalCluster::new(&rain).unwrap();
let client = cluster.create_client().unwrap();
let session = client.new_session().unwrap();

TestContext {
cluster,
client,
session,
}
}

#[test]
fn concat() {
let mut ctx = ctx();
let a = ctx.session.blob(vec![1, 2, 3]);
let b = ctx.session.blob(vec![4, 5, 6]);
let c = ctx.session.concat(&[a, b]);
c.output().keep();
ctx.session.submit().unwrap();
ctx.session.wait(&[c.clone()], &[]).unwrap();
assert_eq!(
ctx.session.fetch(&c.output()).unwrap(),
vec![1, 2, 3, 4, 5, 6]
);
}
}
154 changes: 154 additions & 0 deletions rain_client/src/client/communicator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use capnp_rpc::rpc_twoparty_capnp;
use futures::Future;
use rain_core::comm::new_rpc_system;
use std::error::Error;
use std::net::SocketAddr;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;

use super::task::Task;
use client::dataobject::DataObject;
use rain_core::types::{DataObjectId, TaskId};
use rain_core::utils::{FromCapnp, ToCapnp};
use rain_core::{client_capnp, common_capnp, server_capnp};
use std::cell::{RefCell, RefMut};

pub struct Communicator {
core: RefCell<Core>,
service: client_capnp::client_service::Client,
}

impl Communicator {
pub fn new(scheduler: SocketAddr, version: i32) -> Result<Self, Box<Error>> {
let mut core = Core::new()?;
let handle = core.handle();
let stream = core.run(TcpStream::connect(&scheduler, &handle))?;
stream.set_nodelay(true)?;

debug!("Connection to server {} established", scheduler);

let mut rpc = Box::new(new_rpc_system(stream, None));
let bootstrap: server_capnp::server_bootstrap::Client =
rpc.bootstrap(rpc_twoparty_capnp::Side::Server);
handle.spawn(rpc.map_err(|err| panic!("RPC error: {}", err)));

let mut request = bootstrap.register_as_client_request();
request.get().set_version(version);

let service = core.run(request.send().promise)?.get()?.get_service()?;

Ok(Self {
core: RefCell::new(core),
service,
})
}

pub fn new_session(&self) -> Result<i32, Box<Error>> {
let id: i32 = self.comm()
.run(self.service.new_session_request().send().promise)?
.get()?
.get_session_id();

Ok(id)
}
pub fn close_session(&self, id: i32) -> Result<(), Box<Error>> {
self.comm().run({
let mut req = self.service.close_session_request();
req.get().set_session_id(id);
req.send().promise
})?;

Ok(())
}

pub fn submit<T, D>(&self, tasks: &[T], data_objects: &[D]) -> Result<(), Box<Error>>
where
T: AsRef<Task>,
D: AsRef<DataObject>,
{
let mut req = self.service.submit_request();

to_capnp_list!(
req.get(),
tasks.iter().map(|t| t.as_ref()).collect::<Vec<&Task>>(),
init_tasks
);
to_capnp_list!(
req.get(),
data_objects
.iter()
.map(|t| t.as_ref())
.collect::<Vec<&DataObject>>(),
init_objects
);
self.comm().run(req.send().promise)?;

Ok(())
}

pub fn unkeep(&self, objects: &[DataObjectId]) -> Result<(), Box<Error>> {
let mut req = self.service.unkeep_request();
to_capnp_list!(req.get(), objects, init_object_ids);
self.comm().run(req.send().promise)?;
Ok(())
}

pub fn wait(&self, tasks: &[TaskId], objects: &[DataObjectId]) -> Result<(), Box<Error>> {
let mut req = self.service.wait_request();
to_capnp_list!(req.get(), tasks, init_task_ids);
to_capnp_list!(req.get(), objects, init_object_ids);
self.comm().run(req.send().promise)?;
Ok(())
}
pub fn wait_some(
&self,
tasks: &[TaskId],
objects: &[DataObjectId],
) -> Result<(Vec<TaskId>, Vec<DataObjectId>), Box<Error>> {
let mut req = self.service.wait_some_request();
to_capnp_list!(req.get(), tasks, init_task_ids);
to_capnp_list!(req.get(), objects, init_object_ids);
let res = self.comm().run(req.send().promise)?;

Ok((
from_capnp_list!(res.get()?, get_finished_tasks, TaskId),
from_capnp_list!(res.get()?, get_finished_objects, DataObjectId),
))
}

pub fn fetch(&self, object_id: &DataObjectId) -> Result<Vec<u8>, Box<Error>> {
let mut req = self.service.fetch_request();
object_id.to_capnp(&mut req.get().get_id().unwrap());
req.get().set_size(1024);

let response = self.comm().run(req.send().promise)?;

// TODO: handle error states
let reader = response.get()?;
match reader.get_status().which()? {
common_capnp::fetch_result::status::Ok(()) => {
let data = reader.get_data()?;
Ok(Vec::from(data))
}
common_capnp::fetch_result::status::Removed(()) => {
print!("Removed");
Ok(vec![])
}
common_capnp::fetch_result::status::Error(err) => {
print!("Error: {:?}", err.unwrap().get_message());
Ok(vec![])
}
_ => bail!("Non-ok status"),
}
}

pub fn terminate_server(&self) -> Result<(), Box<Error>> {
self.comm()
.run(self.service.terminate_server_request().send().promise)?;
Ok(())
}

fn comm(&self) -> RefMut<Core> {
self.core.borrow_mut()
}
}
17 changes: 17 additions & 0 deletions rain_client/src/client/dataobject.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use rain_core::types::{DataObjectId, ObjectSpec};
use std::cell::Cell;

pub struct DataObject {
pub keep: Cell<bool>,
pub data: Option<Vec<u8>>,
pub spec: ObjectSpec,
}

impl DataObject {
pub fn keep(&self) {
self.keep.set(true);
}
pub fn id(&self) -> DataObjectId {
self.spec.id
}
}
48 changes: 48 additions & 0 deletions rain_client/src/client/localcluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use super::client::Client;
use std::error::Error;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::process::{Command, Stdio};

pub struct LocalCluster {
listen_addr: SocketAddr,
binary: PathBuf,
}

impl LocalCluster {
pub fn new(binary: &str) -> Result<Self, Box<Error>> {
let mut cluster = LocalCluster {
binary: PathBuf::from(binary),
listen_addr: SocketAddr::new("127.0.0.1".parse()?, 7210),
};
cluster.start()?;

Ok(cluster)
}

fn start(&mut self) -> Result<(), Box<Error>> {
Command::new(&self.binary)
.arg("start")
.arg("--listen")
.arg(self.listen_addr.to_string())
.arg("--simple")
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()?
.wait()?;

Ok(())
}

pub fn create_client(&self) -> Result<Client, Box<Error>> {
Client::new(self.listen_addr)
}
}

impl Drop for LocalCluster {
#![allow(unused_must_use)]
fn drop(&mut self) {
Client::new(self.listen_addr).unwrap().terminate_server();
}
}
14 changes: 14 additions & 0 deletions rain_client/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
pub mod client;
pub mod localcluster;
pub mod session;
pub mod tasks;

#[macro_use]
mod rpc;
mod communicator;
mod dataobject;
mod task;

pub use self::client::Client;
pub use self::localcluster::LocalCluster;
pub use self::session::Session;
Loading