Skip to content

Commit

Permalink
Continue DataEngine in Rust
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Nov 17, 2024
1 parent 2f016af commit 64576b1
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 39 deletions.
17 changes: 17 additions & 0 deletions nautilus_core/common/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,23 @@ use std::{
time::{Duration, Instant},
};

use log::LevelFilter;
use nautilus_core::uuid::UUID4;
use nautilus_model::identifiers::TraderId;

use crate::logging::{init_logging, logger::LoggerConfig, writer::FileWriterConfig};

pub fn init_logger_for_testing(stdout_level: LevelFilter) {
let mut config = LoggerConfig::default();
config.stdout_level = stdout_level;
init_logging(
TraderId::default(),
UUID4::new(),
config,
FileWriterConfig::default(),
);
}

/// Repeatedly evaluates a condition with a delay until it becomes true or a timeout occurs.
///
/// * `condition`: A closure that represents the condition to be met. This closure should return `true`
Expand Down
50 changes: 23 additions & 27 deletions nautilus_core/data/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use std::{
cell::{Ref, RefCell},
collections::{HashMap, HashSet, VecDeque},
num::NonZeroU64,
ops::Deref,
rc::Rc,
sync::Arc,
};
Expand Down Expand Up @@ -102,7 +101,7 @@ pub struct DataEngine {
bar_aggregators: Vec<Box<dyn BarAggregator>>, // TODO: dyn for now
synthetic_quote_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
synthetic_trade_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
buffered_deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>>,
buffered_deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>>, // TODO: Use OrderBookDeltas?
msgbus_priority: u8,
command_queue: VecDeque<SubscriptionCommand>,
config: DataEngineConfig,
Expand Down Expand Up @@ -342,38 +341,28 @@ impl DataEngine {
}

pub fn execute(&mut self, cmd: SubscriptionCommand) {
match cmd.action {
let result = match cmd.action {
Action::Subscribe => match cmd.data_type.type_name() {
stringify!(OrderBookDelta) => self.handle_subscribe_book_deltas(&cmd),
stringify!(OrderBook) => self.handle_subscribe_book_snapshots(&cmd),
stringify!(Bar) => self.handle_subscribe_bars(&cmd),
type_name => Err(anyhow::anyhow!(
"Cannot handle subscription, type `{type_name}` is unrecognized"
)),
_ => Ok(()), // No other actions for engine
},
Action::Unsubscribe => match cmd.data_type.type_name() {
stringify!(OrderBookDelta) => self.handle_unsubscribe_book_deltas(&cmd),
stringify!(OrderBook) => self.handle_unsubscribe_book_snapshots(&cmd),
stringify!(Bar) => self.handle_unsubscribe_bars(&cmd),
type_name => Err(anyhow::anyhow!(
"Cannot handle subscription, type `{type_name}` is unrecognized"
)),
_ => Ok(()), // No other actions for engine
},
};

if let Err(e) = result {
log::error!("{e}");
return;
}
.unwrap_or_else(|e| log::error!("{e}"));

if let Some(client) = self.get_client_mut(&cmd.client_id, &cmd.venue) {
client.execute(cmd.clone());

// TBD if we want to do the below instead
// if client.handles_order_book_deltas {
// client.subscribe_order_book_deltas(instrument_id, book_type, depth)?;
// } else if client.handles_order_book_snapshots {
// client.subscribe_order_book_snapshots(instrument_id, book_type, depth)?;
// } else {
// anyhow::bail!("Cannot subscribe order book for {instrument_id}: client does not handle book subscriptions");
// }
// client.execute(command);
client.execute(cmd);
} else {
log::error!(
"Cannot handle command: no client found for {}",
Expand Down Expand Up @@ -405,7 +394,7 @@ impl DataEngine {
pub fn process_data(&mut self, data: Data) {
match data {
Data::Delta(delta) => self.handle_delta(delta),
Data::Deltas(deltas) => self.handle_deltas(deltas.deref().clone()), // TODO: Optimize
Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
Data::Depth10(depth) => self.handle_depth10(depth),
Data::Quote(quote) => self.handle_quote(quote),
Data::Trade(trade) => self.handle_trade(trade),
Expand Down Expand Up @@ -472,10 +461,13 @@ impl DataEngine {
return; // Not the last delta for event
}

// TODO: Improve efficiency, the FFI API will go along with Cython
OrderBookDeltas::new(delta.instrument_id, buffer_deltas.clone())
// SAFETY: We know the deltas exists already
let deltas = self
.buffered_deltas_map
.remove(&delta.instrument_id)
.unwrap();
OrderBookDeltas::new(delta.instrument_id, deltas)
} else {
// TODO: Improve efficiency, the FFI API will go along with Cython
OrderBookDeltas::new(delta.instrument_id, vec![delta])
};

Expand Down Expand Up @@ -503,8 +495,12 @@ impl DataEngine {
return;
}

// TODO: Improve efficiency, the FFI API will go along with Cython
OrderBookDeltas::new(deltas.instrument_id, buffer_deltas.clone())
// SAFETY: We know the deltas exists already
let buffer_deltas = self
.buffered_deltas_map
.remove(&deltas.instrument_id)
.unwrap();
OrderBookDeltas::new(deltas.instrument_id, buffer_deltas)
} else {
deltas
};
Expand Down
12 changes: 0 additions & 12 deletions nautilus_core/data/src/engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,6 @@ use crate::{
mocks::MockDataClient,
};

// TODO: Used for development
// fn init_logger(stdout_level: LevelFilter) {
// let mut config = LoggerConfig::default();
// config.stdout_level = stdout_level;
// init_logging(
// TraderId::default(),
// UUID4::new(),
// config,
// FileWriterConfig::default(),
// );
// }

#[fixture]
fn trader_id() -> TraderId {
TraderId::default()
Expand Down
6 changes: 6 additions & 0 deletions nautilus_core/model/src/data/deltas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ impl OrderBookDeltas_API {
pub fn new(deltas: OrderBookDeltas) -> Self {
Self(Box::new(deltas))
}

/// Consumes the wrapper and returns the inner `OrderBookDeltas`.
#[must_use]
pub fn into_inner(self) -> OrderBookDeltas {
*self.0
}
}

impl Deref for OrderBookDeltas_API {
Expand Down

0 comments on commit 64576b1

Please sign in to comment.