From 308e9bc308caf339694a09c6f0faa8531d4107c5 Mon Sep 17 00:00:00 2001 From: Dan Nixon Date: Mon, 11 Dec 2023 17:47:31 +0000 Subject: [PATCH 1/5] Remove `address()` function from Mosquitto test helper --- testing-utils/src/mosquitto.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/testing-utils/src/mosquitto.rs b/testing-utils/src/mosquitto.rs index b1acf004..d20ce1b7 100644 --- a/testing-utils/src/mosquitto.rs +++ b/testing-utils/src/mosquitto.rs @@ -45,8 +45,4 @@ impl MosquittoDriver { pub fn port(&self) -> u16 { self.port } - - pub fn address(&self) -> String { - format!("tcp://localhost:{}", self.port) - } } From 2f98940f2b90a7d62a48ca1f1cfa77196eb3d519 Mon Sep 17 00:00:00 2001 From: Dan Nixon Date: Mon, 11 Dec 2023 17:55:31 +0000 Subject: [PATCH 2/5] Remove use of `pop_message()` in integration tests --- .../src/integration_tests/mqtt_reconnect.rs | 18 +++++++++------- .../src/integration_tests/one.rs | 21 +++++++++++-------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/integration-tests/src/integration_tests/mqtt_reconnect.rs b/integration-tests/src/integration_tests/mqtt_reconnect.rs index 93f7abd0..4d2ab3a4 100644 --- a/integration-tests/src/integration_tests/mqtt_reconnect.rs +++ b/integration-tests/src/integration_tests/mqtt_reconnect.rs @@ -174,12 +174,11 @@ async fn mqtt_reconnect() { .await .unwrap(); - tokio::time::sleep(Duration::from_secs(2)).await; - // The event trigger message should be received assert_eq!( mqtt_client - .pop_message() + .wait_for_message(Duration::from_secs(5)) + .await .unwrap() .try_payload_str() .unwrap(), @@ -189,7 +188,8 @@ async fn mqtt_reconnect() { // Segment archive command for camera1 should be sent assert_eq!( mqtt_client - .pop_message() + .wait_for_message(Duration::from_secs(5)) + .await .unwrap() .try_payload_str() .unwrap(), @@ -199,7 +199,8 @@ async fn mqtt_reconnect() { // Event metadata archive command should be sent assert_eq!( mqtt_client - .pop_message() + .wait_for_message(Duration::from_secs(5)) + .await .unwrap() .try_payload_str() .unwrap(), @@ -249,10 +250,11 @@ async fn mqtt_reconnect() { ] ); - tokio::time::sleep(Duration::from_secs(1)).await; - // There should be no more MQTT messages at this point - assert!(mqtt_client.pop_message().is_none()); + assert!(mqtt_client + .wait_for_message(Duration::from_secs(5)) + .await + .is_err()); mqtt_client.stop().await; diff --git a/integration-tests/src/integration_tests/one.rs b/integration-tests/src/integration_tests/one.rs index 5f4edf67..860707ed 100644 --- a/integration-tests/src/integration_tests/one.rs +++ b/integration-tests/src/integration_tests/one.rs @@ -193,12 +193,11 @@ async fn one() { .await .unwrap(); - tokio::time::sleep(Duration::from_secs(2)).await; - // The event trigger message should be received assert_eq!( mqtt_client - .pop_message() + .wait_for_message(Duration::from_secs(5)) + .await .unwrap() .try_payload_str() .unwrap(), @@ -208,7 +207,8 @@ async fn one() { // Segment archive command for camera1 should be sent assert_eq!( mqtt_client - .pop_message() + .wait_for_message(Duration::from_secs(5)) + .await .unwrap() .try_payload_str() .unwrap(), @@ -218,7 +218,8 @@ async fn one() { // Segment archive command for camera3 should be sent assert_eq!( mqtt_client - .pop_message() + .wait_for_message(Duration::from_secs(5)) + .await .unwrap() .try_payload_str() .unwrap(), @@ -228,7 +229,8 @@ async fn one() { // Event metadata archive command should be sent assert_eq!( mqtt_client - .pop_message() + .wait_for_message(Duration::from_secs(5)) + .await .unwrap() .try_payload_str() .unwrap(), @@ -286,10 +288,11 @@ async fn one() { ] ); - tokio::time::sleep(Duration::from_secs(1)).await; - // There should be no more MQTT messages at this point - assert!(mqtt_client.pop_message().is_none()); + assert!(mqtt_client + .wait_for_message(Duration::from_secs(5)) + .await + .is_err()); mqtt_client.stop().await; From 1c26bc370a09d5a99d0cd63d8d795a704b55d733 Mon Sep 17 00:00:00 2001 From: Dan Nixon Date: Mon, 11 Dec 2023 18:14:22 +0000 Subject: [PATCH 3/5] Remove received message queue in test MQTT client --- testing-utils/src/mqtt_client.rs | 82 +------------------------------- 1 file changed, 1 insertion(+), 81 deletions(-) diff --git a/testing-utils/src/mqtt_client.rs b/testing-utils/src/mqtt_client.rs index 371bf2e4..2b97c6d5 100644 --- a/testing-utils/src/mqtt_client.rs +++ b/testing-utils/src/mqtt_client.rs @@ -1,9 +1,5 @@ use rumqttc::{mqttbytes::v4::Publish, AsyncClient, Event, Incoming, MqttOptions}; -use std::{ - collections::VecDeque, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::time::Duration; use tokio::{ sync::{ broadcast::{self, Receiver, Sender}, @@ -13,14 +9,11 @@ use tokio::{ }; use tracing::{error, info}; -type MessageQueue = Arc>>; - pub struct TestMqttClient { handle: Option>, exit_tx: Sender<()>, client: AsyncClient, - recevied_mqtt_messages: MessageQueue, message_rx: Receiver, } @@ -31,15 +24,11 @@ impl TestMqttClient { let (client, mut event_loop) = AsyncClient::new(options, 10); - let recevied_mqtt_messages = MessageQueue::default(); - let (exit_tx, mut exit_rx) = broadcast::channel(1); let (message_tx, message_rx) = broadcast::channel(16); let (connected_tx, mut connected_rx) = watch::channel(false); let handle = { - let recevied_mqtt_messages = recevied_mqtt_messages.clone(); - Some(tokio::spawn(async move { loop { tokio::select! { @@ -50,7 +39,6 @@ impl TestMqttClient { } Ok(Event::Incoming(Incoming::Publish(msg))) => { info!("Received message: {:?}", msg); - recevied_mqtt_messages.lock().unwrap().push_back(msg.clone()); message_tx.send(msg).unwrap(); } Err(e) => { @@ -74,7 +62,6 @@ impl TestMqttClient { handle, exit_tx, client, - recevied_mqtt_messages, message_rx, } } @@ -93,10 +80,6 @@ impl TestMqttClient { &self.client } - pub fn pop_message(&self) -> Option { - self.recevied_mqtt_messages.lock().unwrap().pop_front() - } - pub async fn wait_for_message(&mut self, timeout: Duration) -> Result { match tokio::time::timeout(timeout, self.message_rx.recv()).await { Ok(Ok(msg)) => Ok(msg), @@ -120,69 +103,6 @@ mod test { .init(); } - #[tokio::test] - async fn basic() { - let mosquitto = MosquittoDriver::default(); - - let mut client = TestMqttClient::new(mosquitto.port()).await; - - client - .client() - .subscribe("test-1", rumqttc::QoS::AtLeastOnce) - .await - .unwrap(); - - client - .client() - .subscribe("test-2", rumqttc::QoS::AtLeastOnce) - .await - .unwrap(); - - tokio::time::sleep(Duration::from_millis(100)).await; - - assert!(client.pop_message().is_none()); - - client - .client() - .publish("test-1", rumqttc::QoS::AtLeastOnce, false, "Hello 1") - .await - .unwrap(); - - tokio::time::sleep(Duration::from_millis(100)).await; - - { - let msg = client.pop_message(); - assert!(msg.is_some()); - let msg = msg.unwrap(); - assert_eq!(msg.topic, "test-1".to_string()); - assert_eq!(msg.try_payload_str().unwrap(), "Hello 1"); - } - - assert!(client.pop_message().is_none()); - - client - .client() - .publish("test-2", rumqttc::QoS::AtLeastOnce, false, "Hello 2") - .await - .unwrap(); - - tokio::time::sleep(Duration::from_millis(100)).await; - - { - let msg = client.pop_message(); - assert!(msg.is_some()); - let msg = msg.unwrap(); - assert_eq!(msg.topic, "test-2".to_string()); - assert_eq!(msg.try_payload_str().unwrap(), "Hello 2"); - } - - assert!(client.pop_message().is_none()); - - tokio::time::sleep(Duration::from_millis(100)).await; - - client.stop().await; - } - #[tokio::test] async fn wait_for_message() { let mosquitto = MosquittoDriver::default(); From 205f7d6dbdcfb24b699734e8f450a35f451d8e06 Mon Sep 17 00:00:00 2001 From: Dan Nixon Date: Mon, 11 Dec 2023 18:32:04 +0000 Subject: [PATCH 4/5] Remove excess debug logging --- common/src/mqtt.rs | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/common/src/mqtt.rs b/common/src/mqtt.rs index d37bc608..f9777f4c 100644 --- a/common/src/mqtt.rs +++ b/common/src/mqtt.rs @@ -1,7 +1,7 @@ use rumqttc::{AsyncClient, Event, EventLoop, Incoming, MqttOptions, Outgoing, Publish, QoS}; use serde::Deserialize; use std::time::Duration; -use tracing::{debug, error, info, warn}; +use tracing::{error, info, warn}; #[derive(Debug, Deserialize)] pub struct MqttConfig { @@ -53,15 +53,12 @@ impl MqttClient { None } } - Ok(Event::Incoming(Incoming::ConnAck(event))) => { - info!("ConnAck - {:?}", event); + Ok(Event::Incoming(Incoming::ConnAck(_))) => { + info!("Connected"); self.subscribe_to_topic().await; None } - Ok(event) => { - debug!("rumqttc event: {:?}", event); - None - } + Ok(_) => None, Err(e) => { warn!("rumqttc error: {:?}", e); None @@ -78,9 +75,7 @@ impl MqttClient { info!("Disconnected successfully"); break; } - Ok(event) => { - debug!("rumqttc event: {:?}", event); - } + Ok(_) => {} Err(e) => { warn!("rumqttc error: {:?}", e); } @@ -113,9 +108,7 @@ impl MqttClient { break; } } - Ok(event) => { - debug!("rumqttc event: {:?}", event); - } + Ok(_) => {} Err(e) => { warn!("rumqttc error: {:?}", e); } From aa50068d0d7667d04d32bd34b79a68ec2df826ad Mon Sep 17 00:00:00 2001 From: Dan Nixon Date: Mon, 11 Dec 2023 18:33:14 +0000 Subject: [PATCH 5/5] Warn on disconnection --- common/src/mqtt.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/common/src/mqtt.rs b/common/src/mqtt.rs index f9777f4c..fdb04d2e 100644 --- a/common/src/mqtt.rs +++ b/common/src/mqtt.rs @@ -58,6 +58,10 @@ impl MqttClient { self.subscribe_to_topic().await; None } + Ok(Event::Incoming(Incoming::Disconnect)) => { + warn!("Disconnected"); + None + } Ok(_) => None, Err(e) => { warn!("rumqttc error: {:?}", e);