Skip to content

Commit

Permalink
Add connect_stream for WebSocketClient
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Nov 20, 2024
1 parent b312e12 commit 35cf1a9
Showing 1 changed file with 52 additions and 0 deletions.
52 changes: 52 additions & 0 deletions nautilus_core/network/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,58 @@ pub struct WebSocketClient {
}

impl WebSocketClient {
/// Creates a websocket client that returns a stream for reading messages.
pub async fn connect_stream(
url: String,
headers: Vec<(String, String)>,
heartbeat: Option<u64>,
heartbeat_msg: Option<String>,
max_reconnection_tries: Option<u64>,
keyed_quotas: Vec<(String, Quota)>,
default_quota: Option<Quota>,
) -> Result<(MessageReader, Self), Error> {
let (ws_stream, _) = connect_async(url.clone().into_client_request()?).await?;
let (writer, reader) = ws_stream.split();
let writer = Arc::new(Mutex::new(writer));

let disconnect_mode = Arc::new(AtomicBool::new(false));
let rate_limiter = Arc::new(RateLimiter::new_with_quota(default_quota, keyed_quotas));

// Create config with minimal no-op Python handler so we incrementally
// move towards a more Rust-native approach.
let config = {
let handler = Python::with_gil(|py| Arc::new(py.None()));
WebSocketConfig {
url,
handler,
headers,
heartbeat,
heartbeat_msg,
ping_handler: None,
max_reconnection_tries,
}
};

let inner = WebSocketClientInner::connect_url(config).await?;
let controller_task = Self::spawn_controller_task(
inner,
disconnect_mode.clone(),
None, // no post_reconnection
None, // no post_disconnection
max_reconnection_tries,
);

Ok((
reader,
Self {
rate_limiter,
writer: writer.clone(),
controller_task,
disconnect_mode,
},
))
}

/// Creates a websocket client.
///
/// Creates an inner client and controller task to reconnect or disconnect
Expand Down

0 comments on commit 35cf1a9

Please sign in to comment.