Skip to content

Commit

Permalink
Add ability to request and subscribe to databento bbo-1s and bbo-1m q…
Browse files Browse the repository at this point in the history
…uotes
  • Loading branch information
faysou committed Nov 30, 2024
1 parent ff7ecf2 commit 861e188
Show file tree
Hide file tree
Showing 20 changed files with 369 additions and 209 deletions.
38 changes: 34 additions & 4 deletions nautilus_core/adapters/src/databento/python/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl DatabentoHistoricalClient {
}

#[pyo3(name = "get_range_quotes")]
#[pyo3(signature = (dataset, symbols, start, end=None, limit=None, price_precision=None))]
#[pyo3(signature = (dataset, symbols, start, end=None, limit=None, price_precision=None, schema=None))]
#[allow(clippy::too_many_arguments)]
fn py_get_range_quotes<'py>(
&self,
Expand All @@ -194,6 +194,7 @@ impl DatabentoHistoricalClient {
end: Option<u64>,
limit: Option<u64>,
price_precision: Option<u8>,
schema: Option<String>,
) -> PyResult<Bound<'py, PyAny>> {
let client = self.inner.clone();

Expand All @@ -202,12 +203,22 @@ impl DatabentoHistoricalClient {
check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
let end = end.unwrap_or(self.clock.get_time_ns().as_u64());
let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?;
let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
let dbn_schema = dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?;
match dbn_schema {
dbn::Schema::Mbp1 | dbn::Schema::Bbo1S | dbn::Schema::Bbo1M => (),
_ => {
return Err(to_pyvalue_err(
"Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m",
))
}
};
let params = GetRangeParams::builder()
.dataset(dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(SType::from_str(&stype_in).map_err(to_pyvalue_err)?)
.schema(dbn::Schema::Mbp1)
.schema(dbn_schema)
.limit(limit.and_then(NonZeroU64::new))
.build();

Expand All @@ -226,8 +237,7 @@ impl DatabentoHistoricalClient {
let metadata = decoder.metadata().clone();
let mut result: Vec<QuoteTick> = Vec::new();

while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
let record = dbn::RecordRef::from(msg);
let mut process_record = |record: dbn::RecordRef| -> PyResult<()> {
let instrument_id =
decode_nautilus_instrument_id(&record, &metadata, &publisher_venue_map)
.map_err(to_pyvalue_err)?;
Expand All @@ -244,9 +254,29 @@ impl DatabentoHistoricalClient {
match data {
Some(Data::Quote(quote)) => {
result.push(quote);
Ok(())
}
_ => panic!("Invalid data element not `QuoteTick`, was {data:?}"),
}
};

match dbn_schema {
dbn::Schema::Mbp1 => {
while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
process_record(dbn::RecordRef::from(msg))?;
}
}
dbn::Schema::Bbo1M => {
while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
process_record(dbn::RecordRef::from(msg))?;
}
}
dbn::Schema::Bbo1S => {
while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
process_record(dbn::RecordRef::from(msg))?;
}
}
_ => unreachable!(),
}

Python::with_gil(|py| Ok(result.into_py(py)))
Expand Down
7 changes: 6 additions & 1 deletion nautilus_trader/adapters/_template/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,11 @@ async def _subscribe_order_book_snapshots(
"method `_subscribe_order_book_snapshots` must be implemented in the subclass",
) # pragma: no cover

async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
async def _subscribe_quote_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_subscribe_quote_ticks` must be implemented in the subclass",
) # pragma: no cover
Expand Down Expand Up @@ -319,6 +323,7 @@ async def _request_order_book_snapshot(
instrument_id: InstrumentId,
limit: int,
correlation_id: UUID4,
metadata: dict | None = None,
) -> None:
raise NotImplementedError(
"method `_request_quote_tick` must be implemented in the subclass",
Expand Down
6 changes: 5 additions & 1 deletion nautilus_trader/adapters/betfair/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@ async def delayed_subscribe(self, delay=0) -> None:
async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None:
self._log.info("Skipping subscribe_instrument, Betfair subscribes as part of orderbook")

async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
async def _subscribe_quote_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
self._log.info("Skipping subscribe_quote_ticks, Betfair subscribes as part of orderbook")

async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:
Expand Down
15 changes: 10 additions & 5 deletions nautilus_trader/adapters/binance/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,11 @@ async def _order_book_snapshot_then_deltas(self, instrument_id: InstrumentId) ->
LogColor.BLUE,
)

async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
async def _subscribe_quote_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
await self._ws_client.subscribe_book_ticker(instrument_id.symbol.value)

async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:
Expand Down Expand Up @@ -695,6 +699,7 @@ async def _request_order_book_snapshot(
instrument_id: InstrumentId,
limit: int,
correlation_id: UUID4,
metadata: dict | None = None,
) -> None:
if limit not in [5, 10, 20, 50, 100, 500, 1000]:
self._log.error(
Expand All @@ -712,10 +717,10 @@ async def _request_order_book_snapshot(

data_type = DataType(
OrderBookDeltas,
metadata={
"instrument_id": instrument_id,
"limit": limit,
},
metadata=(
{"instrument_id": instrument_id, "limit": limit}
| (metadata if metadata else {})
),
)
self._handle_data_response(
data_type=data_type,
Expand Down
6 changes: 5 additions & 1 deletion nautilus_trader/adapters/bybit/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,11 @@ async def _subscribe_order_book_deltas(
ws_client = self._ws_clients[bybit_symbol.product_type]
await ws_client.subscribe_order_book(bybit_symbol.raw_symbol, depth=depth)

async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
async def _subscribe_quote_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
bybit_symbol = BybitSymbol(instrument_id.symbol.value)
ws_client = self._ws_clients[bybit_symbol.product_type]

Expand Down
33 changes: 31 additions & 2 deletions nautilus_trader/adapters/databento/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,14 +581,30 @@ async def _subscribe_order_book_snapshots(
except asyncio.CancelledError:
self._log.warning("`_subscribe_order_book_snapshots` was canceled while still pending")

async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
async def _subscribe_quote_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
try:
await self._ensure_subscribed_for_instrument(instrument_id)

quote_type = None
if metadata is not None:
quote_type = metadata.get("quote_type")

schema = DatabentoSchema.MBP_1.value
if quote_type is None or quote_type == "mpb-1":
schema = DatabentoSchema.MBP_1.value
elif quote_type == "bbo-1s":
schema = DatabentoSchema.BBO_1S.value
elif quote_type == "bbo-1m":
schema = DatabentoSchema.BBO_1M.value

dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
live_client = self._get_live_client(dataset)
live_client.subscribe(
schema=DatabentoSchema.MBP_1.value,
schema=schema,
symbols=[instrument_id.symbol.value],
)

Expand Down Expand Up @@ -898,11 +914,24 @@ async def _request_quote_ticks(
LogColor.BLUE,
)

quote_type = None
if metadata is not None:
quote_type = metadata.get("quote_type")

schema = None
if quote_type is None or quote_type == "mpb-1":
schema = DatabentoSchema.MBP_1.value
elif quote_type == "bbo-1s":
schema = DatabentoSchema.BBO_1S.value
elif quote_type == "bbo-1m":
schema = DatabentoSchema.BBO_1M.value

pyo3_quotes = await self._http_client.get_range_quotes(
dataset=dataset,
symbols=[instrument_id.symbol.value],
start=start.value,
end=end.value,
schema=schema,
)

quotes = QuoteTick.from_pyo3_list(pyo3_quotes)
Expand Down
6 changes: 5 additions & 1 deletion nautilus_trader/adapters/dydx/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,11 @@ async def _subscribe_order_book_deltas(
if not self._ws_client.has_subscription(subscription):
await self._ws_client.subscribe_order_book(dydx_symbol.raw_symbol)

async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
async def _subscribe_quote_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
self._log.debug(
f"Subscribing deltas {instrument_id} (quotes are not available)",
LogColor.MAGENTA,
Expand Down
6 changes: 5 additions & 1 deletion nautilus_trader/adapters/interactive_brokers/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ async def _subscribe_order_book_snapshots(
"implement the `_subscribe_order_book_snapshots` coroutine", # pragma: no cover
)

async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
async def _subscribe_quote_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
if not (instrument := self._cache.instrument(instrument_id)):
self._log.error(
f"Cannot subscribe to quotes for {instrument_id}: instrument not found",
Expand Down
6 changes: 5 additions & 1 deletion nautilus_trader/adapters/okx/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,11 @@ async def _subscribe_order_book_deltas(
# Copy subscribe method for book deltas to book snapshots (same logic)
_subscribe_order_book_snapshots = _subscribe_order_book_deltas

async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
async def _subscribe_quote_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
if instrument_id in self._tob_client_map:
self._log.warning(
f"Already subscribed to {instrument_id} top-of-book (quotes)",
Expand Down
6 changes: 5 additions & 1 deletion nautilus_trader/adapters/polymarket/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,11 @@ async def _subscribe_order_book_deltas(

await self._subscribe_asset_book(instrument_id)

async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
async def _subscribe_quote_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
await self._subscribe_asset_book(instrument_id)

async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:
Expand Down
6 changes: 5 additions & 1 deletion nautilus_trader/adapters/tardis/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,11 @@ async def _subscribe_order_book_snapshots(
tardis_data_type = f"{tardis_data_type}_{depth}_0ms"
self._subscribe_stream(instrument_id, tardis_data_type, "order book snapshots")

async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
async def _subscribe_quote_ticks(
self,
instrument_id: InstrumentId,
metadata: dict | None = None,
) -> None:
tardis_data_type = convert_nautilus_data_type_to_tardis_data_type(QuoteTick)
self._subscribe_stream(instrument_id, tardis_data_type, "quotes")

Expand Down
3 changes: 2 additions & 1 deletion nautilus_trader/backtest/data_client.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ cdef class BacktestMarketDataClient(MarketDataClient):
self._add_subscription_order_book_snapshots(instrument_id)
# Do nothing else for backtest

cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id):
cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, dict metadata = None):
Condition.not_none(instrument_id, "instrument_id")

if not self._cache.instrument(instrument_id):
Expand Down Expand Up @@ -356,6 +356,7 @@ cdef class BacktestMarketDataClient(MarketDataClient):
InstrumentId instrument_id,
int limit,
UUID4 correlation_id,
dict metadata = None,
):
Condition.not_none(instrument_id, "instrument_id")
Condition.not_none(correlation_id, "correlation_id")
Expand Down
7 changes: 4 additions & 3 deletions nautilus_trader/common/actor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ cdef class Actor(Component):
ClientId client_id=*,
bint managed=*,
)
cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id=*)
cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id=*, str quote_type=*)
cpdef void subscribe_trade_ticks(self, InstrumentId instrument_id, ClientId client_id=*)
cpdef void subscribe_bars(self, BarType bar_type, ClientId client_id=*, bint await_partial=*)
cpdef void subscribe_bars(self, BarType bar_type, ClientId client_id=*, bint await_partial=*, str quote_type=*)
cpdef void subscribe_instrument_status(self, InstrumentId instrument_id, ClientId client_id=*)
cpdef void subscribe_instrument_close(self, InstrumentId instrument_id, ClientId client_id=*)
cpdef void unsubscribe_data(self, DataType data_type, ClientId client_id=*)
Expand Down Expand Up @@ -216,8 +216,8 @@ cdef class Actor(Component):
datetime end=*,
ClientId client_id=*,
callback=*,
str quote_type=*,
bint update_catalog=*,
str quote_type=*,
)
cpdef UUID4 request_trade_ticks(
self,
Expand Down Expand Up @@ -247,6 +247,7 @@ cdef class Actor(Component):
bint include_external_data=*,
bint update_existing_subscriptions=*,
bint update_catalog=*,
str quote_type=*,
)
cpdef bint is_pending_request(self, UUID4 request_id)
cpdef bint has_pending_requests(self)
Expand Down
Loading

0 comments on commit 861e188

Please sign in to comment.