Skip to content

Commit

Permalink
[api] use Rc for Tasks and Communicator in session
Browse files Browse the repository at this point in the history
  • Loading branch information
Kobzol committed May 4, 2018
1 parent 261ee87 commit 34833b0
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 48 deletions.
21 changes: 21 additions & 0 deletions src/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,27 @@ fn init_log() {
fn main() {
init_log();

if std::env::args().collect::<Vec<String>>()[1] == "RAINTEST" {
use client::tasks::CommonTasks;

let cluster = ::start::localcluster::LocalCluster::new_simple(8000, 8001).unwrap();

let client = cluster.create_client().unwrap();
let mut session = client.new_session().unwrap();

let hello = session.blob("Hello ".into());
let world = session.blob("world!".into());
let task = session.concat(&vec![hello, world]);
let output = task.output().unwrap();
output.keep();

session.submit().unwrap();
session.wait_all().unwrap();

let result = String::from_utf8(session.fetch(&output).unwrap()).unwrap();
println!("{}", result);
}

// We do not use clap macro to build parser,
// since it cannot handle "-" in name of long arguments
let args = App::new("Rain")
Expand Down
10 changes: 5 additions & 5 deletions src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ use std::error::Error;
use std::net::SocketAddr;

use CLIENT_PROTOCOL_VERSION;
use common::wrapped::WrappedRcRefCell;
use super::session::Session;
use super::communicator::Communicator;
use std::rc::Rc;

pub struct Client {
comm: WrappedRcRefCell<Communicator>,
comm: Rc<Communicator>,
}

impl Client {
pub fn new(scheduler: SocketAddr) -> Result<Self, Box<Error>> {
let comm = WrappedRcRefCell::wrap(Communicator::new(scheduler, CLIENT_PROTOCOL_VERSION)?);
let comm = Rc::new(Communicator::new(scheduler, CLIENT_PROTOCOL_VERSION)?);

Ok(Client { comm })
}

pub fn new_session(&self) -> Result<Session, Box<Error>> {
let session_id = self.comm.get_mut().new_session()?;
let session_id = self.comm.new_session()?;
Ok(Session::new(session_id, self.comm.clone()))
}

pub fn terminate_server(&self) -> Result<(), Box<Error>> {
self.comm.get_mut().terminate_server()
self.comm.terminate_server()
}
}
53 changes: 33 additions & 20 deletions src/client/communicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ use std::error::Error;
use common::rpc::new_rpc_system;
use capnp_rpc::rpc_twoparty_capnp;
use futures::Future;
use std::cell::Ref;

use super::task::Task;
use common::id::{DataObjectId, TaskId};
use common::convert::{FromCapnp, ToCapnp};
use client::dataobject::DataObject;
use std::cell::RefCell;
use std::cell::RefMut;

pub struct Communicator {
core: Core,
core: RefCell<Core>,
service: ::client_capnp::client_service::Client,
}

Expand All @@ -36,19 +37,22 @@ impl Communicator {

let service = core.run(request.send().promise)?.get()?.get_service()?;

Ok(Self { core, service })
Ok(Self {
core: RefCell::new(core),
service,
})
}

pub fn new_session(&mut self) -> Result<i32, Box<Error>> {
let id: i32 = self.core
pub fn new_session(&self) -> Result<i32, Box<Error>> {
let id: i32 = self.comm()
.run(self.service.new_session_request().send().promise)?
.get()?
.get_session_id();

Ok(id)
}
pub fn close_session(&mut self, id: i32) -> Result<(), Box<Error>> {
self.core.run({
pub fn close_session(&self, id: i32) -> Result<(), Box<Error>> {
self.comm().run({
let mut req = self.service.close_session_request();
req.get().set_session_id(id);
req.send().promise
Expand All @@ -57,13 +61,18 @@ impl Communicator {
Ok(())
}

pub fn submit<D>(&mut self, tasks: &[Ref<Task>], data_objects: &[D]) -> Result<(), Box<Error>>
pub fn submit<T, D>(&self, tasks: &[T], data_objects: &[D]) -> Result<(), Box<Error>>
where
T: AsRef<Task>,
D: AsRef<DataObject>,
{
let mut req = self.service.submit_request();

to_capnp_list!(req.get(), tasks, init_tasks);
to_capnp_list!(
req.get(),
tasks.iter().map(|t| t.as_ref()).collect::<Vec<&Task>>(),
init_tasks
);
to_capnp_list!(
req.get(),
data_objects
Expand All @@ -72,47 +81,47 @@ impl Communicator {
.collect::<Vec<&DataObject>>(),
init_objects
);
self.core.run(req.send().promise)?;
self.comm().run(req.send().promise)?;

Ok(())
}

pub fn unkeep(&mut self, objects: &[DataObjectId]) -> Result<(), Box<Error>> {
pub fn unkeep(&self, objects: &[DataObjectId]) -> Result<(), Box<Error>> {
let mut req = self.service.unkeep_request();
to_capnp_list!(req.get(), objects, init_object_ids);
self.core.run(req.send().promise)?;
self.comm().run(req.send().promise)?;
Ok(())
}

pub fn wait(&mut self, tasks: &[TaskId], objects: &[DataObjectId]) -> Result<(), Box<Error>> {
pub fn wait(&self, tasks: &[TaskId], objects: &[DataObjectId]) -> Result<(), Box<Error>> {
let mut req = self.service.wait_request();
to_capnp_list!(req.get(), tasks, init_task_ids);
to_capnp_list!(req.get(), objects, init_object_ids);
self.core.run(req.send().promise)?;
self.comm().run(req.send().promise)?;
Ok(())
}
pub fn wait_some(
&mut self,
&self,
tasks: &[TaskId],
objects: &[DataObjectId],
) -> Result<(Vec<TaskId>, Vec<DataObjectId>), Box<Error>> {
let mut req = self.service.wait_some_request();
to_capnp_list!(req.get(), tasks, init_task_ids);
to_capnp_list!(req.get(), objects, init_object_ids);
let res = self.core.run(req.send().promise)?;
let res = self.comm().run(req.send().promise)?;

Ok((
from_capnp_list!(res.get()?, get_finished_tasks, TaskId),
from_capnp_list!(res.get()?, get_finished_objects, DataObjectId),
))
}

pub fn fetch(&mut self, object_id: DataObjectId) -> Result<Vec<u8>, Box<Error>> {
pub fn fetch(&self, object_id: DataObjectId) -> Result<Vec<u8>, Box<Error>> {
let mut req = self.service.fetch_request();
object_id.to_capnp(&mut req.get().get_id().unwrap());
req.get().set_size(1024);

let response = self.core.run(req.send().promise)?;
let response = self.comm().run(req.send().promise)?;

let reader = response.get()?;
match reader.get_status().which()? {
Expand All @@ -124,9 +133,13 @@ impl Communicator {
}
}

pub fn terminate_server(&mut self) -> Result<(), Box<Error>> {
self.core
pub fn terminate_server(&self) -> Result<(), Box<Error>> {
self.comm()
.run(self.service.terminate_server_request().send().promise)?;
Ok(())
}

fn comm(&self) -> RefMut<Core> {
self.core.borrow_mut()
}
}
37 changes: 14 additions & 23 deletions src/client/session.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use common::wrapped::WrappedRcRefCell;

use super::communicator::Communicator;
use client::dataobject::DataObject;
use client::task::Task;
Expand All @@ -11,22 +9,22 @@ use common::id::TaskId;
use common::id::DataObjectId;
use common::id::SId;
use common::DataType;
use std::cell::{Cell, Ref};
use std::cell::Cell;
use std::rc::Rc;

pub type DataObjectPtr = Rc<DataObject>;
pub type TaskPtr = WrappedRcRefCell<Task>;
pub type TaskPtr = Rc<Task>;

pub struct Session {
pub id: i32,
comm: WrappedRcRefCell<Communicator>,
comm: Rc<Communicator>,
tasks: Vec<TaskPtr>,
data_objects: Vec<DataObjectPtr>,
id_counter: i32,
}

impl Session {
pub fn new(id: i32, comm: WrappedRcRefCell<Communicator>) -> Self {
pub fn new(id: i32, comm: Rc<Communicator>) -> Self {
debug!("Session {} created", id);

Session {
Expand All @@ -39,13 +37,7 @@ impl Session {
}

pub fn submit(&mut self) -> Result<(), Box<Error>> {
self.comm.get_mut().submit(
&self.tasks
.iter()
.map(|t| t.get())
.collect::<Vec<Ref<Task>>>(),
&self.data_objects,
)?;
self.comm.submit(&self.tasks, &self.data_objects)?;
self.tasks.clear();
self.data_objects.clear();

Expand All @@ -54,13 +46,12 @@ impl Session {

pub fn unkeep(&mut self, objects: &[DataObjectPtr]) -> Result<(), Box<Error>> {
self.comm
.get_mut()
.unkeep(&objects.iter().map(|o| o.id).collect::<Vec<DataObjectId>>())
}

pub fn wait(&mut self, tasks: &[TaskPtr], objects: &[DataObjectPtr]) -> Result<(), Box<Error>> {
self.comm.get_mut().wait(
&tasks.iter().map(|t| t.get().id).collect::<Vec<TaskId>>(),
self.comm.wait(
&tasks.iter().map(|t| t.id).collect::<Vec<TaskId>>(),
&objects.iter().map(|o| o.id).collect::<Vec<DataObjectId>>(),
)
}
Expand All @@ -69,12 +60,12 @@ impl Session {
tasks: &[TaskPtr],
objects: &[DataObjectPtr],
) -> Result<(Vec<TaskPtr>, Vec<DataObjectPtr>), Box<Error>> {
let task_map: HashMap<TaskId, &TaskPtr> = tasks.iter().map(|t| (t.get().id, t)).collect();
let task_map: HashMap<TaskId, &TaskPtr> = tasks.iter().map(|t| (t.id, t)).collect();
let object_map: HashMap<DataObjectId, &DataObjectPtr> =
objects.iter().map(|o| (o.id, o)).collect();

let (task_ids, object_ids) = self.comm.get_mut().wait_some(
&tasks.iter().map(|t| t.get().id).collect::<Vec<TaskId>>(),
let (task_ids, object_ids) = self.comm.wait_some(
&tasks.iter().map(|t| t.id).collect::<Vec<TaskId>>(),
&objects.iter().map(|o| o.id).collect::<Vec<DataObjectId>>(),
)?;

Expand All @@ -90,14 +81,14 @@ impl Session {
))
}
pub fn wait_all(&mut self) -> Result<(), Box<Error>> {
self.comm.get_mut().wait(
self.comm.wait(
&vec![TaskId::new(self.id, ::common_capnp::ALL_TASKS_ID)],
&vec![],
)
}

pub fn fetch(&mut self, object: &DataObject) -> Result<Vec<u8>, Box<Error>> {
self.comm.get_mut().fetch(object.id)
self.comm.fetch(object.id)
}

pub fn blob(&mut self, data: Vec<u8>) -> DataObjectPtr {
Expand Down Expand Up @@ -154,7 +145,7 @@ impl Session {
attributes,
};

let rc = WrappedRcRefCell::wrap(task);
let rc = Rc::new(task);
self.tasks.push(rc.clone());

rc
Expand All @@ -163,7 +154,7 @@ impl Session {

impl Drop for Session {
fn drop(&mut self) {
self.comm.get_mut().close_session(self.id).unwrap();
self.comm.close_session(self.id).unwrap();
debug!("Session {} destroyed", self.id);
}
}

0 comments on commit 34833b0

Please sign in to comment.