diff --git a/nautilus_core/network/src/websocket.rs b/nautilus_core/network/src/websocket.rs index 2fc2f37fd6da..fe3fa1ce2549 100644 --- a/nautilus_core/network/src/websocket.rs +++ b/nautilus_core/network/src/websocket.rs @@ -323,6 +323,58 @@ pub struct WebSocketClient { } impl WebSocketClient { + /// Creates a websocket client that returns a stream for reading messages. + pub async fn connect_stream( + url: String, + headers: Vec<(String, String)>, + heartbeat: Option, + heartbeat_msg: Option, + max_reconnection_tries: Option, + keyed_quotas: Vec<(String, Quota)>, + default_quota: Option, + ) -> Result<(MessageReader, Self), Error> { + let (ws_stream, _) = connect_async(url.clone().into_client_request()?).await?; + let (writer, reader) = ws_stream.split(); + let writer = Arc::new(Mutex::new(writer)); + + let disconnect_mode = Arc::new(AtomicBool::new(false)); + let rate_limiter = Arc::new(RateLimiter::new_with_quota(default_quota, keyed_quotas)); + + // Create config with minimal no-op Python handler so we incrementally + // move towards a more Rust-native approach. + let config = { + let handler = Python::with_gil(|py| Arc::new(py.None())); + WebSocketConfig { + url, + handler, + headers, + heartbeat, + heartbeat_msg, + ping_handler: None, + max_reconnection_tries, + } + }; + + let inner = WebSocketClientInner::connect_url(config).await?; + let controller_task = Self::spawn_controller_task( + inner, + disconnect_mode.clone(), + None, // no post_reconnection + None, // no post_disconnection + max_reconnection_tries, + ); + + Ok(( + reader, + Self { + rate_limiter, + writer: writer.clone(), + controller_task, + disconnect_mode, + }, + )) + } + /// Creates a websocket client. /// /// Creates an inner client and controller task to reconnect or disconnect