Skip to content

Commit

Permalink
[api] use rain_core in rust client
Browse files Browse the repository at this point in the history
  • Loading branch information
Kobzol committed Jul 7, 2018
1 parent 7b5f017 commit 780ed93
Show file tree
Hide file tree
Showing 36 changed files with 381 additions and 278 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ version = "0.3.0"
[workspace]

members = [
"rain_client",
"rain_client_test",
"rain_core",
"rain_server",
"rain_task",
"rain_task_test",
]

[patch.crates-io]
rain_client = { path = "rain_client" }
rain_core = { path = "rain_core" }
rain_task = { path = "rain_task" }
31 changes: 31 additions & 0 deletions rain_client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "rain_client"
version = "0.3.0"

description = "Distributed computational framework for large-scale task-based pipelines. Client library in Rust."
# documentation = "https://docs.rs/rain_task/" # default docs.rs
homepage = "https://github.com/substantic/rain"
repository = "https://github.com/substantic/rain/"
readme = "README.md"
authors = [
"Stanislav Bohm <[email protected]>",
"Tomas Gavenciak <[email protected]>",
"Vojtech Cima <[email protected]>",
]
license = "MIT"

exclude = ["testing/**/*"]

[badges]
travis-ci = { repository = "substantic/rain", branch = "master" }
maintenance = { status = "actively-developed" }

[dependencies]
capnp-rpc = "0.8"
error-chain = "0.11"
futures = "0.1"
log = "0.4"
rain_core = "0.3.0"
tokio-core = "0.1"
serde = "1"
serde_json = "1"
4 changes: 2 additions & 2 deletions src/client/client.rs → rain_client/src/client/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::error::Error;
use std::net::SocketAddr;

use CLIENT_PROTOCOL_VERSION;
use super::session::Session;
use super::communicator::Communicator;
use super::session::Session;
use rain_core::CLIENT_PROTOCOL_VERSION;
use std::rc::Rc;

pub struct Client {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use tokio_core::reactor::Core;
use std::net::SocketAddr;
use tokio_core::net::TcpStream;
use std::error::Error;
use common::rpc::new_rpc_system;
use capnp_rpc::rpc_twoparty_capnp;
use futures::Future;
use rain_core::comm::new_rpc_system;
use std::error::Error;
use std::net::SocketAddr;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;

use super::task::Task;
use common::id::{DataObjectId, TaskId};
use common::convert::{FromCapnp, ToCapnp};
use client::dataobject::DataObject;
use std::cell::RefCell;
use std::cell::RefMut;
use rain_core::types::{DataObjectId, TaskId};
use rain_core::utils::{FromCapnp, ToCapnp};
use rain_core::{client_capnp, common_capnp, server_capnp};
use std::cell::{RefCell, RefMut};

pub struct Communicator {
core: RefCell<Core>,
service: ::client_capnp::client_service::Client,
service: client_capnp::client_service::Client,
}

impl Communicator {
Expand All @@ -28,7 +28,7 @@ impl Communicator {
debug!("Connection to server {} established", scheduler);

let mut rpc = Box::new(new_rpc_system(stream, None));
let bootstrap: ::server_capnp::server_bootstrap::Client =
let bootstrap: server_capnp::server_bootstrap::Client =
rpc.bootstrap(rpc_twoparty_capnp::Side::Server);
handle.spawn(rpc.map_err(|err| panic!("RPC error: {}", err)));

Expand Down Expand Up @@ -116,19 +116,28 @@ impl Communicator {
))
}

pub fn fetch(&self, object_id: DataObjectId) -> Result<Vec<u8>, Box<Error>> {
pub fn fetch(&self, object_id: &DataObjectId) -> Result<Vec<u8>, Box<Error>> {
let mut req = self.service.fetch_request();
object_id.to_capnp(&mut req.get().get_id().unwrap());
req.get().set_size(1024);

let response = self.comm().run(req.send().promise)?;

// TODO: handle error states
let reader = response.get()?;
match reader.get_status().which()? {
::common_capnp::fetch_result::status::Ok(()) => {
common_capnp::fetch_result::status::Ok(()) => {
let data = reader.get_data()?;
Ok(Vec::from(data))
}
common_capnp::fetch_result::status::Removed(()) => {
print!("Removed");
Ok(vec![])
}
common_capnp::fetch_result::status::Error(err) => {
print!("Error: {:?}", err.unwrap().get_message());
Ok(vec![])
}
_ => bail!("Non-ok status"),
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use common::Attributes;
use common::id::DataObjectId;
use common::DataType;
use rain_core::types::{DataObjectId, ObjectSpec};
use std::cell::Cell;

pub struct DataObject {
pub id: DataObjectId,
pub label: String,
pub keep: Cell<bool>,
pub data: Option<Vec<u8>>,
pub attributes: Attributes,
pub data_type: DataType,
pub spec: ObjectSpec,
}

impl DataObject {
pub fn keep(&self) {
self.keep.set(true);
}
pub fn id(&self) -> DataObjectId {
self.spec.id
}
}
File renamed without changes.
46 changes: 46 additions & 0 deletions rain_client/src/client/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use super::task::Task;
use client::dataobject::DataObject;
use rain_core::client_capnp;
use rain_core::utils::ToCapnp;
use serde_json;

macro_rules! to_capnp_list {
($builder:expr, $items:expr, $name:ident) => {{
let mut builder = $builder.$name($items.len() as u32);
for (i, obj) in $items.iter().enumerate() {
obj.to_capnp(&mut builder.reborrow().get(i as u32));
}
}};
}
macro_rules! from_capnp_list {
($builder:expr, $items:ident, $obj:ident) => {{
$builder
.$items()?
.iter()
.map(|item| $obj::from_capnp(&item))
.collect()
}};
}

impl<'a> ToCapnp<'a> for Task {
type Builder = client_capnp::task::Builder<'a>;

fn to_capnp(&self, builder: &mut Self::Builder) {
builder.set_spec(&serde_json::to_string(&self.spec).unwrap());
}
}
impl<'a> ToCapnp<'a> for DataObject {
type Builder = client_capnp::data_object::Builder<'a>;

fn to_capnp(&self, builder: &mut Self::Builder) {
builder.set_spec(&serde_json::to_string(&self.spec).unwrap());
builder.set_keep(self.keep.get());

if let &Some(ref data) = &self.data {
builder.set_data(&data);
builder.set_has_data(true);
} else {
builder.set_has_data(false);
}
}
}
79 changes: 43 additions & 36 deletions src/client/session.rs → rain_client/src/client/session.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use super::communicator::Communicator;
use client::dataobject::DataObject;
use client::task::Task;
use rain_core::common_capnp;
use rain_core::types::{DataObjectId, DataType, ObjectSpec, Resources, SId, TaskId, TaskSpec,
TaskSpecInput, UserAttrs};
use serde_json;
use std::cell::Cell;
use std::collections::HashMap;
use std::error::Error;
use client::task::TaskInput;
use common::Attributes;
use common::id::TaskId;
use common::id::DataObjectId;
use common::id::SId;
use common::DataType;
use std::cell::Cell;
use std::rc::Rc;

pub type DataObjectPtr = Rc<DataObject>;
Expand Down Expand Up @@ -45,28 +43,36 @@ impl Session {
}

pub fn unkeep(&mut self, objects: &[DataObjectPtr]) -> Result<(), Box<Error>> {
self.comm
.unkeep(&objects.iter().map(|o| o.id).collect::<Vec<DataObjectId>>())
self.comm.unkeep(&objects
.iter()
.map(|o| o.id())
.collect::<Vec<DataObjectId>>())
}

pub fn wait(&mut self, tasks: &[TaskPtr], objects: &[DataObjectPtr]) -> Result<(), Box<Error>> {
self.comm.wait(
&tasks.iter().map(|t| t.id).collect::<Vec<TaskId>>(),
&objects.iter().map(|o| o.id).collect::<Vec<DataObjectId>>(),
&tasks.iter().map(|t| t.id()).collect::<Vec<TaskId>>(),
&objects
.iter()
.map(|o| o.id())
.collect::<Vec<DataObjectId>>(),
)
}
pub fn wait_some(
&mut self,
tasks: &[TaskPtr],
objects: &[DataObjectPtr],
) -> Result<(Vec<TaskPtr>, Vec<DataObjectPtr>), Box<Error>> {
let task_map: HashMap<TaskId, &TaskPtr> = tasks.iter().map(|t| (t.id, t)).collect();
let task_map: HashMap<TaskId, &TaskPtr> = tasks.iter().map(|t| (t.id(), t)).collect();
let object_map: HashMap<DataObjectId, &DataObjectPtr> =
objects.iter().map(|o| (o.id, o)).collect();
objects.iter().map(|o| (o.id(), o)).collect();

let (task_ids, object_ids) = self.comm.wait_some(
&tasks.iter().map(|t| t.id).collect::<Vec<TaskId>>(),
&objects.iter().map(|o| o.id).collect::<Vec<DataObjectId>>(),
&tasks.iter().map(|t| t.id()).collect::<Vec<TaskId>>(),
&objects
.iter()
.map(|o| o.id())
.collect::<Vec<DataObjectId>>(),
)?;

Ok((
Expand All @@ -82,13 +88,13 @@ impl Session {
}
pub fn wait_all(&mut self) -> Result<(), Box<Error>> {
self.comm.wait(
&vec![TaskId::new(self.id, ::common_capnp::ALL_TASKS_ID)],
&vec![TaskId::new(self.id, common_capnp::ALL_TASKS_ID)],
&vec![],
)
}

pub fn fetch(&mut self, object: &DataObject) -> Result<Vec<u8>, Box<Error>> {
self.comm.fetch(object.id)
pub fn fetch(&mut self, o: &DataObjectPtr) -> Result<Vec<u8>, Box<Error>> {
self.comm.fetch(&o.id())
}

pub fn blob(&mut self, data: Vec<u8>) -> DataObjectPtr {
Expand All @@ -102,13 +108,17 @@ impl Session {
DataObjectId::new(self.id, id)
}
pub(crate) fn create_object(&mut self, label: String, data: Option<Vec<u8>>) -> DataObjectPtr {
let object = DataObject {
let spec = ObjectSpec {
id: self.create_object_id(),
keep: Cell::new(false),
label,
data,
attributes: Attributes::new(),
data_type: DataType::Blob,
content_type: "".to_owned(),
user: UserAttrs::new(),
};
let object = DataObject {
keep: Cell::new(false),
data,
spec,
};
let rc = Rc::new(object);
self.data_objects.push(rc.clone());
Expand All @@ -124,27 +134,24 @@ impl Session {
}
pub fn create_task(
&mut self,
command: String,
inputs: Vec<TaskInput>,
task_type: String,
inputs: Vec<TaskSpecInput>,
outputs: Vec<DataObjectPtr>,
config: HashMap<String, String>,
cpus: i32,
cpus: u32,
) -> TaskPtr {
let mut attributes = Attributes::new();
attributes.set("config", config).unwrap();

let mut resources: HashMap<String, i32> = HashMap::new();
resources.insert("cpus".to_owned(), cpus);
attributes.set("resources", resources).unwrap();

let task = Task {
let spec = TaskSpec {
id: self.create_task_id(),
command,
inputs,
outputs,
attributes,
task_type,
outputs: outputs.iter().map(|o| o.id()).collect(),
config: Some(serde_json::to_value(&config).unwrap()),
resources: Resources { cpus },
user: UserAttrs::new(),
};

let task = Task { spec, outputs };

let rc = Rc::new(task);
self.tasks.push(rc.clone());

Expand Down
Loading

0 comments on commit 780ed93

Please sign in to comment.