From 0958975a8864e24d6f1f7eeebb201fa6371300e8 Mon Sep 17 00:00:00 2001 From: Gordy F Date: Mon, 11 Nov 2024 17:08:51 -0800 Subject: [PATCH] Use tokio watch rather than mpsc --- src/commands/run.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/commands/run.rs b/src/commands/run.rs index 2435523..a2d26d4 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -2,7 +2,7 @@ use clap::Parser; use pid::Pid; use rumqttc::{AsyncClient, MqttOptions, QoS}; use std::time::Duration; -use tokio::sync::mpsc; +use tokio::sync::watch; use tokio::task; use tokio::time; @@ -33,7 +33,7 @@ impl RunArgs { .await .unwrap(); - let (tx, mut rx) = mpsc::channel(32); + let (tx, mut rx) = watch::channel(0); { let client = client.clone(); @@ -41,17 +41,14 @@ impl RunArgs { task::spawn(async move { let mut interval = time::interval(Duration::from_millis(1000)); - let mut last_input_value: f32 = 0.0; let mut last_output_value: u8 = 0; loop { interval.tick().await; - while let Ok(value) = rx.try_recv() { - last_input_value = value; - } + let last_input_value = *rx.borrow_and_update(); let output = pid - .next_control_output(last_input_value) + .next_control_output(last_input_value as f32) .output .clamp(0.0, 100.0) .round() as u8; @@ -76,8 +73,8 @@ impl RunArgs { let notification = eventloop.poll().await.unwrap(); if let rumqttc::Event::Incoming(rumqttc::Packet::Publish(publish)) = notification { if let Ok(payload) = std::str::from_utf8(&publish.payload) { - if let Ok(value) = payload.parse::() { - tx.send(value).await.unwrap(); + if let Ok(value) = payload.parse::() { + tx.send(value).unwrap(); } } }