Skip to content

Bidirectional stream leak #2079

Open
Open
@QuentinPerez

Description

@QuentinPerez

Bug Report

Version

0.13.0

Platform

Darwin Kernel Version 24.1.0 arm64

Description

Once this patch is applied to the repo, you can run cargo run --bin streaming-server and cargo run --bin streaming-client. You'll observe that the bidirectional_streaming_echo function continues to run even after the client drops the response.

diff --git a/examples/src/streaming/client.rs b/examples/src/streaming/client.rs
index 546f244..dc0c369 100644
--- a/examples/src/streaming/client.rs
+++ b/examples/src/streaming/client.rs
@@ -32,19 +32,24 @@ async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
 }
 
 async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
-    let in_stream = echo_requests_iter().take(num);
+    let in_stream = echo_requests_iter().take(num).then(|value| async move {
+        tokio::time::sleep(Duration::from_secs(1)).await;
+        value
+    });
 
-    let response = client
+    client
         .bidirectional_streaming_echo(in_stream)
         .await
         .unwrap();
 
-    let mut resp_stream = response.into_inner();
+    tokio::time::sleep(std::time::Duration::from_secs(10)).await; // the server server should be ended
 
-    while let Some(received) = resp_stream.next().await {
-        let received = received.unwrap();
-        println!("\treceived message: `{}`", received.message);
-    }
+    // let mut resp_stream = response.into_inner();
+
+    // while let Some(received) = resp_stream.next().await {
+    //     let received = received.unwrap();
+    //     println!("\treceived message: `{}`", received.message);
+    // }
 }
 
 async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
@@ -67,9 +72,9 @@ async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>,
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap();
 
-    println!("Streaming echo:");
-    streaming_echo(&mut client, 5).await;
-    tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
+    // println!("Streaming echo:");
+    // streaming_echo(&mut client, 5).await;
+    // tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
 
     // Echo stream that sends 17 requests then graceful end that connection
     println!("\r\nBidirectional stream echo:");
@@ -78,8 +83,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     // Echo stream that sends up to `usize::MAX` requests. One request each 2s.
     // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
     // graceful client disconnection (above example) on the server side.
-    println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
-    bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
+    // println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
+    // bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
 
     Ok(())
 }

During the tests, I got this error too:

EchoServer::bidirectional_streaming_echo
thread 'tokio-runtime-worker' panicked at examples/src/streaming/server.rs:111:26:
working rx: SendError { .. }
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions