diff --git a/common/src/mqtt.rs b/common/src/mqtt.rs index d37bc60..fdb04d2 100644 --- a/common/src/mqtt.rs +++ b/common/src/mqtt.rs @@ -1,7 +1,7 @@ use rumqttc::{AsyncClient, Event, EventLoop, Incoming, MqttOptions, Outgoing, Publish, QoS}; use serde::Deserialize; use std::time::Duration; -use tracing::{debug, error, info, warn}; +use tracing::{error, info, warn}; #[derive(Debug, Deserialize)] pub struct MqttConfig { @@ -53,15 +53,16 @@ impl MqttClient { None } } - Ok(Event::Incoming(Incoming::ConnAck(event))) => { - info!("ConnAck - {:?}", event); + Ok(Event::Incoming(Incoming::ConnAck(_))) => { + info!("Connected"); self.subscribe_to_topic().await; None } - Ok(event) => { - debug!("rumqttc event: {:?}", event); + Ok(Event::Incoming(Incoming::Disconnect)) => { + warn!("Disconnected"); None } + Ok(_) => None, Err(e) => { warn!("rumqttc error: {:?}", e); None @@ -78,9 +79,7 @@ impl MqttClient { info!("Disconnected successfully"); break; } - Ok(event) => { - debug!("rumqttc event: {:?}", event); - } + Ok(_) => {} Err(e) => { warn!("rumqttc error: {:?}", e); } @@ -113,9 +112,7 @@ impl MqttClient { break; } } - Ok(event) => { - debug!("rumqttc event: {:?}", event); - } + Ok(_) => {} Err(e) => { warn!("rumqttc error: {:?}", e); } diff --git a/integration-tests/src/integration_tests/mqtt_reconnect.rs b/integration-tests/src/integration_tests/mqtt_reconnect.rs index 93f7abd..4d2ab3a 100644 --- a/integration-tests/src/integration_tests/mqtt_reconnect.rs +++ b/integration-tests/src/integration_tests/mqtt_reconnect.rs @@ -174,12 +174,11 @@ async fn mqtt_reconnect() { .await .unwrap(); - tokio::time::sleep(Duration::from_secs(2)).await; - // The event trigger message should be received assert_eq!( mqtt_client - .pop_message() + .wait_for_message(Duration::from_secs(5)) + .await .unwrap() .try_payload_str() .unwrap(), @@ -189,7 +188,8 @@ async fn mqtt_reconnect() { // Segment archive command for camera1 should be sent assert_eq!( mqtt_client - .pop_message() + .wait_for_message(Duration::from_secs(5)) + .await .unwrap() .try_payload_str() .unwrap(), @@ -199,7 +199,8 @@ async fn mqtt_reconnect() { // Event metadata archive command should be sent assert_eq!( mqtt_client - .pop_message() + .wait_for_message(Duration::from_secs(5)) + .await .unwrap() .try_payload_str() .unwrap(), @@ -249,10 +250,11 @@ async fn mqtt_reconnect() { ] ); - tokio::time::sleep(Duration::from_secs(1)).await; - // There should be no more MQTT messages at this point - assert!(mqtt_client.pop_message().is_none()); + assert!(mqtt_client + .wait_for_message(Duration::from_secs(5)) + .await + .is_err()); mqtt_client.stop().await; diff --git a/integration-tests/src/integration_tests/one.rs b/integration-tests/src/integration_tests/one.rs index 5f4edf6..860707e 100644 --- a/integration-tests/src/integration_tests/one.rs +++ b/integration-tests/src/integration_tests/one.rs @@ -193,12 +193,11 @@ async fn one() { .await .unwrap(); - tokio::time::sleep(Duration::from_secs(2)).await; - // The event trigger message should be received assert_eq!( mqtt_client - .pop_message() + .wait_for_message(Duration::from_secs(5)) + .await .unwrap() .try_payload_str() .unwrap(), @@ -208,7 +207,8 @@ async fn one() { // Segment archive command for camera1 should be sent assert_eq!( mqtt_client - .pop_message() + .wait_for_message(Duration::from_secs(5)) + .await .unwrap() .try_payload_str() .unwrap(), @@ -218,7 +218,8 @@ async fn one() { // Segment archive command for camera3 should be sent assert_eq!( mqtt_client - .pop_message() + .wait_for_message(Duration::from_secs(5)) + .await .unwrap() .try_payload_str() .unwrap(), @@ -228,7 +229,8 @@ async fn one() { // Event metadata archive command should be sent assert_eq!( mqtt_client - .pop_message() + .wait_for_message(Duration::from_secs(5)) + .await .unwrap() .try_payload_str() .unwrap(), @@ -286,10 +288,11 @@ async fn one() { ] ); - tokio::time::sleep(Duration::from_secs(1)).await; - // There should be no more MQTT messages at this point - assert!(mqtt_client.pop_message().is_none()); + assert!(mqtt_client + .wait_for_message(Duration::from_secs(5)) + .await + .is_err()); mqtt_client.stop().await; diff --git a/testing-utils/src/mosquitto.rs b/testing-utils/src/mosquitto.rs index b1acf00..d20ce1b 100644 --- a/testing-utils/src/mosquitto.rs +++ b/testing-utils/src/mosquitto.rs @@ -45,8 +45,4 @@ impl MosquittoDriver { pub fn port(&self) -> u16 { self.port } - - pub fn address(&self) -> String { - format!("tcp://localhost:{}", self.port) - } } diff --git a/testing-utils/src/mqtt_client.rs b/testing-utils/src/mqtt_client.rs index 371bf2e..2b97c6d 100644 --- a/testing-utils/src/mqtt_client.rs +++ b/testing-utils/src/mqtt_client.rs @@ -1,9 +1,5 @@ use rumqttc::{mqttbytes::v4::Publish, AsyncClient, Event, Incoming, MqttOptions}; -use std::{ - collections::VecDeque, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::time::Duration; use tokio::{ sync::{ broadcast::{self, Receiver, Sender}, @@ -13,14 +9,11 @@ use tokio::{ }; use tracing::{error, info}; -type MessageQueue = Arc>>; - pub struct TestMqttClient { handle: Option>, exit_tx: Sender<()>, client: AsyncClient, - recevied_mqtt_messages: MessageQueue, message_rx: Receiver, } @@ -31,15 +24,11 @@ impl TestMqttClient { let (client, mut event_loop) = AsyncClient::new(options, 10); - let recevied_mqtt_messages = MessageQueue::default(); - let (exit_tx, mut exit_rx) = broadcast::channel(1); let (message_tx, message_rx) = broadcast::channel(16); let (connected_tx, mut connected_rx) = watch::channel(false); let handle = { - let recevied_mqtt_messages = recevied_mqtt_messages.clone(); - Some(tokio::spawn(async move { loop { tokio::select! { @@ -50,7 +39,6 @@ impl TestMqttClient { } Ok(Event::Incoming(Incoming::Publish(msg))) => { info!("Received message: {:?}", msg); - recevied_mqtt_messages.lock().unwrap().push_back(msg.clone()); message_tx.send(msg).unwrap(); } Err(e) => { @@ -74,7 +62,6 @@ impl TestMqttClient { handle, exit_tx, client, - recevied_mqtt_messages, message_rx, } } @@ -93,10 +80,6 @@ impl TestMqttClient { &self.client } - pub fn pop_message(&self) -> Option { - self.recevied_mqtt_messages.lock().unwrap().pop_front() - } - pub async fn wait_for_message(&mut self, timeout: Duration) -> Result { match tokio::time::timeout(timeout, self.message_rx.recv()).await { Ok(Ok(msg)) => Ok(msg), @@ -120,69 +103,6 @@ mod test { .init(); } - #[tokio::test] - async fn basic() { - let mosquitto = MosquittoDriver::default(); - - let mut client = TestMqttClient::new(mosquitto.port()).await; - - client - .client() - .subscribe("test-1", rumqttc::QoS::AtLeastOnce) - .await - .unwrap(); - - client - .client() - .subscribe("test-2", rumqttc::QoS::AtLeastOnce) - .await - .unwrap(); - - tokio::time::sleep(Duration::from_millis(100)).await; - - assert!(client.pop_message().is_none()); - - client - .client() - .publish("test-1", rumqttc::QoS::AtLeastOnce, false, "Hello 1") - .await - .unwrap(); - - tokio::time::sleep(Duration::from_millis(100)).await; - - { - let msg = client.pop_message(); - assert!(msg.is_some()); - let msg = msg.unwrap(); - assert_eq!(msg.topic, "test-1".to_string()); - assert_eq!(msg.try_payload_str().unwrap(), "Hello 1"); - } - - assert!(client.pop_message().is_none()); - - client - .client() - .publish("test-2", rumqttc::QoS::AtLeastOnce, false, "Hello 2") - .await - .unwrap(); - - tokio::time::sleep(Duration::from_millis(100)).await; - - { - let msg = client.pop_message(); - assert!(msg.is_some()); - let msg = msg.unwrap(); - assert_eq!(msg.topic, "test-2".to_string()); - assert_eq!(msg.try_payload_str().unwrap(), "Hello 2"); - } - - assert!(client.pop_message().is_none()); - - tokio::time::sleep(Duration::from_millis(100)).await; - - client.stop().await; - } - #[tokio::test] async fn wait_for_message() { let mosquitto = MosquittoDriver::default();