Open
Description
Bug Report
Version
0.13.0
Platform
Darwin Kernel Version 24.1.0 arm64
Description
Once this patch is applied to the repo, you can run cargo run --bin streaming-server
and cargo run --bin streaming-client
. You'll observe that the bidirectional_streaming_echo
function continues to run even after the client drops the response.
diff --git a/examples/src/streaming/client.rs b/examples/src/streaming/client.rs
index 546f244..dc0c369 100644
--- a/examples/src/streaming/client.rs
+++ b/examples/src/streaming/client.rs
@@ -32,19 +32,24 @@ async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
}
async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
- let in_stream = echo_requests_iter().take(num);
+ let in_stream = echo_requests_iter().take(num).then(|value| async move {
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ value
+ });
- let response = client
+ client
.bidirectional_streaming_echo(in_stream)
.await
.unwrap();
- let mut resp_stream = response.into_inner();
+ tokio::time::sleep(std::time::Duration::from_secs(10)).await; // the server server should be ended
- while let Some(received) = resp_stream.next().await {
- let received = received.unwrap();
- println!("\treceived message: `{}`", received.message);
- }
+ // let mut resp_stream = response.into_inner();
+
+ // while let Some(received) = resp_stream.next().await {
+ // let received = received.unwrap();
+ // println!("\treceived message: `{}`", received.message);
+ // }
}
async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
@@ -67,9 +72,9 @@ async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>,
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap();
- println!("Streaming echo:");
- streaming_echo(&mut client, 5).await;
- tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
+ // println!("Streaming echo:");
+ // streaming_echo(&mut client, 5).await;
+ // tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
// Echo stream that sends 17 requests then graceful end that connection
println!("\r\nBidirectional stream echo:");
@@ -78,8 +83,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Echo stream that sends up to `usize::MAX` requests. One request each 2s.
// Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
// graceful client disconnection (above example) on the server side.
- println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
- bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
+ // println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
+ // bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
Ok(())
}
During the tests, I got this error too:
EchoServer::bidirectional_streaming_echo
thread 'tokio-runtime-worker' panicked at examples/src/streaming/server.rs:111:26:
working rx: SendError { .. }
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Metadata
Metadata
Assignees
Labels
No labels