From 6799d59cdb4568c3a194cea6ec0aa3dd4284868a Mon Sep 17 00:00:00 2001 From: Faysal Aberkane Date: Tue, 3 Dec 2024 11:02:46 +0000 Subject: [PATCH] Add ability to request and subscribe to databento bbo-1s and bbo-1m quotes --- .../src/databento/python/historical.rs | 38 +- nautilus_trader/adapters/_template/data.py | 85 +++- nautilus_trader/adapters/betfair/data.py | 64 ++- nautilus_trader/adapters/binance/data.py | 87 +++- nautilus_trader/adapters/bybit/data.py | 42 +- nautilus_trader/adapters/databento/data.py | 110 ++++- nautilus_trader/adapters/dydx/data.py | 38 +- .../adapters/interactive_brokers/data.py | 85 +++- nautilus_trader/adapters/okx/data.py | 36 +- nautilus_trader/adapters/polymarket/data.py | 42 +- nautilus_trader/adapters/tardis/data.py | 44 +- nautilus_trader/backtest/data_client.pyx | 37 +- nautilus_trader/common/actor.pxd | 42 +- nautilus_trader/common/actor.pyx | 151 ++++-- nautilus_trader/core/nautilus_pyo3.pyi | 2 + nautilus_trader/data/client.pxd | 39 +- nautilus_trader/data/client.pyx | 37 +- nautilus_trader/data/engine.pxd | 35 +- nautilus_trader/data/engine.pyx | 429 ++++++++++-------- nautilus_trader/live/data_client.py | 208 ++++++--- 20 files changed, 1133 insertions(+), 518 deletions(-) diff --git a/nautilus_core/adapters/src/databento/python/historical.rs b/nautilus_core/adapters/src/databento/python/historical.rs index c84d8389e8a8..fff1be629637 100644 --- a/nautilus_core/adapters/src/databento/python/historical.rs +++ b/nautilus_core/adapters/src/databento/python/historical.rs @@ -183,7 +183,7 @@ impl DatabentoHistoricalClient { } #[pyo3(name = "get_range_quotes")] - #[pyo3(signature = (dataset, symbols, start, end=None, limit=None, price_precision=None))] + #[pyo3(signature = (dataset, symbols, start, end=None, limit=None, price_precision=None, schema=None))] #[allow(clippy::too_many_arguments)] fn py_get_range_quotes<'py>( &self, @@ -194,6 +194,7 @@ impl DatabentoHistoricalClient { end: Option, limit: Option, price_precision: Option, + schema: Option, ) -> PyResult> { let client = self.inner.clone(); @@ -202,12 +203,22 @@ impl DatabentoHistoricalClient { check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?; let end = end.unwrap_or(self.clock.get_time_ns().as_u64()); let time_range = get_date_time_range(start.into(), end.into()).map_err(to_pyvalue_err)?; + let schema = schema.unwrap_or_else(|| "mbp-1".to_string()); + let dbn_schema = dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?; + match dbn_schema { + dbn::Schema::Mbp1 | dbn::Schema::Bbo1S | dbn::Schema::Bbo1M => (), + _ => { + return Err(to_pyvalue_err( + "Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m", + )) + } + }; let params = GetRangeParams::builder() .dataset(dataset) .date_time_range(time_range) .symbols(symbols) .stype_in(SType::from_str(&stype_in).map_err(to_pyvalue_err)?) - .schema(dbn::Schema::Mbp1) + .schema(dbn_schema) .limit(limit.and_then(NonZeroU64::new)) .build(); @@ -226,8 +237,7 @@ impl DatabentoHistoricalClient { let metadata = decoder.metadata().clone(); let mut result: Vec = Vec::new(); - while let Ok(Some(msg)) = decoder.decode_record::().await { - let record = dbn::RecordRef::from(msg); + let mut process_record = |record: dbn::RecordRef| -> PyResult<()> { let instrument_id = decode_nautilus_instrument_id(&record, &metadata, &publisher_venue_map) .map_err(to_pyvalue_err)?; @@ -244,9 +254,29 @@ impl DatabentoHistoricalClient { match data { Some(Data::Quote(quote)) => { result.push(quote); + Ok(()) } _ => panic!("Invalid data element not `QuoteTick`, was {data:?}"), } + }; + + match dbn_schema { + dbn::Schema::Mbp1 => { + while let Ok(Some(msg)) = decoder.decode_record::().await { + process_record(dbn::RecordRef::from(msg))?; + } + } + dbn::Schema::Bbo1M => { + while let Ok(Some(msg)) = decoder.decode_record::().await { + process_record(dbn::RecordRef::from(msg))?; + } + } + dbn::Schema::Bbo1S => { + while let Ok(Some(msg)) = decoder.decode_record::().await { + process_record(dbn::RecordRef::from(msg))?; + } + } + _ => unreachable!(), } Python::with_gil(|py| Ok(result.into_py(py))) diff --git a/nautilus_trader/adapters/_template/data.py b/nautilus_trader/adapters/_template/data.py index cb43a26d20f3..62cb4da3d9df 100644 --- a/nautilus_trader/adapters/_template/data.py +++ b/nautilus_trader/adapters/_template/data.py @@ -172,12 +172,16 @@ async def _subscribe(self, data_type: DataType) -> None: "method `_subscribe` must be implemented in the subclass", ) # pragma: no cover - async def _subscribe_instruments(self) -> None: + async def _subscribe_instruments(self, metadata: dict | None = None) -> None: raise NotImplementedError( "method `_subscribe_instruments` must be implemented in the subclass", ) # pragma: no cover - async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None: + async def _subscribe_instrument( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( "method `_subscribe_instrument` must be implemented in the subclass", ) # pragma: no cover @@ -187,7 +191,7 @@ async def _subscribe_order_book_deltas( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict | None = None, + metadata: dict | None = None, ) -> None: raise NotImplementedError( "method `_subscribe_order_book_deltas` must be implemented in the subclass", @@ -198,33 +202,49 @@ async def _subscribe_order_book_snapshots( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict | None = None, + metadata: dict | None = None, ) -> None: raise NotImplementedError( "method `_subscribe_order_book_snapshots` must be implemented in the subclass", ) # pragma: no cover - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( "method `_subscribe_quote_ticks` must be implemented in the subclass", ) # pragma: no cover - async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( "method `_subscribe_trade_ticks` must be implemented in the subclass", ) # pragma: no cover - async def _subscribe_bars(self, bar_type: BarType) -> None: + async def _subscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: raise NotImplementedError( "method `_subscribe_bars` must be implemented in the subclass", ) # pragma: no cover - async def _subscribe_instrument_status(self, instrument_id: InstrumentId) -> None: + async def _subscribe_instrument_status( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( "method `_subscribe_instrument_status` must be implemented in the subclass", ) # pragma: no cover - async def _subscribe_instrument_close(self, instrument_id: InstrumentId) -> None: + async def _subscribe_instrument_close( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( "method `_subscribe_instrument_close` must be implemented in the subclass", ) # pragma: no cover @@ -234,47 +254,75 @@ async def _unsubscribe(self, data_type: DataType) -> None: "method `_unsubscribe` must be implemented in the subclass", ) # pragma: no cover - async def _unsubscribe_instruments(self) -> None: + async def _unsubscribe_instruments(self, metadata: dict | None = None) -> None: raise NotImplementedError( "method `_unsubscribe_instruments` must be implemented in the subclass", ) # pragma: no cover - async def _unsubscribe_instrument(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_instrument( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( "method `_unsubscribe_instrument` must be implemented in the subclass", ) # pragma: no cover - async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_deltas( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( "method `_unsubscribe_order_book_deltas` must be implemented in the subclass", ) # pragma: no cover - async def _unsubscribe_order_book_snapshots(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_snapshots( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( "method `_unsubscribe_order_book_snapshots` must be implemented in the subclass", ) # pragma: no cover - async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( "method `_unsubscribe_quote_tick` must be implemented in the subclass", ) # pragma: no cover - async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( "method `_unsubscribe_trade_ticks` must be implemented in the subclass", ) # pragma: no cover - async def _unsubscribe_bars(self, bar_type: BarType) -> None: + async def _unsubscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: raise NotImplementedError( "method `_unsubscribe_bars` must be implemented in the subclass", ) # pragma: no cover - async def _unsubscribe_instrument_status(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_instrument_status( + self, + instrument_id: InstrumentId, + params: dict | None = None, + ) -> None: raise NotImplementedError( "method `_unsubscribe_instrument_status` must be implemented in the subclass", ) # pragma: no cover - async def _unsubscribe_instrument_close(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_instrument_close( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( "method `_unsubscribe_instrument_close` must be implemented in the subclass", ) # pragma: no cover @@ -319,6 +367,7 @@ async def _request_order_book_snapshot( instrument_id: InstrumentId, limit: int, correlation_id: UUID4, + metadata: dict | None = None, ) -> None: raise NotImplementedError( "method `_request_quote_tick` must be implemented in the subclass", diff --git a/nautilus_trader/adapters/betfair/data.py b/nautilus_trader/adapters/betfair/data.py index 956b9ecd7243..cc7dc89a78ac 100644 --- a/nautilus_trader/adapters/betfair/data.py +++ b/nautilus_trader/adapters/betfair/data.py @@ -191,7 +191,7 @@ async def _subscribe_order_book_deltas( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict | None = None, + metadata: dict | None = None, ) -> None: PyCondition.not_none(instrument_id, "instrument_id") @@ -232,43 +232,83 @@ async def delayed_subscribe(self, delay=0) -> None: await self._stream.send_subscription_message(market_ids=list(self._subscribed_market_ids)) self._log.info(f"Added market_ids {self._subscribed_market_ids} for data") - async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None: + async def _subscribe_instrument( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.info("Skipping subscribe_instrument, Betfair subscribes as part of orderbook") - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.info("Skipping subscribe_quote_ticks, Betfair subscribes as part of orderbook") - async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.info("Skipping subscribe_trade_ticks, Betfair subscribes as part of orderbook") - async def _subscribe_instruments(self) -> None: + async def _subscribe_instruments(self, metadata: dict | None = None) -> None: for instrument in self._instrument_provider.list_all(): self._handle_data(instrument) - async def _subscribe_instrument_status(self, instrument_id: InstrumentId) -> None: + async def _subscribe_instrument_status( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: pass # Subscribed as part of orderbook - async def _subscribe_instrument_close(self, instrument_id: InstrumentId) -> None: + async def _subscribe_instrument_close( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: pass # Subscribed as part of orderbook - async def _unsubscribe_order_book_snapshots(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_snapshots( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: # TODO - this could be done by removing the market from self.__subscribed_market_ids and resending the # subscription message - when we have a use case self._log.warning("Betfair does not support unsubscribing from instruments") - async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_deltas( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: # TODO - this could be done by removing the market from self.__subscribed_market_ids and resending the # subscription message - when we have a use case self._log.warning("Betfair does not support unsubscribing from instruments") - async def _unsubscribe_instrument(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_instrument( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.info("Skipping unsubscribe_instrument, not applicable for Betfair") - async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.info("Skipping unsubscribe_quote_ticks, not applicable for Betfair") - async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.info("Skipping unsubscribe_trade_ticks, not applicable for Betfair") # -- STREAMS ---------------------------------------------------------------------------------- diff --git a/nautilus_trader/adapters/binance/data.py b/nautilus_trader/adapters/binance/data.py index cc3e50748934..802524e3a5fd 100644 --- a/nautilus_trader/adapters/binance/data.py +++ b/nautilus_trader/adapters/binance/data.py @@ -320,10 +320,14 @@ async def _unsubscribe(self, data_type: DataType) -> None: f"Cannot unsubscribe from {data_type.type} (not implemented)", ) - async def _subscribe_instruments(self) -> None: + async def _subscribe_instruments(self, metadata: dict | None = None) -> None: pass # Do nothing further - async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None: + async def _subscribe_instrument( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: pass # Do nothing further async def _subscribe_order_book_deltas( @@ -331,11 +335,16 @@ async def _subscribe_order_book_deltas( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict | None = None, + metadata: dict | None = None, ) -> None: update_speed = None - if kwargs is not None: - update_speed = kwargs.get("update_speed") + + if metadata is not None: + params = metadata.get("params") + + if params is not None: + update_speed = params.get("update_speed") + await self._subscribe_order_book( instrument_id=instrument_id, book_type=book_type, @@ -348,11 +357,16 @@ async def _subscribe_order_book_snapshots( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict | None = None, + metadata: dict | None = None, ) -> None: update_speed = None - if kwargs is not None: - update_speed = kwargs.get("update_speed") + + if metadata is not None: + params = metadata.get("params") + + if params is not None: + update_speed = params.get("update_speed") + await self._subscribe_order_book( instrument_id=instrument_id, book_type=book_type, @@ -446,16 +460,24 @@ async def _order_book_snapshot_then_deltas(self, instrument_id: InstrumentId) -> LogColor.BLUE, ) - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: await self._ws_client.subscribe_book_ticker(instrument_id.symbol.value) - async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: if self._use_agg_trade_ticks: await self._ws_client.subscribe_agg_trades(instrument_id.symbol.value) else: await self._ws_client.subscribe_trades(instrument_id.symbol.value) - async def _subscribe_bars(self, bar_type: BarType) -> None: + async def _subscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: PyCondition.is_true( bar_type.is_externally_aggregated(), "aggregation_source is not EXTERNAL", @@ -486,25 +508,45 @@ async def _subscribe_bars(self, bar_type: BarType) -> None: interval=interval.value, ) - async def _unsubscribe_instruments(self) -> None: + async def _unsubscribe_instruments(self, metadata: dict | None = None) -> None: pass # Do nothing further - async def _unsubscribe_instrument(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_instrument( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: pass # Do nothing further - async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_deltas( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: pass # TODO: Unsubscribe from Binance if no other subscriptions - async def _unsubscribe_order_book_snapshots(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_snapshots( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: pass # TODO: Unsubscribe from Binance if no other subscriptions - async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: await self._ws_client.unsubscribe_book_ticker(instrument_id.symbol.value) - async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: await self._ws_client.unsubscribe_trades(instrument_id.symbol.value) - async def _unsubscribe_bars(self, bar_type: BarType) -> None: + async def _unsubscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: if not bar_type.spec.is_time_aggregated(): self._log.error( f"Cannot unsubscribe from {bar_type}: only time bars are aggregated by Binance", @@ -695,6 +737,7 @@ async def _request_order_book_snapshot( instrument_id: InstrumentId, limit: int, correlation_id: UUID4, + metadata: dict | None = None, ) -> None: if limit not in [5, 10, 20, 50, 100, 500, 1000]: self._log.error( @@ -712,10 +755,10 @@ async def _request_order_book_snapshot( data_type = DataType( OrderBookDeltas, - metadata={ - "instrument_id": instrument_id, - "limit": limit, - }, + metadata=( + {"instrument_id": instrument_id, "limit": limit} + | (metadata if metadata else {}) + ), ) self._handle_data_response( data_type=data_type, diff --git a/nautilus_trader/adapters/bybit/data.py b/nautilus_trader/adapters/bybit/data.py index 8bff21c6184e..051e49fc40cd 100644 --- a/nautilus_trader/adapters/bybit/data.py +++ b/nautilus_trader/adapters/bybit/data.py @@ -257,7 +257,7 @@ async def _subscribe_order_book_deltas( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict | None = None, + metadata: dict | None = None, ) -> None: if book_type == BookType.L3_MBO: self._log.error( @@ -314,7 +314,11 @@ async def _subscribe_order_book_deltas( ws_client = self._ws_clients[bybit_symbol.product_type] await ws_client.subscribe_order_book(bybit_symbol.raw_symbol, depth=depth) - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: bybit_symbol = BybitSymbol(instrument_id.symbol.value) ws_client = self._ws_clients[bybit_symbol.product_type] @@ -329,12 +333,16 @@ async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: else: await ws_client.subscribe_tickers(bybit_symbol.raw_symbol) - async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: bybit_symbol = BybitSymbol(instrument_id.symbol.value) ws_client = self._ws_clients[bybit_symbol.product_type] await ws_client.subscribe_trades(bybit_symbol.raw_symbol) - async def _subscribe_bars(self, bar_type: BarType) -> None: + async def _subscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: bybit_symbol = BybitSymbol(bar_type.instrument_id.symbol.value) ws_client = self._ws_clients[bybit_symbol.product_type] interval_str = get_interval_from_bar_type(bar_type) @@ -342,19 +350,31 @@ async def _subscribe_bars(self, bar_type: BarType) -> None: self._topic_bar_type[topic] = bar_type await ws_client.subscribe_klines(bybit_symbol.raw_symbol, interval_str) - async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_deltas( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: bybit_symbol = BybitSymbol(instrument_id.symbol.value) ws_client = self._ws_clients[bybit_symbol.product_type] depth = self._depths.get(instrument_id, 1) await ws_client.unsubscribe_order_book(bybit_symbol.raw_symbol, depth=depth) - async def _unsubscribe_order_book_snapshots(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_snapshots( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: bybit_symbol = BybitSymbol(instrument_id.symbol.value) ws_client = self._ws_clients[bybit_symbol.product_type] depth = self._depths.get(instrument_id, 1) await ws_client.unsubscribe_order_book(bybit_symbol.raw_symbol, depth=depth) - async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: bybit_symbol = BybitSymbol(instrument_id.symbol.value) ws_client = self._ws_clients[bybit_symbol.product_type] if instrument_id in self._tob_quotes: @@ -362,12 +382,16 @@ async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: else: await ws_client.unsubscribe_tickers(bybit_symbol.raw_symbol) - async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: bybit_symbol = BybitSymbol(instrument_id.symbol.value) ws_client = self._ws_clients[bybit_symbol.product_type] await ws_client.unsubscribe_trades(bybit_symbol.raw_symbol) - async def _unsubscribe_bars(self, bar_type: BarType) -> None: + async def _unsubscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: bybit_symbol = BybitSymbol(bar_type.instrument_id.symbol.value) ws_client = self._ws_clients[bybit_symbol.product_type] interval_str = get_interval_from_bar_type(bar_type) diff --git a/nautilus_trader/adapters/databento/data.py b/nautilus_trader/adapters/databento/data.py index aed21be26a75..0ecaded9a45d 100644 --- a/nautilus_trader/adapters/databento/data.py +++ b/nautilus_trader/adapters/databento/data.py @@ -356,7 +356,7 @@ def subscribe_order_book_deltas( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict[str, Any] | None = None, + metadata: dict[str, Any] | None = None, ) -> None: if book_type != BookType.L3_MBO: raise NotImplementedError @@ -366,7 +366,7 @@ def subscribe_order_book_deltas( instrument_id=instrument_id, book_type=book_type, depth=depth, - kwargs=kwargs, + metadata=metadata, ), log_msg=f"subscribe: order_book_deltas {instrument_id}", actions=lambda: self._add_subscription_order_book_deltas(instrument_id), @@ -412,11 +412,15 @@ async def _subscribe_statistics(self, data_type: DataType) -> None: except asyncio.CancelledError: self._log.warning("`_subscribe_imbalance` was canceled while still pending") - async def _subscribe_instruments(self) -> None: + async def _subscribe_instruments(self, metadata: dict | None = None) -> None: # Replace method in child class, for exchange specific data types. raise NotImplementedError("Cannot subscribe to all instruments (not currently supported).") - async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None: + async def _subscribe_instrument( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: try: dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) live_client = self._get_live_client(dataset) @@ -464,7 +468,7 @@ async def _subscribe_order_book_deltas( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict | None = None, + metadata: dict | None = None, ) -> None: try: if book_type != BookType.L3_MBO: @@ -555,7 +559,7 @@ async def _subscribe_order_book_snapshots( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict | None = None, + metadata: dict | None = None, ) -> None: try: await self._ensure_subscribed_for_instrument(instrument_id) @@ -581,14 +585,33 @@ async def _subscribe_order_book_snapshots( except asyncio.CancelledError: self._log.warning("`_subscribe_order_book_snapshots` was canceled while still pending") - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: try: await self._ensure_subscribed_for_instrument(instrument_id) + schema = None + + if metadata is not None: + params = metadata.get("params") + + if params is not None: + schema = params.get("schema") + + # allowed schema values: mbp-1, bbo-1s, bbo-1m + if schema is None or schema not in [ + DatabentoSchema.MBP_1.value, + DatabentoSchema.BBO_1S.value, + DatabentoSchema.BBO_1M.value, + ]: + schema = DatabentoSchema.MBP_1.value dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) live_client = self._get_live_client(dataset) live_client.subscribe( - schema=DatabentoSchema.MBP_1.value, + schema=schema, symbols=[instrument_id.symbol.value], ) @@ -599,7 +622,11 @@ async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: except asyncio.CancelledError: self._log.warning("`_subscribe_quote_ticks` was canceled while still pending") - async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: try: if instrument_id in self._trade_tick_subscriptions: return # Already subscribed (this will save on data costs) @@ -616,7 +643,7 @@ async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: except asyncio.CancelledError: self._log.warning("`_subscribe_trade_ticks` was canceled while still pending") - async def _subscribe_bars(self, bar_type: BarType) -> None: + async def _subscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: try: dataset: Dataset = self._loader.get_dataset_for_venue(bar_type.instrument_id.venue) @@ -635,7 +662,11 @@ async def _subscribe_bars(self, bar_type: BarType) -> None: except asyncio.CancelledError: self._log.warning("`_subscribe_bars` was canceled while still pending") - async def _subscribe_instrument_status(self, instrument_id: InstrumentId) -> None: + async def _subscribe_instrument_status( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: try: dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue) @@ -653,48 +684,72 @@ async def _unsubscribe(self, data_type: DataType) -> None: f"Cannot unsubscribe from {data_type}, unsubscribing not supported by Databento.", ) - async def _unsubscribe_instruments(self) -> None: + async def _unsubscribe_instruments(self, params: dict | None = None) -> None: raise NotImplementedError( "Cannot unsubscribe from all instruments, unsubscribing not supported by Databento.", ) - async def _unsubscribe_instrument(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_instrument( + self, + instrument_id: InstrumentId, + params: dict | None = None, + ) -> None: raise NotImplementedError( f"Cannot unsubscribe from {instrument_id} instrument, " "unsubscribing not supported by Databento.", ) - async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_deltas( + self, + instrument_id: InstrumentId, + params: dict | None = None, + ) -> None: raise NotImplementedError( f"Cannot unsubscribe from {instrument_id} order book deltas, " "unsubscribing not supported by Databento.", ) - async def _unsubscribe_order_book_snapshots(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_snapshots( + self, + instrument_id: InstrumentId, + params: dict | None = None, + ) -> None: raise NotImplementedError( f"Cannot unsubscribe from {instrument_id} order book snapshots, " "unsubscribing not supported by Databento.", ) - async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_quote_ticks( + self, + instrument_id: InstrumentId, + params: dict | None = None, + ) -> None: raise NotImplementedError( f"Cannot unsubscribe from {instrument_id} quotes, " "unsubscribing not supported by Databento.", ) - async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_trade_ticks( + self, + instrument_id: InstrumentId, + params: dict | None = None, + ) -> None: raise NotImplementedError( f"Cannot unsubscribe from {instrument_id} trades, " "unsubscribing not supported by Databento.", ) - async def _unsubscribe_bars(self, bar_type: BarType) -> None: + async def _unsubscribe_bars(self, bar_type: BarType, params: dict | None = None) -> None: raise NotImplementedError( f"Cannot unsubscribe from {bar_type} bars, " "unsubscribing not supported by Databento.", ) - async def _unsubscribe_instrument_status(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_instrument_status( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( f"Cannot unsubscribe from {instrument_id} instrument status, " "unsubscribing not supported by Databento.", @@ -898,11 +953,28 @@ async def _request_quote_ticks( LogColor.BLUE, ) + schema = None + + if metadata is not None: + params = metadata.get("params") + + if params is not None: + schema = params.get("schema") + + # allowed schema values: mbp-1, bbo-1s, bbo-1m + if schema is None or schema not in [ + DatabentoSchema.MBP_1.value, + DatabentoSchema.BBO_1S.value, + DatabentoSchema.BBO_1M.value, + ]: + schema = DatabentoSchema.MBP_1.value + pyo3_quotes = await self._http_client.get_range_quotes( dataset=dataset, symbols=[instrument_id.symbol.value], start=start.value, end=end.value, + schema=schema, ) quotes = QuoteTick.from_pyo3_list(pyo3_quotes) diff --git a/nautilus_trader/adapters/dydx/data.py b/nautilus_trader/adapters/dydx/data.py index aba0dc25d232..3a24ce2ba8fc 100644 --- a/nautilus_trader/adapters/dydx/data.py +++ b/nautilus_trader/adapters/dydx/data.py @@ -17,7 +17,7 @@ """ import asyncio -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import msgspec import pandas as pd @@ -683,7 +683,11 @@ def _handle_markets_subscribed(self, raw: bytes) -> None: except Exception as e: self._log.error(f"Failed to parse market channel data: {raw.decode()} with error {e}") - async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: dydx_symbol = DYDXSymbol(instrument_id.symbol.value) await self._ws_client.subscribe_trades(dydx_symbol.raw_symbol) @@ -692,7 +696,7 @@ async def _subscribe_order_book_deltas( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict[str, Any] | None = None, + metadata: dict | None = None, ) -> None: if book_type in (BookType.L1_MBP, BookType.L3_MBO): self._log.error( @@ -712,7 +716,11 @@ async def _subscribe_order_book_deltas( if not self._ws_client.has_subscription(subscription): await self._ws_client.subscribe_order_book(dydx_symbol.raw_symbol) - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.debug( f"Subscribing deltas {instrument_id} (quotes are not available)", LogColor.MAGENTA, @@ -729,7 +737,7 @@ async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: book_type=book_type, ) - async def _subscribe_bars(self, bar_type: BarType) -> None: + async def _subscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: self._log.info(f"Subscribe to {bar_type} bars") dydx_symbol = DYDXSymbol(bar_type.instrument_id.symbol.value) candles_resolution = get_interval_from_bar_type(bar_type) @@ -737,11 +745,19 @@ async def _subscribe_bars(self, bar_type: BarType) -> None: self._topic_bar_type[topic] = bar_type await self._ws_client.subscribe_klines(dydx_symbol.raw_symbol, candles_resolution) - async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: dydx_symbol = DYDXSymbol(instrument_id.symbol.value) await self._ws_client.unsubscribe_trades(dydx_symbol.raw_symbol) - async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_deltas( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: dydx_symbol = DYDXSymbol(instrument_id.symbol.value) # Check if the websocket client is subscribed. @@ -753,7 +769,11 @@ async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> N if self._ws_client.has_subscription(subscription): await self._ws_client.unsubscribe_order_book(dydx_symbol.raw_symbol) - async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: dydx_symbol = DYDXSymbol(instrument_id.symbol.value) # Check if the websocket client is subscribed. @@ -762,7 +782,7 @@ async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: if self._ws_client.has_subscription(subscription): await self._unsubscribe_order_book_deltas(instrument_id=instrument_id) - async def _unsubscribe_bars(self, bar_type: BarType) -> None: + async def _unsubscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: dydx_symbol = DYDXSymbol(bar_type.instrument_id.symbol.value) candles_resolution = get_interval_from_bar_type(bar_type) await self._ws_client.unsubscribe_klines(dydx_symbol.raw_symbol, candles_resolution) diff --git a/nautilus_trader/adapters/interactive_brokers/data.py b/nautilus_trader/adapters/interactive_brokers/data.py index 513cb3c7ab21..015ef51fc7b2 100644 --- a/nautilus_trader/adapters/interactive_brokers/data.py +++ b/nautilus_trader/adapters/interactive_brokers/data.py @@ -15,7 +15,6 @@ import asyncio from operator import attrgetter -from typing import Any import pandas as pd @@ -133,12 +132,16 @@ async def _subscribe(self, data_type: DataType) -> None: "implement the `_subscribe` coroutine", # pragma: no cover ) - async def _subscribe_instruments(self) -> None: + async def _subscribe_instruments(self, metadata: dict | None = None) -> None: raise NotImplementedError( # pragma: no cover "implement the `_subscribe_instruments` coroutine", # pragma: no cover ) - async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None: + async def _subscribe_instrument( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_subscribe_instrument` coroutine", # pragma: no cover ) @@ -148,7 +151,7 @@ async def _subscribe_order_book_deltas( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict[str, Any] | None = None, + metadata: dict | None = None, ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_subscribe_order_book_deltas` coroutine", # pragma: no cover @@ -159,13 +162,17 @@ async def _subscribe_order_book_snapshots( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict[str, Any] | None = None, + metadata: dict | None = None, ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_subscribe_order_book_snapshots` coroutine", # pragma: no cover ) - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: if not (instrument := self._cache.instrument(instrument_id)): self._log.error( f"Cannot subscribe to quotes for {instrument_id}: instrument not found", @@ -179,7 +186,11 @@ async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: ignore_size=self._ignore_quote_tick_size_updates, ) - async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: if not (instrument := self._cache.instrument(instrument_id)): self._log.error( f"Cannot subscribe to trades for {instrument_id}: instrument not found", @@ -199,7 +210,7 @@ async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: ignore_size=self._ignore_quote_tick_size_updates, ) - async def _subscribe_bars(self, bar_type: BarType) -> None: + async def _subscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: if not (instrument := self._cache.instrument(bar_type.instrument_id)): self._log.error(f"Cannot subscribe to {bar_type} bars: instrument not found") return @@ -218,10 +229,18 @@ async def _subscribe_bars(self, bar_type: BarType) -> None: handle_revised_bars=self._handle_revised_bars, ) - async def _subscribe_instrument_status(self, instrument_id: InstrumentId) -> None: + async def _subscribe_instrument_status( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: pass # Subscribed as part of orderbook - async def _subscribe_instrument_close(self, instrument_id: InstrumentId) -> None: + async def _subscribe_instrument_close( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: pass # Subscribed as part of orderbook async def _unsubscribe(self, data_type: DataType) -> None: @@ -229,42 +248,70 @@ async def _unsubscribe(self, data_type: DataType) -> None: "implement the `_unsubscribe` coroutine", # pragma: no cover ) - async def _unsubscribe_instruments(self) -> None: + async def _unsubscribe_instruments(self, metadata: dict | None = None) -> None: raise NotImplementedError( # pragma: no cover "implement the `_unsubscribe_instruments` coroutine", # pragma: no cover ) - async def _unsubscribe_instrument(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_instrument( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_unsubscribe_instrument` coroutine", # pragma: no cover ) - async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_deltas( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_unsubscribe_order_book_deltas` coroutine", # pragma: no cover ) - async def _unsubscribe_order_book_snapshots(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_snapshots( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_unsubscribe_order_book_snapshots` coroutine", # pragma: no cover ) - async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: await self._client.unsubscribe_ticks(instrument_id, "BidAsk") - async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: await self._client.unsubscribe_ticks(instrument_id, "AllLast") - async def _unsubscribe_bars(self, bar_type: BarType) -> None: + async def _unsubscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: if bar_type.spec.timedelta == 5: await self._client.unsubscribe_realtime_bars(bar_type) else: await self._client.unsubscribe_historical_bars(bar_type) - async def _unsubscribe_instrument_status(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_instrument_status( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: pass # Subscribed as part of orderbook - async def _unsubscribe_instrument_close(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_instrument_close( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: pass # Subscribed as part of orderbook async def _request(self, data_type: DataType, correlation_id: UUID4) -> None: diff --git a/nautilus_trader/adapters/okx/data.py b/nautilus_trader/adapters/okx/data.py index 7796dd0315a7..d7966389f553 100644 --- a/nautilus_trader/adapters/okx/data.py +++ b/nautilus_trader/adapters/okx/data.py @@ -244,7 +244,7 @@ async def _subscribe_order_book_deltas( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict | None = None, + metadata: dict | None = None, ) -> None: if book_type == BookType.L3_MBO: self._log.error( @@ -300,7 +300,11 @@ async def _subscribe_order_book_deltas( # Copy subscribe method for book deltas to book snapshots (same logic) _subscribe_order_book_snapshots = _subscribe_order_book_deltas - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: if instrument_id in self._tob_client_map: self._log.warning( f"Already subscribed to {instrument_id} top-of-book (quotes)", @@ -318,7 +322,11 @@ async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: self._tob_client_map[instrument_id] = ws_client await ws_client.subscribe_order_book(okx_symbol.raw_symbol, 1) - async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: if instrument_id in self._trades_client_map: self._log.warning( f"Already subscribed to {instrument_id} trades", @@ -330,14 +338,18 @@ async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: self._trades_client_map[instrument_id] = ws_client await ws_client.subscribe_trades(okx_symbol.raw_symbol) - async def _subscribe_bars(self, bar_type: BarType) -> None: + async def _subscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: PyCondition.is_true( bar_type.is_externally_aggregated(), "aggregation_source is not EXTERNAL", ) self._log.error("OKX bar subscriptions are not yet implemented") - async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_deltas( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.debug( f"Unsubscribing {instrument_id} from order book deltas/snapshots", LogColor.MAGENTA, @@ -353,7 +365,11 @@ async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> N # Copy unsubscribe method for book deltas to book snapshots (same logic) _unsubscribe_order_book_snapshots = _unsubscribe_order_book_deltas - async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.debug( f"Unsubscribing {instrument_id} from quotes (top-of-book)", LogColor.MAGENTA, @@ -366,7 +382,11 @@ async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: break self._tob_client_map.pop(instrument_id, None) - async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.debug(f"Unsubscribing {instrument_id} from trades", LogColor.MAGENTA) okx_symbol = OKXSymbol(instrument_id.symbol.value) @@ -376,7 +396,7 @@ async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: break self._tob_client_map.pop(instrument_id, None) - async def _unsubscribe_bars(self, bar_type: BarType) -> None: + async def _unsubscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: self._log.error("OKX bar subscriptions are not yet implemented") return diff --git a/nautilus_trader/adapters/polymarket/data.py b/nautilus_trader/adapters/polymarket/data.py index 574574444655..4a234fd6a337 100644 --- a/nautilus_trader/adapters/polymarket/data.py +++ b/nautilus_trader/adapters/polymarket/data.py @@ -224,7 +224,7 @@ async def _subscribe_order_book_deltas( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict | None = None, + metadata: dict | None = None, ) -> None: if book_type == BookType.L3_MBO: self._log.error( @@ -240,38 +240,62 @@ async def _subscribe_order_book_deltas( await self._subscribe_asset_book(instrument_id) - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: await self._subscribe_asset_book(instrument_id) - async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: await self._subscribe_asset_book(instrument_id) - async def _subscribe_bars(self, bar_type: BarType) -> None: + async def _subscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: self._log.error( f"Cannot subscribe to {bar_type} bars: not implemented for Polymarket", ) - async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_deltas( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.error( f"Cannot unsubscribe from {instrument_id} order book deltas: unsubscribing not supported by Polymarket", ) - async def _unsubscribe_order_book_snapshots(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_snapshots( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.error( f"Cannot unsubscribe from {instrument_id} order book snapshots: unsubscribing not supported by Polymarket", ) - async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.error( f"Cannot unsubscribe from {instrument_id} quotes: unsubscribing not supported by Polymarket", ) - async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._log.error( f"Cannot unsubscribe from {instrument_id} trades: unsubscribing not supported by Polymarket", ) - async def _unsubscribe_bars(self, bar_type: BarType) -> None: + async def _unsubscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: self._log.error( f"Cannot unsubscribe from {bar_type} bars: not implemented for Polymarket", ) diff --git a/nautilus_trader/adapters/tardis/data.py b/nautilus_trader/adapters/tardis/data.py index cee8a5ca3727..f78f9314f153 100644 --- a/nautilus_trader/adapters/tardis/data.py +++ b/nautilus_trader/adapters/tardis/data.py @@ -251,7 +251,7 @@ async def _subscribe_order_book_deltas( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict | None = None, + metadata: dict | None = None, ) -> None: if book_type == BookType.L3_MBO: self._log.error( @@ -269,7 +269,7 @@ async def _subscribe_order_book_snapshots( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict | None = None, + metadata: dict | None = None, ) -> None: if book_type == BookType.L3_MBO: self._log.error( @@ -283,39 +283,63 @@ async def _subscribe_order_book_snapshots( tardis_data_type = f"{tardis_data_type}_{depth}_0ms" self._subscribe_stream(instrument_id, tardis_data_type, "order book snapshots") - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: tardis_data_type = convert_nautilus_data_type_to_tardis_data_type(QuoteTick) self._subscribe_stream(instrument_id, tardis_data_type, "quotes") - async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: tardis_data_type = convert_nautilus_data_type_to_tardis_data_type(TradeTick) self._subscribe_stream(instrument_id, tardis_data_type, "trades") - async def _subscribe_bars(self, bar_type: BarType) -> None: + async def _subscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: tardis_data_type = convert_nautilus_bar_type_to_tardis_data_type(bar_type) self._subscribe_stream(bar_type.instrument_id, tardis_data_type, "bars") - async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_deltas( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: tardis_data_type = convert_nautilus_data_type_to_tardis_data_type(OrderBookDelta) ws_client_key = get_ws_client_key(instrument_id, tardis_data_type) self._dispose_websocket_client_by_key(ws_client_key) - async def _unsubscribe_order_book_snapshots(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_snapshots( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: tardis_data_type = convert_nautilus_data_type_to_tardis_data_type(OrderBookDepth10) ws_client_key = get_ws_client_key(instrument_id, tardis_data_type) self._dispose_websocket_client_by_key(ws_client_key) - async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: tardis_data_type = convert_nautilus_data_type_to_tardis_data_type(QuoteTick) ws_client_key = get_ws_client_key(instrument_id, tardis_data_type) self._dispose_websocket_client_by_key(ws_client_key) - async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: tardis_data_type = convert_nautilus_data_type_to_tardis_data_type(TradeTick) ws_client_key = get_ws_client_key(instrument_id, tardis_data_type) self._dispose_websocket_client_by_key(ws_client_key) - async def _unsubscribe_bars(self, bar_type: BarType) -> None: + async def _unsubscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: tardis_data_type = convert_nautilus_bar_type_to_tardis_data_type(bar_type) ws_client_key = get_ws_client_key(bar_type.instrument_id, tardis_data_type) self._dispose_websocket_client_by_key(ws_client_key) diff --git a/nautilus_trader/backtest/data_client.pyx b/nautilus_trader/backtest/data_client.pyx index 44260d630706..5e574edf27f5 100644 --- a/nautilus_trader/backtest/data_client.pyx +++ b/nautilus_trader/backtest/data_client.pyx @@ -150,13 +150,13 @@ cdef class BacktestMarketDataClient(MarketDataClient): # -- SUBSCRIPTIONS -------------------------------------------------------------------------------- - cpdef void subscribe_instruments(self): + cpdef void subscribe_instruments(self, dict metadata = None): cdef Instrument instrument for instrument in self._cache.instruments(Venue(self.id.value)): self.subscribe_instrument(instrument.id) # Do nothing else for backtest - cpdef void subscribe_instrument(self, InstrumentId instrument_id): + cpdef void subscribe_instrument(self, InstrumentId instrument_id, dict metadata = None): Condition.not_none(instrument_id, "instrument_id") if not self._cache.instrument(instrument_id): @@ -173,7 +173,7 @@ cdef class BacktestMarketDataClient(MarketDataClient): InstrumentId instrument_id, BookType book_type, int depth = 0, - dict kwargs = None, + dict metadata = None, ): Condition.not_none(instrument_id, "instrument_id") @@ -192,7 +192,7 @@ cdef class BacktestMarketDataClient(MarketDataClient): InstrumentId instrument_id, BookType book_type, int depth = 0, - dict kwargs = None, + dict metadata = None, ): Condition.not_none(instrument_id, "instrument_id") @@ -206,7 +206,7 @@ cdef class BacktestMarketDataClient(MarketDataClient): self._add_subscription_order_book_snapshots(instrument_id) # Do nothing else for backtest - cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id): + cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, dict metadata = None): Condition.not_none(instrument_id, "instrument_id") if not self._cache.instrument(instrument_id): @@ -219,7 +219,7 @@ cdef class BacktestMarketDataClient(MarketDataClient): self._add_subscription_quote_ticks(instrument_id) # Do nothing else for backtest - cpdef void subscribe_trade_ticks(self, InstrumentId instrument_id): + cpdef void subscribe_trade_ticks(self, InstrumentId instrument_id, dict metadata = None): Condition.not_none(instrument_id, "instrument_id") if not self._cache.instrument(instrument_id): @@ -232,7 +232,7 @@ cdef class BacktestMarketDataClient(MarketDataClient): self._add_subscription_trade_ticks(instrument_id) # Do nothing else for backtest - cpdef void subscribe_bars(self, BarType bar_type): + cpdef void subscribe_bars(self, BarType bar_type, dict metadata = None): Condition.not_none(bar_type, "bar_type") if not self._cache.instrument(bar_type.instrument_id): @@ -245,65 +245,65 @@ cdef class BacktestMarketDataClient(MarketDataClient): self._add_subscription_bars(bar_type) # Do nothing else for backtest - cpdef void subscribe_instrument_status(self, InstrumentId instrument_id): + cpdef void subscribe_instrument_status(self, InstrumentId instrument_id, dict metadata = None): Condition.not_none(instrument_id, "instrument_id") self._add_subscription_instrument_status(instrument_id) # Do nothing else for backtest - cpdef void subscribe_instrument_close(self, InstrumentId instrument_id): + cpdef void subscribe_instrument_close(self, InstrumentId instrument_id, dict metadata = None): Condition.not_none(instrument_id, "instrument_id") self._add_subscription_instrument_close(instrument_id) # Do nothing else for backtest - cpdef void unsubscribe_instruments(self): + cpdef void unsubscribe_instruments(self, dict metadata = None): self._subscriptions_instrument.clear() # Do nothing else for backtest - cpdef void unsubscribe_instrument(self, InstrumentId instrument_id): + cpdef void unsubscribe_instrument(self, InstrumentId instrument_id, dict metadata = None): Condition.not_none(instrument_id, "instrument_id") self._remove_subscription_instrument(instrument_id) # Do nothing else for backtest - cpdef void unsubscribe_order_book_deltas(self, InstrumentId instrument_id): + cpdef void unsubscribe_order_book_deltas(self, InstrumentId instrument_id, dict metadata = None): Condition.not_none(instrument_id, "instrument_id") self._remove_subscription_order_book_deltas(instrument_id) # Do nothing else for backtest - cpdef void unsubscribe_order_book_snapshots(self, InstrumentId instrument_id): + cpdef void unsubscribe_order_book_snapshots(self, InstrumentId instrument_id, dict metadata = None): Condition.not_none(instrument_id, "instrument_id") self._remove_subscription_order_book_snapshots(instrument_id) # Do nothing else for backtest - cpdef void unsubscribe_quote_ticks(self, InstrumentId instrument_id): + cpdef void unsubscribe_quote_ticks(self, InstrumentId instrument_id, dict metadata = None): Condition.not_none(instrument_id, "instrument_id") self._remove_subscription_quote_ticks(instrument_id) # Do nothing else for backtest - cpdef void unsubscribe_trade_ticks(self, InstrumentId instrument_id): + cpdef void unsubscribe_trade_ticks(self, InstrumentId instrument_id, dict metadata = None): Condition.not_none(instrument_id, "instrument_id") self._remove_subscription_trade_ticks(instrument_id) # Do nothing else for backtest - cpdef void unsubscribe_bars(self, BarType bar_type): + cpdef void unsubscribe_bars(self, BarType bar_type, dict metadata = None): Condition.not_none(bar_type, "bar_type") self._remove_subscription_bars(bar_type) # Do nothing else for backtest - cpdef void unsubscribe_instrument_status(self, InstrumentId instrument_id): + cpdef void unsubscribe_instrument_status(self, InstrumentId instrument_id, dict metadata = None): Condition.not_none(instrument_id, "instrument_id") self._remove_subscription_instrument_status(instrument_id) # Do nothing else for backtest - cpdef void unsubscribe_instrument_close(self, InstrumentId instrument_id): + cpdef void unsubscribe_instrument_close(self, InstrumentId instrument_id, dict metadata = None): Condition.not_none(instrument_id, "instrument_id") self._remove_subscription_instrument_close(instrument_id) @@ -356,6 +356,7 @@ cdef class BacktestMarketDataClient(MarketDataClient): InstrumentId instrument_id, int limit, UUID4 correlation_id, + dict metadata = None, ): Condition.not_none(instrument_id, "instrument_id") Condition.not_none(correlation_id, "correlation_id") diff --git a/nautilus_trader/common/actor.pxd b/nautilus_trader/common/actor.pxd index a2b08e936bde..165a7fcd0e56 100644 --- a/nautilus_trader/common/actor.pxd +++ b/nautilus_trader/common/actor.pxd @@ -136,17 +136,17 @@ cdef class Actor(Component): # -- SUBSCRIPTIONS -------------------------------------------------------------------------------- cpdef void subscribe_data(self, DataType data_type, ClientId client_id=*) - cpdef void subscribe_instruments(self, Venue venue, ClientId client_id=*) - cpdef void subscribe_instrument(self, InstrumentId instrument_id, ClientId client_id=*) + cpdef void subscribe_instruments(self, Venue venue, ClientId client_id=*, dict params=*) + cpdef void subscribe_instrument(self, InstrumentId instrument_id, ClientId client_id=*, dict params=*) cpdef void subscribe_order_book_deltas( self, InstrumentId instrument_id, BookType book_type=*, int depth=*, - dict kwargs=*, ClientId client_id=*, bint managed=*, bint pyo3_conversion=*, + dict params=*, ) cpdef void subscribe_order_book_at_interval( self, @@ -154,24 +154,24 @@ cdef class Actor(Component): BookType book_type=*, int depth=*, int interval_ms=*, - dict kwargs=*, ClientId client_id=*, bint managed=*, + dict params=*, ) - cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id=*) - cpdef void subscribe_trade_ticks(self, InstrumentId instrument_id, ClientId client_id=*) - cpdef void subscribe_bars(self, BarType bar_type, ClientId client_id=*, bint await_partial=*) - cpdef void subscribe_instrument_status(self, InstrumentId instrument_id, ClientId client_id=*) - cpdef void subscribe_instrument_close(self, InstrumentId instrument_id, ClientId client_id=*) + cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id=*, dict params=*) + cpdef void subscribe_trade_ticks(self, InstrumentId instrument_id, ClientId client_id=*, dict params=*) + cpdef void subscribe_bars(self, BarType bar_type, ClientId client_id=*, bint await_partial=*, dict params=*) + cpdef void subscribe_instrument_status(self, InstrumentId instrument_id, ClientId client_id=*, dict params=*) + cpdef void subscribe_instrument_close(self, InstrumentId instrument_id, ClientId client_id=*, dict params=*) cpdef void unsubscribe_data(self, DataType data_type, ClientId client_id=*) - cpdef void unsubscribe_instruments(self, Venue venue, ClientId client_id=*) - cpdef void unsubscribe_instrument(self, InstrumentId instrument_id, ClientId client_id=*) - cpdef void unsubscribe_order_book_deltas(self, InstrumentId instrument_id, ClientId client_id=*) - cpdef void unsubscribe_order_book_at_interval(self, InstrumentId instrument_id, int interval_ms=*, ClientId client_id=*) - cpdef void unsubscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id=*) - cpdef void unsubscribe_trade_ticks(self, InstrumentId instrument_id, ClientId client_id=*) - cpdef void unsubscribe_bars(self, BarType bar_type, ClientId client_id=*) - cpdef void unsubscribe_instrument_status(self, InstrumentId instrument_id, ClientId client_id=*) + cpdef void unsubscribe_instruments(self, Venue venue, ClientId client_id=*, dict params=*) + cpdef void unsubscribe_instrument(self, InstrumentId instrument_id, ClientId client_id=*, dict params=*) + cpdef void unsubscribe_order_book_deltas(self, InstrumentId instrument_id, ClientId client_id=*, dict params=*) + cpdef void unsubscribe_order_book_at_interval(self, InstrumentId instrument_id, int interval_ms=*, ClientId client_id=*, dict params=*) + cpdef void unsubscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id=*, dict params=*) + cpdef void unsubscribe_trade_ticks(self, InstrumentId instrument_id, ClientId client_id=*, dict params=*) + cpdef void unsubscribe_bars(self, BarType bar_type, ClientId client_id=*, dict params=*) + cpdef void unsubscribe_instrument_status(self, InstrumentId instrument_id, ClientId client_id=*, dict params=*) cpdef void publish_data(self, DataType data_type, Data data) cpdef void publish_signal(self, str name, value, uint64_t ts_event=*) cpdef void subscribe_signal(self, str name=*) @@ -192,6 +192,7 @@ cdef class Actor(Component): ClientId client_id=*, callback=*, bint update_catalog=*, + dict params=*, ) cpdef UUID4 request_instruments( self, @@ -201,6 +202,7 @@ cdef class Actor(Component): ClientId client_id=*, callback=*, bint update_catalog=*, + dict params=*, ) cpdef UUID4 request_order_book_snapshot( self, @@ -208,6 +210,7 @@ cdef class Actor(Component): int limit, ClientId client_id=*, callback=*, + dict params=*, ) cpdef UUID4 request_quote_ticks( self, @@ -216,8 +219,8 @@ cdef class Actor(Component): datetime end=*, ClientId client_id=*, callback=*, - str quote_type=*, bint update_catalog=*, + dict params=*, ) cpdef UUID4 request_trade_ticks( self, @@ -227,6 +230,7 @@ cdef class Actor(Component): ClientId client_id=*, callback=*, bint update_catalog=*, + dict params=*, ) cpdef UUID4 request_bars( self, @@ -236,6 +240,7 @@ cdef class Actor(Component): ClientId client_id=*, callback=*, bint update_catalog=*, + dict params=*, ) cpdef UUID4 request_aggregated_bars( self, @@ -247,6 +252,7 @@ cdef class Actor(Component): bint include_external_data=*, bint update_existing_subscriptions=*, bint update_catalog=*, + dict params=*, ) cpdef bint is_pending_request(self, UUID4 request_id) cpdef bint has_pending_requests(self) diff --git a/nautilus_trader/common/actor.pyx b/nautilus_trader/common/actor.pyx index f02890854455..cf01c995951a 100644 --- a/nautilus_trader/common/actor.pyx +++ b/nautilus_trader/common/actor.pyx @@ -1123,7 +1123,7 @@ cdef class Actor(Component): self._send_data_cmd(command) - cpdef void subscribe_instruments(self, Venue venue, ClientId client_id = None): + cpdef void subscribe_instruments(self, Venue venue, ClientId client_id = None, dict params = None): """ Subscribe to update `Instrument` data for the given venue. @@ -1134,6 +1134,8 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(venue, "venue") @@ -1147,14 +1149,14 @@ cdef class Actor(Component): cdef Subscribe command = Subscribe( client_id=client_id, venue=venue, - data_type=DataType(Instrument), + data_type=DataType(Instrument, metadata={"params": params}), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) self._send_data_cmd(command) - cpdef void subscribe_instrument(self, InstrumentId instrument_id, ClientId client_id = None): + cpdef void subscribe_instrument(self, InstrumentId instrument_id, ClientId client_id = None, dict params = None): """ Subscribe to update `Instrument` data for the given instrument ID. @@ -1165,6 +1167,8 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(instrument_id, "instrument_id") @@ -1180,7 +1184,7 @@ cdef class Actor(Component): cdef Subscribe command = Subscribe( client_id=client_id, venue=instrument_id.venue, - data_type=DataType(Instrument, metadata={"instrument_id": instrument_id}), + data_type=DataType(Instrument, metadata={"instrument_id": instrument_id, "params": params}), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) @@ -1192,10 +1196,10 @@ cdef class Actor(Component): InstrumentId instrument_id, BookType book_type=BookType.L2_MBP, int depth = 0, - dict kwargs = None, ClientId client_id = None, bint managed = True, bint pyo3_conversion = False, + dict params = None, ): """ Subscribe to the order book data stream, being a snapshot then deltas @@ -1209,8 +1213,6 @@ cdef class Actor(Component): The order book type. depth : int, optional The maximum depth for the order book. A depth of 0 is maximum depth. - kwargs : dict, optional - The keyword arguments for exchange specific parameters. client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. @@ -1219,6 +1221,8 @@ cdef class Actor(Component): pyo3_conversion : bool, default False If received deltas should be converted to `nautilus_pyo3.OrderBookDeltas` prior to being passed to the `on_order_book_deltas` handler. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(instrument_id, "instrument_id") @@ -1241,8 +1245,8 @@ cdef class Actor(Component): "instrument_id": instrument_id, "book_type": book_type, "depth": depth, - "kwargs": kwargs, "managed": managed, + "params": params, }), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), @@ -1256,15 +1260,15 @@ cdef class Actor(Component): BookType book_type=BookType.L2_MBP, int depth = 0, int interval_ms = 1000, - dict kwargs = None, ClientId client_id = None, bint managed = True, + dict params = None, ): """ Subscribe to an `OrderBook` at a specified interval for the given instrument ID. The `DataEngine` will only maintain one order book for each instrument. - Because of this - the level, depth and kwargs for the stream will be set + Because of this - the level, depth and params for the stream will be set as per the last subscription request (this will also affect all subscribers). Parameters @@ -1277,13 +1281,13 @@ cdef class Actor(Component): The maximum depth for the order book. A depth of 0 is maximum depth. interval_ms : int The order book snapshot interval in milliseconds (must be positive). - kwargs : dict, optional - The keyword arguments for exchange specific parameters. client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. managed : bool, default True If an order book should be managed by the data engine based on the subscribed feed. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. Raises ------ @@ -1325,8 +1329,8 @@ cdef class Actor(Component): "book_type": book_type, "depth": depth, "interval_ms": interval_ms, - "kwargs": kwargs, "managed": managed, + "params": params, }), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), @@ -1334,7 +1338,7 @@ cdef class Actor(Component): self._send_data_cmd(command) - cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id = None): + cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id = None, dict params = None): """ Subscribe to streaming `QuoteTick` data for the given instrument ID. @@ -1345,6 +1349,8 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(instrument_id, "instrument_id") @@ -1360,14 +1366,14 @@ cdef class Actor(Component): cdef Subscribe command = Subscribe( client_id=client_id, venue=instrument_id.venue, - data_type=DataType(QuoteTick, metadata={"instrument_id": instrument_id}), + data_type=DataType(QuoteTick, metadata={"instrument_id": instrument_id, "params": params}), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) self._send_data_cmd(command) - cpdef void subscribe_trade_ticks(self, InstrumentId instrument_id, ClientId client_id = None): + cpdef void subscribe_trade_ticks(self, InstrumentId instrument_id, ClientId client_id = None, dict params = None): """ Subscribe to streaming `TradeTick` data for the given instrument ID. @@ -1378,6 +1384,8 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(instrument_id, "instrument_id") @@ -1393,7 +1401,7 @@ cdef class Actor(Component): cdef Subscribe command = Subscribe( client_id=client_id, venue=instrument_id.venue, - data_type=DataType(TradeTick, metadata={"instrument_id": instrument_id}), + data_type=DataType(TradeTick, metadata={"instrument_id": instrument_id, "params": params}), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) @@ -1405,6 +1413,7 @@ cdef class Actor(Component): BarType bar_type, ClientId client_id = None, bint await_partial = False, + dict params = None, ): """ Subscribe to streaming `Bar` data for the given bar type. @@ -1419,6 +1428,8 @@ cdef class Actor(Component): await_partial : bool, default False If the bar aggregator should await the arrival of a historical partial bar prior to actively aggregating new bars. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(bar_type, "bar_type") @@ -1432,6 +1443,7 @@ cdef class Actor(Component): cdef dict metadata = { "bar_type": bar_type, "await_partial": await_partial, + "params": params, } cdef Subscribe command = Subscribe( @@ -1444,7 +1456,7 @@ cdef class Actor(Component): self._send_data_cmd(command) - cpdef void subscribe_instrument_status(self, InstrumentId instrument_id, ClientId client_id = None): + cpdef void subscribe_instrument_status(self, InstrumentId instrument_id, ClientId client_id = None, dict params = None): """ Subscribe to status updates for the given instrument ID. @@ -1455,6 +1467,8 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(instrument_id, "instrument_id") @@ -1468,7 +1482,7 @@ cdef class Actor(Component): cdef Subscribe command = Subscribe( client_id=client_id, venue=instrument_id.venue, - data_type=DataType(InstrumentStatus, metadata={"instrument_id": instrument_id}), + data_type=DataType(InstrumentStatus, metadata={"instrument_id": instrument_id, "params": params}), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) @@ -1476,7 +1490,7 @@ cdef class Actor(Component): self._send_data_cmd(command) self._log.info(f"Subscribed to {instrument_id} InstrumentStatus") - cpdef void subscribe_instrument_close(self, InstrumentId instrument_id, ClientId client_id = None): + cpdef void subscribe_instrument_close(self, InstrumentId instrument_id, ClientId client_id = None, dict params = None): """ Subscribe to close updates for the given instrument ID. @@ -1487,6 +1501,8 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(instrument_id, "instrument_id") @@ -1500,7 +1516,7 @@ cdef class Actor(Component): cdef Subscribe command = Subscribe( client_id=client_id, venue=instrument_id.venue, - data_type=DataType(InstrumentClose, metadata={"instrument_id": instrument_id}), + data_type=DataType(InstrumentClose, metadata={"instrument_id": instrument_id, "params": params}), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) @@ -1518,6 +1534,8 @@ cdef class Actor(Component): client_id : ClientId, optional The data client ID. If supplied then an `Unsubscribe` command will be sent to the data client. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(data_type, "data_type") @@ -1541,7 +1559,7 @@ cdef class Actor(Component): self._send_data_cmd(command) - cpdef void unsubscribe_instruments(self, Venue venue, ClientId client_id = None): + cpdef void unsubscribe_instruments(self, Venue venue, ClientId client_id = None, dict params = None): """ Unsubscribe from update `Instrument` data for the given venue. @@ -1552,6 +1570,8 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(venue, "venue") @@ -1565,14 +1585,14 @@ cdef class Actor(Component): cdef Unsubscribe command = Unsubscribe( client_id=client_id, venue=venue, - data_type=DataType(Instrument), + data_type=DataType(Instrument, metadata={"params": params}), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) self._send_data_cmd(command) - cpdef void unsubscribe_instrument(self, InstrumentId instrument_id, ClientId client_id = None): + cpdef void unsubscribe_instrument(self, InstrumentId instrument_id, ClientId client_id = None, dict params = None): """ Unsubscribe from update `Instrument` data for the given instrument ID. @@ -1583,6 +1603,8 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(instrument_id, "instrument_id") @@ -1598,14 +1620,14 @@ cdef class Actor(Component): cdef Unsubscribe command = Unsubscribe( client_id=client_id, venue=instrument_id.venue, - data_type=DataType(Instrument, metadata={"instrument_id": instrument_id}), + data_type=DataType(Instrument, metadata={"instrument_id": instrument_id, "params": params}), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) self._send_data_cmd(command) - cpdef void unsubscribe_order_book_deltas(self, InstrumentId instrument_id, ClientId client_id = None): + cpdef void unsubscribe_order_book_deltas(self, InstrumentId instrument_id, ClientId client_id = None, dict params = None): """ Unsubscribe the order book deltas stream for the given instrument ID. @@ -1616,6 +1638,8 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(instrument_id, "instrument_id") @@ -1631,7 +1655,7 @@ cdef class Actor(Component): cdef Unsubscribe command = Unsubscribe( client_id=client_id, venue=instrument_id.venue, - data_type=DataType(OrderBookDelta, metadata={"instrument_id": instrument_id}), + data_type=DataType(OrderBookDelta, metadata={"instrument_id": instrument_id, "params": params}), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) @@ -1643,6 +1667,7 @@ cdef class Actor(Component): InstrumentId instrument_id, int interval_ms = 1000, ClientId client_id = None, + dict params = None, ): """ Unsubscribe from an `OrderBook` at a specified interval for the given instrument ID. @@ -1658,6 +1683,8 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(instrument_id, "instrument_id") @@ -1677,6 +1704,7 @@ cdef class Actor(Component): data_type=DataType(OrderBook, metadata={ "instrument_id": instrument_id, "interval_ms": interval_ms, + "params": params, }), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), @@ -1684,7 +1712,7 @@ cdef class Actor(Component): self._send_data_cmd(command) - cpdef void unsubscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id = None): + cpdef void unsubscribe_quote_ticks(self, InstrumentId instrument_id, ClientId client_id = None, dict params = None): """ Unsubscribe from streaming `QuoteTick` data for the given instrument ID. @@ -1695,6 +1723,8 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(instrument_id, "instrument_id") @@ -1710,14 +1740,14 @@ cdef class Actor(Component): cdef Unsubscribe command = Unsubscribe( client_id=client_id, venue=instrument_id.venue, - data_type=DataType(QuoteTick, metadata={"instrument_id": instrument_id}), + data_type=DataType(QuoteTick, metadata={"instrument_id": instrument_id, "params": params}), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) self._send_data_cmd(command) - cpdef void unsubscribe_trade_ticks(self, InstrumentId instrument_id, ClientId client_id = None): + cpdef void unsubscribe_trade_ticks(self, InstrumentId instrument_id, ClientId client_id = None, dict params = None): """ Unsubscribe from streaming `TradeTick` data for the given instrument ID. @@ -1728,6 +1758,8 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(instrument_id, "instrument_id") @@ -1743,14 +1775,14 @@ cdef class Actor(Component): cdef Unsubscribe command = Unsubscribe( client_id=client_id, venue=instrument_id.venue, - data_type=DataType(TradeTick, metadata={"instrument_id": instrument_id}), + data_type=DataType(TradeTick, metadata={"instrument_id": instrument_id, "params": params}), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) self._send_data_cmd(command) - cpdef void unsubscribe_bars(self, BarType bar_type, ClientId client_id = None): + cpdef void unsubscribe_bars(self, BarType bar_type, ClientId client_id = None, dict params = None): """ Unsubscribe from streaming `Bar` data for the given bar type. @@ -1761,6 +1793,8 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue in the instrument ID. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(bar_type, "bar_type") @@ -1776,7 +1810,7 @@ cdef class Actor(Component): cdef Unsubscribe command = Unsubscribe( client_id=client_id, venue=bar_type.instrument_id.venue, - data_type=DataType(Bar, metadata={"bar_type": standard_bar_type}), + data_type=DataType(Bar, metadata={"bar_type": standard_bar_type, "params": params}), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) @@ -1784,7 +1818,7 @@ cdef class Actor(Component): self._send_data_cmd(command) self._log.info(f"Unsubscribed from {standard_bar_type} bar data") - cpdef void unsubscribe_instrument_status(self, InstrumentId instrument_id, ClientId client_id = None): + cpdef void unsubscribe_instrument_status(self, InstrumentId instrument_id, ClientId client_id = None, dict params = None): """ Unsubscribe to status updates of the given venue. @@ -1795,6 +1829,8 @@ cdef class Actor(Component): client_id : ClientId, optional The specific client ID for the command. If ``None`` then will be inferred from the venue. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. """ Condition.not_none(instrument_id, "instrument_id") @@ -1808,7 +1844,7 @@ cdef class Actor(Component): cdef Unsubscribe command = Unsubscribe( client_id=client_id, venue=instrument_id.venue, - data_type=DataType(InstrumentStatus), + data_type=DataType(InstrumentStatus, metadata={"params": params}), command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) @@ -1952,6 +1988,7 @@ cdef class Actor(Component): ClientId client_id = None, callback: Callable[[UUID4], None] | None = None, bint update_catalog = False, + dict params = None, ): """ Request `Instrument` data for the given instrument ID. @@ -1975,6 +2012,8 @@ cdef class Actor(Component): completed processing. update_catalog : bool, default False If True then updates the catalog with new data received from a client. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. Returns ------- @@ -2014,6 +2053,7 @@ cdef class Actor(Component): "start": start, "end": end, "update_catalog": update_catalog, + "params": params, }), callback=self._handle_instrument_response, request_id=request_id, @@ -2024,6 +2064,7 @@ cdef class Actor(Component): self._send_data_req(request) return request_id + cpdef UUID4 request_instruments( self, Venue venue, @@ -2032,6 +2073,7 @@ cdef class Actor(Component): ClientId client_id = None, callback: Callable[[UUID4], None] | None = None, bint update_catalog = False, + dict params = None, ): """ Request all `Instrument` data for the given venue. @@ -2055,6 +2097,8 @@ cdef class Actor(Component): completed processing. update_catalog : bool, default False If True then updates the catalog with new data received from a client. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. Returns ------- @@ -2094,6 +2138,7 @@ cdef class Actor(Component): "start": start, "end": end, "update_catalog": update_catalog, + "params": params, }), callback=self._handle_instruments_response, request_id=request_id, @@ -2110,7 +2155,8 @@ cdef class Actor(Component): InstrumentId instrument_id, int limit, ClientId client_id=None, - callback: Callable[[UUID4], None] | None=None + callback: Callable[[UUID4], None] | None=None, + dict params = None, ): """ Request an order book snapshot. @@ -2128,6 +2174,8 @@ cdef class Actor(Component): The registered callback, to be called with the request ID when the response has completed processing. update_catalog : bool, default False If True then updates the catalog with new data received from a client. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. Returns ------- @@ -2153,6 +2201,7 @@ cdef class Actor(Component): data_type=DataType(OrderBookDeltas, metadata={ "instrument_id": instrument_id, "limit": limit, + "params": params, }), callback=self._handle_data_response, request_id=request_id, @@ -2171,8 +2220,8 @@ cdef class Actor(Component): datetime end = None, ClientId client_id = None, callback: Callable[[UUID4], None] | None = None, - str quote_type = "", bint update_catalog = False, + dict params = None, ): """ Request historical `QuoteTick` data. @@ -2194,10 +2243,10 @@ cdef class Actor(Component): callback : Callable[[UUID4], None], optional The registered callback, to be called with the request ID when the response has completed processing. - quote_type : str, default '' - The specified quote type applicable to certain client implementations. update_catalog : bool, default False If True then updates the catalog with new data received from a client. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. Returns ------- @@ -2237,7 +2286,7 @@ cdef class Actor(Component): "start": start, "end": end, "update_catalog": update_catalog, - "quote_type": quote_type, + "params": params, }), callback=self._handle_quote_ticks_response, request_id=request_id, @@ -2257,6 +2306,7 @@ cdef class Actor(Component): ClientId client_id = None, callback: Callable[[UUID4], None] | None = None, bint update_catalog = False, + dict params = None, ): """ Request historical `TradeTick` data. @@ -2280,6 +2330,8 @@ cdef class Actor(Component): completed processing. update_catalog : bool, default False If True then updates the catalog with new data received from a client. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. Returns ------- @@ -2319,6 +2371,7 @@ cdef class Actor(Component): "start": start, "end": end, "update_catalog": update_catalog, + "params": params, }), callback=self._handle_trade_ticks_response, request_id=request_id, @@ -2338,6 +2391,7 @@ cdef class Actor(Component): ClientId client_id = None, callback: Callable[[UUID4], None] | None = None, bint update_catalog = False, + dict params = None, ): """ Request historical `Bar` data. @@ -2361,6 +2415,8 @@ cdef class Actor(Component): completed processing. update_catalog : bool, default False If True then updates the catalog with new data received from a client. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. Returns ------- @@ -2400,6 +2456,7 @@ cdef class Actor(Component): "start": start, "end": end, "update_catalog": update_catalog, + "params": params, }), callback=self._handle_bars_response, request_id=request_id, @@ -2421,6 +2478,7 @@ cdef class Actor(Component): bint include_external_data = False, bint update_existing_subscriptions = False, bint update_catalog = False, + dict params = None, ): """ Request historical aggregated `Bar` data for multiple bar types. @@ -2453,6 +2511,8 @@ cdef class Actor(Component): If True, updates the aggregators of any existing subscription with the queried external data. update_catalog : bool, default False If True then updates the catalog with new data received from a client. + params : dict, optional + A dictionary of additional parameters potentially used by a specific client serving the request. Returns ------- @@ -2494,14 +2554,14 @@ cdef class Actor(Component): return first = bar_types[0] - market_data_type = "" + bars_market_data_type = "" if first.is_composite(): - market_data_type = "bars" + bars_market_data_type = "bars" elif first.spec.price_type == PriceType.LAST: - market_data_type = "trade_ticks" + bars_market_data_type = "trade_ticks" else: - market_data_type = "quote_ticks" + bars_market_data_type = "quote_ticks" cdef UUID4 request_id = UUID4() cdef DataRequest request = DataRequest( @@ -2509,7 +2569,7 @@ cdef class Actor(Component): venue=first.instrument_id.venue, data_type=DataType(Bar, metadata={ "bar_types": tuple(bar_types), - "market_data_type": market_data_type, + "bars_market_data_type": bars_market_data_type, "instrument_id": first.instrument_id, "bar_type": first.composite(), "start": start, @@ -2517,6 +2577,7 @@ cdef class Actor(Component): "include_external_data": include_external_data, "update_existing_subscriptions": update_existing_subscriptions, "update_catalog": update_catalog, + "params": params, }), callback=self._handle_aggregated_bars_response, request_id=request_id, diff --git a/nautilus_trader/core/nautilus_pyo3.pyi b/nautilus_trader/core/nautilus_pyo3.pyi index c392f320bc7b..e67973cd366a 100644 --- a/nautilus_trader/core/nautilus_pyo3.pyi +++ b/nautilus_trader/core/nautilus_pyo3.pyi @@ -4019,6 +4019,8 @@ class DatabentoHistoricalClient: start: int, end: int | None = None, limit: int | None = None, + price_precision: int | None = None, + schema: str | None = None, ) -> list[QuoteTick]: ... async def get_range_trades( self, diff --git a/nautilus_trader/data/client.pxd b/nautilus_trader/data/client.pxd index 7a615f62d649..9d8fbb238933 100644 --- a/nautilus_trader/data/client.pxd +++ b/nautilus_trader/data/client.pxd @@ -82,24 +82,24 @@ cdef class MarketDataClient(DataClient): cpdef list subscribed_instrument_status(self) cpdef list subscribed_instrument_close(self) - cpdef void subscribe_instruments(self) - cpdef void subscribe_instrument(self, InstrumentId instrument_id) - cpdef void subscribe_order_book_deltas(self, InstrumentId instrument_id, BookType book_type, int depth=*, dict kwargs=*) - cpdef void subscribe_order_book_snapshots(self, InstrumentId instrument_id, BookType book_type, int depth=*, dict kwargs=*) - cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id) - cpdef void subscribe_trade_ticks(self, InstrumentId instrument_id) - cpdef void subscribe_bars(self, BarType bar_type) - cpdef void subscribe_instrument_status(self, InstrumentId instrument_id) - cpdef void subscribe_instrument_close(self, InstrumentId instrument_id) - cpdef void unsubscribe_instruments(self) - cpdef void unsubscribe_instrument(self, InstrumentId instrument_id) - cpdef void unsubscribe_order_book_deltas(self, InstrumentId instrument_id) - cpdef void unsubscribe_order_book_snapshots(self, InstrumentId instrument_id) - cpdef void unsubscribe_quote_ticks(self, InstrumentId instrument_id) - cpdef void unsubscribe_trade_ticks(self, InstrumentId instrument_id) - cpdef void unsubscribe_bars(self, BarType bar_type) - cpdef void unsubscribe_instrument_status(self, InstrumentId instrument_id) - cpdef void unsubscribe_instrument_close(self, InstrumentId instrument_id) + cpdef void subscribe_instruments(self, dict metadata=*) + cpdef void subscribe_instrument(self, InstrumentId instrument_id, dict metadata=*) + cpdef void subscribe_order_book_deltas(self, InstrumentId instrument_id, BookType book_type, int depth=*, dict metadata=*) + cpdef void subscribe_order_book_snapshots(self, InstrumentId instrument_id, BookType book_type, int depth=*, dict metadata=*) + cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, dict metadata=*) + cpdef void subscribe_trade_ticks(self, InstrumentId instrument_id, dict metadata=*) + cpdef void subscribe_bars(self, BarType bar_type, dict metadata=*) + cpdef void subscribe_instrument_status(self, InstrumentId instrument_id, dict metadata=*) + cpdef void subscribe_instrument_close(self, InstrumentId instrument_id, dict metadata=*) + cpdef void unsubscribe_instruments(self, dict metadata=*) + cpdef void unsubscribe_instrument(self, InstrumentId instrument_id, dict metadata=*) + cpdef void unsubscribe_order_book_deltas(self, InstrumentId instrument_id, dict metadata=*) + cpdef void unsubscribe_order_book_snapshots(self, InstrumentId instrument_id, dict metadata=*) + cpdef void unsubscribe_quote_ticks(self, InstrumentId instrument_id, dict metadata=*) + cpdef void unsubscribe_trade_ticks(self, InstrumentId instrument_id, dict metadata=*) + cpdef void unsubscribe_bars(self, BarType bar_type, dict metadata=*) + cpdef void unsubscribe_instrument_status(self, InstrumentId instrument_id, dict metadata=*) + cpdef void unsubscribe_instrument_close(self, InstrumentId instrument_id, dict metadata=*) cpdef void _add_subscription_instrument(self, InstrumentId instrument_id) cpdef void _add_subscription_order_book_deltas(self, InstrumentId instrument_id) @@ -140,7 +140,8 @@ cdef class MarketDataClient(DataClient): self, InstrumentId instrument_id, int limit, - UUID4 correlation_id + UUID4 correlation_id, + dict metadata=*, ) cpdef void request_quote_ticks( self, diff --git a/nautilus_trader/data/client.pyx b/nautilus_trader/data/client.pyx index 7b572593b1a0..584680c4aa8a 100644 --- a/nautilus_trader/data/client.pyx +++ b/nautilus_trader/data/client.pyx @@ -368,7 +368,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `subscribe` must be implemented in the subclass") - cpdef void subscribe_instruments(self): + cpdef void subscribe_instruments(self, dict metadata = None): """ Subscribe to all `Instrument` data. @@ -379,7 +379,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `subscribe_instruments` must be implemented in the subclass") - cpdef void subscribe_instrument(self, InstrumentId instrument_id): + cpdef void subscribe_instrument(self, InstrumentId instrument_id, dict metadata = None): """ Subscribe to the `Instrument` with the given instrument ID. @@ -390,7 +390,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `subscribe_instrument` must be implemented in the subclass") - cpdef void subscribe_order_book_deltas(self, InstrumentId instrument_id, BookType book_type, int depth = 0, dict kwargs = None): + cpdef void subscribe_order_book_deltas(self, InstrumentId instrument_id, BookType book_type, int depth = 0, dict metadata = None): """ Subscribe to `OrderBookDeltas` data for the given instrument ID. @@ -412,7 +412,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `subscribe_order_book_deltas` must be implemented in the subclass") - cpdef void subscribe_order_book_snapshots(self, InstrumentId instrument_id, BookType book_type, int depth = 0, dict kwargs = None): + cpdef void subscribe_order_book_snapshots(self, InstrumentId instrument_id, BookType book_type, int depth = 0, dict metadata = None): """ Subscribe to `OrderBook` snapshots data for the given instrument ID. @@ -434,7 +434,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `subscribe_order_book_snapshots` must be implemented in the subclass") - cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id): + cpdef void subscribe_quote_ticks(self, InstrumentId instrument_id, dict metadata = None): """ Subscribe to `QuoteTick` data for the given instrument ID. @@ -450,7 +450,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `subscribe_quote_ticks` must be implemented in the subclass") - cpdef void subscribe_trade_ticks(self, InstrumentId instrument_id): + cpdef void subscribe_trade_ticks(self, InstrumentId instrument_id, dict metadata = None): """ Subscribe to `TradeTick` data for the given instrument ID. @@ -466,7 +466,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `subscribe_trade_ticks` must be implemented in the subclass") - cpdef void subscribe_instrument_status(self, InstrumentId instrument_id): + cpdef void subscribe_instrument_status(self, InstrumentId instrument_id, dict metadata = None): """ Subscribe to `InstrumentStatus` data for the given instrument ID. @@ -482,7 +482,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `subscribe_instrument_status` must be implemented in the subclass") - cpdef void subscribe_instrument_close(self, InstrumentId instrument_id): + cpdef void subscribe_instrument_close(self, InstrumentId instrument_id, dict metadata = None): """ Subscribe to `InstrumentClose` updates for the given instrument ID. @@ -498,7 +498,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `subscribe_instrument_close` must be implemented in the subclass") - cpdef void subscribe_bars(self, BarType bar_type): + cpdef void subscribe_bars(self, BarType bar_type, dict metadata = None): """ Subscribe to `Bar` data for the given bar type. @@ -529,7 +529,7 @@ cdef class MarketDataClient(DataClient): f"You can implement by overriding the `unsubscribe` method for this client", ) - cpdef void unsubscribe_instruments(self): + cpdef void unsubscribe_instruments(self, dict metadata = None): """ Unsubscribe from all `Instrument` data. @@ -540,7 +540,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `unsubscribe_instruments` must be implemented in the subclass") - cpdef void unsubscribe_instrument(self, InstrumentId instrument_id): + cpdef void unsubscribe_instrument(self, InstrumentId instrument_id, dict metadata = None): """ Unsubscribe from `Instrument` data for the given instrument ID. @@ -556,7 +556,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `unsubscribe_instrument` must be implemented in the subclass") - cpdef void unsubscribe_order_book_deltas(self, InstrumentId instrument_id): + cpdef void unsubscribe_order_book_deltas(self, InstrumentId instrument_id, dict metadata = None): """ Unsubscribe from `OrderBookDeltas` data for the given instrument ID. @@ -572,7 +572,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `unsubscribe_order_book_deltas` must be implemented in the subclass") - cpdef void unsubscribe_order_book_snapshots(self, InstrumentId instrument_id): + cpdef void unsubscribe_order_book_snapshots(self, InstrumentId instrument_id, dict metadata = None): """ Unsubscribe from `OrderBook` snapshots data for the given instrument ID. @@ -588,7 +588,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `unsubscribe_order_book_snapshots` must be implemented in the subclass") - cpdef void unsubscribe_quote_ticks(self, InstrumentId instrument_id): + cpdef void unsubscribe_quote_ticks(self, InstrumentId instrument_id, dict metadata = None): """ Unsubscribe from `QuoteTick` data for the given instrument ID. @@ -604,7 +604,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `unsubscribe_quote_ticks` must be implemented in the subclass") - cpdef void unsubscribe_trade_ticks(self, InstrumentId instrument_id): + cpdef void unsubscribe_trade_ticks(self, InstrumentId instrument_id, dict metadata = None): """ Unsubscribe from `TradeTick` data for the given instrument ID. @@ -620,7 +620,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `unsubscribe_trade_ticks` must be implemented in the subclass") - cpdef void unsubscribe_bars(self, BarType bar_type): + cpdef void unsubscribe_bars(self, BarType bar_type, dict metadata = None): """ Unsubscribe from `Bar` data for the given bar type. @@ -636,7 +636,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `unsubscribe_bars` must be implemented in the subclass") - cpdef void unsubscribe_instrument_status(self, InstrumentId instrument_id): + cpdef void unsubscribe_instrument_status(self, InstrumentId instrument_id, dict metadata = None): """ Unsubscribe from `InstrumentStatus` data for the given instrument ID. @@ -652,7 +652,7 @@ cdef class MarketDataClient(DataClient): ) raise NotImplementedError("method `unsubscribe_instrument_status` must be implemented in the subclass") - cpdef void unsubscribe_instrument_close(self, InstrumentId instrument_id): + cpdef void unsubscribe_instrument_close(self, InstrumentId instrument_id, dict metadata = None): """ Unsubscribe from `InstrumentClose` data for the given instrument ID. @@ -826,6 +826,7 @@ cdef class MarketDataClient(DataClient): InstrumentId instrument_id, int limit, UUID4 correlation_id, + dict metadata = None, ): """ Request order book snapshot data. diff --git a/nautilus_trader/data/engine.pxd b/nautilus_trader/data/engine.pxd index 59bfc385861c..c34a7e2fa53e 100644 --- a/nautilus_trader/data/engine.pxd +++ b/nautilus_trader/data/engine.pxd @@ -13,9 +13,11 @@ # limitations under the License. # ------------------------------------------------------------------------------------------------- +from cpython.datetime cimport datetime from libc.stdint cimport uint64_t from nautilus_trader.persistence.catalog import ParquetDataCatalog + from nautilus_trader.cache.cache cimport Cache from nautilus_trader.common.component cimport Component from nautilus_trader.common.component cimport TimeEvent @@ -128,27 +130,34 @@ cdef class DataEngine(Component): cpdef void _execute_command(self, DataCommand command) cpdef void _handle_subscribe(self, DataClient client, Subscribe command) cpdef void _handle_unsubscribe(self, DataClient client, Unsubscribe command) - cpdef void _handle_subscribe_instrument(self, MarketDataClient client, InstrumentId instrument_id) + cpdef void _handle_subscribe_instrument(self, MarketDataClient client, InstrumentId instrument_id, dict metadata) cpdef void _handle_subscribe_order_book_deltas(self, MarketDataClient client, InstrumentId instrument_id, dict metadata) # noqa cpdef void _handle_subscribe_order_book(self, MarketDataClient client, InstrumentId instrument_id, dict metadata) # noqa - cpdef void _setup_order_book(self, MarketDataClient client, InstrumentId instrument_id, dict metadata, bint only_deltas, bint managed) # noqa + cpdef void _setup_order_book(self, MarketDataClient client, InstrumentId instrument_id, bint only_deltas, bint managed, dict metadata) # noqa cpdef void _create_new_book(self, Instrument instrument, BookType book_type) - cpdef void _handle_subscribe_quote_ticks(self, MarketDataClient client, InstrumentId instrument_id) + cpdef void _handle_subscribe_quote_ticks(self, MarketDataClient client, InstrumentId instrument_id, dict metadata) cpdef void _handle_subscribe_synthetic_quote_ticks(self, InstrumentId instrument_id) - cpdef void _handle_subscribe_trade_ticks(self, MarketDataClient client, InstrumentId instrument_id) + cpdef void _handle_subscribe_trade_ticks(self, MarketDataClient client, InstrumentId instrument_id, dict metadata) cpdef void _handle_subscribe_synthetic_trade_ticks(self, InstrumentId instrument_id) - cpdef void _handle_subscribe_bars(self, MarketDataClient client, BarType bar_type, bint await_partial) + cpdef void _handle_subscribe_bars(self, MarketDataClient client, BarType bar_type, bint await_partial, dict metadata) cpdef void _handle_subscribe_data(self, DataClient client, DataType data_type) - cpdef void _handle_subscribe_instrument_status(self, MarketDataClient client, InstrumentId instrument_id) - cpdef void _handle_subscribe_instrument_close(self, MarketDataClient client, InstrumentId instrument_id) - cpdef void _handle_unsubscribe_instrument(self, MarketDataClient client, InstrumentId instrument_id) + cpdef void _handle_subscribe_instrument_status(self, MarketDataClient client, InstrumentId instrument_id, dict metadata) + cpdef void _handle_subscribe_instrument_close(self, MarketDataClient client, InstrumentId instrument_id, dict metadata) + cpdef void _handle_unsubscribe_instrument(self, MarketDataClient client, InstrumentId instrument_id, dict metadata) cpdef void _handle_unsubscribe_order_book_deltas(self, MarketDataClient client, InstrumentId instrument_id, dict metadata) # noqa cpdef void _handle_unsubscribe_order_book(self, MarketDataClient client, InstrumentId instrument_id, dict metadata) # noqa - cpdef void _handle_unsubscribe_quote_ticks(self, MarketDataClient client, InstrumentId instrument_id) - cpdef void _handle_unsubscribe_trade_ticks(self, MarketDataClient client, InstrumentId instrument_id) - cpdef void _handle_unsubscribe_bars(self, MarketDataClient client, BarType bar_type) + cpdef void _handle_unsubscribe_quote_ticks(self, MarketDataClient client, InstrumentId instrument_id, dict metadata) + cpdef void _handle_unsubscribe_trade_ticks(self, MarketDataClient client, InstrumentId instrument_id, dict metadata) + cpdef void _handle_unsubscribe_bars(self, MarketDataClient client, BarType bar_type, dict metadata) cpdef void _handle_unsubscribe_data(self, DataClient client, DataType data_type) cpdef void _handle_request(self, DataRequest request) + cpdef void _handle_request_instruments(self, DataRequest request, DataClient client, datetime start, datetime end, dict metadata) + cpdef void _handle_request_instrument(self, DataRequest request, DataClient client, InstrumentId instrument_id, datetime start, datetime end, dict metadata) + cpdef void _handle_request_order_book_deltas(self, DataRequest request, DataClient client, dict metadata) + cpdef void _handle_request_quote_ticks(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now, dict metadata) + cpdef void _handle_request_trade_ticks(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now, dict metadata) + cpdef void _handle_request_bars(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now, dict metadata) + cpdef void _handle_request_data(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now) cpdef void _query_catalog(self, DataRequest request) # -- DATA HANDLERS -------------------------------------------------------------------------------- @@ -185,8 +194,8 @@ cdef class DataEngine(Component): cpdef void _update_order_book(self, Data data) cpdef void _snapshot_order_book(self, TimeEvent snap_event) cpdef void _publish_order_book(self, InstrumentId instrument_id, str topic) - cpdef void _start_bar_aggregator(self, MarketDataClient client, BarType bar_type, bint await_partial) - cpdef void _stop_bar_aggregator(self, MarketDataClient client, BarType bar_type) + cpdef void _start_bar_aggregator(self, MarketDataClient client, BarType bar_type, bint await_partial, dict metadata) + cpdef void _stop_bar_aggregator(self, MarketDataClient client, BarType bar_type, dict metadata) cpdef void _update_synthetics_with_quote(self, list synthetics, QuoteTick update) cpdef void _update_synthetic_with_quote(self, SyntheticInstrument synthetic, QuoteTick update) cpdef void _update_synthetics_with_trade(self, list synthetics, TradeTick update) diff --git a/nautilus_trader/data/engine.pyx b/nautilus_trader/data/engine.pyx index c967aedc8f45..645645f5693c 100644 --- a/nautilus_trader/data/engine.pyx +++ b/nautilus_trader/data/engine.pyx @@ -664,6 +664,7 @@ cdef class DataEngine(Component): self._handle_subscribe_instrument( client, command.data_type.metadata.get("instrument_id"), + command.data_type.metadata, ) elif command.data_type.type == OrderBookDelta: self._handle_subscribe_order_book_deltas( @@ -681,27 +682,32 @@ cdef class DataEngine(Component): self._handle_subscribe_quote_ticks( client, command.data_type.metadata.get("instrument_id"), + command.data_type.metadata, ) elif command.data_type.type == TradeTick: self._handle_subscribe_trade_ticks( client, command.data_type.metadata.get("instrument_id"), + command.data_type.metadata, ) elif command.data_type.type == Bar: self._handle_subscribe_bars( client, command.data_type.metadata.get("bar_type"), command.data_type.metadata.get("await_partial"), + command.data_type.metadata, ) elif command.data_type.type == InstrumentStatus: self._handle_subscribe_instrument_status( client, command.data_type.metadata.get("instrument_id"), + command.data_type.metadata, ) elif command.data_type.type == InstrumentClose: self._handle_subscribe_instrument_close( client, command.data_type.metadata.get("instrument_id"), + command.data_type.metadata, ) else: self._handle_subscribe_data(client, command.data_type) @@ -711,6 +717,7 @@ cdef class DataEngine(Component): self._handle_unsubscribe_instrument( client, command.data_type.metadata.get("instrument_id"), + command.data_type.metadata, ) elif command.data_type.type == OrderBook: self._handle_unsubscribe_order_book( @@ -728,16 +735,19 @@ cdef class DataEngine(Component): self._handle_unsubscribe_quote_ticks( client, command.data_type.metadata.get("instrument_id"), + command.data_type.metadata, ) elif command.data_type.type == TradeTick: self._handle_unsubscribe_trade_ticks( client, command.data_type.metadata.get("instrument_id"), + command.data_type.metadata, ) elif command.data_type.type == Bar: self._handle_unsubscribe_bars( client, command.data_type.metadata.get("bar_type"), + command.data_type.metadata, ) else: self._handle_unsubscribe_data(client, command.data_type) @@ -746,6 +756,7 @@ cdef class DataEngine(Component): self, MarketDataClient client, InstrumentId instrument_id, + dict metadata, ): Condition.not_none(client, "client") @@ -758,7 +769,7 @@ cdef class DataEngine(Component): return if instrument_id not in client.subscribed_instruments(): - client.subscribe_instrument(instrument_id) + client.subscribe_instrument(instrument_id, metadata) cpdef void _handle_subscribe_order_book_deltas( self, @@ -777,9 +788,9 @@ cdef class DataEngine(Component): self._setup_order_book( client, instrument_id, - metadata, only_deltas=True, - managed=metadata["managed"] + managed=metadata["managed"], + metadata=metadata, ) cpdef void _handle_subscribe_order_book( @@ -838,18 +849,18 @@ cdef class DataEngine(Component): self._setup_order_book( client, instrument_id, - metadata, only_deltas=False, - managed=metadata["managed"] + managed=metadata["managed"], + metadata=metadata, ) cpdef void _setup_order_book( self, MarketDataClient client, InstrumentId instrument_id, - dict metadata, bint only_deltas, bint managed, + dict metadata, ): Condition.not_none(client, "client") Condition.not_none(instrument_id, "instrument_id") @@ -879,7 +890,7 @@ cdef class DataEngine(Component): instrument_id=instrument_id, book_type=book_type, depth=metadata["depth"], - kwargs=metadata.get("kwargs"), + metadata=metadata, ) except NotImplementedError: if only_deltas: @@ -889,7 +900,7 @@ cdef class DataEngine(Component): instrument_id=instrument_id, book_type=metadata["book_type"], depth=metadata["depth"], - kwargs=metadata.get("kwargs"), + metadata=metadata, ) # Set up subscriptions @@ -936,6 +947,7 @@ cdef class DataEngine(Component): self, MarketDataClient client, InstrumentId instrument_id, + dict metadata, ): Condition.not_none(instrument_id, "instrument_id") if instrument_id.is_synthetic(): @@ -944,7 +956,7 @@ cdef class DataEngine(Component): Condition.not_none(client, "client") if instrument_id not in client.subscribed_quote_ticks(): - client.subscribe_quote_ticks(instrument_id) + client.subscribe_quote_ticks(instrument_id, metadata) cpdef void _handle_subscribe_synthetic_quote_ticks(self, InstrumentId instrument_id): cdef SyntheticInstrument synthetic = self._cache.synthetic(instrument_id) @@ -976,6 +988,7 @@ cdef class DataEngine(Component): self, MarketDataClient client, InstrumentId instrument_id, + dict metadata ): Condition.not_none(instrument_id, "instrument_id") if instrument_id.is_synthetic(): @@ -984,7 +997,7 @@ cdef class DataEngine(Component): Condition.not_none(client, "client") if instrument_id not in client.subscribed_trade_ticks(): - client.subscribe_trade_ticks(instrument_id) + client.subscribe_trade_ticks(instrument_id, metadata) cpdef void _handle_subscribe_synthetic_trade_ticks(self, InstrumentId instrument_id): cdef SyntheticInstrument synthetic = self._cache.synthetic(instrument_id) @@ -1017,6 +1030,7 @@ cdef class DataEngine(Component): MarketDataClient client, BarType bar_type, bint await_partial, + dict metadata, ): Condition.not_none(client, "client") Condition.not_none(bar_type, "bar_type") @@ -1024,7 +1038,7 @@ cdef class DataEngine(Component): if bar_type.is_internally_aggregated(): # Internal aggregation if bar_type.standard() not in self._bar_aggregators: - self._start_bar_aggregator(client, bar_type, await_partial) + self._start_bar_aggregator(client, bar_type, await_partial, metadata) else: # External aggregation if bar_type.instrument_id.is_synthetic(): @@ -1034,7 +1048,7 @@ cdef class DataEngine(Component): return if bar_type not in client.subscribed_bars(): - client.subscribe_bars(bar_type) + client.subscribe_bars(bar_type, metadata) cpdef void _handle_subscribe_data( self, @@ -1058,6 +1072,7 @@ cdef class DataEngine(Component): self, MarketDataClient client, InstrumentId instrument_id, + dict metadata, ): Condition.not_none(client, "client") Condition.not_none(instrument_id, "instrument_id") @@ -1069,12 +1084,13 @@ cdef class DataEngine(Component): return if instrument_id not in client.subscribed_instrument_status(): - client.subscribe_instrument_status(instrument_id) + client.subscribe_instrument_status(instrument_id, metadata) cpdef void _handle_subscribe_instrument_close( self, MarketDataClient client, InstrumentId instrument_id, + dict metadata, ): Condition.not_none(client, "client") Condition.not_none(instrument_id, "instrument_id") @@ -1084,19 +1100,20 @@ cdef class DataEngine(Component): return if instrument_id not in client.subscribed_instrument_close(): - client.subscribe_instrument_close(instrument_id) + client.subscribe_instrument_close(instrument_id, metadata) cpdef void _handle_unsubscribe_instrument( self, MarketDataClient client, InstrumentId instrument_id, + dict metadata, ): Condition.not_none(client, "client") if instrument_id is None: if not self._msgbus.has_subscribers(f"data.instrument.{client.id.value}.*"): if client.subscribed_instruments(): - client.unsubscribe_instruments() + client.unsubscribe_instruments(metadata) return else: if instrument_id.is_synthetic(): @@ -1109,7 +1126,7 @@ cdef class DataEngine(Component): f".{instrument_id.symbol}", ): if instrument_id in client.subscribed_instruments(): - client.unsubscribe_instrument(instrument_id) + client.unsubscribe_instrument(instrument_id, metadata) cpdef void _handle_unsubscribe_order_book_deltas( self, @@ -1142,7 +1159,7 @@ cdef class DataEngine(Component): if not self._msgbus.has_subscribers(topic): if instrument_id in client.subscribed_order_book_deltas(): - client.unsubscribe_order_book_deltas(instrument_id) + client.unsubscribe_order_book_deltas(instrument_id, metadata) cpdef void _handle_unsubscribe_order_book( self, @@ -1185,16 +1202,17 @@ cdef class DataEngine(Component): if not self._msgbus.has_subscribers(deltas_topic): if instrument_id in client.subscribed_order_book_deltas(): - client.unsubscribe_order_book_deltas(instrument_id) + client.unsubscribe_order_book_deltas(instrument_id, metadata) if not self._msgbus.has_subscribers(snapshots_topic): if instrument_id in client.subscribed_order_book_snapshots(): - client.unsubscribe_order_book_snapshots(instrument_id) + client.unsubscribe_order_book_snapshots(instrument_id, metadata) cpdef void _handle_unsubscribe_quote_ticks( self, MarketDataClient client, InstrumentId instrument_id, + dict metadata, ): Condition.not_none(client, "client") Condition.not_none(instrument_id, "instrument_id") @@ -1205,12 +1223,13 @@ cdef class DataEngine(Component): f".{instrument_id.symbol}", ): if instrument_id in client.subscribed_quote_ticks(): - client.unsubscribe_quote_ticks(instrument_id) + client.unsubscribe_quote_ticks(instrument_id, metadata) cpdef void _handle_unsubscribe_trade_ticks( self, MarketDataClient client, InstrumentId instrument_id, + dict metadata, ): Condition.not_none(client, "client") Condition.not_none(instrument_id, "instrument_id") @@ -1221,12 +1240,13 @@ cdef class DataEngine(Component): f".{instrument_id.symbol}", ): if instrument_id in client.subscribed_trade_ticks(): - client.unsubscribe_trade_ticks(instrument_id) + client.unsubscribe_trade_ticks(instrument_id, metadata) cpdef void _handle_unsubscribe_bars( self, MarketDataClient client, BarType bar_type, + dict metadata, ): Condition.not_none(client, "client") Condition.not_none(bar_type, "bar_type") @@ -1237,11 +1257,11 @@ cdef class DataEngine(Component): if bar_type.is_internally_aggregated(): # Internal aggregation if bar_type.standard() in self._bar_aggregators: - self._stop_bar_aggregator(client, bar_type) + self._stop_bar_aggregator(client, bar_type, metadata) else: # External aggregation if bar_type in client.subscribed_bars(): - client.unsubscribe_bars(bar_type) + client.unsubscribe_bars(bar_type, metadata) cpdef void _handle_unsubscribe_data( self, @@ -1305,8 +1325,7 @@ cdef class DataEngine(Component): Condition.is_true(isinstance(client, MarketDataClient), "client was not a MarketDataClient") cdef dict[str, object] metadata = request.data_type.metadata - cdef str aggregated_bars_market_data_type = metadata.get("market_data_type", "") - cdef bint update_catalog = metadata.get("update_catalog", False) + cdef str bars_market_data_type = metadata.get("bars_market_data_type", "") cdef datetime now = self._clock.utc_now() cdef datetime start = time_object_to_dt(metadata.get("start")) # Can be None @@ -1316,180 +1335,204 @@ cdef class DataEngine(Component): instrument_id = request.data_type.metadata.get("instrument_id") if instrument_id is None: - if self._catalogs and not update_catalog: - self._query_catalog(request) - return - - if client is None: - self._log.error( - f"Cannot handle request: " - f"no client registered for '{request.client_id}', {request}") - return # No client to handle request - - client.request_instruments( - request.data_type.metadata.get("venue"), - request.id, - start, - end, - metadata, - ) + self._handle_request_instruments(request, client, start, end, metadata) else: - last_timestamp, _ = self._catalogs_last_timestamp( - Instrument, - instrument_id, - ) + self._handle_request_instrument(request, client, instrument_id, start, end, metadata) + elif request.data_type.type == OrderBookDeltas: + self._handle_request_order_book_deltas(request, client, metadata) + elif request.data_type.type == QuoteTick or bars_market_data_type == "quote_ticks": + self._handle_request_quote_ticks(request, client, start, end, now, metadata) + elif request.data_type.type == TradeTick or bars_market_data_type == "trade_ticks": + self._handle_request_trade_ticks(request, client, start, end, now, metadata) + elif request.data_type.type == Bar or bars_market_data_type == "bars": + self._handle_request_bars(request, client, start, end, now, metadata) + else: + self._handle_request_data(request, client, start, end, now) - if last_timestamp: - self._query_catalog(request) - return + cpdef void _handle_request_instruments(self, DataRequest request, DataClient client, datetime start, datetime end, dict metadata): + cdef bint update_catalog = metadata.get("update_catalog", False) - if client is None: - self._log.error( - f"Cannot handle request: " - f"no client registered for '{request.client_id}', {request}") - return # No client to handle request - - client.request_instrument( - instrument_id, - request.id, - start, - end, - metadata, - ) - elif request.data_type.type == OrderBookDeltas: - instrument_id = request.data_type.metadata.get("instrument_id") + if self._catalogs and not update_catalog: + self._query_catalog(request) + return - if client is None: - self._log.error( - f"Cannot handle request: " - f"no client registered for '{request.client_id}', {request}") - return # No client to handle request - - client.request_order_book_snapshot( - instrument_id, - request.data_type.metadata.get("limit", 0), - request.id - ) - elif request.data_type.type == QuoteTick or aggregated_bars_market_data_type == "quote_ticks": - instrument_id = request.data_type.metadata.get("instrument_id") + if client is None: + self._log.error( + f"Cannot handle request: " + f"no client registered for '{request.client_id}', {request}") + return # No client to handle request + + client.request_instruments( + request.data_type.metadata.get("venue"), + request.id, + start, + end, + metadata, + ) - last_timestamp, _ = self._catalogs_last_timestamp( - QuoteTick, - instrument_id, - ) + cpdef void _handle_request_instrument(self, DataRequest request, DataClient client, InstrumentId instrument_id, datetime start, datetime end, dict metadata): + last_timestamp, _ = self._catalogs_last_timestamp( + Instrument, + instrument_id, + ) - if last_timestamp: - if (now <= last_timestamp) or (end and end <= last_timestamp): - self._query_catalog(request) - return + if last_timestamp: + self._query_catalog(request) + return - if client is None: - self._log.error( - f"Cannot handle request: " - f"no client registered for '{request.client_id}', {request}") - return # No client to handle request + if client is None: + self._log.error( + f"Cannot handle request: " + f"no client registered for '{request.client_id}', {request}") + return # No client to handle request - if last_timestamp and start and start <= last_timestamp: - self._new_query_group(request.id, 2) - self._query_catalog(request) + client.request_instrument( + instrument_id, + request.id, + start, + end, + metadata, + ) - client_start = max_date(start, last_timestamp) - client.request_quote_ticks( - instrument_id, - request.data_type.metadata.get("limit", 0), - request.id, - client_start, - end, - metadata, - ) - elif request.data_type.type == TradeTick or aggregated_bars_market_data_type == "trade_ticks": - instrument_id = request.data_type.metadata.get("instrument_id") + cpdef void _handle_request_order_book_deltas(self, DataRequest request, DataClient client, dict metadata): + instrument_id = request.data_type.metadata.get("instrument_id") - last_timestamp, _ = self._catalogs_last_timestamp( - TradeTick, - instrument_id, - ) + if client is None: + self._log.error( + f"Cannot handle request: " + f"no client registered for '{request.client_id}', {request}") + return # No client to handle request - if last_timestamp: - if (now <= last_timestamp) or (end and end <= last_timestamp): - self._query_catalog(request) - return + client.request_order_book_snapshot( + instrument_id, + request.data_type.metadata.get("limit", 0), + request.id, + metadata, + ) - if client is None: - self._log.error( - f"Cannot handle request: " - f"no client registered for '{request.client_id}', {request}") - return # No client to handle request + cpdef void _handle_request_quote_ticks(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now, dict metadata): + instrument_id = request.data_type.metadata.get("instrument_id") - if last_timestamp and start and start <= last_timestamp: - self._new_query_group(request.id, 2) + last_timestamp, _ = self._catalogs_last_timestamp( + QuoteTick, + instrument_id, + ) + + if last_timestamp: + if (now <= last_timestamp) or (end and end <= last_timestamp): self._query_catalog(request) + return - client_start = max_date(start, last_timestamp) - client.request_trade_ticks( - instrument_id, - request.data_type.metadata.get("limit", 0), - request.id, - client_start, - end, - metadata, - ) - elif request.data_type.type == Bar or aggregated_bars_market_data_type == "bars": - bar_type = request.data_type.metadata.get("bar_type") + if client is None: + self._log.error( + f"Cannot handle request: " + f"no client registered for '{request.client_id}', {request}") + return # No client to handle request - last_timestamp, _ = self._catalogs_last_timestamp( - Bar, - bar_type=bar_type, - ) + if last_timestamp and start and start <= last_timestamp: + self._new_query_group(request.id, 2) + self._query_catalog(request) - if last_timestamp: - if (now <= last_timestamp) or (end and end <= last_timestamp): - self._query_catalog(request) - return + client_start = max_date(start, last_timestamp) + client.request_quote_ticks( + instrument_id, + request.data_type.metadata.get("limit", 0), + request.id, + client_start, + end, + metadata, + ) - if client is None: - self._log.error( - f"Cannot handle request: " - f"no client registered for '{request.client_id}', {request}") - return # No client to handle request + cpdef void _handle_request_trade_ticks(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now, dict metadata): + instrument_id = request.data_type.metadata.get("instrument_id") + + last_timestamp, _ = self._catalogs_last_timestamp( + TradeTick, + instrument_id, + ) - if last_timestamp and start and start <= last_timestamp: - self._new_query_group(request.id, 2) + if last_timestamp: + if (now <= last_timestamp) or (end and end <= last_timestamp): self._query_catalog(request) + return - client_start = max_date(start, last_timestamp) - client.request_bars( - bar_type, - request.data_type.metadata.get("limit", 0), - request.id, - client_start, - end, - metadata, - ) - else: - last_timestamp, _ = self._catalogs_last_timestamp( - request.data_type.type, - ) + if client is None: + self._log.error( + f"Cannot handle request: " + f"no client registered for '{request.client_id}', {request}") + return # No client to handle request - if last_timestamp: - if (now <= last_timestamp) or (end and end <= last_timestamp): - self._query_catalog(request) - return + if last_timestamp and start and start <= last_timestamp: + self._new_query_group(request.id, 2) + self._query_catalog(request) - if client is None: - self._log.error( - f"Cannot handle request: " - f"no client registered for '{request.client_id}', {request}") - return # No client to handle request + client_start = max_date(start, last_timestamp) + client.request_trade_ticks( + instrument_id, + request.data_type.metadata.get("limit", 0), + request.id, + client_start, + end, + metadata, + ) - if last_timestamp and start and start <= last_timestamp: - self._new_query_group(request.id, 2) + cpdef void _handle_request_bars(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now, dict metadata): + bar_type = request.data_type.metadata.get("bar_type") + + last_timestamp, _ = self._catalogs_last_timestamp( + Bar, + bar_type=bar_type, + ) + + if last_timestamp: + if (now <= last_timestamp) or (end and end <= last_timestamp): self._query_catalog(request) + return - try: - client.request(request.data_type, request.id) - except NotImplementedError: - self._log.error(f"Cannot handle request: unrecognized data type {request.data_type}") + if client is None: + self._log.error( + f"Cannot handle request: " + f"no client registered for '{request.client_id}', {request}") + return # No client to handle request + + if last_timestamp and start and start <= last_timestamp: + self._new_query_group(request.id, 2) + self._query_catalog(request) + + client_start = max_date(start, last_timestamp) + client.request_bars( + bar_type, + request.data_type.metadata.get("limit", 0), + request.id, + client_start, + end, + metadata, + ) + + cpdef void _handle_request_data(self, DataRequest request, DataClient client, datetime start, datetime end, datetime now): + last_timestamp, _ = self._catalogs_last_timestamp( + request.data_type.type, + ) + + if last_timestamp: + if (now <= last_timestamp) or (end and end <= last_timestamp): + self._query_catalog(request) + return + + if client is None: + self._log.error( + f"Cannot handle request: " + f"no client registered for '{request.client_id}', {request}") + return # No client to handle request + + if last_timestamp and start and start <= last_timestamp: + self._new_query_group(request.id, 2) + self._query_catalog(request) + + try: + client.request(request.data_type, request.id) + except NotImplementedError: + self._log.error(f"Cannot handle request: unrecognized data type {request.data_type}") cpdef void _query_catalog(self, DataRequest request): cdef datetime start = request.data_type.metadata.get("start") @@ -1509,8 +1552,7 @@ cdef class DataEngine(Component): ) ts_end = ts_now - # Field defined when using actor.request_aggregated_bars - market_data_type = request.data_type.metadata.get("market_data_type") + bars_market_data_type = request.data_type.metadata.get("bars_market_data_type", "") data = [] if request.data_type.type == Instrument: @@ -1521,21 +1563,21 @@ cdef class DataEngine(Component): else: for catalog in self._catalogs.values(): data += catalog.instruments(instrument_ids=[str(instrument_id)]) - elif request.data_type.type == QuoteTick or (market_data_type and market_data_type == "quote_ticks"): + elif request.data_type.type == QuoteTick or bars_market_data_type == "quote_ticks": for catalog in self._catalogs.values(): data += catalog.quote_ticks( instrument_ids=[str(request.data_type.metadata.get("instrument_id"))], start=ts_start, end=ts_end, ) - elif request.data_type.type == TradeTick or (market_data_type and market_data_type == "trade_ticks"): + elif request.data_type.type == TradeTick or bars_market_data_type == "trade_ticks": for catalog in self._catalogs.values(): data += catalog.trade_ticks( instrument_ids=[str(request.data_type.metadata.get("instrument_id"))], start=ts_start, end=ts_end, ) - elif request.data_type.type == Bar or (market_data_type and market_data_type == "bars"): + elif request.data_type.type == Bar or bars_market_data_type == "bars": bar_type = request.data_type.metadata.get("bar_type") if bar_type is None: self._log.error("No bar type provided for bars request") @@ -1830,7 +1872,7 @@ cdef class DataEngine(Component): elif response.data_type.type == TradeTick: self._handle_trade_ticks(response.data) elif response.data_type.type == Bar: - if response.data_type.metadata.get("market_data_type"): + if response.data_type.metadata.get("bars_market_data_type"): response.data = self._handle_aggregated_bars(response.data, response.data_type.metadata) else: self._handle_bars(response.data, response.data_type.metadata.get("Partial")) @@ -1953,16 +1995,16 @@ cdef class DataEngine(Component): bars_result = {} if metadata["include_external_data"]: - if metadata["market_data_type"] == "quote_ticks": + if metadata["bars_market_data_type"] == "quote_ticks": self._cache.add_quote_ticks(ticks) result["quote_ticks"] = ticks - elif metadata["market_data_type"] == "trade_ticks": + elif metadata["bars_market_data_type"] == "trade_ticks": self._cache.add_trade_ticks(ticks) result["trade_ticks"] = ticks - elif metadata["market_data_type"] == "bars": + elif metadata["bars_market_data_type"] == "bars": self._cache.add_bars(ticks) - if metadata["market_data_type"] == "bars": + if metadata["bars_market_data_type"] == "bars": bars_result[metadata["bar_type"]] = ticks for bar_type in metadata["bar_types"]: @@ -2011,12 +2053,12 @@ cdef class DataEngine(Component): handler=handler, ) - if metadata["market_data_type"] == "quote_ticks" and not bar_type.is_composite(): + if metadata["bars_market_data_type"] == "quote_ticks" and not bar_type.is_composite(): aggregator.start_batch_update(handler, ticks[0].ts_event) for tick in ticks: aggregator.handle_quote_tick(tick) - elif metadata["market_data_type"] == "trade_ticks" and not bar_type.is_composite(): + elif metadata["bars_market_data_type"] == "trade_ticks" and not bar_type.is_composite(): aggregator.start_batch_update(handler, ticks[0].ts_event) for tick in ticks: @@ -2033,7 +2075,7 @@ cdef class DataEngine(Component): aggregator.stop_batch_update() bars_result[bar_type.standard()] = aggregated_bars - if not metadata["include_external_data"] and metadata["market_data_type"] == "bars": + if not metadata["include_external_data"] and metadata["bars_market_data_type"] == "bars": del bars_result[metadata["bar_type"]] # we need a second final dict as a we can't delete keys in a loop @@ -2104,6 +2146,7 @@ cdef class DataEngine(Component): MarketDataClient client, BarType bar_type, bint await_partial, + dict metadata, ): cdef Instrument instrument = self._cache.instrument(bar_type.instrument_id) if instrument is None: @@ -2164,7 +2207,7 @@ cdef class DataEngine(Component): topic=f"data.bars.{composite_bar_type}", handler=aggregator.handle_bar, ) - self._handle_subscribe_bars(client, composite_bar_type, False) + self._handle_subscribe_bars(client, composite_bar_type, False, metadata) elif bar_type.spec.price_type == PriceType.LAST: self._msgbus.subscribe( topic=f"data.trades" @@ -2173,7 +2216,7 @@ cdef class DataEngine(Component): handler=aggregator.handle_trade_tick, priority=5, ) - self._handle_subscribe_trade_ticks(client, bar_type.instrument_id) + self._handle_subscribe_trade_ticks(client, bar_type.instrument_id, metadata) else: self._msgbus.subscribe( topic=f"data.quotes" @@ -2182,9 +2225,9 @@ cdef class DataEngine(Component): handler=aggregator.handle_quote_tick, priority=5, ) - self._handle_subscribe_quote_ticks(client, bar_type.instrument_id) + self._handle_subscribe_quote_ticks(client, bar_type.instrument_id, metadata) - cpdef void _stop_bar_aggregator(self, MarketDataClient client, BarType bar_type): + cpdef void _stop_bar_aggregator(self, MarketDataClient client, BarType bar_type, dict metadata): cdef aggregator = self._bar_aggregators.get(bar_type.standard()) if aggregator is None: self._log.warning( @@ -2204,7 +2247,7 @@ cdef class DataEngine(Component): topic=f"data.bars.{composite_bar_type}", handler=aggregator.handle_bar, ) - self._handle_unsubscribe_bars(client, composite_bar_type) + self._handle_unsubscribe_bars(client, composite_bar_type, metadata) elif bar_type.spec.price_type == PriceType.LAST: self._msgbus.unsubscribe( topic=f"data.trades" @@ -2212,7 +2255,7 @@ cdef class DataEngine(Component): f".{bar_type.instrument_id.symbol}", handler=aggregator.handle_trade_tick, ) - self._handle_unsubscribe_trade_ticks(client, bar_type.instrument_id) + self._handle_unsubscribe_trade_ticks(client, bar_type.instrument_id, metadata) else: self._msgbus.unsubscribe( topic=f"data.quotes" @@ -2220,7 +2263,7 @@ cdef class DataEngine(Component): f".{bar_type.instrument_id.symbol}", handler=aggregator.handle_quote_tick, ) - self._handle_unsubscribe_quote_ticks(client, bar_type.instrument_id) + self._handle_unsubscribe_quote_ticks(client, bar_type.instrument_id, metadata) # Remove from aggregators del self._bar_aggregators[bar_type.standard()] diff --git a/nautilus_trader/live/data_client.py b/nautilus_trader/live/data_client.py index bd9c3ef99438..34861b8bfa27 100644 --- a/nautilus_trader/live/data_client.py +++ b/nautilus_trader/live/data_client.py @@ -26,7 +26,6 @@ from asyncio import Task from collections.abc import Callable from collections.abc import Coroutine -from typing import Any import pandas as pd @@ -450,20 +449,24 @@ def subscribe(self, data_type: DataType) -> None: success_color=LogColor.BLUE, ) - def subscribe_instruments(self) -> None: + def subscribe_instruments(self, metadata: dict | None = None) -> None: instrument_ids = list(self._instrument_provider.get_all().keys()) [self._add_subscription_instrument(i) for i in instrument_ids] self.create_task( - self._subscribe_instruments(), + self._subscribe_instruments(metadata), log_msg=f"subscribe: instruments {self.venue}", success_msg=f"Subscribed {self.venue} instruments", success_color=LogColor.BLUE, ) - def subscribe_instrument(self, instrument_id: InstrumentId) -> None: + def subscribe_instrument( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._add_subscription_instrument(instrument_id) self.create_task( - self._subscribe_instrument(instrument_id), + self._subscribe_instrument(instrument_id, metadata), log_msg=f"subscribe: instrument {instrument_id}", success_msg=f"Subscribed {instrument_id} instrument", success_color=LogColor.BLUE, @@ -474,7 +477,7 @@ def subscribe_order_book_deltas( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict[str, Any] | None = None, + metadata: dict | None = None, ) -> None: self._add_subscription_order_book_deltas(instrument_id) self.create_task( @@ -482,7 +485,7 @@ def subscribe_order_book_deltas( instrument_id=instrument_id, book_type=book_type, depth=depth, - kwargs=kwargs, + metadata=metadata, ), log_msg=f"subscribe: order_book_deltas {instrument_id}", success_msg=f"Subscribed {instrument_id} order book deltas depth={depth}", @@ -494,7 +497,7 @@ def subscribe_order_book_snapshots( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict[str, Any] | None = None, + metadata: dict | None = None, ) -> None: self._add_subscription_order_book_snapshots(instrument_id) self.create_task( @@ -502,32 +505,40 @@ def subscribe_order_book_snapshots( instrument_id=instrument_id, book_type=book_type, depth=depth, - kwargs=kwargs, + metadata=metadata, ), log_msg=f"subscribe: order_book_snapshots {instrument_id}", success_msg=f"Subscribed {instrument_id} order book snapshots depth={depth}", success_color=LogColor.BLUE, ) - def subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + def subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._add_subscription_quote_ticks(instrument_id) self.create_task( - self._subscribe_quote_ticks(instrument_id), + self._subscribe_quote_ticks(instrument_id, metadata), log_msg=f"subscribe: quote_ticks {instrument_id}", success_msg=f"Subscribed {instrument_id} quotes", success_color=LogColor.BLUE, ) - def subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + def subscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._add_subscription_trade_ticks(instrument_id) self.create_task( - self._subscribe_trade_ticks(instrument_id), + self._subscribe_trade_ticks(instrument_id, metadata), log_msg=f"subscribe: trade_ticks {instrument_id}", success_msg=f"Subscribed {instrument_id} trades", success_color=LogColor.BLUE, ) - def subscribe_bars(self, bar_type: BarType) -> None: + def subscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: PyCondition.is_true( bar_type.is_externally_aggregated(), "aggregation_source is not EXTERNAL", @@ -535,25 +546,33 @@ def subscribe_bars(self, bar_type: BarType) -> None: self._add_subscription_bars(bar_type) self.create_task( - self._subscribe_bars(bar_type), + self._subscribe_bars(bar_type, metadata), log_msg=f"subscribe: bars {bar_type}", success_msg=f"Subscribed {bar_type} bars", success_color=LogColor.BLUE, ) - def subscribe_instrument_status(self, instrument_id: InstrumentId) -> None: + def subscribe_instrument_status( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._add_subscription_instrument_status(instrument_id) self.create_task( - self._subscribe_instrument_status(instrument_id), + self._subscribe_instrument_status(instrument_id, metadata), log_msg=f"subscribe: instrument_status {instrument_id}", success_msg=f"Subscribed {instrument_id} instrument status ", success_color=LogColor.BLUE, ) - def subscribe_instrument_close(self, instrument_id: InstrumentId) -> None: + def subscribe_instrument_close( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._add_subscription_instrument_close(instrument_id) self.create_task( - self._subscribe_instrument_close(instrument_id), + self._subscribe_instrument_close(instrument_id, metadata), log_msg=f"subscribe: instrument_close {instrument_id}", success_msg=f"Subscribed {instrument_id} instrument close", success_color=LogColor.BLUE, @@ -568,83 +587,111 @@ def unsubscribe(self, data_type: DataType) -> None: success_color=LogColor.BLUE, ) - def unsubscribe_instruments(self) -> None: + def unsubscribe_instruments(self, metadata: dict | None = None) -> None: instrument_ids = list(self._instrument_provider.get_all().keys()) [self._remove_subscription_instrument(i) for i in instrument_ids] self.create_task( - self._unsubscribe_instruments(), + self._unsubscribe_instruments(metadata), log_msg=f"unsubscribe: instruments {self.venue}", success_msg=f"Unsubscribed {self.venue} instruments", success_color=LogColor.BLUE, ) - def unsubscribe_instrument(self, instrument_id: InstrumentId) -> None: + def unsubscribe_instrument( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._remove_subscription_instrument(instrument_id) self.create_task( - self._unsubscribe_instrument(instrument_id), + self._unsubscribe_instrument(instrument_id, metadata), log_msg=f"unsubscribe: instrument {instrument_id}", success_msg=f"Unsubscribed {instrument_id} instrument", success_color=LogColor.BLUE, ) - def unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> None: + def unsubscribe_order_book_deltas( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._remove_subscription_order_book_deltas(instrument_id) self.create_task( - self._unsubscribe_order_book_deltas(instrument_id), + self._unsubscribe_order_book_deltas(instrument_id, metadata), log_msg=f"unsubscribe: order_book_deltas {instrument_id}", success_msg=f"Unsubscribed {instrument_id} order book deltas", success_color=LogColor.BLUE, ) - def unsubscribe_order_book_snapshots(self, instrument_id: InstrumentId) -> None: + def unsubscribe_order_book_snapshots( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._remove_subscription_order_book_snapshots(instrument_id) self.create_task( - self._unsubscribe_order_book_snapshots(instrument_id), + self._unsubscribe_order_book_snapshots(instrument_id, metadata), log_msg=f"unsubscribe: order_book_snapshots {instrument_id}", success_msg=f"Unsubscribed {instrument_id} order book snapshots", success_color=LogColor.BLUE, ) - def unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + def unsubscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._remove_subscription_quote_ticks(instrument_id) self.create_task( - self._unsubscribe_quote_ticks(instrument_id), + self._unsubscribe_quote_ticks(instrument_id, metadata), log_msg=f"unsubscribe: quote_ticks {instrument_id}", success_msg=f"Unsubscribed {instrument_id} quotes", success_color=LogColor.BLUE, ) - def unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + def unsubscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._remove_subscription_trade_ticks(instrument_id) self.create_task( - self._unsubscribe_trade_ticks(instrument_id), + self._unsubscribe_trade_ticks(instrument_id, metadata), log_msg=f"unsubscribe: trade_ticks {instrument_id}", success_msg=f"Unsubscribed {instrument_id} trades", success_color=LogColor.BLUE, ) - def unsubscribe_bars(self, bar_type: BarType) -> None: + def unsubscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: self._remove_subscription_bars(bar_type) self.create_task( - self._unsubscribe_bars(bar_type), + self._unsubscribe_bars(bar_type, metadata), log_msg=f"unsubscribe: bars {bar_type}", success_msg=f"Unsubscribed {bar_type} bars", success_color=LogColor.BLUE, ) - def unsubscribe_instrument_status(self, instrument_id: InstrumentId) -> None: + def unsubscribe_instrument_status( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._remove_subscription_instrument_status(instrument_id) self.create_task( - self._unsubscribe_instrument_status(instrument_id), + self._unsubscribe_instrument_status(instrument_id, metadata), log_msg=f"unsubscribe: instrument_status {instrument_id}", success_msg=f"Unsubscribed {instrument_id} instrument status", success_color=LogColor.BLUE, ) - def unsubscribe_instrument_close(self, instrument_id: InstrumentId) -> None: + def unsubscribe_instrument_close( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: self._remove_subscription_instrument_close(instrument_id) self.create_task( - self._unsubscribe_instrument_close(instrument_id), + self._unsubscribe_instrument_close(instrument_id, metadata), log_msg=f"unsubscribe: instrument_close {instrument_id}", success_msg=f"Unsubscribed {instrument_id} instrument close", success_color=LogColor.BLUE, @@ -781,6 +828,7 @@ def request_order_book_snapshot( instrument_id: InstrumentId, limit: int, correlation_id: UUID4, + metadata: dict | None = None, ) -> None: limit_str = f" limit={limit}" if limit else "" self._log.info(f"Request {instrument_id} order_book_snapshot{limit_str}", LogColor.BLUE) @@ -789,6 +837,7 @@ def request_order_book_snapshot( instrument_id=instrument_id, limit=limit, correlation_id=correlation_id, + metadata=metadata, ), log_msg=f"request: order_book_snapshot {instrument_id}", ) @@ -811,12 +860,16 @@ async def _subscribe(self, data_type: DataType) -> None: "implement the `_subscribe` coroutine", # pragma: no cover ) - async def _subscribe_instruments(self) -> None: + async def _subscribe_instruments(self, metadata: dict | None = None) -> None: raise NotImplementedError( # pragma: no cover "implement the `_subscribe_instruments` coroutine", # pragma: no cover ) - async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None: + async def _subscribe_instrument( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_subscribe_instrument` coroutine", # pragma: no cover ) @@ -826,7 +879,7 @@ async def _subscribe_order_book_deltas( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict[str, Any] | None = None, + metadata: dict | None = None, ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_subscribe_order_book_deltas` coroutine", # pragma: no cover @@ -837,33 +890,49 @@ async def _subscribe_order_book_snapshots( instrument_id: InstrumentId, book_type: BookType, depth: int | None = None, - kwargs: dict[str, Any] | None = None, + metadata: dict | None = None, ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_subscribe_order_book_snapshots` coroutine", # pragma: no cover ) - async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_subscribe_quote_ticks` coroutine", # pragma: no cover ) - async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _subscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_subscribe_trade_ticks` coroutine", # pragma: no cover ) - async def _subscribe_bars(self, bar_type: BarType) -> None: + async def _subscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: raise NotImplementedError( # pragma: no cover "implement the `_subscribe_bars` coroutine", # pragma: no cover ) - async def _subscribe_instrument_status(self, instrument_id: InstrumentId) -> None: + async def _subscribe_instrument_status( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_subscribe_instrument_status` coroutine", # pragma: no cover ) - async def _subscribe_instrument_close(self, instrument_id: InstrumentId) -> None: + async def _subscribe_instrument_close( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_subscribe_instrument_close` coroutine", # pragma: no cover ) @@ -873,47 +942,75 @@ async def _unsubscribe(self, data_type: DataType) -> None: "implement the `_unsubscribe` coroutine", # pragma: no cover ) - async def _unsubscribe_instruments(self) -> None: + async def _unsubscribe_instruments(self, params: dict | None = None) -> None: raise NotImplementedError( # pragma: no cover "implement the `_unsubscribe_instruments` coroutine", # pragma: no cover ) - async def _unsubscribe_instrument(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_instrument( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_unsubscribe_instrument` coroutine", # pragma: no cover ) - async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_deltas( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_unsubscribe_order_book_deltas` coroutine", # pragma: no cover ) - async def _unsubscribe_order_book_snapshots(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_order_book_snapshots( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_unsubscribe_order_book_snapshots` coroutine", # pragma: no cover ) - async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_quote_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_unsubscribe_quote_ticks` coroutine", # pragma: no cover ) - async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_trade_ticks( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_unsubscribe_trade_ticks` coroutine", # pragma: no cover ) - async def _unsubscribe_bars(self, bar_type: BarType) -> None: + async def _unsubscribe_bars(self, bar_type: BarType, metadata: dict | None = None) -> None: raise NotImplementedError( # pragma: no cover "implement the `_unsubscribe_bars` coroutine", # pragma: no cover ) - async def _unsubscribe_instrument_status(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_instrument_status( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_unsubscribe_instrument_status` coroutine", # pragma: no cover ) - async def _unsubscribe_instrument_close(self, instrument_id: InstrumentId) -> None: + async def _unsubscribe_instrument_close( + self, + instrument_id: InstrumentId, + metadata: dict | None = None, + ) -> None: raise NotImplementedError( # pragma: no cover "implement the `_unsubscribe_instrument_close` coroutine", # pragma: no cover ) @@ -991,6 +1088,7 @@ async def _request_order_book_snapshot( instrument_id: InstrumentId, limit: int, correlation_id: UUID4, + metadata: dict | None = None, ) -> None: raise NotImplementedError( "implement the `_request_order_book_snapshot` coroutine", # pragma: no cover