Skip to content

Commit

Permalink
MQTT reconnect integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
DanNixon committed Dec 9, 2023
1 parent 24fb856 commit b272b3f
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 4 deletions.
1 change: 1 addition & 0 deletions integration-tests/src/integration_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ fn init() {
.init();
}

mod mqtt_reconnect;
mod one;
mod two;
260 changes: 260 additions & 0 deletions integration-tests/src/integration_tests/mqtt_reconnect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
use satori_testing_utils::{
DummyHlsServer, DummyStreamParams, MinioDriver, MosquittoDriver, PublishExt, TestMqttClient,
};
use std::{io::Write, time::Duration};
use tempfile::NamedTempFile;

const MQTT_TOPIC: &str = "satori";

#[tokio::test]
#[ignore]
async fn mqtt_reconnect() {
let minio = MinioDriver::default();
minio.wait_for_ready().await;
minio.set_credential_env_vars();
let s3_bucket = minio.create_bucket("satori").await;

// Initially start Mosquitto
let mosquitto = MosquittoDriver::default();

let mut stream_1 = DummyHlsServer::new(
"stream 1".to_string(),
DummyStreamParams::new("2023-01-01T00:00:00Z", Duration::from_secs(6), 100).into(),
)
.await;

let event_processor_events_file = NamedTempFile::new().unwrap();

let event_processor_config_file = {
let contents = format!(
indoc::indoc!(
r#"
event_file = "{}"
interval = 10
event_ttl = 5
[mqtt]
broker = "{}"
client_id = "satori-event-processor"
username = "test"
password = ""
topic = "satori"
[triggers.fallback]
cameras = ["camera1", "camera2", "camera3"]
reason = "Unknown"
pre = 60
post = 60
[[cameras]]
name = "camera1"
url = "{}/stream.m3u8"
"#
),
event_processor_events_file.path().display(),
mosquitto.address(),
stream_1.address(),
);

let file = NamedTempFile::new().unwrap();
file.as_file().write_all(contents.as_bytes()).unwrap();
file
};

let satori_event_processor = satori_testing_utils::CargoBinaryRunner::new(
"satori-event-processor".to_string(),
vec![
"--config".to_string(),
event_processor_config_file.path().display().to_string(),
"--observability-address".to_string(),
"127.0.0.1:9090".to_string(),
],
vec![("RUST_LOG".to_string(), "debug".to_string())],
);

// Wait for the event processor to start
satori_testing_utils::wait_for_url("http://localhost:9090", Duration::from_secs(600))
.await
.expect("event processor should be running");

let archiver_queue_file = NamedTempFile::new().unwrap();

let archiver_config_file = {
let contents = format!(
indoc::indoc!(
r#"
queue_file = "{}"
interval = 10
[storage]
kind = "s3"
bucket = "satori"
region = ""
endpoint = "{}"
[mqtt]
broker = "{}"
client_id = "satori-archiver-s3"
username = "test"
password = ""
topic = "satori"
[[cameras]]
name = "camera1"
url = "{}/stream.m3u8"
"#
),
archiver_queue_file.path().display(),
minio.endpoint(),
mosquitto.address(),
stream_1.address(),
);

let file = NamedTempFile::new().unwrap();
file.as_file().write_all(contents.as_bytes()).unwrap();
file
};

let satori_archiver = satori_testing_utils::CargoBinaryRunner::new(
"satori-archiver".to_string(),
vec![
"--config".to_string(),
archiver_config_file.path().display().to_string(),
"--observability-address".to_string(),
"127.0.0.1:9091".to_string(),
],
vec![
("AWS_ACCESS_KEY_ID".to_string(), "minioadmin".to_string()),
(
"AWS_SECRET_ACCESS_KEY".to_string(),
"minioadmin".to_string(),
),
("RUST_LOG".to_string(), "debug".to_string()),
],
);

// Wait for the archiver to start
satori_testing_utils::wait_for_url("http://localhost:9091", Duration::from_secs(600))
.await
.expect("archiver should be running");

// Wait a short time and stop Mosquitto
tokio::time::sleep(Duration::from_secs(2)).await;
let mqtt_port = mosquitto.port();
drop(mosquitto);

// Wait a little bit longer and start Mosquitto again (using the same port as before)
tokio::time::sleep(Duration::from_secs(10)).await;
let mosquitto = MosquittoDriver::with_port(mqtt_port);

// Wait some more time for components to reconnect to Mosquitto
// For now this must be longer than the archiver interval (this should not be the case and
// should be looked at when moving to rumqttc)
tokio::time::sleep(Duration::from_secs(15)).await;

let mut mqtt_client = TestMqttClient::new(mosquitto.port()).await;
mqtt_client
.client()
.subscribe(MQTT_TOPIC, rumqttc::QoS::ExactlyOnce)
.await
.unwrap();

// Trigger an event
mqtt_client
.client()
.publish(
MQTT_TOPIC,
rumqttc::QoS::ExactlyOnce,
false,
r#"{"kind": "trigger_command", "data": {"id": "test", "timestamp": "2023-01-01T00:02:15Z", "reason": "test", "cameras": ["camera1"], "pre": 50, "post": 30 }}"#.to_string(),
)
.await
.unwrap();

tokio::time::sleep(Duration::from_secs(2)).await;

// The event trigger message should be received
assert_eq!(
mqtt_client
.pop_message()
.unwrap()
.try_payload_str()
.unwrap(),
r#"{"kind": "trigger_command", "data": {"id": "test", "timestamp": "2023-01-01T00:02:15Z", "reason": "test", "cameras": ["camera1"], "pre": 50, "post": 30 }}"#,
);

// Segment archive command for camera1 should be sent
assert_eq!(
mqtt_client
.pop_message()
.unwrap()
.try_payload_str()
.unwrap(),
r#"{"kind":"archive_command","data":{"kind":"segments","data":{"name":"camera1","segment_list":["2023-01-01T00_01_24+0000.ts","2023-01-01T00_01_30+0000.ts","2023-01-01T00_01_36+0000.ts","2023-01-01T00_01_42+0000.ts","2023-01-01T00_01_48+0000.ts","2023-01-01T00_01_54+0000.ts","2023-01-01T00_02_00+0000.ts","2023-01-01T00_02_06+0000.ts","2023-01-01T00_02_12+0000.ts","2023-01-01T00_02_18+0000.ts","2023-01-01T00_02_24+0000.ts","2023-01-01T00_02_30+0000.ts","2023-01-01T00_02_36+0000.ts","2023-01-01T00_02_42+0000.ts"]}}}"#
);

// Event metadata archive command should be sent
assert_eq!(
mqtt_client
.pop_message()
.unwrap()
.try_payload_str()
.unwrap(),
r#"{"kind":"archive_command","data":{"kind":"event_metadata","data":{"metadata":{"id":"test","timestamp":"2023-01-01T00:02:15Z"},"reasons":[{"timestamp":"2023-01-01T00:02:15Z","reason":"test"}],"start":"2023-01-01T00:01:25Z","end":"2023-01-01T00:02:45Z","cameras":[{"name":"camera1","segment_list":["2023-01-01T00_01_24+0000.ts","2023-01-01T00_01_30+0000.ts","2023-01-01T00_01_36+0000.ts","2023-01-01T00_01_42+0000.ts","2023-01-01T00_01_48+0000.ts","2023-01-01T00_01_54+0000.ts","2023-01-01T00_02_00+0000.ts","2023-01-01T00_02_06+0000.ts","2023-01-01T00_02_12+0000.ts","2023-01-01T00_02_18+0000.ts","2023-01-01T00_02_24+0000.ts","2023-01-01T00_02_30+0000.ts","2023-01-01T00_02_36+0000.ts","2023-01-01T00_02_42+0000.ts"]}]}}}"#
);

tokio::time::sleep(Duration::from_secs(1)).await;

// Check correct event metadata is stored in S3
let s3_event = s3_bucket
.get_object("events/2023-01-01T00:02:15+00:00_test.json")
.await
.unwrap();
let s3_event = s3_event.as_str().unwrap();
assert_eq!(
s3_event,
"{\n \"metadata\": {\n \"id\": \"test\",\n \"timestamp\": \"2023-01-01T00:02:15Z\"\n },\n \"reasons\": [\n {\n \"timestamp\": \"2023-01-01T00:02:15Z\",\n \"reason\": \"test\"\n }\n ],\n \"start\": \"2023-01-01T00:01:25Z\",\n \"end\": \"2023-01-01T00:02:45Z\",\n \"cameras\": [\n {\n \"name\": \"camera1\",\n \"segment_list\": [\n \"2023-01-01T00_01_24+0000.ts\",\n \"2023-01-01T00_01_30+0000.ts\",\n \"2023-01-01T00_01_36+0000.ts\",\n \"2023-01-01T00_01_42+0000.ts\",\n \"2023-01-01T00_01_48+0000.ts\",\n \"2023-01-01T00_01_54+0000.ts\",\n \"2023-01-01T00_02_00+0000.ts\",\n \"2023-01-01T00_02_06+0000.ts\",\n \"2023-01-01T00_02_12+0000.ts\",\n \"2023-01-01T00_02_18+0000.ts\",\n \"2023-01-01T00_02_24+0000.ts\",\n \"2023-01-01T00_02_30+0000.ts\",\n \"2023-01-01T00_02_36+0000.ts\",\n \"2023-01-01T00_02_42+0000.ts\"\n ]\n }\n ]\n}"
);

// Check correct segments are stored in S3
let s3_segments_camera1 = s3_bucket
.list("segments/camera1/".to_string(), Some("/".to_string()))
.await
.unwrap();
let s3_segments_camera1 = s3_segments_camera1[0]
.contents
.iter()
.map(|s| s.key.clone())
.collect::<Vec<_>>();
assert_eq!(
s3_segments_camera1,
vec![
"segments/camera1/2023-01-01T00_01_24+0000.ts",
"segments/camera1/2023-01-01T00_01_30+0000.ts",
"segments/camera1/2023-01-01T00_01_36+0000.ts",
"segments/camera1/2023-01-01T00_01_42+0000.ts",
"segments/camera1/2023-01-01T00_01_48+0000.ts",
"segments/camera1/2023-01-01T00_01_54+0000.ts",
"segments/camera1/2023-01-01T00_02_00+0000.ts",
"segments/camera1/2023-01-01T00_02_06+0000.ts",
"segments/camera1/2023-01-01T00_02_12+0000.ts",
"segments/camera1/2023-01-01T00_02_18+0000.ts",
"segments/camera1/2023-01-01T00_02_24+0000.ts",
"segments/camera1/2023-01-01T00_02_30+0000.ts",
"segments/camera1/2023-01-01T00_02_36+0000.ts",
"segments/camera1/2023-01-01T00_02_42+0000.ts",
]
);

tokio::time::sleep(Duration::from_secs(1)).await;

// There should be no more MQTT messages at this point
assert!(mqtt_client.pop_message().is_none());

mqtt_client.stop().await;

satori_event_processor.stop();
satori_archiver.stop();

stream_1.stop().await;
}
11 changes: 7 additions & 4 deletions testing-utils/src/mosquitto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ pub struct MosquittoDriver {

impl Default for MosquittoDriver {
fn default() -> Self {
let port = rand::random::<u16>() % 1000 + 8000;
Self::with_port(port)
}
}

impl MosquittoDriver {
pub fn with_port(port: u16) -> Self {
let temp_config_file = tempfile::NamedTempFile::new().unwrap();
temp_config_file
.as_file()
Expand All @@ -18,8 +25,6 @@ impl Default for MosquittoDriver {
)
.unwrap();

let port = rand::random::<u16>() % 1000 + 8000;

let podman = PodmanDriver::new(
"docker.io/library/eclipse-mosquitto",
&[&format!("{port}:1883")],
Expand All @@ -36,9 +41,7 @@ impl Default for MosquittoDriver {
port,
}
}
}

impl MosquittoDriver {
pub fn port(&self) -> u16 {
self.port
}
Expand Down

0 comments on commit b272b3f

Please sign in to comment.