Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor IPC API implementation #116

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from

Conversation

Dampfwalze
Copy link

@Dampfwalze Dampfwalze commented Apr 13, 2024

This is my go on the IPC API.

The driver events are now expressed by a Stream. This also allowes for multiple listeners.

Using the sync API, one can now add many receiver callbacks. The termination is now expressed using a boolean return value.

driver.add_event_receiver(|cmd| {
  ...
  if finished {
    false
  } else { 
    true 
  };
});

Maybe a subscription object is better?

let subscription = driver.add_event_receiver(|cmd| { ... });
...
subscription.cancel();

(Changes are not fully tested)

@MolotovCherry
Copy link
Owner

MolotovCherry commented Apr 13, 2024

This was the last remaining thing on my todo list to handle properly. This is actually the kind of implementation I was looking for, but didn't get around to yet.

I would prefer a subscription object instead, but yeah, this is overall looking quite good!

Copy link
Owner

@MolotovCherry MolotovCherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love where this concept is going. This implementation looks good. Thanks for the contribution!

P.S. The build is failing because of the python bindings. You can ignore the build failure.

rust/driver-ipc/src/driver_client.rs Outdated Show resolved Hide resolved
rust/driver-ipc/src/client.rs Outdated Show resolved Hide resolved
rust/driver-ipc/src/client.rs Show resolved Hide resolved
rust/driver-ipc/src/client.rs Outdated Show resolved Hide resolved
let client = client.clone();
let command_tx = command_tx.clone();
let notify = notify.clone();
task::spawn(async move { receive_command(&client, command_tx, &notify).await });
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I didn't have this in the original code, but I think we should handle the case where this errors out. Basically, what I'm envisioning here is that read errors out, but it's then confusing to the library user on why they aren't receiving any messages (e.g. on their event stream).

The senders should be fine since the caller will receive errors if they try to send, but since we still hold onto a broadcast sender, the subscribers won't receive a None when the stream should be closed.

I'm thinking that we should completely drop the broadcast sender on error to solve this.

Maybe sometime we can handle re-connections after the server disconnected, but we don't need to think about that right now (it's not like the client can't just recreate the driver instance anyways).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still thinking if maybe we should add a field, method, callback, or something somewhere to indicate the connection status of the client. Would probably make it easier to know what's going on

Edit: hmm, maybe something like on_* kind of callbacks. on_connect, on_disconnect, etc. what do you think? If you agree, which callbacks do you think would be suitable? At the moment, I feel a on_disconnect is all that's needed

(You don't have to implement anything I said in this comment on this pr, I'm just thinking of what the best design would be)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option is to wrap the event that is send to receivers into a Result. So command_tx would have type Sender<Result<ClientCommand, SomeErrorType>>, receive_events() would return impl Stream<Item = Result<EventCommand, SomeErrorType>>.

But on that note, we should maybe rethink the error system to return specialized errors instead of the general IpcError, since it does not convey what can actually happen. For example, request_state() will never return QueryNotFound error, but the user must still provide a valid path for it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I compiled a table of which errors each method can actually encounter. I will work further on this list.

Error sets

Client

Method Errors Meaning
Client::connect()
Client::connect_to()
  • io::Error
  • Connection failed
send_command()
  • serde_json::Error
  • io::Error
  • Encode failed
  • Pipe broken
Client::notify()
Client::remove()
Client::remove_all()
  • serde_json::Error cannot happen
  • io::Error
  • Pipe broken
receive_command()
  • io::Error
  • SendError we can silently stop
  • serde_json::Error skipped
  • Pipe broken
  • Client closed
  • Server not working correctly
Client::request_state()
  • serde_json::Error cannot happen
  • io::Error
  • RecvError::Closed
  • Timeout
  • Pipe broken
  • receive_command() errored (Pipe broken)
  • Server not responding in time
Client::receive_events()
  • io::Error from receive_command()(Not implemented)
  • Pipe broken
Client::persist()
  • io::Error
  • serde_json::Error
  • Failed to interact with registry
  • Encode failed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are error types I would propose.

pub mod error {
    use thiserror::Error;
    use super::*;

    /// Error returned from [Client::connect] and [Client::connect_to].
    #[derive(Debug, Error)]
    pub enum ConnectionError {
        #[error("Failed to open pipe: {0}")]
        Failed(io::Error),
    }

    /// Error returned from [send_command]
    #[derive(Debug, Error)]
    pub(super) enum SendCommandError {
        #[error("Failed to encode message: {0}")]
        Encode(serde_json::Error),
        #[error("Failed to send message: {0}")]
        PipeBroken(io::Error),
    }

    /// Error returned from [Client::notify], [Client::remove] and
    /// [Client::remove_all].
    #[derive(Debug, Error)]
    pub enum SendError {
        #[error("Failed to send message: {0}")]
        PipeBroken(io::Error),
    }

    /// Error returned from [Client::request_state].
    #[derive(Debug, Error)]
    pub enum RequestError {
        #[error("Failed to send message (pipe broken): {0}")]
        Send(io::Error),
        #[error("Failed to receive message (pipe broken): {0}")]
        Receive(io::Error),
        #[error("Did not get a response in time ({0:?})")]
        Timeout(Duration),
    }

    // TODO: Client::receive_events()

    /// Error returned from [Client::persist].
    #[derive(Debug, Error)]
    pub enum PersistError {
        // Maybe a bit too verbose?
        #[error("Failed to open registry key: {0}")]
        Open(io::Error),
        #[error("Failed to set registry value: {0}")]
        Set(io::Error),
        #[error("Failed to serialize monitors: {0}")]
        Serialize(serde_json::Error),
    }
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One problem: We cannot easily distribute errors from send_command(), since io::Error is not Clone.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option is to wrap the event that is send to receivers into a Result. So command_tx would have type Sender<Result<ClientCommand, SomeErrorType>>, receive_events() would return impl Stream<Item = Result<EventCommand, SomeErrorType>>.

I agree with this. It would allow us to send an error to all receivers once the pipe breaks (or other error), and we could easily resume receive_command again if the client asks to reconnect (or resend an error if that reconnection fails). The implementation also isn't difficult, and it's clear what has happened since the error was propagated to clients.

One problem: We cannot easily distribute errors from send_command(), since io::Error is not Clone.

Looks like we can work around this by wrapping it in a Rc or Arc at least (this was the suggested solution when I searched it)

But on that note, we should maybe rethink the error system to return specialized errors instead of the general IpcError, since it does not convey what can actually happen. For example, request_state() will never return QueryNotFound error, but the user must still provide a valid path for it.

I also think we should separate these into more specific errors according to the error conditions that can happen (much less superfluous when handling error types for the user).


Unless you want to handle this part, I'm fine adding this to my list of tasks to do.

rust/driver-ipc/src/sync/client.rs Outdated Show resolved Hide resolved
@Dampfwalze
Copy link
Author

I was thinking about the possibility to easily revert the drivers state to the persistant state. There could be a method Client::revert() or Client::revert_to_persistant() that will load back the saved state from the registry.

@MolotovCherry
Copy link
Owner

I was thinking about the possibility to easily revert the drivers state to the persistant state. There could be a method Client::revert() or Client::revert_to_persistant() that will load back the saved state from the registry.

That's a good suggestion.

What do you think about having the method fetch and convert the persistent state to Vec<Monitor> and let the caller do what they want with it? We could keep the one that also reverts the driver in one call as well if it's still desirable

@Dampfwalze
Copy link
Author

What do you think about having the method fetch and convert the persistent state to Vec<Monitor> and let the caller do what they want with it? We could keep the one that also reverts the driver in one call as well if it's still desirable

In any case, there could be a method on the Client that deserializes the persistend state: Client::read_persistent() -> Vec<Monitor> (also forwarded in DriverClient?).

Then, there can be a method on the DriverClient to read this state and write it in its own state: DriverClient::load_persiststent().

But Client::revert_to_persistant() is still good to have (also forwarded in DriverClient).

@MolotovCherry
Copy link
Owner

In any case, there could be a method on the Client that deserializes the persistend state: Client::read_persistent() -> Vec<Monitor> (also forwarded in DriverClient?).

Then, there can be a method on the DriverClient to read this state and write it in its own state: DriverClient::load_persiststent().

But Client::revert_to_persistant() is still good to have (also forwarded in DriverClient).

That sounds perfect. I'll add that after this pr is merged

@MolotovCherry MolotovCherry mentioned this pull request Apr 14, 2024
2 tasks
@Dampfwalze
Copy link
Author

Dampfwalze commented Apr 14, 2024

I would prefer a subscription object instead

I have implemented this now.

One thing to note, there is no good way to cancel the subscription from the callback itself now. Doing this will involve creating some shared state where the subscription is moved into after subscribing, so something like Arc<Mutex<Option<EventsSubscription>>>.

Example:

let shared_sub = Arc::new(Mutex::new(None::<EventsSubscription>));

let sub = client.add_event_receiver({
    let shared_sub = shared_sub.clone();
    move |event| {
        println!("{:?}", event);
        // ...
        shared_sub.lock().unwrap().as_mut().unwrap().cancel();
    }
});

*shared_sub.lock().unwrap() = Some(sub);

Also: What should happen if the callback panics? Currently the task would just stop without notice. Maybe EventSubscription::cancel() could return a Result? What would the Error type be then?

@MolotovCherry
Copy link
Owner

MolotovCherry commented Apr 14, 2024

I would prefer a subscription object instead

I have implemented this now.

One thing to note, there is no good way to cancel the subscription from the callback itself now. Doing this will involve creating some shared state where the subscription is moved into after subscribing, so something like Arc<Mutex<Option<EventsSubscription>>>.

Example:

let shared_sub = Arc::new(Mutex::new(None::<EventsSubscription>));

let sub = client.add_event_receiver({
    let shared_sub = shared_sub.clone();
    move |event| {
        println!("{:?}", event);
        // ...
        shared_sub.lock().unwrap().as_mut().unwrap().cancel();
    }
});

*shared_sub.lock().unwrap() = Some(sub);

Also: What should happen if the callback panics? Currently the task would just stop without notice. Maybe EventSubscription::cancel() could return a Result? What would the Error type be then?

I don't feel it matters much here if the callback panics. It does print a message, and the task exiting on panic is expected (similar to what would happen if a thread panicked). Though a note could be added if there's anything we want the user to be clear about. Do you feel it would be useful to catch unwind, mark the panic, resume unwind, and notify on cancel?

Perhaps it might be better to just catch it and re-raise it on the main thread instead. This way it becomes a lot clearer since everything crashes, instead of silently failing.

I just did some testing and found that the cb isn't getting executed. The streams are receiving as expected, it seems to stop working at RUNTIME.spawn in EventsSubscription. A block_on does execute it all the way (just a test to see if that was the issue), but spawn seems to not

@Dampfwalze
Copy link
Author

Dampfwalze commented Apr 14, 2024

I just did some testing and found that the cb isn't getting executed. The streams are receiving as expected, it seems to stop working at RUNTIME.spawn in EventsSubscription. A block_on does execute it all the way (just a test to see if that was the issue), but spawn seems to not

This is wired, because all unit tests pass and they do check if the callback is run or not. I need a bit more context to make sense of the situation here. Maybe the runtime was blocked somehow in your tests? Did you call the API from within the runtime? RUNTIME.spawn() does not immediately start execution, it just sends it of to be scheduled for execution.

You can try tokio::task::yield_now().await after calling add_event_receiver() to give execution back to tokio.

@MolotovCherry
Copy link
Owner

MolotovCherry commented Apr 14, 2024

This is wired, because all unit tests pass and they do check if the callback is run or not. I need a bit more context to make sense of the situation here. Maybe the runtime was blocked somehow in your tests? Did you call the API from within the runtime? RUNTIME.spawn() does not immediately start execution, it just sends it of to be scheduled for execution.

You can try tokio::task::yield_now().await after calling add_event_receiver() to give execution back to tokio.

So far from what I've been able to gather, the test event_receiver_cancel_from_cb() still passes after a panic is introduced into the callback. The first one (event_receiver()) properly fails when doing the same thing, so it applies only to this test. Hmm

let sub = client.add_event_receiver({
      let shared_sub = shared_sub.clone();
      move |event| {
          panic!();
      }
  });

@Dampfwalze
Copy link
Author

Dampfwalze commented Apr 14, 2024

So far from what I've been able to gather, the test event_receiver_cancel_from_cb() still passes after a panic is introduced into the callback. The first one (event_receiver()) properly fails when doing the same thing, so it applies only to this test. Hmm

Yes, this is correct. This makes sense, because when the callback panics, the receiver used to cancel the callback is dropped and closed. If you now call cancel(), it returns false, which is actually the expected behaviour, because the callback was already "canceled".

But when introducing a print statement, I can confirm that it is run.

@MolotovCherry
Copy link
Owner

MolotovCherry commented Apr 14, 2024

Give me a moment to track this original problem down. I'll reply back when I have more context.

On a side note though, I do agree that it might be best to handle panics somehow to make it more transparent. You asked earlier:

Maybe EventSubscription::cancel() could return a Result? What would the Error type be then?

We could just use std::thread::Result, which is the same type that catch unwind returns. I'm leaning more into not panicking the whole program, so cancel seems like as good a place as any to return it I guess. It shouldn't be too much of a requirement that the cb is UnwindSafe, right? If there's a nicer way, I might be more preferable to that. I'd still like to avoid a catch unwind if possible.

@MolotovCherry
Copy link
Owner

MolotovCherry commented Apr 14, 2024

If you'd like to test what I was talking about earlier, here's how (below). (Please see Edit 2)

Edit: Since the before steps were a bit roundabout, use this instead. This triggers it just the same. (The async version works fine)

use driver_ipc::sync::DriverClient;

fn main() {
    let driver = DriverClient::new().unwrap();
    driver.add_event_receiver(|event| {
        // this does not print
        println!("{event:?}");
    });

    // go and send some events on the driver
    std::thread::sleep(std::time::Duration::from_secs(10));
    println!("shutting down");
}

Edit 2: Okay I think we got it now. Not exactly surprising (obviously, give the behavior of receivers), but we need to add a comment explaining that the returned value needs to be saved and not dropped until done, otherwise the callback will stop firing. Not binding the value to something will also cause a drop before end of scope. Though obvious, it's an easy gotcha people may accidentally hit.

Everything looking good! 👍🏻

@Dampfwalze
Copy link
Author

Edit 2: Okay I think we got it now. Not exactly surprising (obviously, give the behavior of receivers), but we need to add a comment explaining that the returned value needs to be saved and not dropped until done, otherwise the callback will stop firing. Not binding the value to something will also cause a drop before end of scope. Though obvious, it's an easy gotcha people may accidentally hit.

This is not the behavior I expected nor wanted. I tracked that issue down and fixed it. If the subscription gets dropped, the callback will still run. This behavior is in line with the behavior of thread::spawn or task::spawn. When you drop the JoinHandle, the thread/task will be detached.

@MolotovCherry
Copy link
Owner

MolotovCherry commented Apr 14, 2024

The last remaining unresolved comment is basically already taken care of, isn't it?

Basically, what I'm envisioning here is that read errors out, but it's then confusing to the library user on why they aren't receiving any messages (e.g. on their event stream).

As far as this, I can take care of handling the error and a callback notification system of sorts / letting user know they got disconnected.

Maybe sometime we can handle re-connections after the server disconnected, but we don't need to think about that right now (it's not like the client can't just recreate the driver instance anyways).

This is a non-issue unrelated to this PR (not sure if it matters to bother handling this, but even if I do, it's unrelated to this PR)

The senders should be fine since the caller will receive errors if they try to send, but since we still hold onto a broadcast sender, the subscribers won't receive a None when the stream should be closed.

I'm thinking that we should completely drop the broadcast sender on error to solve this.

Iirc, this is already fixed after you changed the Client to hold a receiver instead.


So I think the previous one can be marked as resolved and we can merge this if you feel it's ready. (If you still have some api input about the design of it, your thoughts are welcome, but I don't see this last one as a blocker)

/// Connect to driver on pipe with specified name.
///
/// `name` is ONLY the {name} portion of \\.\pipe\{name}.
pub fn connect_to(name: &str) -> Result<Self> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two client methods are a little misleading, cause even though you can create a named pipe using sync code, it still requires the reactor to be running. In pure sync code it would result in the following, making the sync signature a little misleading (something could be said about tokio making it sync too, but oh well)
there is no reactor running, must be called from the context of a Tokio 1.x runtime

It would probably be best to have both of these be async to reduce the possibility anyone would run into the error by accident (even though anyone using Client is already expected to be using async, there's still a remote possibility)

Or at least, failing that, this fact being added to the doc comment should also be good enough

Alternative: In the doc comment for the Client itself is also fine. May actually be preferable.

If you don't want to bother with this one, leave me a comment stating such and mark it as solved, and I'll add it to my list of things to do

Edit: P.S. You can ignore this if you want. It's just a doc change anyways. I can do it myself too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants