diff --git a/src/commands/run.rs b/src/commands/run.rs index 2435523..a2d26d4 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -2,7 +2,7 @@ use clap::Parser; use pid::Pid; use rumqttc::{AsyncClient, MqttOptions, QoS}; use std::time::Duration; -use tokio::sync::mpsc; +use tokio::sync::watch; use tokio::task; use tokio::time; @@ -33,7 +33,7 @@ impl RunArgs { .await .unwrap(); - let (tx, mut rx) = mpsc::channel(32); + let (tx, mut rx) = watch::channel(0); { let client = client.clone(); @@ -41,17 +41,14 @@ impl RunArgs { task::spawn(async move { let mut interval = time::interval(Duration::from_millis(1000)); - let mut last_input_value: f32 = 0.0; let mut last_output_value: u8 = 0; loop { interval.tick().await; - while let Ok(value) = rx.try_recv() { - last_input_value = value; - } + let last_input_value = *rx.borrow_and_update(); let output = pid - .next_control_output(last_input_value) + .next_control_output(last_input_value as f32) .output .clamp(0.0, 100.0) .round() as u8; @@ -76,8 +73,8 @@ impl RunArgs { let notification = eventloop.poll().await.unwrap(); if let rumqttc::Event::Incoming(rumqttc::Packet::Publish(publish)) = notification { if let Ok(payload) = std::str::from_utf8(&publish.payload) { - if let Ok(value) = payload.parse::() { - tx.send(value).await.unwrap(); + if let Ok(value) = payload.parse::() { + tx.send(value).unwrap(); } } }