From dd21f7827d7af6155ca48a742ab9ee1a64e27196 Mon Sep 17 00:00:00 2001 From: Faysal Aberkane Date: Sat, 30 Nov 2024 21:29:31 +0000 Subject: [PATCH] Add ability to request and subscribe to databento bbo-1s and bbo-1m quotes --- .../src/databento/python/historical.rs | 38 +- nautilus_trader/adapters/_template/data.py | 7 +- nautilus_trader/adapters/betfair/data.py | 6 +- nautilus_trader/adapters/binance/data.py | 15 +- nautilus_trader/adapters/bybit/data.py | 6 +- nautilus_trader/adapters/databento/data.py | 25 +- nautilus_trader/adapters/dydx/data.py | 6 +- .../adapters/interactive_brokers/data.py | 6 +- nautilus_trader/adapters/okx/data.py | 6 +- nautilus_trader/adapters/polymarket/data.py | 6 +- nautilus_trader/adapters/tardis/data.py | 6 +- nautilus_trader/backtest/data_client.pyx | 3 +- nautilus_trader/common/actor.pxd | 7 +- nautilus_trader/common/actor.pyx | 30 +- nautilus_trader/core/nautilus_pyo3.pyi | 2 + nautilus_trader/data/client.pxd | 5 +- nautilus_trader/data/client.pyx | 3 +- nautilus_trader/data/engine.pxd | 15 +- nautilus_trader/data/engine.pyx | 361 ++++++++++-------- nautilus_trader/live/data_client.py | 17 +- 20 files changed, 361 insertions(+), 209 deletions(-) diff --git a/nautilus_core/adapters/src/databento/python/historical.rs b/nautilus_core/adapters/src/databento/python/historical.rs index c84d8389e8a8..fff1be629637 100644 --- a/nautilus_core/adapters/src/databento/python/historical.rs +++ b/nautilus_core/adapters/src/databento/python/historical.rs @@ -183,7 +183,7 @@ impl DatabentoHistoricalClient { } #[pyo3(name = "get_range_quotes")] - #[pyo3(signature = (dataset, symbols, start, end=None, limit=None, price_precision=None))] + #[pyo3(signature = (dataset, symbols, start, end=None, limit=None, price_precision=None, schema=None))] #[allow(clippy::too_many_arguments)] fn py_get_range_quotes<'py>( &self, @@ -194,6 +194,7 @@ impl DatabentoHistoricalClient { end: Option, limit: Option, price_precision: Option, + schema: Option, ) -> PyResult> { let client = self.inner.clone(); @@ -202,12 +203,22 @@ impl DatabentoHistoricalClient { check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?; let end = end.unwrap_or(self.clock.get_time_ns().as_u64()); let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?; + let schema = schema.unwrap_or_else(|| "mbp-1".to_string()); + let dbn_schema = dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?; + match dbn_schema { + dbn::Schema::Mbp1 | dbn::Schema::Bbo1S | dbn::Schema::Bbo1M => (), + _ => { + return Err(to_pyvalue_err( + "Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m", + )) + } + }; let params = GetRangeParams::builder() .dataset(dataset) .date_time_range(time_range) .symbols(symbols) .stype_in(SType::from_str(&stype_in).map_err(to_pyvalue_err)?) - .schema(dbn::Schema::Mbp1) + .schema(dbn_schema) .limit(limit.and_then(NonZeroU64::new)) .build(); @@ -226,8 +237,7 @@ impl DatabentoHistoricalClient { let metadata = decoder.metadata().clone(); let mut result: Vec = Vec::new(); - while let Ok(Some(msg)) = decoder.decode_record::().await { - let record = dbn::RecordRef::from(msg); + let mut process_record = |record: dbn::RecordRef| -> PyResult<()> { let instrument_id = decode_nautilus_instrument_id(&record, &metadata, &publisher_venue_map) .map_err(to_pyvalue_err)?; @@ -244,9 +254,29 @@ impl DatabentoHistoricalClient { match data { Some(Data::Quote(quote)) => { result.push(quote); + Ok(()) } _ => panic!("Invalid data element not `QuoteTick`, was {data:?}"), } + }; + + match dbn_schema { + dbn::Schema::Mbp1 => { + while let Ok(Some(msg)) = decoder.decode_record::().await { + process_record(dbn::RecordRef::from(msg))?; + } + } + dbn::Schema::Bbo1M => { + while let Ok(Some(msg)) = decoder.decode_record::().await { + process_record(dbn::RecordRef::from(msg))?; + } + } + dbn::Schema::Bbo1S => { + while let Ok(Some(msg)) = decoder.decode_record::().await { + process_record(dbn::RecordRef::from(msg))?; + } + } + _ => unreachable!(), } Python::with_gil(|py| Ok(result.into_py(py))) diff --git a/nautilus_trader/adapters/_template/data.py b/nautilus_trader/adapters/_template/data.py index cb43a26d20f3..adb9f6d65bee 100644 --- a/nautilus_trader/adapters/_template/data.py +++ b/nautilus_trader/adapters/_template/data.py @@ -204,7 +204,11 @@ async def _subscribe_order_book_snapshots( "method `_subscribe_order_book_snapshots` must be implemented in the subclass", ) # pragma: no cover - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( "method `_subscribe_quote_ticks` must be implemented in the subclass", ) # pragma: no cover @@ -319,6 +323,7 @@ async def _request_order_book_snapshot( instrument_id: InstrumentId, limit: int, correlation_id: UUID4, + metadata: dict | None = None, ) -> None: raise NotImplementedError( "method `_request_quote_tick` must be implemented in the subclass", diff --git a/nautilus_trader/adapters/betfair/data.py b/nautilus_trader/adapters/betfair/data.py index 956b9ecd7243..aed5c2898a98 100644 --- a/nautilus_trader/adapters/betfair/data.py +++ b/nautilus_trader/adapters/betfair/data.py @@ -235,7 +235,11 @@ async def delayed_subscribe(self, delay=0) -> None: async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None: self._log.info("Skipping subscribe_instrument, Betfair subscribes as part of orderbook") - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.info("Skipping subscribe_quote_ticks, Betfair subscribes as part of orderbook") async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: diff --git a/nautilus_trader/adapters/binance/data.py b/nautilus_trader/adapters/binance/data.py index cc3e50748934..9580119db317 100644 --- a/nautilus_trader/adapters/binance/data.py +++ b/nautilus_trader/adapters/binance/data.py @@ -446,7 +446,11 @@ async def _order_book_snapshot_then_deltas(self, instrument_id: InstrumentId) -> LogColor.BLUE, ) - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: await self._ws_client.subscribe_book_ticker(instrument_id.symbol.value) async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: @@ -695,6 +699,7 @@ async def _request_order_book_snapshot( instrument_id: InstrumentId, limit: int, correlation_id: UUID4, + metadata: dict | None = None, ) -> None: if limit not in [5, 10, 20, 50, 100, 500, 1000]: self._log.error( @@ -712,10 +717,10 @@ async def _request_order_book_snapshot( data_type = DataType( OrderBookDeltas, - metadata={ - "instrument_id": instrument_id, - "limit": limit, - }, + metadata=( + {"instrument_id": instrument_id, "limit": limit} + | (metadata if metadata else {}) + ), ) self._handle_data_response( data_type=data_type, diff --git a/nautilus_trader/adapters/bybit/data.py b/nautilus_trader/adapters/bybit/data.py index 8bff21c6184e..fb0553e16ef4 100644 --- a/nautilus_trader/adapters/bybit/data.py +++ b/nautilus_trader/adapters/bybit/data.py @@ -314,7 +314,11 @@ async def _subscribe_order_book_deltas( ws_client = self._ws_clients[bybit_symbol.product_type] await ws_client.subscribe_order_book(bybit_symbol.raw_symbol, depth=depth) - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: bybit_symbol = BybitSymbol(instrument_id.symbol.value) ws_client = self._ws_clients[bybit_symbol.product_type] diff --git a/nautilus_trader/adapters/databento/data.py b/nautilus_trader/adapters/databento/data.py index aed21be26a75..8bfaeabde9a9 100644 --- a/nautilus_trader/adapters/databento/data.py +++ b/nautilus_trader/adapters/databento/data.py @@ -581,14 +581,29 @@ async def _subscribe_order_book_snapshots( except asyncio.CancelledError: self._log.warning("`_subscribe_order_book_snapshots` was canceled while still pending") - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: try: await self._ensure_subscribed_for_instrument(instrument_id) + quote_type = None + if metadata is not None: + quote_type = metadata.get("quote_type") + + if quote_type is None or quote_type == "mpb-1": + schema = DatabentoSchema.MBP_1.value + elif quote_type == "bbo-1s": + schema = DatabentoSchema.BBO_1S.value + elif quote_type == "bbo-1m": + schema = DatabentoSchema.BBO_1M.value + dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) live_client = self._get_live_client(dataset) live_client.subscribe( - schema=DatabentoSchema.MBP_1.value, + schema=schema, symbols=[instrument_id.symbol.value], ) @@ -898,11 +913,17 @@ async def _request_quote_ticks( LogColor.BLUE, ) + # schema can only be "mpb-1", "bbo-1s" or "bbo-1m", this is tested in rust + schema = None + if metadata is not None: + schema = metadata.get("quote_type") + pyo3_quotes = await self._http_client.get_range_quotes( dataset=dataset, symbols=[instrument_id.symbol.value], start=start.value, end=end.value, + schema=schema, ) quotes = QuoteTick.from_pyo3_list(pyo3_quotes) diff --git a/nautilus_trader/adapters/dydx/data.py b/nautilus_trader/adapters/dydx/data.py index aba0dc25d232..03948f94a9c5 100644 --- a/nautilus_trader/adapters/dydx/data.py +++ b/nautilus_trader/adapters/dydx/data.py @@ -712,7 +712,11 @@ async def _subscribe_order_book_deltas( if not self._ws_client.has_subscription(subscription): await self._ws_client.subscribe_order_book(dydx_symbol.raw_symbol) - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.debug( f"Subscribing deltas {instrument_id} (quotes are not available)", LogColor.MAGENTA, diff --git a/nautilus_trader/adapters/interactive_brokers/data.py b/nautilus_trader/adapters/interactive_brokers/data.py index 513cb3c7ab21..00fcad51cd83 100644 --- a/nautilus_trader/adapters/interactive_brokers/data.py +++ b/nautilus_trader/adapters/interactive_brokers/data.py @@ -165,7 +165,11 @@ async def _subscribe_order_book_snapshots( "implement the `_subscribe_order_book_snapshots` coroutine", # pragma: no cover ) - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: if not (instrument := self._cache.instrument(instrument_id)): self._log.error( f"Cannot subscribe to quotes for {instrument_id}: instrument not found", diff --git a/nautilus_trader/adapters/okx/data.py b/nautilus_trader/adapters/okx/data.py index 7796dd0315a7..b8f522303058 100644 --- a/nautilus_trader/adapters/okx/data.py +++ b/nautilus_trader/adapters/okx/data.py @@ -300,7 +300,11 @@ async def _subscribe_order_book_deltas( # Copy subscribe method for book deltas to book snapshots (same logic) _subscribe_order_book_snapshots = _subscribe_order_book_deltas - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: if instrument_id in self._tob_client_map: self._log.warning( f"Already subscribed to {instrument_id} top-of-book (quotes)", diff --git a/nautilus_trader/adapters/polymarket/data.py b/nautilus_trader/adapters/polymarket/data.py index 574574444655..32ff98da0d85 100644 --- a/nautilus_trader/adapters/polymarket/data.py +++ b/nautilus_trader/adapters/polymarket/data.py @@ -240,7 +240,11 @@ async def _subscribe_order_book_deltas( await self._subscribe_asset_book(instrument_id) - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: await self._subscribe_asset_book(instrument_id) async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: diff --git a/nautilus_trader/adapters/tardis/data.py b/nautilus_trader/adapters/tardis/data.py index cee8a5ca3727..4ffbe6d8d50f 100644 --- a/nautilus_trader/adapters/tardis/data.py +++ b/nautilus_trader/adapters/tardis/data.py @@ -283,7 +283,11 @@ async def _subscribe_order_book_snapshots( tardis_data_type = f"{tardis_data_type}_{depth}_0ms" self._subscribe_stream(instrument_id, tardis_data_type, "order book snapshots") - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: tardis_data_type = convert_nautilus_data_type_to_tardis_data_type(QuoteTick) self._subscribe_stream(instrument_id, tardis_data_type, "quotes") diff --git a/nautilus_trader/backtest/data_client.pyx b/nautilus_trader/backtest/data_client.pyx index 44260d630706..3ac8163c7875 100644 --- a/nautilus_trader/backtest/data_client.pyx +++ b/nautilus_trader/backtest/data_client.pyx @@ -206,7 +206,7 @@ cdef class BacktestMarketDataClient(MarketDataClient): self._add_subscription_order_book_snapshots(instrument_id) # Do nothing else for backtest - cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id): + cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, dict metadata = None): Condition.not_none(instrument_id, "instrument_id") if not self._cache.instrument(instrument_id): @@ -356,6 +356,7 @@ cdef class BacktestMarketDataClient(MarketDataClient): InstrumentId instrument_id, int limit, UUID4 correlation_id, + dict metadata = None, ): Condition.not_none(instrument_id, "instrument_id") Condition.not_none(correlation_id, "correlation_id") diff --git a/nautilus_trader/common/actor.pxd b/nautilus_trader/common/actor.pxd index a2b08e936bde..0706f38b07bc 100644 --- a/nautilus_trader/common/actor.pxd +++ b/nautilus_trader/common/actor.pxd @@ -158,9 +158,9 @@ cdef class Actor(Component): ClientId client_id=*, bint managed=*, ) - cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id=*) + cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id=*, str quote_type=*) cpdef void subscribe_trade_ticks(self, InstrumentId instrument_id, ClientId client_id=*) - cpdef void subscribe_bars(self, BarType bar_type, ClientId client_id=*, bint await_partial=*) + cpdef void subscribe_bars(self, BarType bar_type, ClientId client_id=*, bint await_partial=*, str quote_type=*) cpdef void subscribe_instrument_status(self, InstrumentId instrument_id, ClientId client_id=*) cpdef void subscribe_instrument_close(self, InstrumentId instrument_id, ClientId client_id=*) cpdef void unsubscribe_data(self, DataType data_type, ClientId client_id=*) @@ -216,8 +216,8 @@ cdef class Actor(Component): datetime end=*, ClientId client_id=*, callback=*, - str quote_type=*, bint update_catalog=*, + str quote_type=*, ) cpdef UUID4 request_trade_ticks( self, @@ -247,6 +247,7 @@ cdef class Actor(Component): bint include_external_data=*, bint update_existing_subscriptions=*, bint update_catalog=*, + str quote_type=*, ) cpdef bint is_pending_request(self, UUID4 request_id) cpdef bint has_pending_requests(self) diff --git a/nautilus_trader/common/actor.pyx b/nautilus_trader/common/actor.pyx index f02890854455..2ad3b8c25efb 100644 --- a/nautilus_trader/common/actor.pyx +++ b/nautilus_trader/common/actor.pyx @@ -1334,7 +1334,7 @@ cdef class Actor(Component): self._send_data_cmd(command) - cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id = None): + cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id = None, str quote_type = None): """ Subscribe to streaming `QuoteTick` data for the given instrument ID. @@ -1345,6 +1345,8 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. + quote_type : str, default None + The specified quote type applicable to certain client implementations. """ Condition.not_none(instrument_id, "instrument_id") @@ -1360,7 +1362,7 @@ cdef class Actor(Component): cdef Subscribe command = Subscribe( client_id=client_id, venue=instrument_id.venue, - data_type=DataType(QuoteTick, metadata={"instrument_id": instrument_id}), + data_type=DataType(QuoteTick, metadata={"instrument_id": instrument_id, "quote_type": quote_type}), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) @@ -1405,6 +1407,7 @@ cdef class Actor(Component): BarType bar_type, ClientId client_id = None, bint await_partial = False, + str quote_type = None, ): """ Subscribe to streaming `Bar` data for the given bar type. @@ -1419,6 +1422,8 @@ cdef class Actor(Component): await_partial : bool, default False If the bar aggregator should await the arrival of a historical partial bar prior to actively aggregating new bars. + quote_type : str, default None + The specified quote type applicable to certain client implementations. """ Condition.not_none(bar_type, "bar_type") @@ -1432,6 +1437,7 @@ cdef class Actor(Component): cdef dict metadata = { "bar_type": bar_type, "await_partial": await_partial, + "quote_type": quote_type, } cdef Subscribe command = Subscribe( @@ -2171,8 +2177,8 @@ cdef class Actor(Component): datetime end = None, ClientId client_id = None, callback: Callable[[UUID4], None] | None = None, - str quote_type = "", bint update_catalog = False, + str quote_type = None, ): """ Request historical `QuoteTick` data. @@ -2194,10 +2200,10 @@ cdef class Actor(Component): callback : Callable[[UUID4], None], optional The registered callback, to be called with the request ID when the response has completed processing. - quote_type : str, default '' - The specified quote type applicable to certain client implementations. update_catalog : bool, default False If True then updates the catalog with new data received from a client. + quote_type : str, default None + The specified quote type applicable to certain client implementations. Returns ------- @@ -2421,6 +2427,7 @@ cdef class Actor(Component): bint include_external_data = False, bint update_existing_subscriptions = False, bint update_catalog = False, + str quote_type = None, ): """ Request historical aggregated `Bar` data for multiple bar types. @@ -2453,6 +2460,8 @@ cdef class Actor(Component): If True, updates the aggregators of any existing subscription with the queried external data. update_catalog : bool, default False If True then updates the catalog with new data received from a client. + quote_type : str, default None + The specified quote type applicable to certain client implementations. Returns ------- @@ -2494,14 +2503,14 @@ cdef class Actor(Component): return first = bar_types[0] - market_data_type = "" + bars_market_data_type = "" if first.is_composite(): - market_data_type = "bars" + bars_market_data_type = "bars" elif first.spec.price_type == PriceType.LAST: - market_data_type = "trade_ticks" + bars_market_data_type = "trade_ticks" else: - market_data_type = "quote_ticks" + bars_market_data_type = "quote_ticks" cdef UUID4 request_id = UUID4() cdef DataRequest request = DataRequest( @@ -2509,13 +2518,14 @@ cdef class Actor(Component): venue=first.instrument_id.venue, data_type=DataType(Bar, metadata={ "bar_types": tuple(bar_types), - "market_data_type": market_data_type, + "bars_market_data_type": bars_market_data_type, "instrument_id": first.instrument_id, "bar_type": first.composite(), "start": start, "end": end, "include_external_data": include_external_data, "update_existing_subscriptions": update_existing_subscriptions, + "quote_type": quote_type, "update_catalog": update_catalog, }), callback=self._handle_aggregated_bars_response, diff --git a/nautilus_trader/core/nautilus_pyo3.pyi b/nautilus_trader/core/nautilus_pyo3.pyi index c392f320bc7b..e67973cd366a 100644 --- a/nautilus_trader/core/nautilus_pyo3.pyi +++ b/nautilus_trader/core/nautilus_pyo3.pyi @@ -4019,6 +4019,8 @@ class DatabentoHistoricalClient: start: int, end: int | None = None, limit: int | None = None, + price_precision: int | None = None, + schema: str | None = None, ) -> list[QuoteTick]: ... async def get_range_trades( self, diff --git a/nautilus_trader/data/client.pxd b/nautilus_trader/data/client.pxd index 7a615f62d649..fc55d4097b90 100644 --- a/nautilus_trader/data/client.pxd +++ b/nautilus_trader/data/client.pxd @@ -86,7 +86,7 @@ cdef class MarketDataClient(DataClient): cpdef void subscribe_instrument(self, InstrumentId instrument_id) cpdef void subscribe_order_book_deltas(self, InstrumentId instrument_id, BookType book_type, int depth=*, dict kwargs=*) cpdef void subscribe_order_book_snapshots(self, InstrumentId instrument_id, BookType book_type, int depth=*, dict kwargs=*) - cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id) + cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, dict metadata=*) cpdef void subscribe_trade_ticks(self, InstrumentId instrument_id) cpdef void subscribe_bars(self, BarType bar_type) cpdef void subscribe_instrument_status(self, InstrumentId instrument_id) @@ -140,7 +140,8 @@ cdef class MarketDataClient(DataClient): self, InstrumentId instrument_id, int limit, - UUID4 correlation_id + UUID4 correlation_id, + dict metadata=*, ) cpdef void request_quote_ticks( self, diff --git a/nautilus_trader/data/client.pyx b/nautilus_trader/data/client.pyx index 7b572593b1a0..31571991ed9f 100644 --- a/nautilus_trader/data/client.pyx +++ b/nautilus_trader/data/client.pyx @@ -434,7 +434,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `subscribe_order_book_snapshots` must be implemented in the subclass") - cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id): + cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, dict metadata = None): """ Subscribe to `QuoteTick` data for the given instrument ID. @@ -826,6 +826,7 @@ cdef class MarketDataClient(DataClient): InstrumentId instrument_id, int limit, UUID4 correlation_id, + dict metadata = None, ): """ Request order book snapshot data. diff --git a/nautilus_trader/data/engine.pxd b/nautilus_trader/data/engine.pxd index 59bfc385861c..f0bcee964a60 100644 --- a/nautilus_trader/data/engine.pxd +++ b/nautilus_trader/data/engine.pxd @@ -13,9 +13,11 @@ # limitations under the License. # ------------------------------------------------------------------------------------------------- +from cpython.datetime cimport datetime from libc.stdint cimport uint64_t from nautilus_trader.persistence.catalog import ParquetDataCatalog + from nautilus_trader.cache.cache cimport Cache from nautilus_trader.common.component cimport Component from nautilus_trader.common.component cimport TimeEvent @@ -133,11 +135,11 @@ cdef class DataEngine(Component): cpdef void _handle_subscribe_order_book(self, MarketDataClient client, InstrumentId instrument_id, dict metadata) # noqa cpdef void _setup_order_book(self, MarketDataClient client, InstrumentId instrument_id, dict metadata, bint only_deltas, bint managed) # noqa cpdef void _create_new_book(self, Instrument instrument, BookType book_type) - cpdef void _handle_subscribe_quote_ticks(self, MarketDataClient client, InstrumentId instrument_id) + cpdef void _handle_subscribe_quote_ticks(self, MarketDataClient client, InstrumentId instrument_id, dict metadata) cpdef void _handle_subscribe_synthetic_quote_ticks(self, InstrumentId instrument_id) cpdef void _handle_subscribe_trade_ticks(self, MarketDataClient client, InstrumentId instrument_id) cpdef void _handle_subscribe_synthetic_trade_ticks(self, InstrumentId instrument_id) - cpdef void _handle_subscribe_bars(self, MarketDataClient client, BarType bar_type, bint await_partial) + cpdef void _handle_subscribe_bars(self, MarketDataClient client, BarType bar_type, bint await_partial, dict metadata) cpdef void _handle_subscribe_data(self, DataClient client, DataType data_type) cpdef void _handle_subscribe_instrument_status(self, MarketDataClient client, InstrumentId instrument_id) cpdef void _handle_subscribe_instrument_close(self, MarketDataClient client, InstrumentId instrument_id) @@ -149,6 +151,13 @@ cdef class DataEngine(Component): cpdef void _handle_unsubscribe_bars(self, MarketDataClient client, BarType bar_type) cpdef void _handle_unsubscribe_data(self, DataClient client, DataType data_type) cpdef void _handle_request(self, DataRequest request) + cpdef void _handle_request_instruments(self, DataRequest request, DataClient client, datetime start, datetime end, dict metadata) + cpdef void _handle_request_instrument(self, DataRequest request, DataClient client, InstrumentId instrument_id, datetime start, datetime end, dict metadata) + cpdef void _handle_request_order_book_deltas(self, DataRequest request, DataClient client, dict metadata) + cpdef void _handle_request_quote_ticks(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now, dict metadata) + cpdef void _handle_request_trade_ticks(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now, dict metadata) + cpdef void _handle_request_bars(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now, dict metadata) + cpdef void _handle_request_data(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now) cpdef void _query_catalog(self, DataRequest request) # -- DATA HANDLERS -------------------------------------------------------------------------------- @@ -185,7 +194,7 @@ cdef class DataEngine(Component): cpdef void _update_order_book(self, Data data) cpdef void _snapshot_order_book(self, TimeEvent snap_event) cpdef void _publish_order_book(self, InstrumentId instrument_id, str topic) - cpdef void _start_bar_aggregator(self, MarketDataClient client, BarType bar_type, bint await_partial) + cpdef void _start_bar_aggregator(self, MarketDataClient client, BarType bar_type, bint await_partial, dict metadata) cpdef void _stop_bar_aggregator(self, MarketDataClient client, BarType bar_type) cpdef void _update_synthetics_with_quote(self, list synthetics, QuoteTick update) cpdef void _update_synthetic_with_quote(self, SyntheticInstrument synthetic, QuoteTick update) diff --git a/nautilus_trader/data/engine.pyx b/nautilus_trader/data/engine.pyx index c967aedc8f45..3424955aa6ab 100644 --- a/nautilus_trader/data/engine.pyx +++ b/nautilus_trader/data/engine.pyx @@ -681,6 +681,7 @@ cdef class DataEngine(Component): self._handle_subscribe_quote_ticks( client, command.data_type.metadata.get("instrument_id"), + command.data_type.metadata, ) elif command.data_type.type == TradeTick: self._handle_subscribe_trade_ticks( @@ -692,6 +693,7 @@ cdef class DataEngine(Component): client, command.data_type.metadata.get("bar_type"), command.data_type.metadata.get("await_partial"), + command.data_type.metadata, ) elif command.data_type.type == InstrumentStatus: self._handle_subscribe_instrument_status( @@ -936,6 +938,7 @@ cdef class DataEngine(Component): self, MarketDataClient client, InstrumentId instrument_id, + dict metadata, ): Condition.not_none(instrument_id, "instrument_id") if instrument_id.is_synthetic(): @@ -944,7 +947,7 @@ cdef class DataEngine(Component): Condition.not_none(client, "client") if instrument_id not in client.subscribed_quote_ticks(): - client.subscribe_quote_ticks(instrument_id) + client.subscribe_quote_ticks(instrument_id, metadata) cpdef void _handle_subscribe_synthetic_quote_ticks(self, InstrumentId instrument_id): cdef SyntheticInstrument synthetic = self._cache.synthetic(instrument_id) @@ -1017,6 +1020,7 @@ cdef class DataEngine(Component): MarketDataClient client, BarType bar_type, bint await_partial, + dict metadata, ): Condition.not_none(client, "client") Condition.not_none(bar_type, "bar_type") @@ -1024,7 +1028,7 @@ cdef class DataEngine(Component): if bar_type.is_internally_aggregated(): # Internal aggregation if bar_type.standard() not in self._bar_aggregators: - self._start_bar_aggregator(client, bar_type, await_partial) + self._start_bar_aggregator(client, bar_type, await_partial, metadata) else: # External aggregation if bar_type.instrument_id.is_synthetic(): @@ -1305,8 +1309,7 @@ cdef class DataEngine(Component): Condition.is_true(isinstance(client, MarketDataClient), "client was not a MarketDataClient") cdef dict[str, object] metadata = request.data_type.metadata - cdef str aggregated_bars_market_data_type = metadata.get("market_data_type", "") - cdef bint update_catalog = metadata.get("update_catalog", False) + cdef str bars_market_data_type = metadata.get("bars_market_data_type", "") cdef datetime now = self._clock.utc_now() cdef datetime start = time_object_to_dt(metadata.get("start")) # Can be None @@ -1316,180 +1319,204 @@ cdef class DataEngine(Component): instrument_id = request.data_type.metadata.get("instrument_id") if instrument_id is None: - if self._catalogs and not update_catalog: - self._query_catalog(request) - return - - if client is None: - self._log.error( - f"Cannot handle request: " - f"no client registered for '{request.client_id}', {request}") - return # No client to handle request - - client.request_instruments( - request.data_type.metadata.get("venue"), - request.id, - start, - end, - metadata, - ) + self._handle_request_instruments(request, client, start, end, metadata) else: - last_timestamp, _ = self._catalogs_last_timestamp( - Instrument, - instrument_id, - ) + self._handle_request_instrument(request, client, instrument_id, start, end, metadata) + elif request.data_type.type == OrderBookDeltas: + self._handle_request_order_book_deltas(request, client, metadata) + elif request.data_type.type == QuoteTick or bars_market_data_type == "quote_ticks": + self._handle_request_quote_ticks(request, client, start, end, now, metadata) + elif request.data_type.type == TradeTick or bars_market_data_type == "trade_ticks": + self._handle_request_trade_ticks(request, client, start, end, now, metadata) + elif request.data_type.type == Bar or bars_market_data_type == "bars": + self._handle_request_bars(request, client, start, end, now, metadata) + else: + self._handle_request_data(request, client, start, end, now) - if last_timestamp: - self._query_catalog(request) - return + cpdef void _handle_request_instruments(self, DataRequest request, DataClient client, datetime start, datetime end, dict metadata): + cdef bint update_catalog = metadata.get("update_catalog", False) - if client is None: - self._log.error( - f"Cannot handle request: " - f"no client registered for '{request.client_id}', {request}") - return # No client to handle request - - client.request_instrument( - instrument_id, - request.id, - start, - end, - metadata, - ) - elif request.data_type.type == OrderBookDeltas: - instrument_id = request.data_type.metadata.get("instrument_id") + if self._catalogs and not update_catalog: + self._query_catalog(request) + return - if client is None: - self._log.error( - f"Cannot handle request: " - f"no client registered for '{request.client_id}', {request}") - return # No client to handle request - - client.request_order_book_snapshot( - instrument_id, - request.data_type.metadata.get("limit", 0), - request.id - ) - elif request.data_type.type == QuoteTick or aggregated_bars_market_data_type == "quote_ticks": - instrument_id = request.data_type.metadata.get("instrument_id") + if client is None: + self._log.error( + f"Cannot handle request: " + f"no client registered for '{request.client_id}', {request}") + return # No client to handle request + + client.request_instruments( + request.data_type.metadata.get("venue"), + request.id, + start, + end, + metadata, + ) - last_timestamp, _ = self._catalogs_last_timestamp( - QuoteTick, - instrument_id, - ) + cpdef void _handle_request_instrument(self, DataRequest request, DataClient client, InstrumentId instrument_id, datetime start, datetime end, dict metadata): + last_timestamp, _ = self._catalogs_last_timestamp( + Instrument, + instrument_id, + ) - if last_timestamp: - if (now <= last_timestamp) or (end and end <= last_timestamp): - self._query_catalog(request) - return + if last_timestamp: + self._query_catalog(request) + return - if client is None: - self._log.error( - f"Cannot handle request: " - f"no client registered for '{request.client_id}', {request}") - return # No client to handle request + if client is None: + self._log.error( + f"Cannot handle request: " + f"no client registered for '{request.client_id}', {request}") + return # No client to handle request - if last_timestamp and start and start <= last_timestamp: - self._new_query_group(request.id, 2) - self._query_catalog(request) + client.request_instrument( + instrument_id, + request.id, + start, + end, + metadata, + ) - client_start = max_date(start, last_timestamp) - client.request_quote_ticks( - instrument_id, - request.data_type.metadata.get("limit", 0), - request.id, - client_start, - end, - metadata, - ) - elif request.data_type.type == TradeTick or aggregated_bars_market_data_type == "trade_ticks": - instrument_id = request.data_type.metadata.get("instrument_id") + cpdef void _handle_request_order_book_deltas(self, DataRequest request, DataClient client, dict metadata): + instrument_id = request.data_type.metadata.get("instrument_id") - last_timestamp, _ = self._catalogs_last_timestamp( - TradeTick, - instrument_id, - ) + if client is None: + self._log.error( + f"Cannot handle request: " + f"no client registered for '{request.client_id}', {request}") + return # No client to handle request - if last_timestamp: - if (now <= last_timestamp) or (end and end <= last_timestamp): - self._query_catalog(request) - return + client.request_order_book_snapshot( + instrument_id, + request.data_type.metadata.get("limit", 0), + request.id, + metadata, + ) - if client is None: - self._log.error( - f"Cannot handle request: " - f"no client registered for '{request.client_id}', {request}") - return # No client to handle request + cpdef void _handle_request_quote_ticks(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now, dict metadata): + instrument_id = request.data_type.metadata.get("instrument_id") - if last_timestamp and start and start <= last_timestamp: - self._new_query_group(request.id, 2) + last_timestamp, _ = self._catalogs_last_timestamp( + QuoteTick, + instrument_id, + ) + + if last_timestamp: + if (now <= last_timestamp) or (end and end <= last_timestamp): self._query_catalog(request) + return - client_start = max_date(start, last_timestamp) - client.request_trade_ticks( - instrument_id, - request.data_type.metadata.get("limit", 0), - request.id, - client_start, - end, - metadata, - ) - elif request.data_type.type == Bar or aggregated_bars_market_data_type == "bars": - bar_type = request.data_type.metadata.get("bar_type") + if client is None: + self._log.error( + f"Cannot handle request: " + f"no client registered for '{request.client_id}', {request}") + return # No client to handle request - last_timestamp, _ = self._catalogs_last_timestamp( - Bar, - bar_type=bar_type, - ) + if last_timestamp and start and start <= last_timestamp: + self._new_query_group(request.id, 2) + self._query_catalog(request) - if last_timestamp: - if (now <= last_timestamp) or (end and end <= last_timestamp): - self._query_catalog(request) - return + client_start = max_date(start, last_timestamp) + client.request_quote_ticks( + instrument_id, + request.data_type.metadata.get("limit", 0), + request.id, + client_start, + end, + metadata, + ) - if client is None: - self._log.error( - f"Cannot handle request: " - f"no client registered for '{request.client_id}', {request}") - return # No client to handle request + cpdef void _handle_request_trade_ticks(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now, dict metadata): + instrument_id = request.data_type.metadata.get("instrument_id") + + last_timestamp, _ = self._catalogs_last_timestamp( + TradeTick, + instrument_id, + ) - if last_timestamp and start and start <= last_timestamp: - self._new_query_group(request.id, 2) + if last_timestamp: + if (now <= last_timestamp) or (end and end <= last_timestamp): self._query_catalog(request) + return - client_start = max_date(start, last_timestamp) - client.request_bars( - bar_type, - request.data_type.metadata.get("limit", 0), - request.id, - client_start, - end, - metadata, - ) - else: - last_timestamp, _ = self._catalogs_last_timestamp( - request.data_type.type, - ) + if client is None: + self._log.error( + f"Cannot handle request: " + f"no client registered for '{request.client_id}', {request}") + return # No client to handle request - if last_timestamp: - if (now <= last_timestamp) or (end and end <= last_timestamp): - self._query_catalog(request) - return + if last_timestamp and start and start <= last_timestamp: + self._new_query_group(request.id, 2) + self._query_catalog(request) - if client is None: - self._log.error( - f"Cannot handle request: " - f"no client registered for '{request.client_id}', {request}") - return # No client to handle request + client_start = max_date(start, last_timestamp) + client.request_trade_ticks( + instrument_id, + request.data_type.metadata.get("limit", 0), + request.id, + client_start, + end, + metadata, + ) + + cpdef void _handle_request_bars(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now, dict metadata): + bar_type = request.data_type.metadata.get("bar_type") + + last_timestamp, _ = self._catalogs_last_timestamp( + Bar, + bar_type=bar_type, + ) + + if last_timestamp: + if (now <= last_timestamp) or (end and end <= last_timestamp): + self._query_catalog(request) + return + + if client is None: + self._log.error( + f"Cannot handle request: " + f"no client registered for '{request.client_id}', {request}") + return # No client to handle request + + if last_timestamp and start and start <= last_timestamp: + self._new_query_group(request.id, 2) + self._query_catalog(request) + + client_start = max_date(start, last_timestamp) + client.request_bars( + bar_type, + request.data_type.metadata.get("limit", 0), + request.id, + client_start, + end, + metadata, + ) + + cpdef void _handle_request_data(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now): + last_timestamp, _ = self._catalogs_last_timestamp( + request.data_type.type, + ) - if last_timestamp and start and start <= last_timestamp: - self._new_query_group(request.id, 2) + if last_timestamp: + if (now <= last_timestamp) or (end and end <= last_timestamp): self._query_catalog(request) + return + + if client is None: + self._log.error( + f"Cannot handle request: " + f"no client registered for '{request.client_id}', {request}") + return # No client to handle request + + if last_timestamp and start and start <= last_timestamp: + self._new_query_group(request.id, 2) + self._query_catalog(request) - try: - client.request(request.data_type, request.id) - except NotImplementedError: - self._log.error(f"Cannot handle request: unrecognized data type {request.data_type}") + try: + client.request(request.data_type, request.id) + except NotImplementedError: + self._log.error(f"Cannot handle request: unrecognized data type {request.data_type}") cpdef void _query_catalog(self, DataRequest request): cdef datetime start = request.data_type.metadata.get("start") @@ -1509,8 +1536,7 @@ cdef class DataEngine(Component): ) ts_end = ts_now - # Field defined when using actor.request_aggregated_bars - market_data_type = request.data_type.metadata.get("market_data_type") + bars_market_data_type = request.data_type.metadata.get("bars_market_data_type") data = [] if request.data_type.type == Instrument: @@ -1521,21 +1547,21 @@ cdef class DataEngine(Component): else: for catalog in self._catalogs.values(): data += catalog.instruments(instrument_ids=[str(instrument_id)]) - elif request.data_type.type == QuoteTick or (market_data_type and market_data_type == "quote_ticks"): + elif request.data_type.type == QuoteTick or (bars_market_data_type and bars_market_data_type == "quote_ticks"): for catalog in self._catalogs.values(): data += catalog.quote_ticks( instrument_ids=[str(request.data_type.metadata.get("instrument_id"))], start=ts_start, end=ts_end, ) - elif request.data_type.type == TradeTick or (market_data_type and market_data_type == "trade_ticks"): + elif request.data_type.type == TradeTick or (bars_market_data_type and bars_market_data_type == "trade_ticks"): for catalog in self._catalogs.values(): data += catalog.trade_ticks( instrument_ids=[str(request.data_type.metadata.get("instrument_id"))], start=ts_start, end=ts_end, ) - elif request.data_type.type == Bar or (market_data_type and market_data_type == "bars"): + elif request.data_type.type == Bar or (bars_market_data_type and bars_market_data_type == "bars"): bar_type = request.data_type.metadata.get("bar_type") if bar_type is None: self._log.error("No bar type provided for bars request") @@ -1830,7 +1856,7 @@ cdef class DataEngine(Component): elif response.data_type.type == TradeTick: self._handle_trade_ticks(response.data) elif response.data_type.type == Bar: - if response.data_type.metadata.get("market_data_type"): + if response.data_type.metadata.get("bars_market_data_type"): response.data = self._handle_aggregated_bars(response.data, response.data_type.metadata) else: self._handle_bars(response.data, response.data_type.metadata.get("Partial")) @@ -1953,16 +1979,16 @@ cdef class DataEngine(Component): bars_result = {} if metadata["include_external_data"]: - if metadata["market_data_type"] == "quote_ticks": + if metadata["bars_market_data_type"] == "quote_ticks": self._cache.add_quote_ticks(ticks) result["quote_ticks"] = ticks - elif metadata["market_data_type"] == "trade_ticks": + elif metadata["bars_market_data_type"] == "trade_ticks": self._cache.add_trade_ticks(ticks) result["trade_ticks"] = ticks - elif metadata["market_data_type"] == "bars": + elif metadata["bars_market_data_type"] == "bars": self._cache.add_bars(ticks) - if metadata["market_data_type"] == "bars": + if metadata["bars_market_data_type"] == "bars": bars_result[metadata["bar_type"]] = ticks for bar_type in metadata["bar_types"]: @@ -2011,12 +2037,12 @@ cdef class DataEngine(Component): handler=handler, ) - if metadata["market_data_type"] == "quote_ticks" and not bar_type.is_composite(): + if metadata["bars_market_data_type"] == "quote_ticks" and not bar_type.is_composite(): aggregator.start_batch_update(handler, ticks[0].ts_event) for tick in ticks: aggregator.handle_quote_tick(tick) - elif metadata["market_data_type"] == "trade_ticks" and not bar_type.is_composite(): + elif metadata["bars_market_data_type"] == "trade_ticks" and not bar_type.is_composite(): aggregator.start_batch_update(handler, ticks[0].ts_event) for tick in ticks: @@ -2033,7 +2059,7 @@ cdef class DataEngine(Component): aggregator.stop_batch_update() bars_result[bar_type.standard()] = aggregated_bars - if not metadata["include_external_data"] and metadata["market_data_type"] == "bars": + if not metadata["include_external_data"] and metadata["bars_market_data_type"] == "bars": del bars_result[metadata["bar_type"]] # we need a second final dict as a we can't delete keys in a loop @@ -2104,6 +2130,7 @@ cdef class DataEngine(Component): MarketDataClient client, BarType bar_type, bint await_partial, + dict metadata, ): cdef Instrument instrument = self._cache.instrument(bar_type.instrument_id) if instrument is None: @@ -2164,7 +2191,7 @@ cdef class DataEngine(Component): topic=f"data.bars.{composite_bar_type}", handler=aggregator.handle_bar, ) - self._handle_subscribe_bars(client, composite_bar_type, False) + self._handle_subscribe_bars(client, composite_bar_type, False, metadata) elif bar_type.spec.price_type == PriceType.LAST: self._msgbus.subscribe( topic=f"data.trades" @@ -2182,7 +2209,7 @@ cdef class DataEngine(Component): handler=aggregator.handle_quote_tick, priority=5, ) - self._handle_subscribe_quote_ticks(client, bar_type.instrument_id) + self._handle_subscribe_quote_ticks(client, bar_type.instrument_id, metadata) cpdef void _stop_bar_aggregator(self, MarketDataClient client, BarType bar_type): cdef aggregator = self._bar_aggregators.get(bar_type.standard()) diff --git a/nautilus_trader/live/data_client.py b/nautilus_trader/live/data_client.py index bd9c3ef99438..f5be6397f19a 100644 --- a/nautilus_trader/live/data_client.py +++ b/nautilus_trader/live/data_client.py @@ -509,10 +509,14 @@ def subscribe_order_book_snapshots( success_color=LogColor.BLUE, ) - def subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + def subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._add_subscription_quote_ticks(instrument_id) self.create_task( - self._subscribe_quote_ticks(instrument_id), + self._subscribe_quote_ticks(instrument_id, metadata), log_msg=f"subscribe: quote_ticks {instrument_id}", success_msg=f"Subscribed {instrument_id} quotes", success_color=LogColor.BLUE, @@ -781,6 +785,7 @@ def request_order_book_snapshot( instrument_id: InstrumentId, limit: int, correlation_id: UUID4, + metadata: dict | None = None, ) -> None: limit_str = f" limit={limit}" if limit else "" self._log.info(f"Request {instrument_id} order_book_snapshot{limit_str}", LogColor.BLUE) @@ -789,6 +794,7 @@ def request_order_book_snapshot( instrument_id=instrument_id, limit=limit, correlation_id=correlation_id, + metadata=metadata, ), log_msg=f"request: order_book_snapshot {instrument_id}", ) @@ -843,7 +849,11 @@ async def _subscribe_order_book_snapshots( "implement the `_subscribe_order_book_snapshots` coroutine", # pragma: no cover ) - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_subscribe_quote_ticks` coroutine", # pragma: no cover ) @@ -991,6 +1001,7 @@ async def _request_order_book_snapshot( instrument_id: InstrumentId, limit: int, correlation_id: UUID4, + metadata: dict | None = None, ) -> None: raise NotImplementedError( "implement the `_request_order_book_snapshot` coroutine", # pragma: no cover