Skip to content

Commit

Permalink
Continue DataEngine in Rust
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Nov 17, 2024
1 parent ea14b0a commit f2f8d9c
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 44 deletions.
14 changes: 13 additions & 1 deletion nautilus_core/data/src/engine/book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl MessageHandler for BookUpdater {

pub struct BookSnapshotter {
pub id: Ustr,
pub timer_name: Ustr,
pub snap_info: BookSnapshotInfo,
pub cache: Rc<RefCell<Cache>>,
pub msgbus: Rc<RefCell<MessageBus>>,
Expand All @@ -100,8 +101,19 @@ impl BookSnapshotter {
cache: Rc<RefCell<Cache>>,
msgbus: Rc<RefCell<MessageBus>>,
) -> Self {
let id_str = format!(
"{}-{}",
stringify!(BookSnapshotter),
snap_info.instrument_id
);
let timer_name = format!(
"OrderBook|{}|{}",
snap_info.instrument_id, snap_info.interval_ms
);

Self {
id: Ustr::from(&format!("{}-{:?}", stringify!(BookSnapshotter), snap_info)),
id: Ustr::from(&id_str),
timer_name: Ustr::from(&timer_name),
snap_info,
cache,
msgbus,
Expand Down
154 changes: 111 additions & 43 deletions nautilus_core/data/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use nautilus_common::{
cache::Cache,
clock::Clock,
logging::{RECV, RES},
messages::data::{DataRequest, DataResponse, SubscriptionCommand},
messages::data::{Action, DataRequest, DataResponse, SubscriptionCommand},
msgbus::{
handler::{MessageHandler, ShareableMessageHandler},
MessageBus,
Expand Down Expand Up @@ -96,7 +96,9 @@ pub struct DataEngine {
default_client: Option<DataClientAdapter>,
external_clients: HashSet<ClientId>,
routing_map: IndexMap<Venue, ClientId>,
order_book_intervals: HashMap<NonZeroU64, HashSet<InstrumentId>>,
book_intervals: HashMap<NonZeroU64, HashSet<InstrumentId>>,
book_updaters: HashMap<InstrumentId, Rc<BookUpdater>>,
book_snapshotters: HashMap<InstrumentId, Rc<BookSnapshotter>>,
bar_aggregators: Vec<Box<dyn BarAggregator>>, // TODO: dyn for now
synthetic_quote_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
synthetic_trade_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
Expand All @@ -123,7 +125,9 @@ impl DataEngine {
default_client: None,
external_clients: HashSet::new(),
routing_map: IndexMap::new(),
order_book_intervals: HashMap::new(),
book_intervals: HashMap::new(),
book_updaters: HashMap::new(),
book_snapshotters: HashMap::new(),
bar_aggregators: Vec::new(),
synthetic_quote_feeds: HashMap::new(),
synthetic_trade_feeds: HashMap::new(),
Expand Down Expand Up @@ -338,15 +342,23 @@ impl DataEngine {
}

pub fn execute(&mut self, cmd: SubscriptionCommand) {
match cmd.data_type.type_name() {
stringify!(OrderBookDelta) => self.handle_subscribe_book_deltas(&cmd),
stringify!(OrderBook) => self.handle_subscribe_book_snapshots(&cmd),
// stringify!(QuoteTick) => self.handle_subscribe_quote_ticks(cmd),
// stringify!(TradeTick) => self.handle_subscribe_trade_ticks(cmd),
// stringify!(Bar) => self.handle_subscribe_bars(cmd),
type_name => Err(anyhow::anyhow!(
"Cannot handle subscription, type `{type_name}` is unrecognized"
)),
match cmd.action {
Action::Subscribe => match cmd.data_type.type_name() {
stringify!(OrderBookDelta) => self.handle_subscribe_book_deltas(&cmd),
stringify!(OrderBook) => self.handle_subscribe_book_snapshots(&cmd),
stringify!(Bar) => self.handle_subscribe_bars(&cmd),
type_name => Err(anyhow::anyhow!(
"Cannot handle subscription, type `{type_name}` is unrecognized"
)),
},
Action::Unsubscribe => match cmd.data_type.type_name() {
stringify!(OrderBookDelta) => self.handle_unsubscribe_book_deltas(&cmd),
stringify!(OrderBook) => self.handle_unsubscribe_book_snapshots(&cmd),
stringify!(Bar) => self.handle_unsubscribe_bars(&cmd),
type_name => Err(anyhow::anyhow!(
"Cannot handle subscription, type `{type_name}` is unrecognized"
)),
},
}
.unwrap_or_else(|e| log::error!("{e}"));

Expand Down Expand Up @@ -620,8 +632,7 @@ impl DataEngine {
let managed = data_type.managed();

{
if !self.order_book_intervals.contains_key(&interval_ms) {
let timer_name = format!("OrderBook|{instrument_id}|{interval_ms}");
if !self.book_intervals.contains_key(&interval_ms) {
let interval_ns = millis_to_nanos(interval_ms.get() as f64);
let mut msgbus = self.msgbus.borrow_mut();
let topic = msgbus.switchboard.get_snapshots_topic(instrument_id);
Expand All @@ -642,8 +653,15 @@ impl DataEngine {
start_time_ns += NANOSECONDS_IN_SECOND; // Add one second
}

let snapshotter =
BookSnapshotter::new(snap_info, self.cache.clone(), self.msgbus.clone());
let snapshotter = Rc::new(BookSnapshotter::new(
snap_info,
self.cache.clone(),
self.msgbus.clone(),
));
self.book_snapshotters
.insert(instrument_id, snapshotter.clone());
let timer_name = snapshotter.timer_name;

let callback =
TimeEventCallback::Rust(Rc::new(move |event| snapshotter.snapshot(event)));

Expand All @@ -662,7 +680,12 @@ impl DataEngine {
Ok(())
}

fn handle_unsubscribe_order_book_deltas(
fn handle_subscribe_bars(&mut self, command: &SubscriptionCommand) -> anyhow::Result<()> {
// TODO: Handle aggregators
Ok(())
}

fn handle_unsubscribe_book_deltas(
&mut self,
command: &SubscriptionCommand,
) -> anyhow::Result<()> {
Expand All @@ -678,24 +701,22 @@ impl DataEngine {
return Ok(());
}

let mut msgbus = self.msgbus.borrow_mut();

let deltas_topic = msgbus.switchboard.get_deltas_topic(instrument_id);
let depth_topic = msgbus.switchboard.get_depth_topic(instrument_id);
let snapshots_topic = msgbus.switchboard.get_snapshots_topic(instrument_id);

let topics = vec![deltas_topic, depth_topic];
let topics = {
let mut msgbus = self.msgbus.borrow_mut();
vec![
msgbus.switchboard.get_deltas_topic(instrument_id),
msgbus.switchboard.get_depth_topic(instrument_id),
msgbus.switchboard.get_snapshots_topic(instrument_id),
]
};

for topic in topics {
let num_subscribers = msgbus.subscriptions_count(topic);
// TODO: Check if internal book subscriber
// TODO: Remove the subscription for the internal order book if it is the last subscription
}
self.maintain_book_updater(&instrument_id, &topics);
self.maintain_book_snapshotter(&instrument_id);

Ok(())
}

fn handle_unsubscribe_order_book_snapshots(
fn handle_unsubscribe_book_snapshots(
&mut self,
command: &SubscriptionCommand,
) -> anyhow::Result<()> {
Expand All @@ -711,23 +732,68 @@ impl DataEngine {
return Ok(());
}

let mut msgbus = self.msgbus.borrow_mut();
let topics = {
let mut msgbus = self.msgbus.borrow_mut();
vec![
msgbus.switchboard.get_deltas_topic(instrument_id),
msgbus.switchboard.get_depth_topic(instrument_id),
msgbus.switchboard.get_snapshots_topic(instrument_id),
]
};

self.maintain_book_updater(&instrument_id, &topics);
self.maintain_book_snapshotter(&instrument_id);

Ok(())
}

let deltas_topic = msgbus.switchboard.get_deltas_topic(instrument_id);
let depth_topic = msgbus.switchboard.get_depth_topic(instrument_id);
let snapshots_topic = msgbus.switchboard.get_snapshots_topic(instrument_id);
fn handle_unsubscribe_bars(&mut self, command: &SubscriptionCommand) -> anyhow::Result<()> {
// TODO: Handle aggregators
Ok(())
}

let topics = vec![deltas_topic, depth_topic];
fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[Ustr]) {
if let Some(updater) = self.book_updaters.get(instrument_id) {
let handler = ShareableMessageHandler(updater.clone());
let mut msgbus = self.msgbus.borrow_mut();

for topic in topics {
let num_subscribers = msgbus.subscriptions_count(topic);
// TODO: Check if internal book subscriber
// TODO: Remove the subscription for the internal order book if it is the last subscription
// Unsubscribe handler if it is the last subscriber
for topic in topics {
if msgbus.subscriptions_count(*topic) == 1
&& msgbus.is_subscribed(*topic, handler.clone())
{
log::debug!("Unsubscribing BookUpdater from {topic}");
msgbus.unsubscribe(*topic, handler.clone());
}
}

// Check remaining subscriptions, if none then remove updater
let still_subscribed = topics
.iter()
.any(|topic| msgbus.is_subscribed(*topic, handler.clone()));
if !still_subscribed {
self.book_updaters.remove(instrument_id);
log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
}
}
}

// TODO: WIP
fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
let mut msgbus = self.msgbus.borrow_mut();

Ok(())
let topic = msgbus.switchboard.get_snapshots_topic(*instrument_id);

// Check remaining snapshot subscriptions, if none then remove snapshotter
if msgbus.subscriptions_count(topic) == 0 {
let timer_name = snapshotter.timer_name;
self.book_snapshotters.remove(instrument_id);
if self.clock.timer_names().contains(&timer_name.as_str()) {
self.clock.cancel_timer(&timer_name);
}
log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
}
}
}

// -- RESPONSE HANDLERS -----------------------------------------------------------------------
Expand Down Expand Up @@ -781,8 +847,10 @@ impl DataEngine {
let mut msgbus = self.msgbus.borrow_mut();

// Set up subscriptions
let updater = BookUpdater::new(instrument_id, self.cache.clone());
let handler = ShareableMessageHandler(Rc::new(updater));
let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
self.book_updaters.insert(*instrument_id, updater.clone());

let handler = ShareableMessageHandler(updater);

let topic = msgbus.switchboard.get_deltas_topic(*instrument_id);
if !msgbus.is_subscribed(topic, handler.clone()) {
Expand Down

0 comments on commit f2f8d9c

Please sign in to comment.