From 8850ee23c924eaa4184e5842d319a9eca20753e1 Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Mon, 18 Nov 2024 17:14:59 +1100 Subject: [PATCH] Improve HttpClient for use from Rust --- nautilus_core/network/benches/test_client.rs | 4 +- nautilus_core/network/src/http.rs | 124 ++++++++++++++----- nautilus_core/network/src/python/http.rs | 33 ++--- 3 files changed, 102 insertions(+), 59 deletions(-) diff --git a/nautilus_core/network/benches/test_client.rs b/nautilus_core/network/benches/test_client.rs index 0245d149f73..142db6a06b8 100644 --- a/nautilus_core/network/benches/test_client.rs +++ b/nautilus_core/network/benches/test_client.rs @@ -13,8 +13,6 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- -use std::collections::HashMap; - use nautilus_network::http::InnerHttpClient; use reqwest::Method; @@ -30,7 +28,7 @@ async fn main() { reqs.push(client.send_request( Method::GET, "http://127.0.0.1:3000".to_string(), - HashMap::new(), + None, None, None, )); diff --git a/nautilus_core/network/src/http.rs b/nautilus_core/network/src/http.rs index ff5a4b07448..1f110273e62 100644 --- a/nautilus_core/network/src/http.rs +++ b/nautilus_core/network/src/http.rs @@ -18,12 +18,13 @@ use std::{collections::HashMap, hash::Hash, sync::Arc, time::Duration}; use bytes::Bytes; +use futures_util::{stream, StreamExt}; use reqwest::{ header::{HeaderMap, HeaderName}, Method, Response, Url, }; -use crate::ratelimiter::{clock::MonotonicClock, RateLimiter}; +use crate::ratelimiter::{clock::MonotonicClock, quota::Quota, RateLimiter}; /// Represents the HTTP methods supported by the `HttpClient`. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] @@ -70,23 +71,6 @@ pub struct HttpResponse { pub(crate) body: Bytes, } -/// A high-performance HTTP client with rate limiting and timeout capabilities. -/// -/// This struct is designed to handle HTTP requests efficiently, providing -/// support for rate limiting, timeouts, and custom headers. The client is -/// built on top of `reqwest` and can be used for both synchronous and -/// asynchronous HTTP requests. -#[cfg_attr( - feature = "python", - pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.network") -)] -pub struct HttpClient { - /// The rate limiter to control the request rate. - pub(crate) rate_limiter: Arc>, - /// The underlying HTTP client used to make requests. - pub(crate) client: InnerHttpClient, -} - /// Represents errors that can occur when using the `HttpClient`. /// /// This enum provides variants for general HTTP errors and timeout errors, @@ -116,6 +100,85 @@ impl From for HttpClientError { } } +/// A high-performance HTTP client with rate limiting and timeout capabilities. +/// +/// This struct is designed to handle HTTP requests efficiently, providing +/// support for rate limiting, timeouts, and custom headers. The client is +/// built on top of `reqwest` and can be used for both synchronous and +/// asynchronous HTTP requests. +#[cfg_attr( + feature = "python", + pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.network") +)] +pub struct HttpClient { + /// The rate limiter to control the request rate. + pub(crate) rate_limiter: Arc>, + /// The underlying HTTP client used to make requests. + pub(crate) client: Arc, +} + +impl HttpClient { + /// Creates a new [`HttpClient`] instance. + #[must_use] + pub fn new( + header_keys: Vec, + keyed_quotas: Vec<(String, Quota)>, + default_quota: Option, + ) -> Self { + let rate_limiter = Arc::new(RateLimiter::new_with_quota(default_quota, keyed_quotas)); + let client = InnerHttpClient { + client: reqwest::Client::new(), + header_keys, + }; + + Self { + rate_limiter, + client: Arc::new(client), + } + } + + /// Send an HTTP request. + /// + /// `method`: The HTTP method to call. + /// `url`: The request is sent to this url. + /// `headers`: The header key value pairs in the request. + /// `body`: The bytes sent in the body of request. + /// `keys`: The keys used for rate limiting the request. + /// + /// # Example + /// + /// When a request is made the URL should be split into all relevant keys within it. + /// + /// For request /foo/bar, should pass keys ["foo/bar", "foo"] for rate limiting. + #[allow(clippy::too_many_arguments)] + pub async fn request( + &self, + method: Method, + url: String, + headers: Option>, + body: Option, + keys: Option>, + timeout_secs: Option, + ) -> Result { + let client = self.client.clone(); + let rate_limiter = self.rate_limiter.clone(); + + // Check keys for rate limiting quota + let keys = keys.unwrap_or_default(); + let tasks = keys.iter().map(|key| rate_limiter.until_key_ready(key)); + + stream::iter(tasks) + .for_each(|key| async move { + key.await; + }) + .await; + + client + .send_request(method, url, headers, body, timeout_secs) + .await + } +} + /// A high-performance `HttpClient` for HTTP requests. /// /// The client is backed by a hyper Client which keeps connections alive and @@ -142,10 +205,11 @@ impl InnerHttpClient { &self, method: Method, url: String, - headers: HashMap, - body: Option>, + headers: Option>, + body: Option, timeout_secs: Option, ) -> Result { + let headers = headers.unwrap_or_default(); let reqwest_url = Url::parse(url.as_str()) .map_err(|e| HttpClientError::from(format!("URL parse error: {e}")))?; @@ -169,7 +233,7 @@ impl InnerHttpClient { let request = match body { Some(b) => request_builder - .body(b) + .body(b.to_vec()) .build() .map_err(HttpClientError::from)?, None => request_builder.build().map_err(HttpClientError::from)?, @@ -277,13 +341,7 @@ mod tests { let client = InnerHttpClient::default(); let response = client - .send_request( - reqwest::Method::GET, - format!("{url}/get"), - HashMap::new(), - None, - None, - ) + .send_request(reqwest::Method::GET, format!("{url}/get"), None, None, None) .await .unwrap(); @@ -301,7 +359,7 @@ mod tests { .send_request( reqwest::Method::POST, format!("{url}/post"), - HashMap::new(), + None, None, None, ) @@ -329,13 +387,13 @@ mod tests { ); let body_string = serde_json::to_string(&body).unwrap(); - let body_bytes = body_string.into_bytes(); + let body_bytes = Bytes::from(body_string.into_bytes()); let response = client .send_request( reqwest::Method::POST, format!("{url}/post"), - HashMap::new(), + None, Some(body_bytes), None, ) @@ -355,7 +413,7 @@ mod tests { .send_request( reqwest::Method::PATCH, format!("{url}/patch"), - HashMap::new(), + None, None, None, ) @@ -375,7 +433,7 @@ mod tests { .send_request( reqwest::Method::DELETE, format!("{url}/delete"), - HashMap::new(), + None, None, None, ) diff --git a/nautilus_core/network/src/python/http.rs b/nautilus_core/network/src/python/http.rs index cf1f23ee5a0..7c37865dd72 100644 --- a/nautilus_core/network/src/python/http.rs +++ b/nautilus_core/network/src/python/http.rs @@ -16,7 +16,6 @@ use std::{ collections::{hash_map::DefaultHasher, HashMap}, hash::{Hash, Hasher}, - sync::Arc, }; use bytes::Bytes; @@ -24,8 +23,8 @@ use futures_util::{stream, StreamExt}; use pyo3::{create_exception, exceptions::PyException, prelude::*, types::PyBytes}; use crate::{ - http::{HttpClient, HttpClientError, HttpMethod, HttpResponse, InnerHttpClient}, - ratelimiter::{quota::Quota, RateLimiter}, + http::{HttpClient, HttpClientError, HttpMethod, HttpResponse}, + ratelimiter::quota::Quota, }; // Python exception class for generic HTTP errors. @@ -86,7 +85,7 @@ impl HttpResponse { #[pymethods] impl HttpClient { - /// Create a new HttpClient. + /// Creates a new HttpClient. /// /// `header_keys`: The key value pairs for the given `header_keys` are retained from the responses. /// `keyed_quota`: A list of string quota pairs that gives quota for specific key values. @@ -115,18 +114,7 @@ impl HttpClient { keyed_quotas: Vec<(String, Quota)>, default_quota: Option, ) -> Self { - let client = reqwest::Client::new(); - let rate_limiter = Arc::new(RateLimiter::new_with_quota(default_quota, keyed_quotas)); - - let client = InnerHttpClient { - client, - header_keys, - }; - - Self { - rate_limiter, - client, - } + Self::new(header_keys, keyed_quotas, default_quota) } /// Send an HTTP request. @@ -155,14 +143,13 @@ impl HttpClient { timeout_secs: Option, py: Python<'py>, ) -> PyResult> { - let headers = headers.unwrap_or_default(); - let body_vec = body.map(|py_bytes| py_bytes.as_bytes().to_vec()); - let keys = keys.unwrap_or_default(); let client = self.client.clone(); let rate_limiter = self.rate_limiter.clone(); - let method = method.into(); + let keys = keys.unwrap_or_default(); + let body = body.map(|py_bytes| Bytes::from(py_bytes.as_bytes().to_vec())); + pyo3_async_runtimes::tokio::future_into_py(py, async move { - // Check keys for rate limiting quota + // TODO: Consolidate rate limiting let tasks = keys.iter().map(|key| rate_limiter.until_key_ready(key)); stream::iter(tasks) .for_each(|key| async move { @@ -170,9 +157,9 @@ impl HttpClient { }) .await; client - .send_request(method, url, headers, body_vec, timeout_secs) + .send_request(method.into(), url, headers, body, timeout_secs) .await - .map_err(super::super::http::HttpClientError::into_py_err) + .map_err(HttpClientError::into_py_err) }) } }