Skip to content

Commit

Permalink
update semtech-udp (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
lthiery authored Dec 27, 2022
1 parent b770001 commit bb25b7b
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 142 deletions.
20 changes: 17 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@ publish = false

[dependencies]
anyhow = "1"
config = { version="0.11", default-features = false, features = ["toml"] }
env_logger = "0"
heapless = "0"
hex = "0"
hyper = { version = "0", features = ["full"] }
log = "0"
lorawan = { git = "https://github.com/helium/rust-lorawan.git" }
lorawan-device = { git = "https://github.com/helium/rust-lorawan.git" }
semtech-udp = { version = ">=0.7,<0.8", features=["client"] }
prometheus = "0"
semtech-udp = { git = "https://github.com/helium/semtech-udp.git", features = ["client"] }
serde = "1"
structopt = "0"
thiserror = "1"
config = { version="0.11", default-features=false, features=["toml"]}
rand = "0"
prometheus = "0"
hyper = { version = "0", features = ["full"] }
triggered = "0"

[dependencies.tokio]
version = "1"
Expand Down
4 changes: 4 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,8 @@ pub enum Error {
SemtechUdpClientRuntime(#[from] semtech_udp::client_runtime::Error),
#[error("invalid region string")]
InvalidRegionString(String),
#[error("error sending downlink to upd_radio instance: {0}")]
SendingDownlinkToUdpRadio(mpsc::error::SendError<virtual_device::IntermediateEvent>),
#[error("receive channel from semtech_udp::client_runtime unexpectedly closed")]
RxChannelSemtechUdpClientRuntimeClosed,
}
144 changes: 110 additions & 34 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use log::{debug, error, info, warn};
use metrics::Metrics;
use semtech_udp::client_runtime::UdpRuntime;
use semtech_udp::client_runtime;
use semtech_udp::client_runtime::{ClientRx, ClientTx, UdpRuntime};
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
path::PathBuf,
time::Instant,
};
use structopt::StructOpt;
use tokio::{
sync::mpsc,
time::{sleep, Duration},
};

mod error;
mod metrics;
Expand Down Expand Up @@ -58,9 +63,8 @@ async fn main() -> Result<()> {
} else {
usize::MAX
};

let pf_map = setup_packet_forwarders(settings.packet_forwarder).await?;

let (trigger, trigger_listener) = triggered::trigger();
let mut pf_map = setup_packet_forwarders(settings.packet_forwarder).await?;
for (label, device) in settings.device.into_iter().take(device_limit) {
let packet_forwarder = if let Some(pf) = &device.packet_forwarder {
pf
Expand All @@ -74,43 +78,61 @@ async fn main() -> Result<()> {
&settings.default_server
});

let lorawan_app = virtual_device::VirtualDevice::new(
label.clone(),
instant,
if let Some(pf) = pf_map.get(packet_forwarder) {
pf
} else {
panic!("{} is invalid packet forwarder", packet_forwarder)
},
device.credentials,
metrics_sender,
device.rejoin_frames,
device.secs_between_transmits,
device.secs_between_join_transmits,
device.region,
)
.await?;

tokio::spawn(async move {
if let Err(e) = lorawan_app.run().await {
error!("{} device threw error: {:?}", label, e)
}
});
if let Some((_udp_runtime, client_tx, _client_rx, senders)) =
pf_map.get_mut(packet_forwarder)
{
let (packet_sender, lorawan_app) = virtual_device::VirtualDevice::new(
label.clone(),
instant,
client_tx.clone(),
device.credentials,
metrics_sender,
device.rejoin_frames,
device.secs_between_transmits,
device.secs_between_join_transmits,
device.region,
)
.await?;

senders.push(packet_sender);

tokio::spawn(async move {
if let Err(e) = lorawan_app.run().await {
error!("{} device threw error: {:?}", label, e)
}
});
} else {
panic!("Unknown macaddress linked to device!");
}
}

for (_, runtime) in pf_map {
tokio::spawn(runtime.run());
for (_label, (udp_runtime, _client_tx, client_rx, senders)) in pf_map {
let shutdown_trigger = trigger_listener.clone();
tokio::spawn(udp_runtime.run(shutdown_trigger));
let shutdown_trigger = trigger_listener.clone();
tokio::spawn(packet_muxer(instant, client_rx, senders, shutdown_trigger));
}

tokio::signal::ctrl_c().await?;
trigger.trigger();
info!("User exit via ctrl C");
Ok(())
}

async fn setup_packet_forwarders(
mut packet_forwarder: HashMap<String, settings::PacketForwarder>,
) -> Result<HashMap<String, UdpRuntime>> {
// prune the deafult packet forwarder if we have more than one
) -> Result<
HashMap<
String,
(
UdpRuntime,
ClientTx,
ClientRx,
Vec<virtual_device::PacketSender>,
),
>,
> {
// prune the default packet forwarder if we have more than one
if packet_forwarder.len() != 1 && packet_forwarder.contains_key("default") {
packet_forwarder.remove("default");
}
Expand All @@ -124,14 +146,68 @@ async fn setup_packet_forwarders(
packet_forwarder.host,
outbound.to_string()
);
let udp_runtime = UdpRuntime::new(
packet_forwarder.mac_cloned_into_buf().unwrap(),
outbound,
let (sender, receiver, udp_runtime) = UdpRuntime::new(
packet_forwarder.mac_cloned_into_buf().unwrap().into(),
packet_forwarder.host,
)
.await?;
pf_map.insert(label, udp_runtime);
pf_map.insert(label, (udp_runtime, sender, receiver, vec![]));
}

Ok(pf_map)
}

async fn packet_muxer(
instant: Instant,
mut client_rx: ClientRx,
senders: Vec<virtual_device::PacketSender>,
trigger: triggered::Listener,
) -> Result {
tokio::select!(
_ = trigger => Ok(()),
resp = async move {
loop {
let msg = client_rx.recv().await.ok_or(Error::RxChannelSemtechUdpClientRuntimeClosed)?;
if let client_runtime::Event::DownlinkRequest(downlink) = msg {

if let Some(scheduled_time) = downlink.pull_resp.data.txpk.time.tmst() {
let time = instant.elapsed().as_micros() as u32;
if scheduled_time > time {
let downlink = Box::new(downlink).clone();
let delay = scheduled_time - time;
for sender in &senders {
let sender = sender.clone();
let downlink = downlink.clone();
tokio::spawn(async move {
sleep(Duration::from_micros(delay as u64 + 50_000)).await;
if let Err(e) = sender.send(downlink, delay as u64).await {
error!("Error sending packet to virtual-lorawan-device instance: {e}");
}
});
}
downlink.ack().await?;
} else {
let time_since_scheduled_time = time - scheduled_time;
if time_since_scheduled_time > 1000 {
warn!(
"UDP packet received after tx time by {} ms",
time_since_scheduled_time / 1000
);
} else {
warn!(
"UDP packet received after tx time by {} μs",
time_since_scheduled_time
);
}
downlink.nack(semtech_udp::tx_ack::Error::TooLate).await?;
}
} else {
warn!(
"Unexpected! UDP packet to transmit radio packet immediately"
);
}
}
}
} => resp
)
}
Loading

0 comments on commit bb25b7b

Please sign in to comment.