Skip to content

Commit

Permalink
Refine HttpClient
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Nov 18, 2024
1 parent 8850ee2 commit 8b901f0
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 35 deletions.
1 change: 1 addition & 0 deletions RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ None

### Internal Improvements
- Improve live engines error logging (will now log all exceptions rather than just `RuntimeError`)
- Refined `HttpClient` for use directly from Rust
- Upgraded `datafusion` crate to v43.0.0 (#2056), thanks @twitu

### Breaking Changes
Expand Down
35 changes: 12 additions & 23 deletions nautilus_core/network/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use std::{collections::HashMap, hash::Hash, sync::Arc, time::Duration};

use bytes::Bytes;
use futures_util::{stream, StreamExt};
use reqwest::{
header::{HeaderMap, HeaderName},
Method, Response, Url,
Expand Down Expand Up @@ -111,10 +110,10 @@ impl From<String> for HttpClientError {
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.network")
)]
pub struct HttpClient {
/// The underlying HTTP client used to make requests.
pub(crate) client: InnerHttpClient,
/// The rate limiter to control the request rate.
pub(crate) rate_limiter: Arc<RateLimiter<String, MonotonicClock>>,
/// The underlying HTTP client used to make requests.
pub(crate) client: Arc<InnerHttpClient>,
}

impl HttpClient {
Expand All @@ -125,15 +124,15 @@ impl HttpClient {
keyed_quotas: Vec<(String, Quota)>,
default_quota: Option<Quota>,
) -> Self {
let rate_limiter = Arc::new(RateLimiter::new_with_quota(default_quota, keyed_quotas));
let client = InnerHttpClient {
client: reqwest::Client::new(),
header_keys,
header_keys: Arc::new(header_keys),
};
let rate_limiter = Arc::new(RateLimiter::new_with_quota(default_quota, keyed_quotas));

Self {
client,
rate_limiter,
client: Arc::new(client),
}
}

Expand All @@ -156,24 +155,14 @@ impl HttpClient {
method: Method,
url: String,
headers: Option<HashMap<String, String>>,
body: Option<Bytes>,
body: Option<Vec<u8>>,
keys: Option<Vec<String>>,
timeout_secs: Option<u64>,
) -> Result<HttpResponse, HttpClientError> {
let client = self.client.clone();
let rate_limiter = self.rate_limiter.clone();

// Check keys for rate limiting quota
let keys = keys.unwrap_or_default();
let tasks = keys.iter().map(|key| rate_limiter.until_key_ready(key));

stream::iter(tasks)
.for_each(|key| async move {
key.await;
})
.await;

client
rate_limiter.await_keys_ready(keys).await;
self.client
.send_request(method, url, headers, body, timeout_secs)
.await
}
Expand All @@ -190,7 +179,7 @@ impl HttpClient {
#[derive(Clone)]
pub struct InnerHttpClient {
pub(crate) client: reqwest::Client,
pub(crate) header_keys: Vec<String>,
pub(crate) header_keys: Arc<Vec<String>>,
}

impl InnerHttpClient {
Expand All @@ -206,7 +195,7 @@ impl InnerHttpClient {
method: Method,
url: String,
headers: Option<HashMap<String, String>>,
body: Option<Bytes>,
body: Option<Vec<u8>>,
timeout_secs: Option<u64>,
) -> Result<HttpResponse, HttpClientError> {
let headers = headers.unwrap_or_default();
Expand All @@ -233,7 +222,7 @@ impl InnerHttpClient {

let request = match body {
Some(b) => request_builder
.body(b.to_vec())
.body(b)
.build()
.map_err(HttpClientError::from)?,
None => request_builder.build().map_err(HttpClientError::from)?,
Expand Down Expand Up @@ -387,7 +376,7 @@ mod tests {
);

let body_string = serde_json::to_string(&body).unwrap();
let body_bytes = Bytes::from(body_string.into_bytes());
let body_bytes = body_string.into_bytes();

let response = client
.send_request(
Expand Down
15 changes: 3 additions & 12 deletions nautilus_core/network/src/python/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use std::{
};

use bytes::Bytes;
use futures_util::{stream, StreamExt};
use pyo3::{create_exception, exceptions::PyException, prelude::*, types::PyBytes};
use pyo3::{create_exception, exceptions::PyException, prelude::*};

use crate::{
http::{HttpClient, HttpClientError, HttpMethod, HttpResponse},
Expand Down Expand Up @@ -138,24 +137,16 @@ impl HttpClient {
method: HttpMethod,
url: String,
headers: Option<HashMap<String, String>>,
body: Option<Bound<'py, PyBytes>>,
body: Option<Vec<u8>>,
keys: Option<Vec<String>>,
timeout_secs: Option<u64>,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let client = self.client.clone();
let rate_limiter = self.rate_limiter.clone();
let keys = keys.unwrap_or_default();
let body = body.map(|py_bytes| Bytes::from(py_bytes.as_bytes().to_vec()));

pyo3_async_runtimes::tokio::future_into_py(py, async move {
// TODO: Consolidate rate limiting
let tasks = keys.iter().map(|key| rate_limiter.until_key_ready(key));
stream::iter(tasks)
.for_each(|key| async move {
key.await;
})
.await;
rate_limiter.await_keys_ready(keys).await;
client
.send_request(method.into(), url, headers, body, timeout_secs)
.await
Expand Down
12 changes: 12 additions & 0 deletions nautilus_core/network/src/ratelimiter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::{
};

use dashmap::DashMap;
use futures_util::StreamExt;
use tokio::time::sleep;

use self::{
Expand Down Expand Up @@ -191,6 +192,17 @@ where
}
}
}

pub async fn await_keys_ready(&self, keys: Option<Vec<K>>) {
let keys = keys.unwrap_or_default();
let tasks = keys.iter().map(|key| self.until_key_ready(key));

futures::stream::iter(tasks)
.for_each_concurrent(None, |key_future| async move {
key_future.await;
})
.await;
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit 8b901f0

Please sign in to comment.