diff --git a/Cargo.lock b/Cargo.lock index 6cd64b2fc..e2708b77f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4883,6 +4883,7 @@ dependencies = [ "clap", "config", "coverage-map", + "csv", "custom-tracing", "db-store", "file-store", diff --git a/Cargo.toml b/Cargo.toml index 98b8de21b..ed76a1ecf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -114,6 +114,7 @@ tokio-util = "0" uuid = { version = "1", features = ["v4", "serde"] } tower-http = { version = "0", features = ["trace"] } derive_builder = "0" +csv = "*" aws-config = "0.51" aws-sdk-s3 = "0.21" aws-types = { version = "0.51", features = ["hardcoded-credentials"]} diff --git a/file_store/Cargo.toml b/file_store/Cargo.toml index 41fb754fc..9eba79fe1 100644 --- a/file_store/Cargo.toml +++ b/file_store/Cargo.toml @@ -28,7 +28,6 @@ tracing = { workspace = true } chrono = { workspace = true } helium-proto = { workspace = true } helium-crypto = { workspace = true } -csv = "*" http = { workspace = true } aws-config = { workspace = true } aws-sdk-s3 = { workspace = true } @@ -49,6 +48,7 @@ derive_builder = { workspace = true } retainer = { workspace = true } uuid = { workspace = true } h3o = { workspace = true } +csv = { workspace = true } task-manager = { path = "../task_manager" } [dev-dependencies] diff --git a/mobile_config/Cargo.toml b/mobile_config/Cargo.toml index 77b5f2d4a..bd6dbe639 100644 --- a/mobile_config/Cargo.toml +++ b/mobile_config/Cargo.toml @@ -43,7 +43,7 @@ tower-http = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } triggered = { workspace = true } - +csv = {workspace = true} coverage-map = { path = "../coverage_map" } custom-tracing = { path = "../custom_tracing", features = ["grpc"] } db-store = { path = "../db_store" } diff --git a/mobile_config/migrations/8_mob_rad_track_add_location.sql b/mobile_config/migrations/8_mob_rad_track_add_location.sql new file mode 100644 index 000000000..048c04eb6 --- /dev/null +++ b/mobile_config/migrations/8_mob_rad_track_add_location.sql @@ -0,0 +1,3 @@ +ALTER TABLE mobile_radio_tracker +ADD COLUMN asserted_location BIGINT, +ADD COLUMN asserted_location_changed_at TIMESTAMPTZ; diff --git a/mobile_config/src/main.rs b/mobile_config/src/main.rs index 61db4f316..2886da2c2 100644 --- a/mobile_config/src/main.rs +++ b/mobile_config/src/main.rs @@ -10,10 +10,16 @@ use helium_proto::services::{ sub_dao::SubDaoServer, }; use mobile_config::{ - admin_service::AdminService, authorization_service::AuthorizationService, - carrier_service::CarrierService, entity_service::EntityService, - gateway_service::GatewayService, hex_boosting_service::HexBoostingService, key_cache::KeyCache, - mobile_radio_tracker::MobileRadioTracker, settings::Settings, sub_dao_service::SubDaoService, + admin_service::AdminService, + authorization_service::AuthorizationService, + carrier_service::CarrierService, + entity_service::EntityService, + gateway_service::GatewayService, + hex_boosting_service::HexBoostingService, + key_cache::KeyCache, + mobile_radio_tracker::{migrate_mobile_tracker_locations, MobileRadioTracker}, + settings::Settings, + sub_dao_service::SubDaoService, }; use std::{net::SocketAddr, path::PathBuf, time::Duration}; use task_manager::{ManagedTask, TaskManager}; @@ -36,21 +42,32 @@ pub struct Cli { impl Cli { pub async fn run(self) -> Result<()> { let settings = Settings::new(self.config)?; - self.cmd.run(settings).await + + match self.cmd { + Cmd::Server(daemon) => daemon.run(&settings).await, + Cmd::MigrateMobileTracker(csv_file) => { + custom_tracing::init(settings.log.clone(), settings.custom_tracing.clone()).await?; + let mobile_config_pool = settings.database.connect("mobile-config-store").await?; + let metadata_pool = settings.metadata.connect("mobile-config-metadata").await?; + sqlx::migrate!().run(&mobile_config_pool).await?; + migrate_mobile_tracker_locations(mobile_config_pool, metadata_pool, &csv_file.path) + .await?; + Ok(()) + } + } } } #[derive(Debug, clap::Subcommand)] pub enum Cmd { Server(Daemon), + // Oneshot command to migrate location data for mobile tracker + MigrateMobileTracker(CsvFile), } -impl Cmd { - pub async fn run(&self, settings: Settings) -> Result<()> { - match self { - Self::Server(cmd) => cmd.run(&settings).await, - } - } +#[derive(Debug, clap::Args)] +pub struct CsvFile { + pub path: String, } #[derive(Debug, clap::Args)] diff --git a/mobile_config/src/mobile_radio_tracker.rs b/mobile_config/src/mobile_radio_tracker.rs index 83444e9d2..5715d6614 100644 --- a/mobile_config/src/mobile_radio_tracker.rs +++ b/mobile_config/src/mobile_radio_tracker.rs @@ -1,6 +1,7 @@ -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, str::FromStr, time::Duration}; use chrono::{DateTime, Utc}; +use csv::Reader; use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; use sqlx::{Pool, Postgres, QueryBuilder}; use task_manager::ManagedTask; @@ -11,6 +12,7 @@ type EntityKey = Vec; struct MobileRadio { entity_key: EntityKey, refreshed_at: DateTime, + created_at: DateTime, location: Option, is_full_hotspot: Option, num_location_asserts: Option, @@ -77,6 +79,8 @@ pub struct TrackedMobileRadio { pub hash: String, pub last_changed_at: DateTime, pub last_checked_at: DateTime, + pub asserted_location: Option, + pub asserted_location_changed_at: Option>, } impl TrackedMobileRadio { @@ -86,17 +90,23 @@ impl TrackedMobileRadio { hash: radio.hash(), last_changed_at: radio.refreshed_at, last_checked_at: Utc::now(), + asserted_location: radio.location, + asserted_location_changed_at: None, } } fn update_from_radio(mut self, radio: &MobileRadio) -> Self { let new_hash = radio.hash(); + self.last_checked_at = Utc::now(); if self.hash != new_hash { self.hash = new_hash; self.last_changed_at = radio.refreshed_at; } + if self.asserted_location != radio.location { + self.asserted_location = radio.location; + self.asserted_location_changed_at = Some(radio.refreshed_at); + } - self.last_checked_at = Utc::now(); self } } @@ -131,7 +141,7 @@ impl MobileRadioTracker { } async fn run(self, mut shutdown: triggered::Listener) -> anyhow::Result<()> { - tracing::info!("starting"); + tracing::info!("starting with interval: {:?}", self.interval); let mut interval = tokio::time::interval(self.interval); loop { @@ -192,7 +202,9 @@ pub async fn get_tracked_radios( entity_key, hash, last_changed_at, - last_checked_at + last_checked_at, + asserted_location, + asserted_location_changed_at FROM mobile_radio_tracker "#, ) @@ -213,6 +225,7 @@ fn get_all_mobile_radios(metadata: &Pool) -> impl Stream anyhow::Result<()> { let mut txn = pool.begin().await?; - const BATCH_SIZE: usize = (u16::MAX / 4) as usize; + const BATCH_SIZE: usize = (u16::MAX / 6) as usize; for chunk in tracked_radios.chunks(BATCH_SIZE) { QueryBuilder::new( - "INSERT INTO mobile_radio_tracker(entity_key, hash, last_changed_at, last_checked_at)", + "INSERT INTO mobile_radio_tracker(entity_key, hash, last_changed_at, last_checked_at, asserted_location, asserted_location_changed_at)", ) .push_values(chunk, |mut b, tracked_radio| { b.push_bind(&tracked_radio.entity_key) .push_bind(&tracked_radio.hash) .push_bind(tracked_radio.last_changed_at) - .push_bind(tracked_radio.last_checked_at); + .push_bind(tracked_radio.last_checked_at) + .push_bind(tracked_radio.asserted_location) + .push_bind(tracked_radio.asserted_location_changed_at); }) .push( r#" ON CONFLICT (entity_key) DO UPDATE SET hash = EXCLUDED.hash, last_changed_at = EXCLUDED.last_changed_at, - last_checked_at = EXCLUDED.last_checked_at + last_checked_at = EXCLUDED.last_checked_at, + asserted_location = EXCLUDED.asserted_location, + asserted_location_changed_at = EXCLUDED.asserted_location_changed_at "#, ) .build() @@ -274,6 +291,128 @@ async fn update_tracked_radios( Ok(()) } +// This function can be removed after migration is done. +// 1. Fill mobile_radio_tracker asserted_location from mobile_hotspot_infos +// 2. Read data from csv report. Fill mobile_radio_tracker.asserted_location_changed_at if location from csv and in mobile_hotspot_infos table matches +// 3. Set `asserted_location_changed_at = created_at` for others (num_location_asserts > 0) +pub async fn migrate_mobile_tracker_locations( + mobile_config_pool: Pool, + metadata_pool: Pool, + csv_file_path: &str, +) -> anyhow::Result<()> { + // 1. Fill mobile_radio_tracker asserted_location from mobile_hotspot_infos + + // get_all_mobile_radios + tracing::info!("Exporting data from mobile_hotspot_infos"); + let mobile_infos = get_all_mobile_radios(&metadata_pool) + .filter(|v| futures::future::ready(v.location.is_some())) + .filter(|v| { + futures::future::ready( + v.num_location_asserts.is_some() && v.num_location_asserts.unwrap() > 0, + ) + }) + .collect::>() + .await; + + let mut txn = mobile_config_pool.begin().await?; + + const BATCH_SIZE: usize = (u16::MAX / 3) as usize; + + // Set asserted_location in mobile_radio_tracker from metadata_pool + for chunk in mobile_infos.chunks(BATCH_SIZE) { + let mut query_builder = QueryBuilder::new( + "UPDATE mobile_radio_tracker AS mrt SET asserted_location = data.location + FROM ( ", + ); + + query_builder.push_values(chunk, |mut builder, mob_info| { + builder + .push_bind(mob_info.location) + .push_bind(&mob_info.entity_key); + }); + + query_builder.push( + ") AS data(location, entity_key) + WHERE mrt.entity_key = data.entity_key", + ); + + let built = query_builder.build(); + built.execute(&mut *txn).await?; + } + + // 2. Read data from csv report. Fill mobile_radio_tracker if and only if location from csv and in mobile_hotspot_infos table matches + let mobile_infos_map: HashMap<_, _> = mobile_infos + .iter() + .map(|v| (bs58::encode(v.entity_key.clone()).into_string(), v.location)) + .collect(); + tracing::info!("Exporting data from CSV"); + let mut rdr = Reader::from_path(csv_file_path)?; + let headers = rdr.headers().unwrap().clone(); + let pub_key_idx = headers.iter().position(|h| h == "public_key").unwrap(); + let location_idx = headers.iter().position(|h| h == "h3").unwrap(); + let time_idx = headers.iter().position(|h| h == "time").unwrap(); + + let mut mobile_infos_to_update_map: HashMap> = HashMap::new(); + + let mut csv_migrated_counter = 0; + for record in rdr.records() { + let record = record?; + let pub_key: &str = record.get(pub_key_idx).unwrap(); + + if let Some(v) = mobile_infos_map.get(&String::from(pub_key)) { + let loc = i64::from_str_radix(record.get(location_idx).unwrap(), 16); + if v.unwrap() == loc.unwrap() { + let date: &str = record.get(time_idx).unwrap(); + let date_time: DateTime = DateTime::from_str(date).unwrap(); + let entity_key = bs58::decode(pub_key.to_string()).into_vec()?; + + mobile_infos_to_update_map.insert(entity_key, date_time); + csv_migrated_counter += 1; + } + } else { + tracing::warn!( + "Pubkey: {} exist in csv but not found in metadata database", + pub_key + ) + } + } + tracing::info!("Count radios migrated from CSV: {csv_migrated_counter}"); + + // 3. Set `asserted_location_changed_at = created_at` for others (num_location_asserts > 0) + for mi in mobile_infos.into_iter() { + mobile_infos_to_update_map + .entry(mi.entity_key) + .or_insert(mi.created_at); + } + + let mobile_infos_to_update: Vec<(EntityKey, DateTime)> = + mobile_infos_to_update_map.into_iter().collect(); + + tracing::info!("Updating asserted_location_changed_at in db"); + for chunk in mobile_infos_to_update.chunks(BATCH_SIZE) { + let mut query_builder = QueryBuilder::new( + "UPDATE mobile_radio_tracker AS mrt SET asserted_location_changed_at = data.asserted_location_changed_at + FROM ( ", + ); + + query_builder.push_values(chunk, |mut builder, mob_info| { + builder.push_bind(&mob_info.0).push_bind(mob_info.1); + }); + + query_builder.push( + ") AS data(entity_key, asserted_location_changed_at) + WHERE mrt.entity_key = data.entity_key", + ); + + let built = query_builder.build(); + built.execute(&mut *txn).await?; + } + + txn.commit().await?; + + Ok(()) +} + #[cfg(test)] mod tests { @@ -324,10 +463,68 @@ mod tests { assert_eq!(radio.hash(), result[0].hash); } + #[tokio::test] + async fn last_asserted_location_will_not_updated_if_nothing_changes() { + // location None + let mut radio = mobile_radio(vec![1, 2, 3]); + radio.location = None; + let tracked_radio = TrackedMobileRadio::new(&radio); + let mut tracked_radios = HashMap::new(); + tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio); + + let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; + + assert!(result[0].asserted_location_changed_at.is_none()); + assert!(result[0].asserted_location.is_none()); + + // location is 1 + let mut radio = mobile_radio(vec![1, 2, 3]); + radio.location = Some(1); + let tracked_radio = TrackedMobileRadio::new(&radio); + let mut tracked_radios = HashMap::new(); + tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio); + + let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; + assert!(result[0].asserted_location_changed_at.is_none()); + assert_eq!(result[0].asserted_location, Some(1)); + } + + #[tokio::test] + async fn will_update_last_asserted_location_changed_at_when_location_changes() { + let mut radio = mobile_radio(vec![1, 2, 3]); + radio.location = None; + let tracked_radio = TrackedMobileRadio::new(&radio); + radio.location = Some(1); + + let mut tracked_radios = HashMap::new(); + tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio); + + let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; + + assert_eq!( + result[0].asserted_location_changed_at.unwrap(), + result[0].last_changed_at + ); + assert_eq!(result[0].asserted_location.unwrap(), 1); + + let tracked_radio = TrackedMobileRadio::new(&radio); + radio.location = Some(2); + let mut tracked_radios = HashMap::new(); + tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio); + let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await; + + assert_eq!( + result[0].asserted_location_changed_at.unwrap(), + result[0].last_changed_at, + ); + assert_eq!(result[0].asserted_location.unwrap(), 2); + } + fn mobile_radio(entity_key: EntityKey) -> MobileRadio { MobileRadio { entity_key, refreshed_at: Utc::now() - chrono::Duration::hours(1), + created_at: Utc::now() - chrono::Duration::hours(1), location: Some(1), is_full_hotspot: Some(1), num_location_asserts: Some(1),