Skip to content

Commit

Permalink
Improve HttpClient for use from Rust
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Nov 18, 2024
1 parent ae07595 commit 8850ee2
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 59 deletions.
4 changes: 1 addition & 3 deletions nautilus_core/network/benches/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use std::collections::HashMap;

use nautilus_network::http::InnerHttpClient;
use reqwest::Method;

Expand All @@ -30,7 +28,7 @@ async fn main() {
reqs.push(client.send_request(
Method::GET,
"http://127.0.0.1:3000".to_string(),
HashMap::new(),
None,
None,
None,
));
Expand Down
124 changes: 91 additions & 33 deletions nautilus_core/network/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
use std::{collections::HashMap, hash::Hash, sync::Arc, time::Duration};

use bytes::Bytes;
use futures_util::{stream, StreamExt};
use reqwest::{
header::{HeaderMap, HeaderName},
Method, Response, Url,
};

use crate::ratelimiter::{clock::MonotonicClock, RateLimiter};
use crate::ratelimiter::{clock::MonotonicClock, quota::Quota, RateLimiter};

/// Represents the HTTP methods supported by the `HttpClient`.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -70,23 +71,6 @@ pub struct HttpResponse {
pub(crate) body: Bytes,
}

/// A high-performance HTTP client with rate limiting and timeout capabilities.
///
/// This struct is designed to handle HTTP requests efficiently, providing
/// support for rate limiting, timeouts, and custom headers. The client is
/// built on top of `reqwest` and can be used for both synchronous and
/// asynchronous HTTP requests.
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.network")
)]
pub struct HttpClient {
/// The rate limiter to control the request rate.
pub(crate) rate_limiter: Arc<RateLimiter<String, MonotonicClock>>,
/// The underlying HTTP client used to make requests.
pub(crate) client: InnerHttpClient,
}

/// Represents errors that can occur when using the `HttpClient`.
///
/// This enum provides variants for general HTTP errors and timeout errors,
Expand Down Expand Up @@ -116,6 +100,85 @@ impl From<String> for HttpClientError {
}
}

/// A high-performance HTTP client with rate limiting and timeout capabilities.
///
/// This struct is designed to handle HTTP requests efficiently, providing
/// support for rate limiting, timeouts, and custom headers. The client is
/// built on top of `reqwest` and can be used for both synchronous and
/// asynchronous HTTP requests.
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.network")
)]
pub struct HttpClient {
/// The rate limiter to control the request rate.
pub(crate) rate_limiter: Arc<RateLimiter<String, MonotonicClock>>,
/// The underlying HTTP client used to make requests.
pub(crate) client: Arc<InnerHttpClient>,

This comment has been minimized.

Copy link
@twitu

twitu Nov 18, 2024

Collaborator

Client does not need an arc wrapper. The hyper implementation already makes it cloneable.

}

impl HttpClient {
/// Creates a new [`HttpClient`] instance.
#[must_use]
pub fn new(
header_keys: Vec<String>,
keyed_quotas: Vec<(String, Quota)>,
default_quota: Option<Quota>,
) -> Self {
let rate_limiter = Arc::new(RateLimiter::new_with_quota(default_quota, keyed_quotas));
let client = InnerHttpClient {
client: reqwest::Client::new(),
header_keys,
};

Self {
rate_limiter,
client: Arc::new(client),
}
}

/// Send an HTTP request.
///
/// `method`: The HTTP method to call.
/// `url`: The request is sent to this url.
/// `headers`: The header key value pairs in the request.
/// `body`: The bytes sent in the body of request.
/// `keys`: The keys used for rate limiting the request.
///
/// # Example
///
/// When a request is made the URL should be split into all relevant keys within it.
///
/// For request /foo/bar, should pass keys ["foo/bar", "foo"] for rate limiting.
#[allow(clippy::too_many_arguments)]
pub async fn request(
&self,
method: Method,
url: String,
headers: Option<HashMap<String, String>>,
body: Option<Bytes>,
keys: Option<Vec<String>>,
timeout_secs: Option<u64>,
) -> Result<HttpResponse, HttpClientError> {
let client = self.client.clone();
let rate_limiter = self.rate_limiter.clone();

// Check keys for rate limiting quota
let keys = keys.unwrap_or_default();

This comment has been minimized.

Copy link
@twitu

twitu Nov 18, 2024

Collaborator

Key validation can be done without async processing. The request function is already async. Do we need this level of granularity for async tasks?

let tasks = keys.iter().map(|key| rate_limiter.until_key_ready(key));

stream::iter(tasks)
.for_each(|key| async move {
key.await;
})
.await;

client
.send_request(method, url, headers, body, timeout_secs)
.await
}
}

/// A high-performance `HttpClient` for HTTP requests.
///
/// The client is backed by a hyper Client which keeps connections alive and
Expand All @@ -142,10 +205,11 @@ impl InnerHttpClient {
&self,
method: Method,
url: String,
headers: HashMap<String, String>,
body: Option<Vec<u8>>,
headers: Option<HashMap<String, String>>,
body: Option<Bytes>,

This comment has been minimized.

Copy link
@twitu

twitu Nov 18, 2024

Collaborator

Bytes are more useful for large repsonses that must be streamed.

timeout_secs: Option<u64>,
) -> Result<HttpResponse, HttpClientError> {
let headers = headers.unwrap_or_default();
let reqwest_url = Url::parse(url.as_str())
.map_err(|e| HttpClientError::from(format!("URL parse error: {e}")))?;

Expand All @@ -169,7 +233,7 @@ impl InnerHttpClient {

let request = match body {
Some(b) => request_builder
.body(b)
.body(b.to_vec())
.build()
.map_err(HttpClientError::from)?,
None => request_builder.build().map_err(HttpClientError::from)?,
Expand Down Expand Up @@ -277,13 +341,7 @@ mod tests {

let client = InnerHttpClient::default();
let response = client
.send_request(
reqwest::Method::GET,
format!("{url}/get"),
HashMap::new(),
None,
None,
)
.send_request(reqwest::Method::GET, format!("{url}/get"), None, None, None)
.await
.unwrap();

Expand All @@ -301,7 +359,7 @@ mod tests {
.send_request(
reqwest::Method::POST,
format!("{url}/post"),
HashMap::new(),
None,
None,
None,
)
Expand Down Expand Up @@ -329,13 +387,13 @@ mod tests {
);

let body_string = serde_json::to_string(&body).unwrap();
let body_bytes = body_string.into_bytes();
let body_bytes = Bytes::from(body_string.into_bytes());

let response = client
.send_request(
reqwest::Method::POST,
format!("{url}/post"),
HashMap::new(),
None,
Some(body_bytes),
None,
)
Expand All @@ -355,7 +413,7 @@ mod tests {
.send_request(
reqwest::Method::PATCH,
format!("{url}/patch"),
HashMap::new(),
None,
None,
None,
)
Expand All @@ -375,7 +433,7 @@ mod tests {
.send_request(
reqwest::Method::DELETE,
format!("{url}/delete"),
HashMap::new(),
None,
None,
None,
)
Expand Down
33 changes: 10 additions & 23 deletions nautilus_core/network/src/python/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
sync::Arc,
};

use bytes::Bytes;
use futures_util::{stream, StreamExt};
use pyo3::{create_exception, exceptions::PyException, prelude::*, types::PyBytes};

use crate::{
http::{HttpClient, HttpClientError, HttpMethod, HttpResponse, InnerHttpClient},
ratelimiter::{quota::Quota, RateLimiter},
http::{HttpClient, HttpClientError, HttpMethod, HttpResponse},
ratelimiter::quota::Quota,
};

// Python exception class for generic HTTP errors.
Expand Down Expand Up @@ -86,7 +85,7 @@ impl HttpResponse {

#[pymethods]
impl HttpClient {
/// Create a new HttpClient.
/// Creates a new HttpClient.
///
/// `header_keys`: The key value pairs for the given `header_keys` are retained from the responses.
/// `keyed_quota`: A list of string quota pairs that gives quota for specific key values.
Expand Down Expand Up @@ -115,18 +114,7 @@ impl HttpClient {
keyed_quotas: Vec<(String, Quota)>,
default_quota: Option<Quota>,
) -> Self {
let client = reqwest::Client::new();
let rate_limiter = Arc::new(RateLimiter::new_with_quota(default_quota, keyed_quotas));

let client = InnerHttpClient {
client,
header_keys,
};

Self {
rate_limiter,
client,
}
Self::new(header_keys, keyed_quotas, default_quota)
}

/// Send an HTTP request.
Expand Down Expand Up @@ -155,24 +143,23 @@ impl HttpClient {
timeout_secs: Option<u64>,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let headers = headers.unwrap_or_default();
let body_vec = body.map(|py_bytes| py_bytes.as_bytes().to_vec());
let keys = keys.unwrap_or_default();
let client = self.client.clone();
let rate_limiter = self.rate_limiter.clone();
let method = method.into();
let keys = keys.unwrap_or_default();
let body = body.map(|py_bytes| Bytes::from(py_bytes.as_bytes().to_vec()));

pyo3_async_runtimes::tokio::future_into_py(py, async move {
// Check keys for rate limiting quota
// TODO: Consolidate rate limiting
let tasks = keys.iter().map(|key| rate_limiter.until_key_ready(key));
stream::iter(tasks)
.for_each(|key| async move {
key.await;
})
.await;
client
.send_request(method, url, headers, body_vec, timeout_secs)
.send_request(method.into(), url, headers, body, timeout_secs)
.await
.map_err(super::super::http::HttpClientError::into_py_err)
.map_err(HttpClientError::into_py_err)
})
}
}

0 comments on commit 8850ee2

Please sign in to comment.