Skip to content

Commit

Permalink
Use tokio watch rather than mpsc
Browse files Browse the repository at this point in the history
  • Loading branch information
Gordy F committed Nov 12, 2024
1 parent 9749b39 commit 0958975
Showing 1 changed file with 6 additions and 9 deletions.
15 changes: 6 additions & 9 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clap::Parser;
use pid::Pid;
use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::task;
use tokio::time;

Expand Down Expand Up @@ -33,25 +33,22 @@ impl RunArgs {
.await
.unwrap();

let (tx, mut rx) = mpsc::channel(32);
let (tx, mut rx) = watch::channel(0);

{
let client = client.clone();
let output_topic = self.output_topic.clone();

task::spawn(async move {
let mut interval = time::interval(Duration::from_millis(1000));
let mut last_input_value: f32 = 0.0;
let mut last_output_value: u8 = 0;
loop {
interval.tick().await;

while let Ok(value) = rx.try_recv() {
last_input_value = value;
}
let last_input_value = *rx.borrow_and_update();

let output = pid
.next_control_output(last_input_value)
.next_control_output(last_input_value as f32)
.output
.clamp(0.0, 100.0)
.round() as u8;
Expand All @@ -76,8 +73,8 @@ impl RunArgs {
let notification = eventloop.poll().await.unwrap();
if let rumqttc::Event::Incoming(rumqttc::Packet::Publish(publish)) = notification {
if let Ok(payload) = std::str::from_utf8(&publish.payload) {
if let Ok(value) = payload.parse::<f32>() {
tx.send(value).await.unwrap();
if let Ok(value) = payload.parse::<u16>() {
tx.send(value).unwrap();
}
}
}
Expand Down

0 comments on commit 0958975

Please sign in to comment.