diff --git a/mobile_config/migrations/7_mobile_radio_tracker.sql b/mobile_config/migrations/7_mobile_radio_tracker.sql index d2560347e..b80d396bc 100644 --- a/mobile_config/migrations/7_mobile_radio_tracker.sql +++ b/mobile_config/migrations/7_mobile_radio_tracker.sql @@ -3,5 +3,7 @@ CREATE TABLE IF NOT EXISTS mobile_radio_tracker ( hash TEXT NOT NULL, last_changed_at TIMESTAMPTZ NOT NULL, last_checked_at TIMESTAMPTZ NOT NULL, + asserted_location NUMERIC, + asserted_location_changed_at TIMESTAMPTZ, PRIMARY KEY (entity_key) ); diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index 881503bb0..56ee615fc 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -77,6 +77,8 @@ pub struct TrackedMobileRadio { pub hash: String, pub last_changed_at: DateTime, pub last_checked_at: DateTime, + pub asserted_location: Option, + pub asserted_location_changed_at: Option>, } impl TrackedMobileRadio { @@ -86,17 +88,23 @@ impl TrackedMobileRadio { hash: radio.hash(), last_changed_at: radio.refreshed_at, last_checked_at: Utc::now(), + asserted_location: radio.location, + asserted_location_changed_at: None, } } fn update_from_radio(mut self, radio: &MobileRadio) -> Self { let new_hash = radio.hash(); + self.last_checked_at = Utc::now(); if self.hash != new_hash { self.hash = new_hash; self.last_changed_at = radio.refreshed_at; } + if self.asserted_location != radio.location { + self.asserted_location = radio.location; + self.asserted_location_changed_at = Some(radio.refreshed_at); + } - self.last_checked_at = Utc::now(); self } } @@ -131,7 +139,7 @@ impl MobileRadioTracker { } async fn run(self, mut shutdown: triggered::Listener) -> anyhow::Result<()> { - tracing::info!("starting"); + tracing::info!("starting with interval: {:?}", self.interval); let mut interval = tokio::time::interval(self.interval); loop { @@ -192,7 +200,9 @@ pub async fn get_tracked_radios( entity_key, hash, last_changed_at, - last_checked_at + last_checked_at, + asserted_location::bigint, + asserted_location_changed_at FROM mobile_radio_tracker "#, ) @@ -244,24 +254,28 @@ async fn update_tracked_radios( ) -> anyhow::Result<()> { let mut txn = pool.begin().await?; - const BATCH_SIZE: usize = (u16::MAX / 4) as usize; + const BATCH_SIZE: usize = (u16::MAX / 6) as usize; for chunk in tracked_radios.chunks(BATCH_SIZE) { QueryBuilder::new( - "INSERT INTO mobile_radio_tracker(entity_key, hash, last_changed_at, last_checked_at)", + "INSERT INTO mobile_radio_tracker(entity_key, hash, last_changed_at, last_checked_at, asserted_location, asserted_location_changed_at)", ) .push_values(chunk, |mut b, tracked_radio| { b.push_bind(&tracked_radio.entity_key) .push_bind(&tracked_radio.hash) .push_bind(tracked_radio.last_changed_at) - .push_bind(tracked_radio.last_checked_at); + .push_bind(tracked_radio.last_checked_at) + .push_bind(tracked_radio.asserted_location) + .push_bind(tracked_radio.asserted_location_changed_at); }) .push( r#" ON CONFLICT (entity_key) DO UPDATE SET hash = EXCLUDED.hash, last_changed_at = EXCLUDED.last_changed_at, - last_checked_at = EXCLUDED.last_checked_at + last_checked_at = EXCLUDED.last_checked_at, + asserted_location = EXCLUDED.asserted_location, + asserted_location_changed_at = EXCLUDED.asserted_location_changed_at "#, ) .build() @@ -324,6 +338,63 @@ mod tests { assert_eq!(radio.hash(), result[0].hash); } + #[tokio::test] + async fn last_asserted_location_will_not_updated_if_nothing_changes() { + // location None + let mut radio = mobile_radio(vec![1, 2, 3]); + radio.location = None; + let tracked_radio = TrackedMobileRadio::new(&radio); + let mut tracked_radios = HashMap::new(); + tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio); + + let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; + + assert!(result[0].asserted_location_changed_at.is_none()); + assert!(result[0].asserted_location.is_none()); + + // location is 1 + let mut radio = mobile_radio(vec![1, 2, 3]); + radio.location = Some(1); + let tracked_radio = TrackedMobileRadio::new(&radio); + let mut tracked_radios = HashMap::new(); + tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio); + + let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; + assert!(result[0].asserted_location_changed_at.is_none()); + assert_eq!(result[0].asserted_location, Some(1)); + } + + #[tokio::test] + async fn will_update_last_asserted_location_changed_at_when_location_changes() { + let mut radio = mobile_radio(vec![1, 2, 3]); + radio.location = None; + let tracked_radio = TrackedMobileRadio::new(&radio); + radio.location = Some(1); + + let mut tracked_radios = HashMap::new(); + tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio); + + let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; + + assert_eq!( + result[0].asserted_location_changed_at.unwrap(), + result[0].last_changed_at + ); + assert_eq!(result[0].asserted_location.unwrap(), 1); + + let tracked_radio = TrackedMobileRadio::new(&radio); + radio.location = Some(2); + let mut tracked_radios = HashMap::new(); + tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio); + let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; + + assert_eq!( + result[0].asserted_location_changed_at.unwrap(), + result[0].last_changed_at, + ); + assert_eq!(result[0].asserted_location.unwrap(), 2); + } + fn mobile_radio(entity_key: EntityKey) -> MobileRadio { MobileRadio { entity_key,