Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional MQTT tidy #62

Merged
merged 5 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions common/src/mqtt.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use rumqttc::{AsyncClient, Event, EventLoop, Incoming, MqttOptions, Outgoing, Publish, QoS};
use serde::Deserialize;
use std::time::Duration;
use tracing::{debug, error, info, warn};
use tracing::{error, info, warn};

#[derive(Debug, Deserialize)]
pub struct MqttConfig {
Expand Down Expand Up @@ -53,15 +53,16 @@ impl MqttClient {
None
}
}
Ok(Event::Incoming(Incoming::ConnAck(event))) => {
info!("ConnAck - {:?}", event);
Ok(Event::Incoming(Incoming::ConnAck(_))) => {
info!("Connected");
self.subscribe_to_topic().await;
None
}
Ok(event) => {
debug!("rumqttc event: {:?}", event);
Ok(Event::Incoming(Incoming::Disconnect)) => {
warn!("Disconnected");
None
}
Ok(_) => None,
Err(e) => {
warn!("rumqttc error: {:?}", e);
None
Expand All @@ -78,9 +79,7 @@ impl MqttClient {
info!("Disconnected successfully");
break;
}
Ok(event) => {
debug!("rumqttc event: {:?}", event);
}
Ok(_) => {}
Err(e) => {
warn!("rumqttc error: {:?}", e);
}
Expand Down Expand Up @@ -113,9 +112,7 @@ impl MqttClient {
break;
}
}
Ok(event) => {
debug!("rumqttc event: {:?}", event);
}
Ok(_) => {}
Err(e) => {
warn!("rumqttc error: {:?}", e);
}
Expand Down
18 changes: 10 additions & 8 deletions integration-tests/src/integration_tests/mqtt_reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,11 @@ async fn mqtt_reconnect() {
.await
.unwrap();

tokio::time::sleep(Duration::from_secs(2)).await;

// The event trigger message should be received
assert_eq!(
mqtt_client
.pop_message()
.wait_for_message(Duration::from_secs(5))
.await
.unwrap()
.try_payload_str()
.unwrap(),
Expand All @@ -189,7 +188,8 @@ async fn mqtt_reconnect() {
// Segment archive command for camera1 should be sent
assert_eq!(
mqtt_client
.pop_message()
.wait_for_message(Duration::from_secs(5))
.await
.unwrap()
.try_payload_str()
.unwrap(),
Expand All @@ -199,7 +199,8 @@ async fn mqtt_reconnect() {
// Event metadata archive command should be sent
assert_eq!(
mqtt_client
.pop_message()
.wait_for_message(Duration::from_secs(5))
.await
.unwrap()
.try_payload_str()
.unwrap(),
Expand Down Expand Up @@ -249,10 +250,11 @@ async fn mqtt_reconnect() {
]
);

tokio::time::sleep(Duration::from_secs(1)).await;

// There should be no more MQTT messages at this point
assert!(mqtt_client.pop_message().is_none());
assert!(mqtt_client
.wait_for_message(Duration::from_secs(5))
.await
.is_err());

mqtt_client.stop().await;

Expand Down
21 changes: 12 additions & 9 deletions integration-tests/src/integration_tests/one.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,11 @@ async fn one() {
.await
.unwrap();

tokio::time::sleep(Duration::from_secs(2)).await;

// The event trigger message should be received
assert_eq!(
mqtt_client
.pop_message()
.wait_for_message(Duration::from_secs(5))
.await
.unwrap()
.try_payload_str()
.unwrap(),
Expand All @@ -208,7 +207,8 @@ async fn one() {
// Segment archive command for camera1 should be sent
assert_eq!(
mqtt_client
.pop_message()
.wait_for_message(Duration::from_secs(5))
.await
.unwrap()
.try_payload_str()
.unwrap(),
Expand All @@ -218,7 +218,8 @@ async fn one() {
// Segment archive command for camera3 should be sent
assert_eq!(
mqtt_client
.pop_message()
.wait_for_message(Duration::from_secs(5))
.await
.unwrap()
.try_payload_str()
.unwrap(),
Expand All @@ -228,7 +229,8 @@ async fn one() {
// Event metadata archive command should be sent
assert_eq!(
mqtt_client
.pop_message()
.wait_for_message(Duration::from_secs(5))
.await
.unwrap()
.try_payload_str()
.unwrap(),
Expand Down Expand Up @@ -286,10 +288,11 @@ async fn one() {
]
);

tokio::time::sleep(Duration::from_secs(1)).await;

// There should be no more MQTT messages at this point
assert!(mqtt_client.pop_message().is_none());
assert!(mqtt_client
.wait_for_message(Duration::from_secs(5))
.await
.is_err());

mqtt_client.stop().await;

Expand Down
4 changes: 0 additions & 4 deletions testing-utils/src/mosquitto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,4 @@ impl MosquittoDriver {
pub fn port(&self) -> u16 {
self.port
}

pub fn address(&self) -> String {
format!("tcp://localhost:{}", self.port)
}
}
82 changes: 1 addition & 81 deletions testing-utils/src/mqtt_client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use rumqttc::{mqttbytes::v4::Publish, AsyncClient, Event, Incoming, MqttOptions};
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
time::Duration,
};
use std::time::Duration;
use tokio::{
sync::{
broadcast::{self, Receiver, Sender},
Expand All @@ -13,14 +9,11 @@ use tokio::{
};
use tracing::{error, info};

type MessageQueue = Arc<Mutex<VecDeque<Publish>>>;

pub struct TestMqttClient {
handle: Option<JoinHandle<()>>,
exit_tx: Sender<()>,

client: AsyncClient,
recevied_mqtt_messages: MessageQueue,
message_rx: Receiver<Publish>,
}

Expand All @@ -31,15 +24,11 @@ impl TestMqttClient {

let (client, mut event_loop) = AsyncClient::new(options, 10);

let recevied_mqtt_messages = MessageQueue::default();

let (exit_tx, mut exit_rx) = broadcast::channel(1);
let (message_tx, message_rx) = broadcast::channel(16);
let (connected_tx, mut connected_rx) = watch::channel(false);

let handle = {
let recevied_mqtt_messages = recevied_mqtt_messages.clone();

Some(tokio::spawn(async move {
loop {
tokio::select! {
Expand All @@ -50,7 +39,6 @@ impl TestMqttClient {
}
Ok(Event::Incoming(Incoming::Publish(msg))) => {
info!("Received message: {:?}", msg);
recevied_mqtt_messages.lock().unwrap().push_back(msg.clone());
message_tx.send(msg).unwrap();
}
Err(e) => {
Expand All @@ -74,7 +62,6 @@ impl TestMqttClient {
handle,
exit_tx,
client,
recevied_mqtt_messages,
message_rx,
}
}
Expand All @@ -93,10 +80,6 @@ impl TestMqttClient {
&self.client
}

pub fn pop_message(&self) -> Option<Publish> {
self.recevied_mqtt_messages.lock().unwrap().pop_front()
}

pub async fn wait_for_message(&mut self, timeout: Duration) -> Result<Publish, ()> {
match tokio::time::timeout(timeout, self.message_rx.recv()).await {
Ok(Ok(msg)) => Ok(msg),
Expand All @@ -120,69 +103,6 @@ mod test {
.init();
}

#[tokio::test]
async fn basic() {
let mosquitto = MosquittoDriver::default();

let mut client = TestMqttClient::new(mosquitto.port()).await;

client
.client()
.subscribe("test-1", rumqttc::QoS::AtLeastOnce)
.await
.unwrap();

client
.client()
.subscribe("test-2", rumqttc::QoS::AtLeastOnce)
.await
.unwrap();

tokio::time::sleep(Duration::from_millis(100)).await;

assert!(client.pop_message().is_none());

client
.client()
.publish("test-1", rumqttc::QoS::AtLeastOnce, false, "Hello 1")
.await
.unwrap();

tokio::time::sleep(Duration::from_millis(100)).await;

{
let msg = client.pop_message();
assert!(msg.is_some());
let msg = msg.unwrap();
assert_eq!(msg.topic, "test-1".to_string());
assert_eq!(msg.try_payload_str().unwrap(), "Hello 1");
}

assert!(client.pop_message().is_none());

client
.client()
.publish("test-2", rumqttc::QoS::AtLeastOnce, false, "Hello 2")
.await
.unwrap();

tokio::time::sleep(Duration::from_millis(100)).await;

{
let msg = client.pop_message();
assert!(msg.is_some());
let msg = msg.unwrap();
assert_eq!(msg.topic, "test-2".to_string());
assert_eq!(msg.try_payload_str().unwrap(), "Hello 2");
}

assert!(client.pop_message().is_none());

tokio::time::sleep(Duration::from_millis(100)).await;

client.stop().await;
}

#[tokio::test]
async fn wait_for_message() {
let mosquitto = MosquittoDriver::default();
Expand Down