Skip to content

Commit 7b5f017

Browse files
committed
[api] use Rc for Tasks in session
1 parent 7eff364 commit 7b5f017

File tree

3 files changed

+52
-48
lines changed

3 files changed

+52
-48
lines changed

src/client/client.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,27 @@ use std::error::Error;
22
use std::net::SocketAddr;
33

44
use CLIENT_PROTOCOL_VERSION;
5-
use common::wrapped::WrappedRcRefCell;
65
use super::session::Session;
76
use super::communicator::Communicator;
7+
use std::rc::Rc;
88

99
pub struct Client {
10-
comm: WrappedRcRefCell<Communicator>,
10+
comm: Rc<Communicator>,
1111
}
1212

1313
impl Client {
1414
pub fn new(scheduler: SocketAddr) -> Result<Self, Box<Error>> {
15-
let comm = WrappedRcRefCell::wrap(Communicator::new(scheduler, CLIENT_PROTOCOL_VERSION)?);
15+
let comm = Rc::new(Communicator::new(scheduler, CLIENT_PROTOCOL_VERSION)?);
1616

1717
Ok(Client { comm })
1818
}
1919

2020
pub fn new_session(&self) -> Result<Session, Box<Error>> {
21-
let session_id = self.comm.get_mut().new_session()?;
21+
let session_id = self.comm.new_session()?;
2222
Ok(Session::new(session_id, self.comm.clone()))
2323
}
2424

2525
pub fn terminate_server(&self) -> Result<(), Box<Error>> {
26-
self.comm.get_mut().terminate_server()
26+
self.comm.terminate_server()
2727
}
2828
}

src/client/communicator.rs

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@ use std::error::Error;
55
use common::rpc::new_rpc_system;
66
use capnp_rpc::rpc_twoparty_capnp;
77
use futures::Future;
8-
use std::cell::Ref;
98

109
use super::task::Task;
1110
use common::id::{DataObjectId, TaskId};
1211
use common::convert::{FromCapnp, ToCapnp};
1312
use client::dataobject::DataObject;
13+
use std::cell::RefCell;
14+
use std::cell::RefMut;
1415

1516
pub struct Communicator {
16-
core: Core,
17+
core: RefCell<Core>,
1718
service: ::client_capnp::client_service::Client,
1819
}
1920

@@ -36,19 +37,22 @@ impl Communicator {
3637

3738
let service = core.run(request.send().promise)?.get()?.get_service()?;
3839

39-
Ok(Self { core, service })
40+
Ok(Self {
41+
core: RefCell::new(core),
42+
service,
43+
})
4044
}
4145

42-
pub fn new_session(&mut self) -> Result<i32, Box<Error>> {
43-
let id: i32 = self.core
46+
pub fn new_session(&self) -> Result<i32, Box<Error>> {
47+
let id: i32 = self.comm()
4448
.run(self.service.new_session_request().send().promise)?
4549
.get()?
4650
.get_session_id();
4751

4852
Ok(id)
4953
}
50-
pub fn close_session(&mut self, id: i32) -> Result<(), Box<Error>> {
51-
self.core.run({
54+
pub fn close_session(&self, id: i32) -> Result<(), Box<Error>> {
55+
self.comm().run({
5256
let mut req = self.service.close_session_request();
5357
req.get().set_session_id(id);
5458
req.send().promise
@@ -57,13 +61,18 @@ impl Communicator {
5761
Ok(())
5862
}
5963

60-
pub fn submit<D>(&mut self, tasks: &[Ref<Task>], data_objects: &[D]) -> Result<(), Box<Error>>
64+
pub fn submit<T, D>(&self, tasks: &[T], data_objects: &[D]) -> Result<(), Box<Error>>
6165
where
66+
T: AsRef<Task>,
6267
D: AsRef<DataObject>,
6368
{
6469
let mut req = self.service.submit_request();
6570

66-
to_capnp_list!(req.get(), tasks, init_tasks);
71+
to_capnp_list!(
72+
req.get(),
73+
tasks.iter().map(|t| t.as_ref()).collect::<Vec<&Task>>(),
74+
init_tasks
75+
);
6776
to_capnp_list!(
6877
req.get(),
6978
data_objects
@@ -72,47 +81,47 @@ impl Communicator {
7281
.collect::<Vec<&DataObject>>(),
7382
init_objects
7483
);
75-
self.core.run(req.send().promise)?;
84+
self.comm().run(req.send().promise)?;
7685

7786
Ok(())
7887
}
7988

80-
pub fn unkeep(&mut self, objects: &[DataObjectId]) -> Result<(), Box<Error>> {
89+
pub fn unkeep(&self, objects: &[DataObjectId]) -> Result<(), Box<Error>> {
8190
let mut req = self.service.unkeep_request();
8291
to_capnp_list!(req.get(), objects, init_object_ids);
83-
self.core.run(req.send().promise)?;
92+
self.comm().run(req.send().promise)?;
8493
Ok(())
8594
}
8695

87-
pub fn wait(&mut self, tasks: &[TaskId], objects: &[DataObjectId]) -> Result<(), Box<Error>> {
96+
pub fn wait(&self, tasks: &[TaskId], objects: &[DataObjectId]) -> Result<(), Box<Error>> {
8897
let mut req = self.service.wait_request();
8998
to_capnp_list!(req.get(), tasks, init_task_ids);
9099
to_capnp_list!(req.get(), objects, init_object_ids);
91-
self.core.run(req.send().promise)?;
100+
self.comm().run(req.send().promise)?;
92101
Ok(())
93102
}
94103
pub fn wait_some(
95-
&mut self,
104+
&self,
96105
tasks: &[TaskId],
97106
objects: &[DataObjectId],
98107
) -> Result<(Vec<TaskId>, Vec<DataObjectId>), Box<Error>> {
99108
let mut req = self.service.wait_some_request();
100109
to_capnp_list!(req.get(), tasks, init_task_ids);
101110
to_capnp_list!(req.get(), objects, init_object_ids);
102-
let res = self.core.run(req.send().promise)?;
111+
let res = self.comm().run(req.send().promise)?;
103112

104113
Ok((
105114
from_capnp_list!(res.get()?, get_finished_tasks, TaskId),
106115
from_capnp_list!(res.get()?, get_finished_objects, DataObjectId),
107116
))
108117
}
109118

110-
pub fn fetch(&mut self, object_id: DataObjectId) -> Result<Vec<u8>, Box<Error>> {
119+
pub fn fetch(&self, object_id: DataObjectId) -> Result<Vec<u8>, Box<Error>> {
111120
let mut req = self.service.fetch_request();
112121
object_id.to_capnp(&mut req.get().get_id().unwrap());
113122
req.get().set_size(1024);
114123

115-
let response = self.core.run(req.send().promise)?;
124+
let response = self.comm().run(req.send().promise)?;
116125

117126
let reader = response.get()?;
118127
match reader.get_status().which()? {
@@ -124,9 +133,13 @@ impl Communicator {
124133
}
125134
}
126135

127-
pub fn terminate_server(&mut self) -> Result<(), Box<Error>> {
128-
self.core
136+
pub fn terminate_server(&self) -> Result<(), Box<Error>> {
137+
self.comm()
129138
.run(self.service.terminate_server_request().send().promise)?;
130139
Ok(())
131140
}
141+
142+
fn comm(&self) -> RefMut<Core> {
143+
self.core.borrow_mut()
144+
}
132145
}

src/client/session.rs

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use common::wrapped::WrappedRcRefCell;
2-
31
use super::communicator::Communicator;
42
use client::dataobject::DataObject;
53
use client::task::Task;
@@ -11,22 +9,22 @@ use common::id::TaskId;
119
use common::id::DataObjectId;
1210
use common::id::SId;
1311
use common::DataType;
14-
use std::cell::{Cell, Ref};
12+
use std::cell::Cell;
1513
use std::rc::Rc;
1614

1715
pub type DataObjectPtr = Rc<DataObject>;
18-
pub type TaskPtr = WrappedRcRefCell<Task>;
16+
pub type TaskPtr = Rc<Task>;
1917

2018
pub struct Session {
2119
pub id: i32,
22-
comm: WrappedRcRefCell<Communicator>,
20+
comm: Rc<Communicator>,
2321
tasks: Vec<TaskPtr>,
2422
data_objects: Vec<DataObjectPtr>,
2523
id_counter: i32,
2624
}
2725

2826
impl Session {
29-
pub fn new(id: i32, comm: WrappedRcRefCell<Communicator>) -> Self {
27+
pub fn new(id: i32, comm: Rc<Communicator>) -> Self {
3028
debug!("Session {} created", id);
3129

3230
Session {
@@ -39,13 +37,7 @@ impl Session {
3937
}
4038

4139
pub fn submit(&mut self) -> Result<(), Box<Error>> {
42-
self.comm.get_mut().submit(
43-
&self.tasks
44-
.iter()
45-
.map(|t| t.get())
46-
.collect::<Vec<Ref<Task>>>(),
47-
&self.data_objects,
48-
)?;
40+
self.comm.submit(&self.tasks, &self.data_objects)?;
4941
self.tasks.clear();
5042
self.data_objects.clear();
5143

@@ -54,13 +46,12 @@ impl Session {
5446

5547
pub fn unkeep(&mut self, objects: &[DataObjectPtr]) -> Result<(), Box<Error>> {
5648
self.comm
57-
.get_mut()
5849
.unkeep(&objects.iter().map(|o| o.id).collect::<Vec<DataObjectId>>())
5950
}
6051

6152
pub fn wait(&mut self, tasks: &[TaskPtr], objects: &[DataObjectPtr]) -> Result<(), Box<Error>> {
62-
self.comm.get_mut().wait(
63-
&tasks.iter().map(|t| t.get().id).collect::<Vec<TaskId>>(),
53+
self.comm.wait(
54+
&tasks.iter().map(|t| t.id).collect::<Vec<TaskId>>(),
6455
&objects.iter().map(|o| o.id).collect::<Vec<DataObjectId>>(),
6556
)
6657
}
@@ -69,12 +60,12 @@ impl Session {
6960
tasks: &[TaskPtr],
7061
objects: &[DataObjectPtr],
7162
) -> Result<(Vec<TaskPtr>, Vec<DataObjectPtr>), Box<Error>> {
72-
let task_map: HashMap<TaskId, &TaskPtr> = tasks.iter().map(|t| (t.get().id, t)).collect();
63+
let task_map: HashMap<TaskId, &TaskPtr> = tasks.iter().map(|t| (t.id, t)).collect();
7364
let object_map: HashMap<DataObjectId, &DataObjectPtr> =
7465
objects.iter().map(|o| (o.id, o)).collect();
7566

76-
let (task_ids, object_ids) = self.comm.get_mut().wait_some(
77-
&tasks.iter().map(|t| t.get().id).collect::<Vec<TaskId>>(),
67+
let (task_ids, object_ids) = self.comm.wait_some(
68+
&tasks.iter().map(|t| t.id).collect::<Vec<TaskId>>(),
7869
&objects.iter().map(|o| o.id).collect::<Vec<DataObjectId>>(),
7970
)?;
8071

@@ -90,14 +81,14 @@ impl Session {
9081
))
9182
}
9283
pub fn wait_all(&mut self) -> Result<(), Box<Error>> {
93-
self.comm.get_mut().wait(
84+
self.comm.wait(
9485
&vec![TaskId::new(self.id, ::common_capnp::ALL_TASKS_ID)],
9586
&vec![],
9687
)
9788
}
9889

9990
pub fn fetch(&mut self, object: &DataObject) -> Result<Vec<u8>, Box<Error>> {
100-
self.comm.get_mut().fetch(object.id)
91+
self.comm.fetch(object.id)
10192
}
10293

10394
pub fn blob(&mut self, data: Vec<u8>) -> DataObjectPtr {
@@ -154,7 +145,7 @@ impl Session {
154145
attributes,
155146
};
156147

157-
let rc = WrappedRcRefCell::wrap(task);
148+
let rc = Rc::new(task);
158149
self.tasks.push(rc.clone());
159150

160151
rc
@@ -163,7 +154,7 @@ impl Session {
163154

164155
impl Drop for Session {
165156
fn drop(&mut self) {
166-
self.comm.get_mut().close_session(self.id).unwrap();
157+
self.comm.close_session(self.id).unwrap();
167158
debug!("Session {} destroyed", self.id);
168159
}
169160
}

0 commit comments

Comments
 (0)