Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bidirectional stream leak #2079

Open
QuentinPerez opened this issue Nov 30, 2024 · 0 comments
Open

Bidirectional stream leak #2079

QuentinPerez opened this issue Nov 30, 2024 · 0 comments

Comments

@QuentinPerez
Copy link
Contributor

QuentinPerez commented Nov 30, 2024

Bug Report

Version

0.13.0

Platform

Darwin Kernel Version 24.1.0 arm64

Description

Once this patch is applied to the repo, you can run cargo run --bin streaming-server and cargo run --bin streaming-client. You'll observe that the bidirectional_streaming_echo function continues to run even after the client drops the response.

diff --git a/examples/src/streaming/client.rs b/examples/src/streaming/client.rs
index 546f244..dc0c369 100644
--- a/examples/src/streaming/client.rs
+++ b/examples/src/streaming/client.rs
@@ -32,19 +32,24 @@ async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
 }
 
 async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
-    let in_stream = echo_requests_iter().take(num);
+    let in_stream = echo_requests_iter().take(num).then(|value| async move {
+        tokio::time::sleep(Duration::from_secs(1)).await;
+        value
+    });
 
-    let response = client
+    client
         .bidirectional_streaming_echo(in_stream)
         .await
         .unwrap();
 
-    let mut resp_stream = response.into_inner();
+    tokio::time::sleep(std::time::Duration::from_secs(10)).await; // the server server should be ended
 
-    while let Some(received) = resp_stream.next().await {
-        let received = received.unwrap();
-        println!("\treceived message: `{}`", received.message);
-    }
+    // let mut resp_stream = response.into_inner();
+
+    // while let Some(received) = resp_stream.next().await {
+    //     let received = received.unwrap();
+    //     println!("\treceived message: `{}`", received.message);
+    // }
 }
 
 async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
@@ -67,9 +72,9 @@ async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>,
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap();
 
-    println!("Streaming echo:");
-    streaming_echo(&mut client, 5).await;
-    tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
+    // println!("Streaming echo:");
+    // streaming_echo(&mut client, 5).await;
+    // tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
 
     // Echo stream that sends 17 requests then graceful end that connection
     println!("\r\nBidirectional stream echo:");
@@ -78,8 +83,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     // Echo stream that sends up to `usize::MAX` requests. One request each 2s.
     // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
     // graceful client disconnection (above example) on the server side.
-    println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
-    bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
+    // println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
+    // bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
 
     Ok(())
 }

During the tests, I got this error too:

EchoServer::bidirectional_streaming_echo
thread 'tokio-runtime-worker' panicked at examples/src/streaming/server.rs:111:26:
working rx: SendError { .. }
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant