Skip to content

Commit

Permalink
Continue DataEngine in Rust
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Nov 18, 2024
1 parent e41fce4 commit 913ef55
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 15 deletions.
5 changes: 2 additions & 3 deletions nautilus_core/common/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ use std::{
time::{Duration, Instant},
};

use log::LevelFilter;
use nautilus_core::uuid::UUID4;
use nautilus_model::identifiers::TraderId;

use crate::logging::{init_logging, logger::LoggerConfig, writer::FileWriterConfig};

pub fn init_logger_for_testing(stdout_level: LevelFilter) {
pub fn init_logger_for_testing(stdout_level: Option<log::LevelFilter>) {
let mut config = LoggerConfig::default();
config.stdout_level = stdout_level;
config.stdout_level = stdout_level.unwrap_or(log::LevelFilter::Trace);
init_logging(
TraderId::default(),
UUID4::new(),
Expand Down
92 changes: 84 additions & 8 deletions nautilus_core/data/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ use nautilus_model::{
trade::TradeTick,
Data, DataType,
},
enums::{BookType, RecordFlag},
enums::{AggregationSource, BookType, PriceType, RecordFlag},
identifiers::{ClientId, InstrumentId, Venue},
instruments::{any::InstrumentAny, synthetic::SyntheticInstrument},
orderbook::book::OrderBook,
Expand All @@ -98,7 +98,7 @@ pub struct DataEngine {
book_intervals: HashMap<NonZeroU64, HashSet<InstrumentId>>,
book_updaters: HashMap<InstrumentId, Rc<BookUpdater>>,
book_snapshotters: HashMap<InstrumentId, Rc<BookSnapshotter>>,
bar_aggregators: Vec<Box<dyn BarAggregator>>, // TODO: dyn for now
bar_aggregators: HashMap<BarType, Box<dyn BarAggregator>>,
synthetic_quote_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
synthetic_trade_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
buffered_deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>>, // TODO: Use OrderBookDeltas?
Expand Down Expand Up @@ -127,7 +127,7 @@ impl DataEngine {
book_intervals: HashMap::new(),
book_updaters: HashMap::new(),
book_snapshotters: HashMap::new(),
bar_aggregators: Vec::new(),
bar_aggregators: HashMap::new(),
synthetic_quote_feeds: HashMap::new(),
synthetic_trade_feeds: HashMap::new(),
buffered_deltas_map: HashMap::new(),
Expand Down Expand Up @@ -579,7 +579,7 @@ impl DataEngine {
) -> anyhow::Result<()> {
let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
anyhow::anyhow!(
"Invalid order book deltas subscription: did not contain an `instrument_id`, {}",
"Invalid order book deltas subscription: did not contain an 'instrument_id', {}",
command.data_type
)
})?;
Expand Down Expand Up @@ -608,7 +608,7 @@ impl DataEngine {
) -> anyhow::Result<()> {
let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
anyhow::anyhow!(
"Invalid order book snapshots subscription: did not contain an `instrument_id`, {}",
"Invalid order book snapshots subscription: did not contain an 'instrument_id', {}",
command.data_type
)
})?;
Expand Down Expand Up @@ -677,7 +677,23 @@ impl DataEngine {
}

fn handle_subscribe_bars(&mut self, command: &SubscriptionCommand) -> anyhow::Result<()> {
// TODO: Handle aggregators
let bar_type = command.data_type.bar_type();

match bar_type.aggregation_source() {
AggregationSource::Internal => {
if !self.bar_aggregators.contains_key(&bar_type.standard()) {
self.start_bar_aggregator(bar_type)?;
}
}
AggregationSource::External => {
if bar_type.instrument_id().is_synthetic() {
anyhow::bail!(
"Cannot subscribe for externally aggregated synthetic instrument bar data"
);
};
}
}

Ok(())
}

Expand All @@ -687,7 +703,7 @@ impl DataEngine {
) -> anyhow::Result<()> {
let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
anyhow::anyhow!(
"Invalid order book snapshots subscription: did not contain an `instrument_id`, {}",
"Invalid order book snapshots subscription: did not contain an 'instrument_id', {}",
command.data_type
)
})?;
Expand Down Expand Up @@ -718,7 +734,7 @@ impl DataEngine {
) -> anyhow::Result<()> {
let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
anyhow::anyhow!(
"Invalid order book snapshots subscription: did not contain an `instrument_id`, {}",
"Invalid order book snapshots subscription: did not contain an 'instrument_id', {}",
command.data_type
)
})?;
Expand Down Expand Up @@ -860,6 +876,66 @@ impl DataEngine {

Ok(())
}

fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
let instrument = self
.cache
.borrow()
.instrument(&bar_type.instrument_id())
.ok_or_else(|| {
anyhow::anyhow!(
"Cannot start bar aggregation: no instrument found for {}",
bar_type.instrument_id(),
)
})?;

// Create aggregator
// TODO: Determine how to handle generic Clock vs dyn Clock
// let aggregator = if bar_type.spec().is_time_aggregated() {
// TimeBarAggregator::new(
// instrument,
// bar_type,
// |b| self.handle_bar(b),
// false,
// self.clock,
// self.config.time_bars_build_with_no_updates,
// self.config.time_bars_timestamp_on_close,
// &self.config.time_bars_interval_type,
// )
// };

Ok(())
}

fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
let aggregator = self
.bar_aggregators
.remove(&bar_type.standard())
.ok_or_else(|| {
anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
})?;

// TODO: If its a `TimeBarAggregator` then call `.stop()`
// if let Some(aggregator) = (aggregator as &dyn BarAggregator)
// .as_any()
// .downcast_ref::<TimeBarAggregator<_, _>>()
// {
// aggregator.stop();
// };

if bar_type.is_composite() {
let composite_bar_type = bar_type.composite();
// TODO: Unsubscribe the `aggregator.handle_bar`
} else if bar_type.spec().price_type == PriceType::Last {
// TODO: Unsubscribe `aggregator.handle_trade_tick`
todo!()
} else {
// TODO: Unsubscribe `aggregator.handle_quote_tick`
todo!()
};

Ok(())
}
}

pub struct SubscriptionCommandHandler {
Expand Down
16 changes: 12 additions & 4 deletions nautilus_core/data/src/engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use nautilus_common::{
switchboard::MessagingSwitchboard,
MessageBus,
},
testing::init_logger_for_testing,
};
use nautilus_core::{nanos::UnixNanos, uuid::UUID4};
use nautilus_model::{
Expand Down Expand Up @@ -470,14 +471,13 @@ fn test_execute_subscribe_trade_ticks(

#[rstest]
fn test_execute_subscribe_bars(
audusd_sim: CurrencyPair,
msgbus: Rc<RefCell<MessageBus>>,
switchboard: MessagingSwitchboard,
data_engine: Rc<RefCell<DataEngine>>,
data_client: DataClientAdapter,
) {
let client_id = data_client.client_id;
let venue = data_client.venue;
data_engine.borrow_mut().register_client(data_client, None);
init_logger_for_testing(None); // TODO: Remove once initial development completed

let endpoint = switchboard.data_engine_execute;
let handler = ShareableMessageHandler(Rc::new(SubscriptionCommandHandler {
Expand All @@ -486,7 +486,14 @@ fn test_execute_subscribe_bars(
}));
msgbus.borrow_mut().register(endpoint, handler);

let bar_type = BarType::from("AUDUSD.SIM-1-MINUTE-LAST-INTERNAL");
let audusd_sim = InstrumentAny::CurrencyPair(audusd_sim);
data_engine.borrow_mut().process(&audusd_sim as &dyn Any);

let client_id = data_client.client_id;
let venue = data_client.venue;
data_engine.borrow_mut().register_client(data_client, None);

let bar_type = BarType::from("AUD/USD.SIM-1-MINUTE-LAST-INTERNAL");
let metadata = indexmap! {
"bar_type".to_string() => bar_type.to_string(),
};
Expand Down Expand Up @@ -515,6 +522,7 @@ fn test_execute_subscribe_bars(
msgbus.borrow().send(&endpoint, &cmd as &dyn Any);
data_engine.borrow_mut().run();

assert_eq!(audusd_sim.id(), bar_type.instrument_id());
assert!(!data_engine.borrow().subscribed_bars().contains(&bar_type));
}

Expand Down

0 comments on commit 913ef55

Please sign in to comment.