From 02a36511ac0c8a060786651e22e8cf87b07aa58b Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 4 Dec 2024 12:23:46 +0200 Subject: [PATCH 1/4] Add location tracking --- .../migrations/7_mobile_radio_tracker.sql | 2 ++ mobile_config/src/mobile_radio_tracker.rs | 26 ++++++++++++++----- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/mobile_config/migrations/7_mobile_radio_tracker.sql b/mobile_config/migrations/7_mobile_radio_tracker.sql index d2560347e..66cf5b029 100644 --- a/mobile_config/migrations/7_mobile_radio_tracker.sql +++ b/mobile_config/migrations/7_mobile_radio_tracker.sql @@ -3,5 +3,7 @@ CREATE TABLE IF NOT EXISTS mobile_radio_tracker ( hash TEXT NOT NULL, last_changed_at TIMESTAMPTZ NOT NULL, last_checked_at TIMESTAMPTZ NOT NULL, + asserted_location NUMERIC, + asserted_location_change_at TIMESTAMPTZ, PRIMARY KEY (entity_key) ); diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index 0dc03a8c3..0db4493d6 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -77,6 +77,8 @@ struct TrackedMobileRadio { hash: String, last_changed_at: DateTime, last_checked_at: DateTime, + asserted_location: Option, + asserted_location_change_at: Option>, } impl TrackedMobileRadio { @@ -86,17 +88,23 @@ impl TrackedMobileRadio { hash: radio.hash(), last_changed_at: radio.refreshed_at, last_checked_at: Utc::now(), + asserted_location: radio.location, + asserted_location_change_at: None, } } fn update_from_radio(mut self, radio: &MobileRadio) -> Self { let new_hash = radio.hash(); + self.last_checked_at = Utc::now(); if self.hash != new_hash { self.hash = new_hash; self.last_changed_at = radio.refreshed_at; } + if self.asserted_location != radio.location { + self.asserted_location = radio.location; + self.asserted_location_change_at = Some(self.last_checked_at); + } - self.last_checked_at = Utc::now(); self } } @@ -192,7 +200,9 @@ async fn get_tracked_radios( entity_key, hash, last_changed_at, - last_checked_at + last_checked_at, + asserted_location, + asserted_location_change_at FROM mobile_radio_tracker "#, ) @@ -241,24 +251,28 @@ async fn update_tracked_radios( ) -> anyhow::Result<()> { let mut txn = pool.begin().await?; - const BATCH_SIZE: usize = (u16::MAX / 4) as usize; + const BATCH_SIZE: usize = (u16::MAX / 6) as usize; for chunk in tracked_radios.chunks(BATCH_SIZE) { QueryBuilder::new( - "INSERT INTO mobile_radio_tracker(entity_key, hash, last_changed_at, last_checked_at)", + "INSERT INTO mobile_radio_tracker(entity_key, hash, last_changed_at, last_checked_at, asserted_location, asserted_location_change_at)", ) .push_values(chunk, |mut b, tracked_radio| { b.push_bind(&tracked_radio.entity_key) .push_bind(&tracked_radio.hash) .push_bind(tracked_radio.last_changed_at) - .push_bind(tracked_radio.last_checked_at); + .push_bind(tracked_radio.last_checked_at) + .push_bind(tracked_radio.asserted_location) + .push_bind(tracked_radio.asserted_location_change_at); }) .push( r#" ON CONFLICT (entity_key) DO UPDATE SET hash = EXCLUDED.hash, last_changed_at = EXCLUDED.last_changed_at, - last_checked_at = EXCLUDED.last_checked_at + last_checked_at = EXCLUDED.last_checked_at, + asserted_location = EXCLUDED.last_asserted_location, + asserted_location_change_at = EXCLUDED.asserted_location_change_at, "#, ) .build() From 630657a4e4525463fa3a2d17bc9a9bf6d35c6f35 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 4 Dec 2024 13:18:49 +0200 Subject: [PATCH 2/4] Fix bugs --- mobile_config/src/mobile_radio_tracker.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index 0db4493d6..c4eadb9ab 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -139,7 +139,7 @@ impl MobileRadioTracker { } async fn run(self, mut shutdown: triggered::Listener) -> anyhow::Result<()> { - tracing::info!("starting"); + tracing::info!("starting with interval: {:?}", self.interval); let mut interval = tokio::time::interval(self.interval); loop { @@ -201,7 +201,7 @@ async fn get_tracked_radios( hash, last_changed_at, last_checked_at, - asserted_location, + asserted_location::bigint, asserted_location_change_at FROM mobile_radio_tracker "#, @@ -271,8 +271,8 @@ async fn update_tracked_radios( hash = EXCLUDED.hash, last_changed_at = EXCLUDED.last_changed_at, last_checked_at = EXCLUDED.last_checked_at, - asserted_location = EXCLUDED.last_asserted_location, - asserted_location_change_at = EXCLUDED.asserted_location_change_at, + asserted_location = EXCLUDED.asserted_location, + asserted_location_change_at = EXCLUDED.asserted_location_change_at "#, ) .build() From cbe7e5cd90037fe2eecfb9a97b880712c139b236 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 4 Dec 2024 13:34:34 +0200 Subject: [PATCH 3/4] Add tests --- mobile_config/src/mobile_radio_tracker.rs | 57 +++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index c4eadb9ab..ab0028c2b 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -335,6 +335,63 @@ mod tests { assert_eq!(radio.hash(), result[0].hash); } + #[tokio::test] + async fn last_asserted_location_will_not_updated_if_nothing_changes() { + // location None + let mut radio = mobile_radio(vec![1, 2, 3]); + radio.location = None; + let tracked_radio = TrackedMobileRadio::new(&radio); + let mut tracked_radios = HashMap::new(); + tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio); + + let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; + + assert!(result[0].asserted_location_change_at.is_none()); + assert!(result[0].asserted_location.is_none()); + + // location is 1 + let mut radio = mobile_radio(vec![1, 2, 3]); + radio.location = Some(1); + let tracked_radio = TrackedMobileRadio::new(&radio); + let mut tracked_radios = HashMap::new(); + tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio); + + let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; + assert!(result[0].asserted_location_change_at.is_none()); + assert_eq!(result[0].asserted_location, Some(1)); + } + + #[tokio::test] + async fn will_update_last_asserted_location_change_at_when_location_changes() { + let mut radio = mobile_radio(vec![1, 2, 3]); + radio.location = None; + let tracked_radio = TrackedMobileRadio::new(&radio); + radio.location = Some(1); + + let mut tracked_radios = HashMap::new(); + tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio); + + let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; + + assert_eq!( + result[0].asserted_location_change_at.unwrap(), + result[0].last_checked_at + ); + assert_eq!(result[0].asserted_location.unwrap(), 1); + + let tracked_radio = TrackedMobileRadio::new(&radio); + radio.location = Some(2); + let mut tracked_radios = HashMap::new(); + tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio); + let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; + + assert_eq!( + result[0].asserted_location_change_at.unwrap(), + result[0].last_checked_at + ); + assert_eq!(result[0].asserted_location.unwrap(), 2); + } + fn mobile_radio(entity_key: EntityKey) -> MobileRadio { MobileRadio { entity_key, From d66811ddfa0bfd957cedc6ea2319f9f490498afb Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 4 Dec 2024 15:34:13 +0200 Subject: [PATCH 4/4] Fix comments --- .../migrations/7_mobile_radio_tracker.sql | 2 +- mobile_config/src/mobile_radio_tracker.rs | 28 +++++++++---------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/mobile_config/migrations/7_mobile_radio_tracker.sql b/mobile_config/migrations/7_mobile_radio_tracker.sql index 66cf5b029..b80d396bc 100644 --- a/mobile_config/migrations/7_mobile_radio_tracker.sql +++ b/mobile_config/migrations/7_mobile_radio_tracker.sql @@ -4,6 +4,6 @@ CREATE TABLE IF NOT EXISTS mobile_radio_tracker ( last_changed_at TIMESTAMPTZ NOT NULL, last_checked_at TIMESTAMPTZ NOT NULL, asserted_location NUMERIC, - asserted_location_change_at TIMESTAMPTZ, + asserted_location_changed_at TIMESTAMPTZ, PRIMARY KEY (entity_key) ); diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index ab0028c2b..cd6fefbd5 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -78,7 +78,7 @@ struct TrackedMobileRadio { last_changed_at: DateTime, last_checked_at: DateTime, asserted_location: Option, - asserted_location_change_at: Option>, + asserted_location_changed_at: Option>, } impl TrackedMobileRadio { @@ -89,7 +89,7 @@ impl TrackedMobileRadio { last_changed_at: radio.refreshed_at, last_checked_at: Utc::now(), asserted_location: radio.location, - asserted_location_change_at: None, + asserted_location_changed_at: None, } } @@ -102,7 +102,7 @@ impl TrackedMobileRadio { } if self.asserted_location != radio.location { self.asserted_location = radio.location; - self.asserted_location_change_at = Some(self.last_checked_at); + self.asserted_location_changed_at = Some(radio.refreshed_at); } self @@ -202,7 +202,7 @@ async fn get_tracked_radios( last_changed_at, last_checked_at, asserted_location::bigint, - asserted_location_change_at + asserted_location_changed_at FROM mobile_radio_tracker "#, ) @@ -255,7 +255,7 @@ async fn update_tracked_radios( for chunk in tracked_radios.chunks(BATCH_SIZE) { QueryBuilder::new( - "INSERT INTO mobile_radio_tracker(entity_key, hash, last_changed_at, last_checked_at, asserted_location, asserted_location_change_at)", + "INSERT INTO mobile_radio_tracker(entity_key, hash, last_changed_at, last_checked_at, asserted_location, asserted_location_changed_at)", ) .push_values(chunk, |mut b, tracked_radio| { b.push_bind(&tracked_radio.entity_key) @@ -263,7 +263,7 @@ async fn update_tracked_radios( .push_bind(tracked_radio.last_changed_at) .push_bind(tracked_radio.last_checked_at) .push_bind(tracked_radio.asserted_location) - .push_bind(tracked_radio.asserted_location_change_at); + .push_bind(tracked_radio.asserted_location_changed_at); }) .push( r#" @@ -272,7 +272,7 @@ async fn update_tracked_radios( last_changed_at = EXCLUDED.last_changed_at, last_checked_at = EXCLUDED.last_checked_at, asserted_location = EXCLUDED.asserted_location, - asserted_location_change_at = EXCLUDED.asserted_location_change_at + asserted_location_changed_at = EXCLUDED.asserted_location_changed_at "#, ) .build() @@ -346,7 +346,7 @@ mod tests { let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; - assert!(result[0].asserted_location_change_at.is_none()); + assert!(result[0].asserted_location_changed_at.is_none()); assert!(result[0].asserted_location.is_none()); // location is 1 @@ -357,12 +357,12 @@ mod tests { tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio); let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; - assert!(result[0].asserted_location_change_at.is_none()); + assert!(result[0].asserted_location_changed_at.is_none()); assert_eq!(result[0].asserted_location, Some(1)); } #[tokio::test] - async fn will_update_last_asserted_location_change_at_when_location_changes() { + async fn will_update_last_asserted_location_changed_at_when_location_changes() { let mut radio = mobile_radio(vec![1, 2, 3]); radio.location = None; let tracked_radio = TrackedMobileRadio::new(&radio); @@ -374,8 +374,8 @@ mod tests { let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; assert_eq!( - result[0].asserted_location_change_at.unwrap(), - result[0].last_checked_at + result[0].asserted_location_changed_at.unwrap(), + result[0].last_changed_at ); assert_eq!(result[0].asserted_location.unwrap(), 1); @@ -386,8 +386,8 @@ mod tests { let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; assert_eq!( - result[0].asserted_location_change_at.unwrap(), - result[0].last_checked_at + result[0].asserted_location_changed_at.unwrap(), + result[0].last_changed_at, ); assert_eq!(result[0].asserted_location.unwrap(), 2); }