Skip to content

Commit

Permalink
Add ability to request and subscribe to databento bbo-1s and bbo-1m q…
Browse files Browse the repository at this point in the history
…uotes
  • Loading branch information
faysou committed Dec 3, 2024
1 parent 02571f4 commit 8d06834
Show file tree
Hide file tree
Showing 20 changed files with 1,133 additions and 518 deletions.
38 changes: 34 additions & 4 deletions nautilus_core/adapters/src/databento/python/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl DatabentoHistoricalClient {
}

#[pyo3(name = "get_range_quotes")]
#[pyo3(signature = (dataset, symbols, start, end=None, limit=None, price_precision=None))]
#[pyo3(signature = (dataset, symbols, start, end=None, limit=None, price_precision=None, schema=None))]
#[allow(clippy::too_many_arguments)]
fn py_get_range_quotes<'py>(
&self,
Expand All @@ -194,6 +194,7 @@ impl DatabentoHistoricalClient {
end: Option<u64>,
limit: Option<u64>,
price_precision: Option<u8>,
schema: Option<String>,
) -> PyResult<Bound<'py, PyAny>> {
let client = self.inner.clone();

Expand All @@ -202,12 +203,22 @@ impl DatabentoHistoricalClient {
check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
let dbn_schema = dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?;
match dbn_schema {
dbn::Schema::Mbp1 | dbn::Schema::Bbo1S | dbn::Schema::Bbo1M => (),
_ => {
return Err(to_pyvalue_err(
"Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m",
))
}
};
let params = GetRangeParams::builder()
.dataset(dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(SType::from_str(&stype_in).map_err(to_pyvalue_err)?)
.schema(dbn::Schema::Mbp1)
.schema(dbn_schema)
.limit(limit.and_then(NonZeroU64::new))
.build();

Expand All @@ -226,8 +237,7 @@ impl DatabentoHistoricalClient {
let metadata = decoder.metadata().clone();
let mut result: Vec<QuoteTick> = Vec::new();

while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
let record = dbn::RecordRef::from(msg);
let mut process_record = |record: dbn::RecordRef| -> PyResult<()> {
let instrument_id =
decode_nautilus_instrument_id(&record, &metadata, &publisher_venue_map)
.map_err(to_pyvalue_err)?;
Expand All @@ -244,9 +254,29 @@ impl DatabentoHistoricalClient {
match data {
Some(Data::Quote(quote)) => {
result.push(quote);
Ok(())
}
_ => panic!("Invalid data element not `QuoteTick`, was {data:?}"),
}
};

match dbn_schema {
dbn::Schema::Mbp1 => {
while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
process_record(dbn::RecordRef::from(msg))?;
}
}
dbn::Schema::Bbo1M => {
while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
process_record(dbn::RecordRef::from(msg))?;
}
}
dbn::Schema::Bbo1S => {
while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
process_record(dbn::RecordRef::from(msg))?;
}
}
_ => panic!("Invalid schema {dbn_schema}"),
}

Python::with_gil(|py| Ok(result.into_py(py)))
Expand Down
85 changes: 67 additions & 18 deletions nautilus_trader/adapters/_template/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,16 @@ async def _subscribe(self, data_type: DataType) -> None:
"method `_subscribe` must be implemented in the subclass",
) # pragma: no cover

async def _subscribe_instruments(self) -> None:
async def _subscribe_instruments(self, metadata: dict | None = None) -> None:
raise NotImplementedError(
"method `_subscribe_instruments` must be implemented in the subclass",
) # pragma: no cover

async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None:
async def _subscribe_instrument(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_subscribe_instrument` must be implemented in the subclass",
) # pragma: no cover
Expand All @@ -187,7 +191,7 @@ async def _subscribe_order_book_deltas(
instrument_id: InstrumentId,
book_type: BookType,
depth: int | None = None,
kwargs: dict | None = None,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_subscribe_order_book_deltas` must be implemented in the subclass",
Expand All @@ -198,33 +202,49 @@ async def _subscribe_order_book_snapshots(
instrument_id: InstrumentId,
book_type: BookType,
depth: int | None = None,
kwargs: dict | None = None,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_subscribe_order_book_snapshots` must be implemented in the subclass",
) # pragma: no cover

async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
async def _subscribe_quote_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_subscribe_quote_ticks` must be implemented in the subclass",
) # pragma: no cover

async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:
async def _subscribe_trade_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_subscribe_trade_ticks` must be implemented in the subclass",
) # pragma: no cover

async def _subscribe_bars(self, bar_type: BarType) -> None:
async def _subscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None:
raise NotImplementedError(
"method `_subscribe_bars` must be implemented in the subclass",
) # pragma: no cover

async def _subscribe_instrument_status(self, instrument_id: InstrumentId) -> None:
async def _subscribe_instrument_status(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_subscribe_instrument_status` must be implemented in the subclass",
) # pragma: no cover

async def _subscribe_instrument_close(self, instrument_id: InstrumentId) -> None:
async def _subscribe_instrument_close(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_subscribe_instrument_close` must be implemented in the subclass",
) # pragma: no cover
Expand All @@ -234,47 +254,75 @@ async def _unsubscribe(self, data_type: DataType) -> None:
"method `_unsubscribe` must be implemented in the subclass",
) # pragma: no cover

async def _unsubscribe_instruments(self) -> None:
async def _unsubscribe_instruments(self, metadata: dict | None = None) -> None:
raise NotImplementedError(
"method `_unsubscribe_instruments` must be implemented in the subclass",
) # pragma: no cover

async def _unsubscribe_instrument(self, instrument_id: InstrumentId) -> None:
async def _unsubscribe_instrument(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_unsubscribe_instrument` must be implemented in the subclass",
) # pragma: no cover

async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> None:
async def _unsubscribe_order_book_deltas(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_unsubscribe_order_book_deltas` must be implemented in the subclass",
) # pragma: no cover

async def _unsubscribe_order_book_snapshots(self, instrument_id: InstrumentId) -> None:
async def _unsubscribe_order_book_snapshots(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_unsubscribe_order_book_snapshots` must be implemented in the subclass",
) # pragma: no cover

async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
async def _unsubscribe_quote_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_unsubscribe_quote_tick` must be implemented in the subclass",
) # pragma: no cover

async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:
async def _unsubscribe_trade_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_unsubscribe_trade_ticks` must be implemented in the subclass",
) # pragma: no cover

async def _unsubscribe_bars(self, bar_type: BarType) -> None:
async def _unsubscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None:
raise NotImplementedError(
"method `_unsubscribe_bars` must be implemented in the subclass",
) # pragma: no cover

async def _unsubscribe_instrument_status(self, instrument_id: InstrumentId) -> None:
async def _unsubscribe_instrument_status(
self,
instrument_id: InstrumentId,
params: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_unsubscribe_instrument_status` must be implemented in the subclass",
) # pragma: no cover

async def _unsubscribe_instrument_close(self, instrument_id: InstrumentId) -> None:
async def _unsubscribe_instrument_close(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_unsubscribe_instrument_close` must be implemented in the subclass",
) # pragma: no cover
Expand Down Expand Up @@ -319,6 +367,7 @@ async def _request_order_book_snapshot(
instrument_id: InstrumentId,
limit: int,
correlation_id: UUID4,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_request_quote_tick` must be implemented in the subclass",
Expand Down
64 changes: 52 additions & 12 deletions nautilus_trader/adapters/betfair/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async def _subscribe_order_book_deltas(
instrument_id: InstrumentId,
book_type: BookType,
depth: int | None = None,
kwargs: dict | None = None,
metadata: dict | None = None,
) -> None:
PyCondition.not_none(instrument_id, "instrument_id")

Expand Down Expand Up @@ -232,43 +232,83 @@ async def delayed_subscribe(self, delay=0) -> None:
await self._stream.send_subscription_message(market_ids=list(self._subscribed_market_ids))
self._log.info(f"Added market_ids {self._subscribed_market_ids} for <OrderBook> data")

async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None:
async def _subscribe_instrument(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
self._log.info("Skipping subscribe_instrument, Betfair subscribes as part of orderbook")

async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
async def _subscribe_quote_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
self._log.info("Skipping subscribe_quote_ticks, Betfair subscribes as part of orderbook")

async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:
async def _subscribe_trade_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
self._log.info("Skipping subscribe_trade_ticks, Betfair subscribes as part of orderbook")

async def _subscribe_instruments(self) -> None:
async def _subscribe_instruments(self, metadata: dict | None = None) -> None:
for instrument in self._instrument_provider.list_all():
self._handle_data(instrument)

async def _subscribe_instrument_status(self, instrument_id: InstrumentId) -> None:
async def _subscribe_instrument_status(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
pass # Subscribed as part of orderbook

async def _subscribe_instrument_close(self, instrument_id: InstrumentId) -> None:
async def _subscribe_instrument_close(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
pass # Subscribed as part of orderbook

async def _unsubscribe_order_book_snapshots(self, instrument_id: InstrumentId) -> None:
async def _unsubscribe_order_book_snapshots(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
# TODO - this could be done by removing the market from self.__subscribed_market_ids and resending the
# subscription message - when we have a use case

self._log.warning("Betfair does not support unsubscribing from instruments")

async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> None:
async def _unsubscribe_order_book_deltas(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
# TODO - this could be done by removing the market from self.__subscribed_market_ids and resending the
# subscription message - when we have a use case
self._log.warning("Betfair does not support unsubscribing from instruments")

async def _unsubscribe_instrument(self, instrument_id: InstrumentId) -> None:
async def _unsubscribe_instrument(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
self._log.info("Skipping unsubscribe_instrument, not applicable for Betfair")

async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
async def _unsubscribe_quote_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
self._log.info("Skipping unsubscribe_quote_ticks, not applicable for Betfair")

async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:
async def _unsubscribe_trade_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
self._log.info("Skipping unsubscribe_trade_ticks, not applicable for Betfair")

# -- STREAMS ----------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 8d06834

Please sign in to comment.