-
-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor IPC API implementation #116
base: master
Are you sure you want to change the base?
Conversation
This was the last remaining thing on my todo list to handle properly. This is actually the kind of implementation I was looking for, but didn't get around to yet. I would prefer a subscription object instead, but yeah, this is overall looking quite good! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love where this concept is going. This implementation looks good. Thanks for the contribution!
P.S. The build is failing because of the python bindings. You can ignore the build failure.
rust/driver-ipc/src/client.rs
Outdated
let client = client.clone(); | ||
let command_tx = command_tx.clone(); | ||
let notify = notify.clone(); | ||
task::spawn(async move { receive_command(&client, command_tx, ¬ify).await }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know I didn't have this in the original code, but I think we should handle the case where this errors out. Basically, what I'm envisioning here is that read errors out, but it's then confusing to the library user on why they aren't receiving any messages (e.g. on their event stream).
The senders should be fine since the caller will receive errors if they try to send, but since we still hold onto a broadcast sender, the subscribers won't receive a None
when the stream should be closed.
I'm thinking that we should completely drop the broadcast sender on error to solve this.
Maybe sometime we can handle re-connections after the server disconnected, but we don't need to think about that right now (it's not like the client can't just recreate the driver instance anyways).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still thinking if maybe we should add a field, method, callback, or something somewhere to indicate the connection status of the client. Would probably make it easier to know what's going on
Edit: hmm, maybe something like on_*
kind of callbacks. on_connect
, on_disconnect
, etc. what do you think? If you agree, which callbacks do you think would be suitable? At the moment, I feel a on_disconnect
is all that's needed
(You don't have to implement anything I said in this comment on this pr, I'm just thinking of what the best design would be)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One option is to wrap the event that is send to receivers into a Result
. So command_tx
would have type Sender<Result<ClientCommand, SomeErrorType>>
, receive_events()
would return impl Stream<Item = Result<EventCommand, SomeErrorType>>
.
But on that note, we should maybe rethink the error system to return specialized errors instead of the general IpcError
, since it does not convey what can actually happen. For example, request_state()
will never return QueryNotFound
error, but the user must still provide a valid path for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I compiled a table of which errors each method can actually encounter. I will work further on this list.
Error sets
Client
Method | Errors | Meaning |
---|---|---|
Client::connect() Client::connect_to() |
|
|
send_command() |
|
|
Client::notify() Client::remove() Client::remove_all() |
|
|
receive_command() |
|
|
Client::request_state() |
|
|
Client::receive_events() |
|
|
Client::persist() |
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are error types I would propose.
pub mod error {
use thiserror::Error;
use super::*;
/// Error returned from [Client::connect] and [Client::connect_to].
#[derive(Debug, Error)]
pub enum ConnectionError {
#[error("Failed to open pipe: {0}")]
Failed(io::Error),
}
/// Error returned from [send_command]
#[derive(Debug, Error)]
pub(super) enum SendCommandError {
#[error("Failed to encode message: {0}")]
Encode(serde_json::Error),
#[error("Failed to send message: {0}")]
PipeBroken(io::Error),
}
/// Error returned from [Client::notify], [Client::remove] and
/// [Client::remove_all].
#[derive(Debug, Error)]
pub enum SendError {
#[error("Failed to send message: {0}")]
PipeBroken(io::Error),
}
/// Error returned from [Client::request_state].
#[derive(Debug, Error)]
pub enum RequestError {
#[error("Failed to send message (pipe broken): {0}")]
Send(io::Error),
#[error("Failed to receive message (pipe broken): {0}")]
Receive(io::Error),
#[error("Did not get a response in time ({0:?})")]
Timeout(Duration),
}
// TODO: Client::receive_events()
/// Error returned from [Client::persist].
#[derive(Debug, Error)]
pub enum PersistError {
// Maybe a bit too verbose?
#[error("Failed to open registry key: {0}")]
Open(io::Error),
#[error("Failed to set registry value: {0}")]
Set(io::Error),
#[error("Failed to serialize monitors: {0}")]
Serialize(serde_json::Error),
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One problem: We cannot easily distribute errors from send_command()
, since io::Error
is not Clone
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One option is to wrap the event that is send to receivers into a Result. So command_tx would have type Sender<Result<ClientCommand, SomeErrorType>>, receive_events() would return impl Stream<Item = Result<EventCommand, SomeErrorType>>.
I agree with this. It would allow us to send an error to all receivers once the pipe breaks (or other error), and we could easily resume receive_command
again if the client asks to reconnect (or resend an error if that reconnection fails). The implementation also isn't difficult, and it's clear what has happened since the error was propagated to clients.
One problem: We cannot easily distribute errors from send_command(), since io::Error is not Clone.
Looks like we can work around this by wrapping it in a Rc
or Arc
at least (this was the suggested solution when I searched it)
But on that note, we should maybe rethink the error system to return specialized errors instead of the general IpcError, since it does not convey what can actually happen. For example, request_state() will never return QueryNotFound error, but the user must still provide a valid path for it.
I also think we should separate these into more specific errors according to the error conditions that can happen (much less superfluous when handling error types for the user).
Unless you want to handle this part, I'm fine adding this to my list of tasks to do.
…nders are dropped when receiver task finishes, so that every receiver gets closed.
I was thinking about the possibility to easily revert the drivers state to the persistant state. There could be a method |
That's a good suggestion. What do you think about having the method fetch and convert the persistent state to |
In any case, there could be a method on the Then, there can be a method on the But |
That sounds perfect. I'll add that after this pr is merged |
I have implemented this now. One thing to note, there is no good way to cancel the subscription from the callback itself now. Doing this will involve creating some shared state where the subscription is moved into after subscribing, so something like Example: let shared_sub = Arc::new(Mutex::new(None::<EventsSubscription>));
let sub = client.add_event_receiver({
let shared_sub = shared_sub.clone();
move |event| {
println!("{:?}", event);
// ...
shared_sub.lock().unwrap().as_mut().unwrap().cancel();
}
});
*shared_sub.lock().unwrap() = Some(sub); Also: What should happen if the callback panics? Currently the task would just stop without notice. Maybe |
I don't feel it matters much here if the callback panics. It does print a message, and the task exiting on panic is expected (similar to what would happen if a thread panicked). Though a note could be added if there's anything we want the user to be clear about. Do you feel it would be useful to catch unwind, mark the panic, resume unwind, and notify on cancel? Perhaps it might be better to just catch it and re-raise it on the main thread instead. This way it becomes a lot clearer since everything crashes, instead of silently failing. I just did some testing and found that the |
This is wired, because all unit tests pass and they do check if the callback is run or not. I need a bit more context to make sense of the situation here. Maybe the runtime was blocked somehow in your tests? Did you call the API from within the runtime? You can try |
So far from what I've been able to gather, the test let sub = client.add_event_receiver({
let shared_sub = shared_sub.clone();
move |event| {
panic!();
}
}); |
Yes, this is correct. This makes sense, because when the callback panics, the receiver used to cancel the callback is dropped and closed. If you now call But when introducing a print statement, I can confirm that it is run. |
Give me a moment to track this original problem down. I'll reply back when I have more context. On a side note though, I do agree that it might be best to handle panics somehow to make it more transparent. You asked earlier:
We could just use |
Edit: Since the before steps were a bit roundabout, use this instead. This triggers it just the same. (The async version works fine) use driver_ipc::sync::DriverClient;
fn main() {
let driver = DriverClient::new().unwrap();
driver.add_event_receiver(|event| {
// this does not print
println!("{event:?}");
});
// go and send some events on the driver
std::thread::sleep(std::time::Duration::from_secs(10));
println!("shutting down");
} Edit 2: Okay I think we got it now. Not exactly surprising (obviously, give the behavior of receivers), but we need to add a comment explaining that the returned value needs to be saved and not dropped until done, otherwise the callback will stop firing. Not binding the value to something will also cause a drop before end of scope. Though obvious, it's an easy gotcha people may accidentally hit. Everything looking good! 👍🏻 |
This is not the behavior I expected nor wanted. I tracked that issue down and fixed it. If the subscription gets dropped, the callback will still run. This behavior is in line with the behavior of |
The last remaining unresolved comment is basically already taken care of, isn't it?
As far as this, I can take care of handling the error and a callback notification system of sorts / letting user know they got disconnected.
This is a non-issue unrelated to this PR (not sure if it matters to bother handling this, but even if I do, it's unrelated to this PR)
Iirc, this is already fixed after you changed the So I think the previous one can be marked as resolved and we can merge this if you feel it's ready. (If you still have some api input about the design of it, your thoughts are welcome, but I don't see this last one as a blocker) |
/// Connect to driver on pipe with specified name. | ||
/// | ||
/// `name` is ONLY the {name} portion of \\.\pipe\{name}. | ||
pub fn connect_to(name: &str) -> Result<Self> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The two client methods are a little misleading, cause even though you can create a named pipe using sync code, it still requires the reactor to be running. In pure sync code it would result in the following, making the sync signature a little misleading (something could be said about tokio making it sync too, but oh well)
there is no reactor running, must be called from the context of a Tokio 1.x runtime
It would probably be best to have both of these be async
to reduce the possibility anyone would run into the error by accident (even though anyone using Client
is already expected to be using async, there's still a remote possibility)
Or at least, failing that, this fact being added to the doc comment should also be good enough
Alternative: In the doc comment for the Client
itself is also fine. May actually be preferable.
If you don't want to bother with this one, leave me a comment stating such and mark it as solved, and I'll add it to my list of things to do
Edit: P.S. You can ignore this if you want. It's just a doc change anyways. I can do it myself too.
This is my go on the IPC API.
The driver events are now expressed by a
Stream
. This also allowes for multiple listeners.Using the sync API, one can now add many receiver callbacks. The termination is now expressed using a boolean return value.
Maybe a subscription object is better?
(Changes are not fully tested)