Skip to content

Commit

Permalink
Add price_precision option for DatabentoDataLoader
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Nov 28, 2024
1 parent c66aa95 commit 2ac333c
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 85 deletions.
1 change: 1 addition & 0 deletions RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Released on TBD (UTC).
- Added `max_reconnection_tries` to data client config for dYdX (#2066), thanks @davidsblom
- Added wallet subscription for Bybit (#2076), thanks @sunlei
- Added docs clarity on loading historical bars (#2078), thanks @dodofarm
- Added `price_precision` optional parameter for `DatabentoDataLoader` methods
- Improved `Cache` behavior when adding more recent quotes, trades, or bars (now adds to cache)

### Internal Improvements
Expand Down
52 changes: 35 additions & 17 deletions nautilus_core/adapters/src/databento/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ impl DatabentoDataLoader {
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
include_trades: bool,
) -> anyhow::Result<impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>> + '_>
where
Expand All @@ -211,7 +212,7 @@ impl DatabentoDataLoader {
let metadata = decoder.metadata().clone();
let mut dbn_stream = decoder.decode_stream::<T>();

let price_precision = Currency::USD().precision; // Hard-coded for now
let price_precision = price_precision.unwrap_or(Currency::USD().precision);

Ok(std::iter::from_fn(move || {
if let Err(e) = dbn_stream.advance() {
Expand Down Expand Up @@ -256,8 +257,9 @@ impl DatabentoDataLoader {
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<OrderBookDelta>> {
self.read_records::<dbn::MboMsg>(filepath, instrument_id, false)?
self.read_records::<dbn::MboMsg>(filepath, instrument_id, price_precision, false)?
.filter_map(|result| match result {
Ok((Some(item1), _)) => {
if let Data::Delta(delta) = item1 {
Expand All @@ -276,8 +278,9 @@ impl DatabentoDataLoader {
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<OrderBookDepth10>> {
self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, false)?
self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, price_precision, false)?
.filter_map(|result| match result {
Ok((Some(item1), _)) => {
if let Data::Depth10(depth) = item1 {
Expand All @@ -296,8 +299,9 @@ impl DatabentoDataLoader {
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<QuoteTick>> {
self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, false)?
self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, price_precision, false)?
.filter_map(|result| match result {
Ok((Some(item1), _)) => {
if let Data::Quote(quote) = item1 {
Expand All @@ -316,8 +320,9 @@ impl DatabentoDataLoader {
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<QuoteTick>> {
self.read_records::<dbn::BboMsg>(filepath, instrument_id, false)?
self.read_records::<dbn::BboMsg>(filepath, instrument_id, price_precision, false)?
.filter_map(|result| match result {
Ok((Some(item1), _)) => {
if let Data::Quote(quote) = item1 {
Expand All @@ -336,8 +341,9 @@ impl DatabentoDataLoader {
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<TradeTick>> {
self.read_records::<dbn::TbboMsg>(filepath, instrument_id, false)?
self.read_records::<dbn::TbboMsg>(filepath, instrument_id, price_precision, false)?
.filter_map(|result| match result {
Ok((_, maybe_item2)) => {
if let Some(Data::Trade(trade)) = maybe_item2 {
Expand All @@ -355,8 +361,9 @@ impl DatabentoDataLoader {
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<TradeTick>> {
self.read_records::<dbn::TradeMsg>(filepath, instrument_id, false)?
self.read_records::<dbn::TradeMsg>(filepath, instrument_id, price_precision, false)?
.filter_map(|result| match result {
Ok((Some(item1), _)) => {
if let Data::Trade(trade) = item1 {
Expand All @@ -375,8 +382,9 @@ impl DatabentoDataLoader {
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<Vec<Bar>> {
self.read_records::<dbn::OhlcvMsg>(filepath, instrument_id, false)?
self.read_records::<dbn::OhlcvMsg>(filepath, instrument_id, price_precision, false)?
.filter_map(|result| match result {
Ok((Some(item1), _)) => {
if let Data::Bar(bar) = item1 {
Expand Down Expand Up @@ -435,6 +443,7 @@ impl DatabentoDataLoader {
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoImbalance>> + '_>
where
T: dbn::Record + dbn::HasRType + 'static,
Expand All @@ -443,7 +452,7 @@ impl DatabentoDataLoader {
let metadata = decoder.metadata().clone();
let mut dbn_stream = decoder.decode_stream::<T>();

let price_precision = Currency::USD().precision; // Hard-coded for now
let price_precision = price_precision.unwrap_or(Currency::USD().precision);

Ok(std::iter::from_fn(move || {
if let Err(e) = dbn_stream.advance() {
Expand Down Expand Up @@ -484,6 +493,7 @@ impl DatabentoDataLoader {
&self,
filepath: &Path,
instrument_id: Option<InstrumentId>,
price_precision: Option<u8>,
) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoStatistics>> + '_>
where
T: dbn::Record + dbn::HasRType + 'static,
Expand All @@ -492,7 +502,7 @@ impl DatabentoDataLoader {
let metadata = decoder.metadata().clone();
let mut dbn_stream = decoder.decode_stream::<T>();

let price_precision = Currency::USD().precision; // Hard-coded for now
let price_precision = price_precision.unwrap_or(Currency::USD().precision);

Ok(std::iter::from_fn(move || {
if let Err(e) = dbn_stream.advance() {
Expand Down Expand Up @@ -571,7 +581,7 @@ mod tests {
let instrument_id = InstrumentId::from("ESM4.GLBX");

let deltas = loader
.load_order_book_deltas(&path, Some(instrument_id))
.load_order_book_deltas(&path, Some(instrument_id), None)
.unwrap();

assert_eq!(deltas.len(), 2);
Expand All @@ -584,7 +594,7 @@ mod tests {
let instrument_id = InstrumentId::from("ESM4.GLBX");

let depths = loader
.load_order_book_depth10(&path, Some(instrument_id))
.load_order_book_depth10(&path, Some(instrument_id), None)
.unwrap();

assert_eq!(depths.len(), 2);
Expand All @@ -596,7 +606,9 @@ mod tests {
let loader = data_loader();
let instrument_id = InstrumentId::from("ESM4.GLBX");

let quotes = loader.load_quotes(&path, Some(instrument_id)).unwrap();
let quotes = loader
.load_quotes(&path, Some(instrument_id), None)
.unwrap();

assert_eq!(quotes.len(), 2);
}
Expand All @@ -608,7 +620,9 @@ mod tests {
let loader = data_loader();
let instrument_id = InstrumentId::from("ESM4.GLBX");

let quotes = loader.load_bbo_quotes(&path, Some(instrument_id)).unwrap();
let quotes = loader
.load_bbo_quotes(&path, Some(instrument_id), None)
.unwrap();

assert_eq!(quotes.len(), 2);
}
Expand All @@ -619,7 +633,9 @@ mod tests {
let loader = data_loader();
let instrument_id = InstrumentId::from("ESM4.GLBX");

let _trades = loader.load_tbbo_trades(&path, Some(instrument_id)).unwrap();
let _trades = loader
.load_tbbo_trades(&path, Some(instrument_id), None)
.unwrap();

// assert_eq!(trades.len(), 2); TODO: No records?
}
Expand All @@ -630,7 +646,9 @@ mod tests {
let loader = data_loader();

let instrument_id = InstrumentId::from("ESM4.GLBX");
let trades = loader.load_trades(&path, Some(instrument_id)).unwrap();
let trades = loader
.load_trades(&path, Some(instrument_id), None)
.unwrap();

assert_eq!(trades.len(), 2);
}
Expand All @@ -644,7 +662,7 @@ mod tests {
let loader = data_loader();

let instrument_id = InstrumentId::from("ESM4.GLBX");
let bars = loader.load_bars(&path, Some(instrument_id)).unwrap();
let bars = loader.load_bars(&path, Some(instrument_id), None).unwrap();

assert_eq!(bars.len(), 2);
}
Expand Down
27 changes: 17 additions & 10 deletions nautilus_core/adapters/src/databento/python/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ impl DatabentoHistoricalClient {
}

#[pyo3(name = "get_range_quotes")]
#[pyo3(signature = (dataset, symbols, start, end=None, limit=None))]
#[pyo3(signature = (dataset, symbols, start, end=None, limit=None, price_precision=None))]
#[allow(clippy::too_many_arguments)]
fn py_get_range_quotes<'py>(
&self,
py: Python<'py>,
Expand All @@ -192,6 +193,7 @@ impl DatabentoHistoricalClient {
start: u64,
end: Option<u64>,
limit: Option<u64>,
price_precision: Option<u8>,
) -> PyResult<Bound<'py, PyAny>> {
let client = self.inner.clone();

Expand All @@ -209,7 +211,7 @@ impl DatabentoHistoricalClient {
.limit(limit.and_then(NonZeroU64::new))
.build();

let price_precision = Currency::USD().precision; // TODO: Hard-coded for now
let price_precision = price_precision.unwrap_or(Currency::USD().precision);
let publisher_venue_map = self.publisher_venue_map.clone();
let ts_init = self.clock.get_time_ns();

Expand Down Expand Up @@ -252,7 +254,8 @@ impl DatabentoHistoricalClient {
}

#[pyo3(name = "get_range_trades")]
#[pyo3(signature = (dataset, symbols, start, end=None, limit=None))]
#[pyo3(signature = (dataset, symbols, start, end=None, limit=None, price_precision=None))]
#[allow(clippy::too_many_arguments)]
fn py_get_range_trades<'py>(
&self,
py: Python<'py>,
Expand All @@ -261,6 +264,7 @@ impl DatabentoHistoricalClient {
start: u64,
end: Option<u64>,
limit: Option<u64>,
price_precision: Option<u8>,
) -> PyResult<Bound<'py, PyAny>> {
let client = self.inner.clone();

Expand All @@ -278,7 +282,7 @@ impl DatabentoHistoricalClient {
.limit(limit.and_then(NonZeroU64::new))
.build();

let price_precision = Currency::USD().precision; // TODO: Hard-coded for now
let price_precision = price_precision.unwrap_or(Currency::USD().precision);
let publisher_venue_map = self.publisher_venue_map.clone();
let ts_init = self.clock.get_time_ns();

Expand Down Expand Up @@ -322,7 +326,7 @@ impl DatabentoHistoricalClient {

#[pyo3(name = "get_range_bars")]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (dataset, symbols, aggregation, start, end=None, limit=None))]
#[pyo3(signature = (dataset, symbols, aggregation, start, end=None, limit=None, price_precision=None))]
fn py_get_range_bars<'py>(
&self,
py: Python<'py>,
Expand All @@ -332,6 +336,7 @@ impl DatabentoHistoricalClient {
start: u64,
end: Option<u64>,
limit: Option<u64>,
price_precision: Option<u8>,
) -> PyResult<Bound<'py, PyAny>> {
let client = self.inner.clone();

Expand All @@ -356,7 +361,7 @@ impl DatabentoHistoricalClient {
.limit(limit.and_then(NonZeroU64::new))
.build();

let price_precision = Currency::USD().precision; // TODO: Hard-coded for now
let price_precision = price_precision.unwrap_or(Currency::USD().precision);
let publisher_venue_map = self.publisher_venue_map.clone();
let ts_init = self.clock.get_time_ns();

Expand Down Expand Up @@ -400,7 +405,7 @@ impl DatabentoHistoricalClient {

#[pyo3(name = "get_range_imbalance")]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (dataset, symbols, start, end=None, limit=None))]
#[pyo3(signature = (dataset, symbols, start, end=None, limit=None, price_precision=None))]
fn py_get_range_imbalance<'py>(
&self,
py: Python<'py>,
Expand All @@ -409,6 +414,7 @@ impl DatabentoHistoricalClient {
start: u64,
end: Option<u64>,
limit: Option<u64>,
price_precision: Option<u8>,
) -> PyResult<Bound<'py, PyAny>> {
let client = self.inner.clone();

Expand All @@ -426,7 +432,7 @@ impl DatabentoHistoricalClient {
.limit(limit.and_then(NonZeroU64::new))
.build();

let price_precision = Currency::USD().precision; // TODO: Hard-coded for now
let price_precision = price_precision.unwrap_or(Currency::USD().precision);
let publisher_venue_map = self.publisher_venue_map.clone();
let ts_init = self.clock.get_time_ns();

Expand Down Expand Up @@ -459,7 +465,7 @@ impl DatabentoHistoricalClient {

#[pyo3(name = "get_range_statistics")]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (dataset, symbols, start, end=None, limit=None))]
#[pyo3(signature = (dataset, symbols, start, end=None, limit=None, price_precision=None))]
fn py_get_range_statistics<'py>(
&self,
py: Python<'py>,
Expand All @@ -468,6 +474,7 @@ impl DatabentoHistoricalClient {
start: u64,
end: Option<u64>,
limit: Option<u64>,
price_precision: Option<u8>,
) -> PyResult<Bound<'py, PyAny>> {
let client = self.inner.clone();

Expand All @@ -485,7 +492,7 @@ impl DatabentoHistoricalClient {
.limit(limit.and_then(NonZeroU64::new))
.build();

let price_precision = Currency::USD().precision; // TODO: Hard-coded for now
let price_precision = price_precision.unwrap_or(Currency::USD().precision);
let publisher_venue_map = self.publisher_venue_map.clone();
let ts_init = self.clock.get_time_ns();

Expand Down
Loading

0 comments on commit 2ac333c

Please sign in to comment.