From d95d0eb974d4cc3e32352aa519e52b8de73e0273 Mon Sep 17 00:00:00 2001 From: janskiba Date: Mon, 11 Sep 2023 13:17:14 +0000 Subject: [PATCH 01/29] Handle logging via spans --- proxy/src/auth.rs | 8 +-- shared/src/crypto_jwt.rs | 11 +--- shared/src/logger.rs | 8 ++- shared/src/middleware.rs | 121 ++++++--------------------------------- 4 files changed, 31 insertions(+), 117 deletions(-) diff --git a/proxy/src/auth.rs b/proxy/src/auth.rs index 37d79e76..bd65dcc9 100644 --- a/proxy/src/auth.rs +++ b/proxy/src/auth.rs @@ -7,10 +7,10 @@ use axum::{ }; use beam_lib::{AppId, AppOrProxyId}; use shared::{ - config, config_proxy, middleware::ProxyLogger, + config, config_proxy }; -use tracing::debug; +use tracing::{debug, Span}; pub(crate) struct AuthenticatedApp(pub(crate) AppId); @@ -41,9 +41,7 @@ impl FromRequestParts for AuthenticatedApp { return Err(UNAUTH_ERR); } debug!("Request authenticated (ClientID {})", client_id); - _ = parts.extensions.remove::() - .expect("Added by middleware") - .send(AppOrProxyId::App(client_id.clone())); + Span::current().record("from", AppOrProxyId::App(client_id.clone()).hide_broker()); Ok(Self(client_id)) } else { Err(UNAUTH_ERR) diff --git a/shared/src/crypto_jwt.rs b/shared/src/crypto_jwt.rs index b19cf5e6..7f3dc60c 100644 --- a/shared/src/crypto_jwt.rs +++ b/shared/src/crypto_jwt.rs @@ -4,7 +4,6 @@ use crate::{ config_shared::ConfigCrypto, crypto::{self, CryptoPublicPortion}, errors::{CertificateInvalidReason, SamplyBeamError}, - middleware::{LoggingInfo, ProxyLogger}, Msg, MsgEmpty, MsgId, MsgSigned, }; use axum::{async_trait, body::HttpBody, extract::{FromRequest, Request}, http::{header, request::Parts, uri::PathAndQuery, HeaderMap, HeaderName, Method, StatusCode, Uri}, BoxError, RequestExt}; @@ -20,7 +19,7 @@ use once_cell::unsync::Lazy; use openssl::base64; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::Value; -use tracing::{debug, error, warn}; +use tracing::{debug, error, warn, Span}; const ERR_SIG: (StatusCode, &str) = (StatusCode::UNAUTHORIZED, "Signature could not be verified"); // const ERR_CERT: (StatusCode, &str) = (StatusCode::BAD_REQUEST, "Unable to retrieve matching certificate."); @@ -126,7 +125,6 @@ pub const JWT_VERIFICATION_OPTIONS: Lazy = Lazy::new(|| Ver ..Default::default() }); -#[tracing::instrument(skip(token_without_extended_signature))] /// This verifys a Msg from sent to the Broker /// The Message is encoded in the JWT Claims of the body which is a JWT. /// There is never really a [`MsgSigned`] involved in Deserializing the message as the signature is just copied from the body JWT. @@ -165,12 +163,7 @@ pub async fn verify_with_extended_header( ERR_SIG })?; - req.extensions - .remove::() - .expect("Should be set by middleware") - .send(header_claims.custom.from.clone()) - .await - .expect("Receiver still lives in middleware"); + Span::current().record("from", header_claims.custom.from.hide_broker()); // Check extra digest diff --git a/shared/src/logger.rs b/shared/src/logger.rs index c7d3660b..ec983d7a 100644 --- a/shared/src/logger.rs +++ b/shared/src/logger.rs @@ -1,8 +1,14 @@ use tracing::{debug, dispatcher::SetGlobalDefaultError, Level}; +use tracing_subscriber::fmt::format::{debug_fn, self}; #[allow(clippy::if_same_then_else)] // The redundant if-else serves documentation purposes pub fn init_logger() -> Result<(), SetGlobalDefaultError> { - let subscriber = tracing_subscriber::FmtSubscriber::builder().with_max_level(Level::DEBUG); + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .fmt_fields(debug_fn(|w, f, v| match f.name() { + "from" | "message" => write!(w, "{v:?}"), + _ => write!(w, "{f}={v:?} "), + })) + .with_max_level(Level::DEBUG); // TODO: Reduce code complexity. let env_filter = match std::env::var("RUST_LOG") { diff --git a/shared/src/middleware.rs b/shared/src/middleware.rs index d3044c4f..70250118 100644 --- a/shared/src/middleware.rs +++ b/shared/src/middleware.rs @@ -1,111 +1,28 @@ -use std::{ - cell::RefCell, - net::{IpAddr, SocketAddr}, - sync::Arc, -}; - use axum::{ - body::HttpBody, - extract::{ConnectInfo, Request}, - http::{header::HeaderName, Method, StatusCode, Uri}, - middleware::{self, Next}, - response::{IntoResponse, Response}, + extract::Request, + http::StatusCode, + middleware::Next, + response::Response, }; -use tokio::sync::mpsc; -use tracing::{error, info, instrument, span, warn, Level}; - -use beam_lib::AppOrProxyId; - -const X_FORWARDED_FOR: HeaderName = HeaderName::from_static("x-forwarded-for"); - -pub struct LoggingInfo { - // Known from the start - method: Method, - uri: Uri, - ip: IpAddr, - - // Added by SignedMsg extractor - from_proxy: Option, - - // Added after handlers - status_code: Option, -} - -impl LoggingInfo { - fn new(method: Method, uri: Uri, ip: IpAddr) -> Self { - Self { - method, - uri, - ip, - from_proxy: None, - status_code: None, - } - } - - fn set_status_code(&mut self, status: StatusCode) { - self.status_code = Some(status); - } - - fn set_proxy_name(&mut self, proxy: AppOrProxyId) { - self.from_proxy = Some(proxy); - } - - fn get_log(&self) -> String { - let from = self - .from_proxy - .as_ref() - .map(|id| id.hide_broker()) - .unwrap_or(self.ip.to_string()); - format!( - "{} {} {} {}", - from, - self.status_code - .expect("Did not set Statuscode before logging"), - self.method, - self.uri - ) - } -} - -pub type ProxyLogger = mpsc::Sender; +use tracing::{info, warn, info_span, field, Instrument, Span}; pub async fn log( - ConnectInfo(info): ConnectInfo, - mut req: Request, + req: Request, next: Next, ) -> Response { let method = req.method().clone(); let uri = req.uri().clone(); - let ip = get_ip(&req, &info); - - let mut info = LoggingInfo::new(method, uri, ip); - // This channel may or may not receive an AppOrProxyId from verify_with_extended_header - // TODO: Solve this with tracing - let (tx, mut rx) = mpsc::channel(1); - req.extensions_mut().insert(tx); - - let resp = next.run(req).await; - info.set_status_code(resp.status()); - - if let Ok(proxy) = rx.try_recv() { - info.set_proxy_name(proxy); - } - - let line = info.get_log(); - // If we get a gateway timeout we won't log it with log level warn as this happens regularly with the long polling api - if resp.status().is_success() || resp.status().is_informational() || resp.status() == StatusCode::GATEWAY_TIMEOUT { - info!(target: "in", "{}", line); - } else { - warn!(target: "in", "{}", line); - } - resp -} - -fn get_ip(req: &Request, info: &SocketAddr) -> IpAddr { - req.headers() - .get(X_FORWARDED_FOR) - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.split(',').next()) - .and_then(|v| v.parse().ok()) - .unwrap_or(info.ip()) + let span = info_span!("", from = field::Empty); + + async move { + let resp = next.run(req).instrument(Span::current()).await; + let status = resp.status(); + // If we get a gateway timeout we won't log it with log level warn as this happens regularly with the long polling api + if status.is_success() || status.is_informational() || status == StatusCode::GATEWAY_TIMEOUT { + info!(target: "in", "{method} {uri} {status}"); + } else { + warn!(target: "in", "{method} {uri} {status}"); + }; + resp + }.instrument(span).await } From bb57a8948d6b3545458f1056516e1d673154298e Mon Sep 17 00:00:00 2001 From: janskiba Date: Thu, 14 Sep 2023 11:31:35 +0000 Subject: [PATCH 02/29] Beam id api improvements --- beam-lib/src/ids.rs | 66 ++++++++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/beam-lib/src/ids.rs b/beam-lib/src/ids.rs index 34802d7f..60614691 100644 --- a/beam-lib/src/ids.rs +++ b/beam-lib/src/ids.rs @@ -71,18 +71,10 @@ impl AppOrProxyId { self.as_ref().ends_with(other.as_ref()) } - pub fn hide_broker(&self) -> String { + pub fn hide_broker(&self) -> &str { match self { - AppOrProxyId::App(app) => { - let without_broker = strip_broker_id(&app.0).expect("Is valid id"); - without_broker[..without_broker.len() - 1].to_owned() - } - AppOrProxyId::Proxy(proxy) => proxy - .0 - .split_once('.') - .map(|(proxy, _broker)| proxy) - .unwrap_or_default() - .to_string(), + AppOrProxyId::App(app) => app.hide_broker_name(), + AppOrProxyId::Proxy(proxy) => proxy.proxy_name(), } } } @@ -137,7 +129,7 @@ pub(crate) enum BeamIdType { BrokerId, } -macro_rules! impl_new { +macro_rules! impl_id { ($id:ident) => { impl $id { #[cfg(feature = "strict-ids")] @@ -161,11 +153,23 @@ macro_rules! impl_new { self.as_ref().ends_with(other.as_ref()) } } + + impl AsRef for $id { + fn as_ref(&self) -> &str { + &self.0 + } + } + + impl Display for $id { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } + } }; } -impl_new!(AppId); -impl_new!(ProxyId); +impl_id!(AppId); +impl_id!(ProxyId); #[cfg(feature = "strict-ids")] fn get_id_type(id: &str) -> Result { @@ -209,32 +213,32 @@ impl AppId { .expect("AppId should be valid"); ProxyId(proxy_id.to_string()) } -} -impl AsRef for AppId { - fn as_ref(&self) -> &str { - &self.0 + /// Returns the AppId as a string slice without the broker part of the string + /// ## Example + /// app1.proxy1.broker => app1.proxy1 + #[cfg(feature = "strict-ids")] + pub fn hide_broker_name(&self) -> &str { + let without_broker = strip_broker_id(&self.0).expect("Is valid id"); + &without_broker[..without_broker.len() - 1] } } -impl Display for AppId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(&self.0) - } -} #[derive(Debug, Clone, Serialize, PartialEq, Eq, Hash)] pub struct ProxyId(String); -impl AsRef for ProxyId { - fn as_ref(&self) -> &str { - &self.0 - } -} +impl ProxyId { -impl Display for ProxyId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(&self.0) + /// Returns the proxies name without the broker id + /// ## Example + /// proxy1.broker => proxy1 + #[cfg(feature = "strict-ids")] + pub fn proxy_name(&self) -> &str { + self.0 + .split_once('.') + .map(|(proxy, _broker)| proxy) + .expect("This is a valid proxy id") } } From c94bf684667d457591a3cfb77d87a5f7c3aa548e Mon Sep 17 00:00:00 2001 From: janskiba Date: Thu, 14 Sep 2023 11:56:27 +0000 Subject: [PATCH 03/29] Better beam proxy auth logging --- proxy/src/auth.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/proxy/src/auth.rs b/proxy/src/auth.rs index bd65dcc9..d6fdb409 100644 --- a/proxy/src/auth.rs +++ b/proxy/src/auth.rs @@ -10,7 +10,7 @@ use shared::{ config, config_proxy }; -use tracing::{debug, Span}; +use tracing::{debug, Span, debug_span, warn}; pub(crate) struct AuthenticatedApp(pub(crate) AppId); @@ -25,25 +25,30 @@ impl FromRequestParts for AuthenticatedApp { [(header::WWW_AUTHENTICATE, SCHEME)], ); if let Some(auth) = parts.headers.get(header::AUTHORIZATION) { - let auth = auth.to_str().map_err(|_| UNAUTH_ERR)?; - let mut auth = auth.split(' '); - if auth.next().unwrap_or("") != SCHEME { + let auth_str = auth.to_str().map_err(|_| UNAUTH_ERR)?; + let mut auth = auth_str.split(' '); + if auth.next() != Some(SCHEME) { + warn!(auth_str, "Invalid auth scheme"); return Err(UNAUTH_ERR); } - let client_id = auth.next().unwrap_or(""); - let client_id = AppId::new(client_id).map_err(|_| UNAUTH_ERR)?; - let api_key_actual = config::CONFIG_PROXY - .api_keys - .get(&client_id) - .ok_or(UNAUTH_ERR)?; + let Some(client_id) = auth.next().and_then(|s| AppId::new(s).ok()) else { + warn!(auth_str, "Invalid app id"); + return Err(UNAUTH_ERR); + }; + let Some(api_key_actual) = config::CONFIG_PROXY.api_keys.get(&client_id) else { + warn!("App {client_id} not registered in proxy"); + return Err(UNAUTH_ERR); + }; let api_key_claimed = auth.next().ok_or(UNAUTH_ERR)?; if api_key_claimed != api_key_actual { + warn!("App {client_id} provided the wrong api key"); return Err(UNAUTH_ERR); } debug!("Request authenticated (ClientID {})", client_id); Span::current().record("from", AppOrProxyId::App(client_id.clone()).hide_broker()); Ok(Self(client_id)) } else { + warn!("No auth header provided"); Err(UNAUTH_ERR) } } From 39641605bfb8618fbfc152c63fcae867446b1a52 Mon Sep 17 00:00:00 2001 From: janskiba Date: Thu, 14 Sep 2023 13:42:54 +0000 Subject: [PATCH 04/29] ip logging in broker --- shared/src/crypto_jwt.rs | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/shared/src/crypto_jwt.rs b/shared/src/crypto_jwt.rs index 7f3dc60c..e552c8a6 100644 --- a/shared/src/crypto_jwt.rs +++ b/shared/src/crypto_jwt.rs @@ -1,3 +1,5 @@ +use std::net::{SocketAddr, IpAddr}; + use beam_lib::{AppOrProxyId, ProxyId}; use crate::{ config, @@ -6,7 +8,7 @@ use crate::{ errors::{CertificateInvalidReason, SamplyBeamError}, Msg, MsgEmpty, MsgId, MsgSigned, }; -use axum::{async_trait, body::HttpBody, extract::{FromRequest, Request}, http::{header, request::Parts, uri::PathAndQuery, HeaderMap, HeaderName, Method, StatusCode, Uri}, BoxError, RequestExt}; +use axum::{async_trait, body::HttpBody, extract::{{FromRequest, ConnectInfo, FromRequestParts}, Request}, http::{header, request::Parts, uri::PathAndQuery, HeaderMap, HeaderName, Method, StatusCode, Uri}, BoxError, RequestExt}; use jwt_simple::{ claims::JWTClaims, prelude::{ @@ -19,7 +21,7 @@ use once_cell::unsync::Lazy; use openssl::base64; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::Value; -use tracing::{debug, error, warn, Span}; +use tracing::{debug, error, warn, Span, info_span}; const ERR_SIG: (StatusCode, &str) = (StatusCode::UNAUTHORIZED, "Signature could not be verified"); // const ERR_CERT: (StatusCode, &str) = (StatusCode::BAD_REQUEST, "Unable to retrieve matching certificate."); @@ -133,17 +135,20 @@ pub async fn verify_with_extended_header( req: &mut Parts, token_without_extended_signature: &str, ) -> Result, (StatusCode, &'static str)> { + let ip = get_ip(req).await; + // let a = _span.entered(); let token_with_extended_signature = std::str::from_utf8( req.headers .get(header::AUTHORIZATION) .ok_or_else(|| { - warn!("Missing Authorization header (in verify_with_extended_header)"); + warn!(%ip, "Missing Authorization header (in verify_with_extended_header)"); ERR_SIG })? .as_bytes(), ) .map_err(|e| { warn!( + %ip, "Unable to parse existing Authorization header (in verify_with_extended_header): {}", e ); @@ -157,6 +162,7 @@ pub async fn verify_with_extended_header( .await .map_err(|e| { warn!( + %ip, "Unable to extract header JWT: {}. The full JWT was: {}. The header was: {:?}", e, token_with_extended_signature, req ); @@ -306,3 +312,14 @@ pub fn make_extra_fields_digest( from: from.to_owned(), }) } + +async fn get_ip(parts: &mut Parts) -> IpAddr { + let source_ip = ConnectInfo::::from_request_parts(parts, &()).await.expect("The server is configured to keep connect info").0.ip(); + const X_FORWARDED_FOR: HeaderName = HeaderName::from_static("x-forwarded-for"); + parts.headers + .get(X_FORWARDED_FOR) + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.split(',').next()) + .and_then(|v| v.parse().ok()) + .unwrap_or(source_ip) +} From 5fc9eb5cde1ee82e0a5cef986ed6de26f891352e Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 7 Nov 2023 09:29:14 +0000 Subject: [PATCH 05/29] Refactor error handeling --- shared/src/crypto_jwt.rs | 35 ++++++++++++----------------------- 1 file changed, 12 insertions(+), 23 deletions(-) diff --git a/shared/src/crypto_jwt.rs b/shared/src/crypto_jwt.rs index e552c8a6..0d3aee6a 100644 --- a/shared/src/crypto_jwt.rs +++ b/shared/src/crypto_jwt.rs @@ -136,24 +136,17 @@ pub async fn verify_with_extended_header( token_without_extended_signature: &str, ) -> Result, (StatusCode, &'static str)> { let ip = get_ip(req).await; - // let a = _span.entered(); - let token_with_extended_signature = std::str::from_utf8( - req.headers - .get(header::AUTHORIZATION) - .ok_or_else(|| { - warn!(%ip, "Missing Authorization header (in verify_with_extended_header)"); - ERR_SIG - })? - .as_bytes(), - ) - .map_err(|e| { - warn!( - %ip, - "Unable to parse existing Authorization header (in verify_with_extended_header): {}", - e - ); - ERR_SIG - })?; + let token_with_extended_signature = req.headers + .get(header::AUTHORIZATION) + .ok_or_else(|| { + warn!(%ip, "Missing Authorization header"); + ERR_SIG + })? + .to_str() + .map_err(|e| { + warn!(%ip, "Unable to parse existing Authorization header: {e}"); + ERR_SIG + })?; let token_with_extended_signature = token_with_extended_signature.trim_start_matches("SamplyJWT "); @@ -161,11 +154,7 @@ pub async fn verify_with_extended_header( extract_jwt::(token_with_extended_signature) .await .map_err(|e| { - warn!( - %ip, - "Unable to extract header JWT: {}. The full JWT was: {}. The header was: {:?}", - e, token_with_extended_signature, req - ); + warn!(%ip, "Unable to extract header JWT: {e}. The full JWT was: {token_with_extended_signature}"); ERR_SIG })?; From 040639216bb0d28b7c7cc5542b7cc5291b482861 Mon Sep 17 00:00:00 2001 From: janskiba Date: Thu, 8 Aug 2024 13:01:38 +0000 Subject: [PATCH 06/29] chore: Replace `docker-compose` with `docker compose` --- README.md | 2 +- dev/beamdev | 10 +++++----- dev/pki/pki | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 28fe1576..94738bce 100644 --- a/README.md +++ b/README.md @@ -576,7 +576,7 @@ Alternatively, you can run the services in the background and get the logs as fo ```shell ./dev/beamdev start_bg -docker-compose logs -f +docker compose logs -f ``` Confirm that your setup works by running `./dev/test noci`, which runs the tests against your instances. diff --git a/dev/beamdev b/dev/beamdev index 868d36b0..8e67fa03 100755 --- a/dev/beamdev +++ b/dev/beamdev @@ -93,7 +93,7 @@ function build() { function build_docker() { BACK2=$(pwd) cd $SD - docker-compose build --build-arg TARGETARCH=$ARCH + docker compose build --build-arg TARGETARCH=$ARCH cd $BACK2 } @@ -121,7 +121,7 @@ function demo { clean pki/pki setup echo "$VAULT_TOKEN" > ./pki/pki.secret - docker-compose up --no-build --no-recreate --abort-on-container-exit + docker compose up --no-build --no-recreate --abort-on-container-exit } function start_bg { @@ -129,7 +129,7 @@ function start_bg { pki/pki devsetup echo "$VAULT_TOKEN" > ./pki/pki.secret build $@ - docker-compose up --no-build --no-recreate -d + docker compose up --no-build --no-recreate -d for ADDR in $P1 $P2; do TRIES=1 while [ $TRIES -ne 0 ]; do @@ -144,7 +144,7 @@ function start_bg { ((TRIES=TRIES+1)) if [ $TRIES -ge 30 ]; then echo "ERROR: $ADDR not available after 30 seconds. Giving up and printing docker compose logs." - docker-compose logs + docker compose logs exit 5 fi fi @@ -166,7 +166,7 @@ function stop { } function clean { - docker-compose down + docker compose down rm -fv pki/*.pem pki/*.json pki/pki.secret pki/pki clean } diff --git a/dev/pki/pki b/dev/pki/pki index 19ada5c1..1d422b16 100755 --- a/dev/pki/pki +++ b/dev/pki/pki @@ -13,12 +13,12 @@ export BROKER_ID=$(echo $PROXY1_ID | cut -d '.' -f 2-) export VAULT_ADDR=http://127.0.0.1:8200 function start() { - docker-compose up -d --no-build vault + docker compose up -d --no-build vault } function clean() { rm -vf *.pem *.json *.secret - docker-compose down + docker compose down } function create_root_ca() { From ea8957dd67c705c1b988f43cedd3fb0e43a7323e Mon Sep 17 00:00:00 2001 From: janskiba Date: Thu, 8 Aug 2024 12:34:15 +0000 Subject: [PATCH 07/29] refactor: Migrate retry handeling form `backoff` to `tryhard` --- proxy/Cargo.toml | 2 +- proxy/src/main.rs | 79 +++++++++++++++++------------------------------ 2 files changed, 30 insertions(+), 51 deletions(-) diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 711d3e00..37dfa470 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -18,7 +18,7 @@ httpdate = "1.0" # Error handling anyhow = "1" -backoff = { version = "0.4", features = ["tokio"] } +tryhard = "0.5" # Subscriber is imported through shared tracing = "0.1" diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 2b9baba9..921fe3a9 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -1,16 +1,18 @@ #![allow(unused_imports)] +use std::future::Future; use std::time::Duration; use axum::http::{header, HeaderValue, StatusCode}; -use backoff::{future::retry_notify, ExponentialBackoff}; use beam_lib::AppOrProxyId; +use futures::future::Ready; use shared::{reqwest, EncryptedMessage, MsgEmpty, PlainMessage}; use shared::crypto::CryptoPublicPortion; use shared::errors::SamplyBeamError; use shared::http_client::{self, SamplyHttpClient}; use shared::{config, config_proxy::Config}; use tracing::{debug, error, info, warn}; +use tryhard::{backoff_strategies::ExponentialBackoff, RetryFuture, RetryFutureConfig}; use crate::serve_tasks::sign_request; @@ -38,38 +40,18 @@ pub async fn main() -> anyhow::Result<()> { Some(Duration::from_secs(20)), )?; - if let Err(err) = retry_notify( - ExponentialBackoff::default(), - || async { Ok(get_broker_health(&config, &client).await?) }, - |err, dur: Duration| { - warn!( - "Still trying to reach Broker: {}. Retrying in {}s", - err, - dur.as_secs() - ); - }, - ) - .await - { - error!("Giving up reaching Broker: {}", err); + if let Err(err) = retry_notify(|| get_broker_health(&config, &client), |err, dur| { + warn!("Still trying to reach Broker: {err}. Retrying in {}s", dur.as_secs()); + }).await { + error!("Giving up reaching Broker: {err}"); std::process::exit(1); } else { info!("Connected to Broker: {}", &config.broker_uri); } - if let Err(err) = retry_notify( - ExponentialBackoff::default(), - || async { Ok(init_crypto(config.clone(), client.clone()).await?) }, - |err, dur: Duration| { - warn!( - "Still trying to initialize certificate chain: {}. Retrying in {}s", - err, - dur.as_secs() - ); - }, - ) - .await - { + if let Err(err) = retry_notify(|| init_crypto(config.clone(), client.clone()), |err, dur| { + warn!("Still trying to initialize certificate chain: {err}. Retrying in {}s", dur.as_secs()); + }).await { error!("Giving up on initializing certificate chain: {}", err); std::process::exit(1); } else { @@ -81,6 +63,20 @@ pub async fn main() -> anyhow::Result<()> { Ok(()) } +fn retry_notify(f: F, on_error: Cb) -> RetryFuture, &E) -> Ready<()>>> +where + F: FnMut() -> Fut, + Fut: Future>, + Cb: Fn(&E, Duration) + 'static, + +{ + tryhard::retry_fn(f) + .retries(100) + .exponential_backoff(Duration::from_secs(1)) + .max_delay(Duration::from_secs(120)) + .on_retry(Box::new(move |_, b, e| futures::future::ready(on_error(e, b.unwrap_or(Duration::MAX))))) +} + async fn init_crypto(config: Config, client: SamplyHttpClient) -> Result<(), SamplyBeamError> { let private_crypto_proxy = shared::config_shared::load_private_crypto_for_proxy()?; shared::crypto::init_cert_getter(crypto::build_cert_getter( @@ -117,28 +113,11 @@ async fn get_broker_health( let uri = config.broker_uri .join("/v1/health") .expect("Uri to be constructed correctly"); - - let resp = retry_notify( - backoff::ExponentialBackoffBuilder::default() - .with_max_interval(Duration::from_secs(10)) - .with_max_elapsed_time(Some(Duration::from_secs(30))) - .build(), - || async { - Ok(client - .get(uri.clone()) - .header(header::USER_AGENT, HeaderValue::from_static(env!("SAMPLY_USER_AGENT"))) - .send() - .await?) - }, - |err, b: Duration| { - warn!( - "Unable to connect to Broker: {}. Retrying in {}s", - err, - b.as_secs() - ); - }, - ) - .await?; + let resp = client + .get(uri.clone()) + .header(header::USER_AGENT, HeaderValue::from_static(env!("SAMPLY_USER_AGENT"))) + .send() + .await?; match resp.status() { StatusCode::OK => Ok(()), From 1ee3597ea789bd62dc7ce496d21bca6cfa2ec9ae Mon Sep 17 00:00:00 2001 From: janskiba Date: Thu, 8 Aug 2024 12:35:56 +0000 Subject: [PATCH 08/29] fix: retry 502 status without delay in control connection --- proxy/src/main.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 921fe3a9..8adc6bd6 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -11,6 +11,7 @@ use shared::crypto::CryptoPublicPortion; use shared::errors::SamplyBeamError; use shared::http_client::{self, SamplyHttpClient}; use shared::{config, config_proxy::Config}; +use tokio::time::Instant; use tracing::{debug, error, info, warn}; use tryhard::{backoff_strategies::ExponentialBackoff, RetryFuture, RetryFutureConfig}; @@ -131,6 +132,8 @@ async fn get_broker_health( fn spawn_controller_polling(client: SamplyHttpClient, config: Config) { const RETRY_INTERVAL: Duration = Duration::from_secs(60); tokio::spawn(async move { + let mut retries_this_min = 0; + let mut reset_interval = std::pin::pin!(tokio::time::sleep(Duration::from_secs(60))); loop { let body = EncryptedMessage::MsgEmpty(MsgEmpty { from: AppOrProxyId::Proxy(config.proxy_id.clone()), @@ -149,8 +152,15 @@ fn spawn_controller_polling(client: SamplyHttpClient, config: Config) { StatusCode::OK => { // Process control task }, - StatusCode::GATEWAY_TIMEOUT => { - debug!("Connection to broker timed out; retrying."); + status @ (StatusCode::GATEWAY_TIMEOUT | StatusCode::BAD_GATEWAY) => { + if retries_this_min < 10 { + retries_this_min += 1; + debug!("Connection to broker timed out; retrying."); + } else { + warn!("Retried more then 10 times in one minute getting status code: {status}"); + tokio::time::sleep(RETRY_INTERVAL).await; + continue; + } }, other => { warn!("Got unexpected status getting control tasks from broker: {other}"); @@ -166,6 +176,10 @@ fn spawn_controller_polling(client: SamplyHttpClient, config: Config) { tokio::time::sleep(RETRY_INTERVAL).await; } }; + if reset_interval.is_elapsed() { + retries_this_min = 0; + reset_interval.as_mut().reset(Instant::now() + Duration::from_secs(60)); + } } }); } From 7e4ff7eef1de97c118c4b0162da8b80150dd3a84 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 12 Aug 2024 08:42:07 +0000 Subject: [PATCH 09/29] Update dashmap requirement from 5.5 to 6.0 Updates the requirements on [dashmap](https://github.com/xacrimon/dashmap) to permit the latest version. - [Release notes](https://github.com/xacrimon/dashmap/releases) - [Commits](https://github.com/xacrimon/dashmap/compare/v5.5.0...v6.0.1) --- updated-dependencies: - dependency-name: dashmap dependency-type: direct:production ... Signed-off-by: dependabot[bot] --- broker/Cargo.toml | 2 +- proxy/Cargo.toml | 2 +- shared/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 1d462f40..b8ebf746 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -16,7 +16,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" axum = { version = "0.7", features = [ "query" ] } #axum-macros = "0.3.7" -dashmap = "5.4" +dashmap = "6.0" anyhow = "1" thiserror = "1" diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 711d3e00..64595adc 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -38,7 +38,7 @@ async-stream = "0.3" # Socket dependencies chacha20poly1305 = { version = "0.10", features = ["stream"], optional = true } -dashmap = { version = "5.5", optional = true} +dashmap = { version = "6.0", optional = true} hyper = { version = "1", default-features = false, optional = true } hyper-util = { version = "0.1", default-features = false, features = ["tokio"], optional = true} diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 32534a14..14508ca5 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -49,7 +49,7 @@ fundu = "2.0" regex = "1" # expire map dependencies -dashmap = { version = "5.4", optional = true} +dashmap = { version = "6.0", optional = true} beam-lib = { workspace = true } From bf7ef35d011bd55c8f3622c6a4756d553a55270d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 12 Aug 2024 08:44:20 +0000 Subject: [PATCH 10/29] Update itertools requirement from 0.12.0 to 0.13.0 Updates the requirements on [itertools](https://github.com/rust-itertools/itertools) to permit the latest version. - [Changelog](https://github.com/rust-itertools/itertools/blob/master/CHANGELOG.md) - [Commits](https://github.com/rust-itertools/itertools/compare/v0.12.0...v0.13.0) --- updated-dependencies: - dependency-name: itertools dependency-type: direct:production ... Signed-off-by: dependabot[bot] --- shared/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 32534a14..21167c36 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -33,7 +33,7 @@ rsa = "0.9" sha2 = "0.10" openssl = "0.10" chacha20poly1305 = "0.10" -itertools = "0.12.0" +itertools = "0.13.0" jwt-simple = "0.11" # Global variables From 7920fef2d4ea685ee986e62dbc48554ede75947f Mon Sep 17 00:00:00 2001 From: janskiba Date: Mon, 12 Aug 2024 11:27:13 +0000 Subject: [PATCH 11/29] fix: Better timout timings --- broker/src/health.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/broker/src/health.rs b/broker/src/health.rs index 3b0a19ac..b9c115e5 100644 --- a/broker/src/health.rs +++ b/broker/src/health.rs @@ -57,7 +57,8 @@ pub struct Health { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ProxyStatus { - last_active: SystemTime, + last_connect: SystemTime, + last_disconnect: Option, #[serde(skip)] connections: u8, } @@ -68,18 +69,27 @@ impl ProxyStatus { } pub fn disconnect(&mut self) { + self.last_disconnect = Some(SystemTime::now()); self.connections -= 1; } pub fn connect(&mut self) { self.connections += 1; - self.last_active = SystemTime::now(); + self.last_connect = SystemTime::now(); + } + + pub fn _last_seen(&self) -> SystemTime { + if self.online() { + SystemTime::now() + } else { + self.last_disconnect.expect("Should always exist as the proxy is not online") + } } } impl ProxyStatus { pub fn new() -> ProxyStatus { - ProxyStatus { last_active: SystemTime::now(), connections: 1 } + ProxyStatus { last_connect: SystemTime::now(), connections: 1, last_disconnect: None } } } From 9d031e6f4d1e3f795312942c50d21deac5becc23 Mon Sep 17 00:00:00 2001 From: janskiba Date: Thu, 24 Oct 2024 12:38:35 +0000 Subject: [PATCH 12/29] fix(sockets): return right headers for request upgrade --- broker/src/serve_sockets.rs | 7 +++++-- proxy/src/serve_sockets.rs | 16 +++++++++++----- tests/src/socket_test.rs | 30 ++++++++++++++++++++---------- 3 files changed, 36 insertions(+), 17 deletions(-) diff --git a/broker/src/serve_sockets.rs b/broker/src/serve_sockets.rs index e941eb23..8a39c72f 100644 --- a/broker/src/serve_sockets.rs +++ b/broker/src/serve_sockets.rs @@ -1,6 +1,6 @@ use std::{sync::Arc, collections::{HashMap, HashSet}, ops::Deref, time::Duration}; -use axum::{extract::{Path, Request, State}, http::{header, request::Parts, StatusCode}, response::{IntoResponse, Response}, routing::get, RequestExt, Router}; +use axum::{extract::{Path, Request, State}, http::{header, request::Parts, HeaderValue, StatusCode}, response::{IntoResponse, Response}, routing::get, RequestExt, Router}; use bytes::BufMut; use hyper_util::rt::TokioIo; use serde::{Serialize, Serializer, ser::SerializeSeq}; @@ -131,5 +131,8 @@ async fn connect_socket( } }); } - Err(StatusCode::SWITCHING_PROTOCOLS) + Ok(([ + (header::UPGRADE, HeaderValue::from_static("tcp")), + (header::CONNECTION, HeaderValue::from_static("upgrade")) + ], StatusCode::SWITCHING_PROTOCOLS).into_response()) } diff --git a/proxy/src/serve_sockets.rs b/proxy/src/serve_sockets.rs index 602eea93..92a95b20 100644 --- a/proxy/src/serve_sockets.rs +++ b/proxy/src/serve_sockets.rs @@ -7,7 +7,7 @@ use std::{ }; use axum::{ - extract::{Path, Request, State}, http::{self, StatusCode}, response::{IntoResponse, Response}, routing::{get, post}, Extension, Json, RequestPartsExt, Router + extract::{Path, Request, State}, http::{self, header, HeaderValue, StatusCode}, response::{IntoResponse, Response}, routing::{get, post}, Extension, Json, RequestPartsExt, Router }; use bytes::{Buf, BufMut, BytesMut}; use chacha20poly1305::{ @@ -193,7 +193,8 @@ async fn connect_socket( return StatusCode::INTERNAL_SERVER_ERROR.into_response(); } }; - *get_socket_con_req.headers_mut() = req.headers().clone(); + get_socket_con_req.headers_mut().insert(header::CONNECTION, HeaderValue::from_static("upgrade")); + get_socket_con_req.headers_mut().insert(header::UPGRADE, HeaderValue::from_static("tcp")); let mut res = match forward_request(get_socket_con_req, &state.config, &sender, &state.client).await { @@ -207,8 +208,10 @@ async fn connect_socket( let broker_conn = match res.extensions_mut().remove::() { Some(other_conn) if res.status() == StatusCode::SWITCHING_PROTOCOLS => other_conn, _ => { - warn!("Failed to create an upgradable connection to the broker. Response was: {res:?}"); - return res.status().into_response(); + let s = res.status(); + let res = res.text().await.unwrap_or_else(|_| "".into()); + warn!("Failed to create an upgradable connection to the broker. {s}: {res}"); + return s.into_response(); } }; @@ -232,7 +235,10 @@ async fn connect_socket( } }); - StatusCode::SWITCHING_PROTOCOLS.into_response() + ([ + (header::UPGRADE, HeaderValue::from_static("tcp")), + (header::CONNECTION, HeaderValue::from_static("upgrade")) + ], StatusCode::SWITCHING_PROTOCOLS).into_response() } #[derive(Debug, Clone, Copy)] diff --git a/tests/src/socket_test.rs b/tests/src/socket_test.rs index 9455ba97..46675b22 100644 --- a/tests/src/socket_test.rs +++ b/tests/src/socket_test.rs @@ -1,33 +1,43 @@ +use std::time::Duration; + +use beam_lib::{BlockingOptions, MsgId}; use rand::RngCore; use tokio::io::{AsyncWriteExt, AsyncReadExt, AsyncRead, AsyncWrite}; use anyhow::Result; use crate::*; async fn test_connection(mut a: T, mut b: T) -> Result<()> { - const N: usize = 2_usize.pow(13); - let test_data: &mut [u8; N] = &mut [0; N]; - rand::thread_rng().fill_bytes(test_data); - let mut read_buf = [0; N]; - a.write_all(test_data).await?; - a.flush().await?; - b.read_exact(&mut read_buf).await?; - assert_eq!(test_data, &read_buf); + const N: usize = 2_usize.pow(8); + for _ in 0..10 { + let test_data: &mut [u8; N] = &mut [0; N]; + rand::thread_rng().fill_bytes(test_data); + let mut read_buf = [0; N]; + a.write_all(test_data).await?; + a.flush().await?; + b.read_exact(&mut read_buf).await?; + assert_eq!(test_data, &read_buf); + tokio::time::sleep(Duration::from_millis(500)).await; + } Ok(()) } #[tokio::test] async fn test_full() -> Result<()> { + let id = MsgId::new(); + let id_str = id.to_string(); let metadata = serde_json::json!({ "foo": vec![1, 2, 3], + "id": id }); let app1 = async { CLIENT1.create_socket_with_metadata(&APP2, &metadata).await.map_err(anyhow::Error::from) }; let app2 = async { let task = CLIENT2 - .get_socket_tasks(&beam_lib::BlockingOptions::from_count(1)) + .get_socket_tasks(&BlockingOptions::from_time(Duration::from_secs(1))) .await? - .pop() + .into_iter() + .find(|t| t.metadata["id"].as_str() == Some(&id_str)) .ok_or(anyhow::anyhow!("Failed to get a socket task"))?; assert_eq!(&task.metadata, &metadata); Ok(CLIENT2.connect_socket(&task.id).await?) From 99a3aab591ab58599abc70bbf656830f41f4e85d Mon Sep 17 00:00:00 2001 From: janskiba Date: Wed, 11 Sep 2024 16:31:57 +0000 Subject: [PATCH 13/29] fix(tests): fix issue with static reqwest clients --- tests/src/lib.rs | 8 ++++++-- tests/src/socket_test.rs | 8 ++++---- tests/src/task_test.rs | 10 +++++----- tests/src/test_sse.rs | 4 ++-- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/tests/src/lib.rs b/tests/src/lib.rs index 4ed220b6..68c14973 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -35,8 +35,12 @@ pub const APP_KEY: &str = match option_env!("APP_KEY") { None => "App1Secret" }; -pub static CLIENT1: Lazy = Lazy::new(|| BeamClient::new(&APP1, APP_KEY, PROXY1.parse().unwrap())); -pub static CLIENT2: Lazy = Lazy::new(|| BeamClient::new(&APP2, APP_KEY, PROXY2.parse().unwrap())); +pub fn client1() -> BeamClient { + BeamClient::new(&APP1, APP_KEY, PROXY1.parse().unwrap()) +} +pub fn client2() -> BeamClient { + BeamClient::new(&APP2, APP_KEY, PROXY2.parse().unwrap()) +} #[tokio::test] async fn test_time_out() { diff --git a/tests/src/socket_test.rs b/tests/src/socket_test.rs index 46675b22..e49723a6 100644 --- a/tests/src/socket_test.rs +++ b/tests/src/socket_test.rs @@ -30,17 +30,17 @@ async fn test_full() -> Result<()> { "id": id }); let app1 = async { - CLIENT1.create_socket_with_metadata(&APP2, &metadata).await.map_err(anyhow::Error::from) + client1().create_socket_with_metadata(&APP2, &metadata).await.map_err(anyhow::Error::from) }; let app2 = async { - let task = CLIENT2 - .get_socket_tasks(&BlockingOptions::from_time(Duration::from_secs(1))) + let task = client2() + .get_socket_tasks(&beam_lib::BlockingOptions::from_count(1)) .await? .into_iter() .find(|t| t.metadata["id"].as_str() == Some(&id_str)) .ok_or(anyhow::anyhow!("Failed to get a socket task"))?; assert_eq!(&task.metadata, &metadata); - Ok(CLIENT2.connect_socket(&task.id).await?) + Ok(client2().connect_socket(&task.id).await?) }; let (app1, app2) = tokio::try_join!(app1, app2)?; diff --git a/tests/src/task_test.rs b/tests/src/task_test.rs index 6fd95bb8..f206f434 100644 --- a/tests/src/task_test.rs +++ b/tests/src/task_test.rs @@ -6,7 +6,7 @@ use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use tokio::sync::oneshot; -use crate::{CLIENT1, APP1, APP2, CLIENT2}; +use crate::{client1, APP1, APP2, client2}; #[tokio::test] async fn test_full_task_cycle() -> Result<()> { @@ -52,7 +52,7 @@ async fn test_task_claiming() -> Result<()> { pub async fn post_task(body: T) -> Result { let id = MsgId::new(); - CLIENT1.post_task(&TaskRequest { + client1().post_task(&TaskRequest { id, from: APP1.clone(), to: vec![APP2.clone()], @@ -65,7 +65,7 @@ pub async fn post_task(body: T) -> Result { } pub async fn poll_task(expected_id: MsgId) -> Result> { - CLIENT2.poll_pending_tasks::(&BlockingOptions::from_time(Duration::from_secs(1))) + client2().poll_pending_tasks::(&BlockingOptions::from_time(Duration::from_secs(1))) .await? .into_iter() .find(|t| t.id == expected_id) @@ -77,14 +77,14 @@ pub async fn poll_task(expected_id: MsgId) -> Res } pub async fn poll_result(task_id: MsgId, block: &BlockingOptions) -> Result> { - CLIENT1.poll_results(&task_id, block) + client1().poll_results(&task_id, block) .await? .pop() .ok_or(anyhow::anyhow!("Got no task")) } pub async fn put_result(task_id: MsgId, body: T, status: Option) -> Result<()> { - CLIENT2.put_result(&TaskResult { + client2().put_result(&TaskResult { from: APP2.clone(), to: vec![APP1.clone()], task: task_id, diff --git a/tests/src/test_sse.rs b/tests/src/test_sse.rs index f994ae4f..2c0fff5d 100644 --- a/tests/src/test_sse.rs +++ b/tests/src/test_sse.rs @@ -6,12 +6,12 @@ use beam_lib::TaskResult; use futures::{StreamExt, TryStreamExt}; use reqwest::{header::{self, HeaderValue}, Method}; -use crate::{CLIENT1, task_test}; +use crate::{client1, task_test}; #[tokio::test] async fn test_sse() -> Result<()> { let id = task_test::post_task("test").await?; - let res = CLIENT1 + let res = client1() .raw_beam_request( Method::GET, &format!("v1/tasks/{id}/results?wait_count=1"), From 81b1fe2c0daacfbbc1718fc0f7fb4131649e5364 Mon Sep 17 00:00:00 2001 From: Jan <59206115+Threated@users.noreply.github.com> Date: Mon, 4 Nov 2024 14:20:47 +0100 Subject: [PATCH 14/29] fix(broker): increase result channel size to prevent lag (#212) --- broker/src/task_manager.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/broker/src/task_manager.rs b/broker/src/task_manager.rs index 0b7d607c..475e442d 100644 --- a/broker/src/task_manager.rs +++ b/broker/src/task_manager.rs @@ -179,7 +179,9 @@ impl + Task + Msg> TaskManager { } let max_receivers = task.get_to().len(); self.tasks.insert(id.clone(), task); - let (results_sender, _) = broadcast::channel(1.max(max_receivers)); + // Create a large enough buffer that all receivers can at least create one claimed result and a successfull result + // while the receiver channel is not being polled filling up the buffer and causing the channel to lag + let (results_sender, _) = broadcast::channel(1.max(max_receivers) * 2); self.new_results.insert(id.clone(), results_sender); // We dont care if noone is listening _ = self.new_tasks.send(id); From f32b58bf7d422214dd67319213d0b8ebc7bed13a Mon Sep 17 00:00:00 2001 From: janskiba Date: Mon, 4 Nov 2024 13:39:16 +0000 Subject: [PATCH 15/29] fix(sockets): correctly index into encrypt and decrypt buffers --- broker/src/serve_sockets.rs | 1 + proxy/src/serve_sockets.rs | 26 +++++++++++++------------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/broker/src/serve_sockets.rs b/broker/src/serve_sockets.rs index 8a39c72f..d6cbf3ef 100644 --- a/broker/src/serve_sockets.rs +++ b/broker/src/serve_sockets.rs @@ -99,6 +99,7 @@ async fn connect_socket( } let Some(conn) = parts.extensions.remove::() else { + warn!("Failed to upgrade connection: {:#?}", parts.headers); return Err(StatusCode::UPGRADE_REQUIRED); }; diff --git a/proxy/src/serve_sockets.rs b/proxy/src/serve_sockets.rs index 92a95b20..1b310751 100644 --- a/proxy/src/serve_sockets.rs +++ b/proxy/src/serve_sockets.rs @@ -312,17 +312,14 @@ impl DecryptorCodec { } impl EncryptorCodec { - #[inline] - fn tag_overhead() -> usize { - ::TagSize::to_usize() - } + const TAG_SIZE: usize = ::TagSize::USIZE; } impl Encoder<&[u8]> for EncryptorCodec { type Error = io::Error; fn encode(&mut self, item: &[u8], dst: &mut BytesMut) -> Result<(), Self::Error> { - let mut enc_buf = EncBuffer::new(dst, item.len() + Self::tag_overhead()); + let mut enc_buf = EncBuffer::new(dst, (item.len() + Self::TAG_SIZE).try_into().expect("item to large")); enc_buf.extend_from_slice(item).expect("Infallible"); self.encryptor .encrypt_next_in_place(b"", &mut enc_buf) @@ -349,7 +346,7 @@ impl Decoder for DecryptorCodec { let plain = self .decryptor - .decrypt_next(&src[Self::SIZE_OVERHEAD..]) + .decrypt_next(&src[Self::SIZE_OVERHEAD..(Self::SIZE_OVERHEAD + size as usize)]) .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Decryption failed"))?; src.advance(total_frame_size); Ok(Some(plain)) @@ -358,17 +355,20 @@ impl Decoder for DecryptorCodec { struct EncBuffer<'a> { buf: &'a mut BytesMut, + /// The index from which the inplace encryption takes place + enc_idx: usize, } impl<'a> EncBuffer<'a> { - fn new(buffer: &'a mut BytesMut, content_len: usize) -> Self { - buffer.reserve(content_len + Self::SIZE_OVERHEAD); - buffer.extend_from_slice(&u32::to_le_bytes(content_len as u32)); - Self { buf: buffer } + fn new(buffer: &'a mut BytesMut, content_len: u32) -> Self { + let enc_idx = buffer.len() + Self::SIZE_OVERHEAD; + buffer.reserve(content_len as usize + Self::SIZE_OVERHEAD); + buffer.extend_from_slice(&u32::to_le_bytes(content_len)); + Self { buf: buffer, enc_idx } } /// Reserved for size of msg - const SIZE_OVERHEAD: usize = 4; + const SIZE_OVERHEAD: usize = (u32::BITS / 8) as usize; } impl<'a> Buffer for EncBuffer<'a> { @@ -386,13 +386,13 @@ impl<'a> Buffer for EncBuffer<'a> { impl<'a> AsRef<[u8]> for EncBuffer<'a> { fn as_ref(&self) -> &[u8] { - &self.buf[Self::SIZE_OVERHEAD..] + &self.buf[self.enc_idx..] } } impl<'a> AsMut<[u8]> for EncBuffer<'a> { fn as_mut(&mut self) -> &mut [u8] { - &mut self.buf[Self::SIZE_OVERHEAD..] + &mut self.buf[self.enc_idx..] } } From aa7ce7dbc5f4149713faf767cc5d812b54c6289c Mon Sep 17 00:00:00 2001 From: Jan <59206115+Threated@users.noreply.github.com> Date: Fri, 8 Nov 2024 12:03:37 +0100 Subject: [PATCH 16/29] fix(sockets): correctly handle pending writes to prevent double write (#214) --- proxy/Cargo.toml | 3 ++ proxy/src/serve_sockets.rs | 57 +++++++++++++++++++++++--------------- 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 3afb9469..4fdb4575 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -47,3 +47,6 @@ sockets = ["dep:chacha20poly1305", "dep:dashmap", "tokio-util/codec", "tokio-uti [build-dependencies] build-data = "0" + +[dev-dependencies] +rand = "0.8.5" diff --git a/proxy/src/serve_sockets.rs b/proxy/src/serve_sockets.rs index 1b310751..681cf164 100644 --- a/proxy/src/serve_sockets.rs +++ b/proxy/src/serve_sockets.rs @@ -1,9 +1,5 @@ use std::{ - io::{self, Write}, - ops::{Deref, DerefMut}, - pin::Pin, - task::Poll, - time::{Duration, SystemTime, Instant}, sync::Arc, + io::{self, Write}, ops::{Deref, DerefMut}, pin::Pin, sync::{atomic::AtomicUsize, Arc}, task::Poll, time::{Duration, Instant, SystemTime} }; use axum::{ @@ -33,7 +29,7 @@ use shared::{ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf, ReadHalf, WriteHalf}; use tokio_util::{ codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite}, - compat::{Compat, FuturesAsyncReadCompatExt}, + compat::{Compat, FuturesAsyncReadCompatExt}, io::SinkWriter, }; use tracing::{warn, debug}; @@ -296,7 +292,7 @@ impl DerefMut for SocketEncKey { struct EncryptedSocket { // inner: Framed, read: Compat, DecryptorCodec>>>, - write: FramedWrite, EncryptorCodec>, + write: SinkWriter, EncryptorCodec>>, } struct EncryptorCodec { @@ -380,7 +376,8 @@ impl<'a> Buffer for EncBuffer<'a> { // This should only be called when decrypting fn truncate(&mut self, len: usize) { - self.buf.truncate(len) + warn!("Buffer got truncated. This should never happen as it should be perfectly sized"); + self.buf.truncate(self.enc_idx + len) } } @@ -415,7 +412,7 @@ impl EncryptedSocket { let (r, w) = tokio::io::split(inner); let read = FramedRead::new(r, DecryptorCodec { decryptor }); let read = read.into_async_read().compat(); - let write = FramedWrite::new(w, EncryptorCodec { encryptor }); + let write = SinkWriter::new(FramedWrite::new(w, EncryptorCodec { encryptor })); Ok(Self { read, write }) } @@ -437,27 +434,28 @@ impl AsyncWrite for EncryptedSocket { cx: &mut std::task::Context<'_>, buf: &[u8], ) -> std::task::Poll> { - self.write.send(buf).poll_unpin(cx).map_ok(|_| buf.len()) + Pin::new(&mut self.write).poll_write(cx, buf) } fn poll_flush( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - self.write.poll_flush_unpin(cx) + Pin::new(&mut self.write).poll_flush(cx) } fn poll_shutdown( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - self.write.poll_close_unpin(cx) + Pin::new(&mut self.write).poll_shutdown(cx) } } #[cfg(test)] mod tests { use chacha20poly1305::aead::stream::{Decryptor, Encryptor, EncryptorLE31}; + use rand::Rng; use tokio::net::{TcpListener, TcpStream}; use super::*; @@ -466,20 +464,35 @@ mod tests { async fn test_encryption() { let mut key = GenericArray::default(); OsRng.fill_bytes(&mut key); - const N: usize = 2_usize.pow(13); - let test_data: &mut [u8; N] = &mut [0; N]; - OsRng.fill_bytes(test_data); - let mut read_buf = [0; N]; + let data: Arc> = (0..13337).map(|_| { + let mut chunk = vec![0; OsRng.gen_range(1..9999)]; + OsRng.fill_bytes(&mut chunk); + chunk + }).collect::>().into(); start_test_broker().await; let (mut client1, mut client2) = tokio::join!(client(&key), client(&key)); - client1.write_all(test_data).await.unwrap(); - client2.read_exact(&mut read_buf).await.unwrap(); - assert_eq!(test_data, &read_buf); - client2.write_all(test_data).await.unwrap(); - client1.read_exact(&mut read_buf).await.unwrap(); - assert_eq!(test_data, &read_buf); + let data_cp = data.clone(); + let a = tokio::spawn(async move { + for c in data_cp.iter() { + client1.write_all(&c).await.unwrap(); + client1.flush().await.unwrap(); + } + }); + let data_cp = data.clone(); + let b = tokio::spawn(async move { + for (i, c) in data_cp.iter().enumerate() { + let mut buf = vec![0; c.len()]; + client2.read_exact(&mut buf).await.unwrap(); + if &buf != c { + let mut remaining = Vec::new(); + client2.read_to_end(&mut remaining).await.unwrap(); + assert_eq!(&buf, c, "{i}: {remaining:?}"); + } + } + }); + tokio::try_join!(a, b).unwrap(); } async fn start_test_broker() { From b52a1b4c2183f4bb17212b1e8e67b075db3961ae Mon Sep 17 00:00:00 2001 From: Jan <59206115+Threated@users.noreply.github.com> Date: Fri, 15 Nov 2024 15:47:03 +0100 Subject: [PATCH 17/29] chore: print more context for connection error (#217) --- shared/src/errors.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shared/src/errors.rs b/shared/src/errors.rs index 2fc2d29b..249b5abf 100644 --- a/shared/src/errors.rs +++ b/shared/src/errors.rs @@ -37,7 +37,7 @@ pub enum SamplyBeamError { ConfigurationFailed(String), #[error("Internal synchronization error: {0}")] InternalSynchronizationError(String), - #[error("Error executing HTTP request: {0}")] + #[error("Error executing HTTP request: {0:?}")] HttpRequestError(#[from] reqwest::Error), // #[error("Error building HTTP request: {0}")] // HttpRequestBuildError(#[from] http::Error), From 3d6e1cad75c9de6bcd0f7a6682836268f207c1b1 Mon Sep 17 00:00:00 2001 From: Martin Lablans <6804500+lablans@users.noreply.github.com> Date: Mon, 25 Nov 2024 10:34:04 +0100 Subject: [PATCH 18/29] Disable daily rebuild (#218) --- .github/workflows/rust.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 4bdd52c9..7bd5493c 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -4,9 +4,6 @@ on: push: workflow_dispatch: pull_request: - schedule: - # Fetch new base image updates every night at 1am - - cron: '0 1 * * *' jobs: build-with-samply: From c3049c77b12776b288ef447c88564d77db084819 Mon Sep 17 00:00:00 2001 From: Andreas Date: Wed, 18 Dec 2024 10:18:12 +0100 Subject: [PATCH 19/29] Show correct authentication header (#219) --- dev/beamdev | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dev/beamdev b/dev/beamdev index 8e67fa03..e0c10519 100755 --- a/dev/beamdev +++ b/dev/beamdev @@ -158,7 +158,8 @@ function defaults { echo "Broker $BROKER_ID at http://localhost:8080 (this would not be valid in production)" echo "$PROXY1_ID at $P1" echo "$PROXY2_ID at $P2" - echo "Authenticate via header: [ Authorization: ApiKey $APP_ID_SHORT.$PROXY1_ID $APP_KEY ]" + echo "Authenticate via header: [ Authorization: ApiKey $APP1_ID_SHORT.$PROXY1_ID $APP_KEY ]" + echo "Authenticate via header: [ Authorization: ApiKey $APP1_ID_SHORT.$PROXY2_ID $APP_KEY ]" } function stop { From cb06ca98dc444a24c7173b8e6b4882df61903c21 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 18 Dec 2024 10:20:25 +0100 Subject: [PATCH 20/29] chore(deps): update thiserror from 1 to 2 (#216) Updates the requirements on [thiserror](https://github.com/dtolnay/thiserror) to permit the latest version. - [Release notes](https://github.com/dtolnay/thiserror/releases) - [Commits](https://github.com/dtolnay/thiserror/compare/1.0.0...2.0.3) --- updated-dependencies: - dependency-name: thiserror dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- beam-lib/Cargo.toml | 2 +- broker/Cargo.toml | 2 +- shared/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/beam-lib/Cargo.toml b/beam-lib/Cargo.toml index 3d82262c..bb1348d6 100644 --- a/beam-lib/Cargo.toml +++ b/beam-lib/Cargo.toml @@ -10,7 +10,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" uuid = { version = "1", features = ["v4", "serde"] } reqwest = { version = "0.12", features = ["json"], default-features = false, optional = true } -thiserror = { version = "1.0", optional = true } +thiserror = { version = "2.0", optional = true } [features] strict-ids = [] diff --git a/broker/Cargo.toml b/broker/Cargo.toml index b8ebf746..5fd6554e 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -19,7 +19,7 @@ axum = { version = "0.7", features = [ "query" ] } dashmap = "6.0" anyhow = "1" -thiserror = "1" +thiserror = "2" # Subscriber is setup through shared tracing = "0.1" diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 1d9caa4e..a92a1864 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -40,7 +40,7 @@ jwt-simple = "0.11" once_cell = "1" # Error handling -thiserror = "1" +thiserror = "2" # Command Line Interface clap = { version = "4", features = ["env", "derive"] } From 47c0266f028c1a9cbeed454c55f95fb7f3db763c Mon Sep 17 00:00:00 2001 From: Jan <59206115+Threated@users.noreply.github.com> Date: Tue, 14 Jan 2025 11:22:52 +0100 Subject: [PATCH 21/29] fix: dont override successfull result with claimed (#224) --- broker/src/task_manager.rs | 19 ++++++++++++++----- tests/src/task_test.rs | 15 +++++++++++++++ 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/broker/src/task_manager.rs b/broker/src/task_manager.rs index 475e442d..05bad264 100644 --- a/broker/src/task_manager.rs +++ b/broker/src/task_manager.rs @@ -1,7 +1,5 @@ use std::{ - borrow::Cow, - ops::Deref, - time::{Duration, SystemTime}, collections::HashMap, sync::Arc, convert::Infallible, + borrow::Cow, collections::{hash_map::Entry, HashMap}, convert::Infallible, ops::Deref, sync::Arc, time::{Duration, SystemTime} }; use axum::{response::{IntoResponse, sse::Event, Sse}, Json, http::StatusCode}; @@ -35,7 +33,18 @@ impl Task for MsgTaskRequest { type Result = MsgSigned>; fn insert_result(&mut self, result: Self::Result) -> bool { - self.results.insert(result.get_from().clone(), result).is_some() + match self.results.entry(result.get_from().clone()) { + // Don't override a successful result. See tests::task_test::test_claim_after_success for more details + Entry::Occupied(prev) if prev.get().msg.status == WorkStatus::Succeeded && result.msg.status == WorkStatus::Claimed => false, + Entry::Occupied(mut prev) => { + prev.insert(result); + true + }, + Entry::Vacant(empty) => { + empty.insert(result); + false + }, + } } fn get_results(&self) -> &HashMap { @@ -179,7 +188,7 @@ impl + Task + Msg> TaskManager { } let max_receivers = task.get_to().len(); self.tasks.insert(id.clone(), task); - // Create a large enough buffer that all receivers can at least create one claimed result and a successfull result + // Create a large enough buffer that all receivers can at least create one claimed result and a successful result // while the receiver channel is not being polled filling up the buffer and causing the channel to lag let (results_sender, _) = broadcast::channel(1.max(max_receivers) * 2); self.new_results.insert(id.clone(), results_sender); diff --git a/tests/src/task_test.rs b/tests/src/task_test.rs index f206f434..a7edd820 100644 --- a/tests/src/task_test.rs +++ b/tests/src/task_test.rs @@ -50,6 +50,21 @@ async fn test_task_claiming() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_claim_after_success() -> Result<()> { + // We dont want to update a successful result to claimed which is almost always a http race condition where we select on claiming and answerering a task at the same time. + // Example: + // We might claim a task and have not gotten a response yet so the future is still not completed and might be at some unfair proxy. + // In parallel we are computing the result of that task and finished it so we drop the future thats waiting on the response and imidiatly send the successful result. + // This result might end up arriving before the request that claims the task so when the claiming request arrived we should not override the result. + let id = post_task(()).await?; + put_result(id, (), Some(WorkStatus::Succeeded)).await?; + put_result(id, (), Some(WorkStatus::Claimed)).await?; + let res = tokio::time::timeout(Duration::from_secs(10), poll_result::<()>(id, &BlockingOptions::from_count(1))).await??; + assert_eq!(res.status, WorkStatus::Succeeded); + Ok(()) +} + pub async fn post_task(body: T) -> Result { let id = MsgId::new(); client1().post_task(&TaskRequest { From f8fee4f238fe1ef11900f9bd6c6ac2eebdf3cee4 Mon Sep 17 00:00:00 2001 From: Jan <59206115+Threated@users.noreply.github.com> Date: Wed, 15 Jan 2025 08:34:44 +0100 Subject: [PATCH 22/29] docs: explain weird `wait_count` behavior (#225) --- README.md | 4 ++-- tests/src/task_test.rs | 9 +++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 94738bce..0e0b804b 100644 --- a/README.md +++ b/README.md @@ -399,13 +399,13 @@ Date: Mon, 27 Jun 2022 14:26:45 GMT As part of making this API performant, all reading endpoints support long-polling as an efficient alternative to regular (repeated) polling. Using this function requires the following parameters: -- `wait_count`: The API call will block until this many results are available ... +- `wait_count`: The API call will block until at least this many results are available. If there are more matching tasks/results avalible all of them will be returned. - `wait_time`: ... or this time has passed (if not stated differently, e.g., by adding 'm', 'h', 'ms', ..., this is interpreted as seconds), whichever comes first. For example, retrieving a task's results: - `GET /v1/tasks//results` will return immediately with however many results are available, -- `GET /v1/tasks//results?wait_count=5` will block forever until 5 results are available, +- `GET /v1/tasks//results?wait_count=5` will block until at least 5 results are available, - `GET /v1/tasks//results?wait_count=5&wait_time=30s` will block until 5 results are available or 30 seconds have passed (whichever comes first). In the latter case, HTTP code `206 (Partial Content)` is returned to indicate that the result is incomplete. ### Server-sent Events (SSE) API (experimental) diff --git a/tests/src/task_test.rs b/tests/src/task_test.rs index a7edd820..957da30b 100644 --- a/tests/src/task_test.rs +++ b/tests/src/task_test.rs @@ -65,6 +65,15 @@ async fn test_claim_after_success() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_polling_tasks_yields_more_than_specified_wait_count() -> Result<()> { + let id1 = post_task(()).await?; + let id2 = post_task(()).await?; + let tasks = client2().poll_pending_tasks::(&BlockingOptions::from_count(1)).await?; + assert_eq!(tasks.iter().filter(|t| [id1, id2].contains(&t.id)).count(), 2); + Ok(()) +} + pub async fn post_task(body: T) -> Result { let id = MsgId::new(); client1().post_task(&TaskRequest { From bab8f39f1f6253a449ffc86171c6ff98efddaab8 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 15 Jan 2025 09:30:21 +0100 Subject: [PATCH 23/29] chore(deps): update itertools requirement from 0.13.0 to 0.14.0 (#222) Updates the requirements on [itertools](https://github.com/rust-itertools/itertools) to permit the latest version. - [Changelog](https://github.com/rust-itertools/itertools/blob/master/CHANGELOG.md) - [Commits](https://github.com/rust-itertools/itertools/compare/v0.13.0...v0.14.0) --- updated-dependencies: - dependency-name: itertools dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- shared/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shared/Cargo.toml b/shared/Cargo.toml index a92a1864..8e4bc740 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -33,7 +33,7 @@ rsa = "0.9" sha2 = "0.10" openssl = "0.10" chacha20poly1305 = "0.10" -itertools = "0.13.0" +itertools = "0.14.0" jwt-simple = "0.11" # Global variables From 8768b6c9384f6d226ddd72f3af187acef5b1a80b Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 15 Jan 2025 09:30:39 +0100 Subject: [PATCH 24/29] chore(deps): update axum requirement from 0.7 to 0.8 (#221) * chore(deps): update axum requirement from 0.7 to 0.8 Updates the requirements on [axum](https://github.com/tokio-rs/axum) to permit the latest version. - [Release notes](https://github.com/tokio-rs/axum/releases) - [Changelog](https://github.com/tokio-rs/axum/blob/main/CHANGELOG.md) - [Commits](https://github.com/tokio-rs/axum/commits) --- updated-dependencies: - dependency-name: axum dependency-type: direct:production ... Signed-off-by: dependabot[bot] * chore: migrate to native async traits where possible * chore: migrate routes to new axum syntax --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: janskiba --- broker/Cargo.toml | 4 ++-- broker/src/crypto.rs | 10 ++-------- broker/src/serve_health.rs | 2 +- broker/src/serve_pki.rs | 4 ++-- broker/src/serve_sockets.rs | 2 +- broker/src/serve_tasks.rs | 8 ++++---- proxy/Cargo.toml | 2 +- proxy/src/auth.rs | 2 -- proxy/src/crypto.rs | 4 ++-- proxy/src/serve_sockets.rs | 2 +- proxy/src/serve_tasks.rs | 4 ++-- shared/Cargo.toml | 3 ++- shared/src/config_shared.rs | 1 - shared/src/crypto.rs | 3 ++- shared/src/crypto_jwt.rs | 3 +-- shared/src/http_client.rs | 1 - shared/src/lib.rs | 2 +- shared/src/traits.rs | 2 -- 18 files changed, 24 insertions(+), 35 deletions(-) diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 5fd6554e..7a1eeaab 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -14,7 +14,7 @@ beam-lib = { workspace = true } tokio = { version = "1", features = ["full"] } serde = { version = "1", features = ["derive"] } serde_json = "1" -axum = { version = "0.7", features = [ "query" ] } +axum = { version = "0.8", features = [ "query" ] } #axum-macros = "0.3.7" dashmap = "6.0" @@ -30,7 +30,7 @@ futures-core = { version = "0.3", default-features = false } once_cell = "1" # Socket dependencies bytes = { version = "1", optional = true } -axum-extra = { version = "0.9", features = ["typed-header"] } +axum-extra = { version = "0.10", features = ["typed-header"] } hyper = { version = "1", default-features = false, optional = true} hyper-util = { version = "0.1", default-features = false, features = ["tokio"], optional = true} diff --git a/broker/src/crypto.rs b/broker/src/crypto.rs index 6853710c..c9468148 100644 --- a/broker/src/crypto.rs +++ b/broker/src/crypto.rs @@ -1,15 +1,9 @@ use std::{future::Future, mem::discriminant}; -use axum::{ - async_trait, - http::{header, method, uri::Scheme, Method, Request, StatusCode, Uri}, -}; +use axum::http::{header, method, uri::Scheme, Method, Request, StatusCode, Uri}; use serde::{Deserialize, Serialize}; use shared::{ - config, - crypto::{parse_crl, CertificateCache, CertificateCacheUpdate, GetCerts}, - errors::SamplyBeamError, - http_client::{self, SamplyHttpClient}, openssl::x509::X509Crl, reqwest::{self, Url}, + async_trait, config, crypto::{parse_crl, CertificateCache, CertificateCacheUpdate, GetCerts}, errors::SamplyBeamError, http_client::{self, SamplyHttpClient}, openssl::x509::X509Crl, reqwest::{self, Url} }; use std::time::Duration; use tokio::time::timeout; diff --git a/broker/src/serve_health.rs b/broker/src/serve_health.rs index 81ab706d..9a4d93fb 100644 --- a/broker/src/serve_health.rs +++ b/broker/src/serve_health.rs @@ -19,7 +19,7 @@ struct HealthOutput { pub(crate) fn router(health: Arc>) -> Router { Router::new() .route("/v1/health", get(handler)) - .route("/v1/health/proxies/:proxy_id", get(proxy_health)) + .route("/v1/health/proxies/{proxy_id}", get(proxy_health)) .route("/v1/health/proxies", get(get_all_proxies)) .route("/v1/control", get(get_control_tasks).layer(axum::middleware::from_fn(log_version_mismatch))) .with_state(health) diff --git a/broker/src/serve_pki.rs b/broker/src/serve_pki.rs index 601b9e3f..2e124e67 100644 --- a/broker/src/serve_pki.rs +++ b/broker/src/serve_pki.rs @@ -47,12 +47,12 @@ pub(crate) fn router() -> Router { .route("/v1/pki/certs", get(get_certificate_list)) .route("/v1/pki/certs/im-ca", get(get_im_cert)) .route( - "/v1/pki/certs/by_serial/:serial", + "/v1/pki/certs/by_serial/{serial}", get(get_certificate_by_serial), ) } -#[tracing::instrument(name = "/v1/pki/certs/by_serial/:serial")] +#[tracing::instrument(name = "/v1/pki/certs/by_serial/{serial}")] async fn get_certificate_by_serial( ConnectInfo(addr): ConnectInfo, Path(serial): Path, diff --git a/broker/src/serve_sockets.rs b/broker/src/serve_sockets.rs index d6cbf3ef..0e715f83 100644 --- a/broker/src/serve_sockets.rs +++ b/broker/src/serve_sockets.rs @@ -42,7 +42,7 @@ impl Default for SocketState { pub(crate) fn router() -> Router { Router::new() .route("/v1/sockets", get(get_socket_requests).post(post_socket_request)) - .route("/v1/sockets/:id", get(connect_socket)) + .route("/v1/sockets/{id}", get(connect_socket)) .with_state(SocketState::default()) } diff --git a/broker/src/serve_tasks.rs b/broker/src/serve_tasks.rs index 7f98eec0..60e70561 100644 --- a/broker/src/serve_tasks.rs +++ b/broker/src/serve_tasks.rs @@ -40,8 +40,8 @@ pub(crate) fn router() -> Router { let state = TasksState::default(); Router::new() .route("/v1/tasks", get(get_tasks).post(post_task)) - .route("/v1/tasks/:task_id/results", get(get_results_for_task)) - .route("/v1/tasks/:task_id/results/:app_id", put(put_result)) + .route("/v1/tasks/{task_id}/results", get(get_results_for_task)) + .route("/v1/tasks/{task_id}/results/{app_id}", put(put_result)) .with_state(state) } @@ -82,7 +82,7 @@ async fn get_results_for_task( } } -// GET /v1/tasks/:task_id/results +// GET /v1/tasks/{task_id}/results async fn get_results_for_task_nostream( addr: SocketAddr, state: TasksState, @@ -354,7 +354,7 @@ async fn post_task( )) } -// PUT /v1/tasks/:task_id/results/:app_id +// PUT /v1/tasks/{task_id}/results/{app_id} async fn put_result( ConnectInfo(addr): ConnectInfo, Path((task_id, app_id)): Path<(MsgId, AppOrProxyId)>, diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 4fdb4575..5e008857 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -12,7 +12,7 @@ shared = { path = "../shared", features = ["config-for-proxy"] } beam-lib = { workspace = true } tokio = { version = "1", features = ["full"] } -axum = { version = "0.7", features = ["macros"] } +axum = { version = "0.8", features = ["macros"] } bytes = { version = "1" } httpdate = "1.0" diff --git a/proxy/src/auth.rs b/proxy/src/auth.rs index d6fdb409..96737bad 100644 --- a/proxy/src/auth.rs +++ b/proxy/src/auth.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use axum::{ - async_trait, extract::{FromRequest, FromRequestParts}, http::{header::{self, HeaderName}, request::Parts, Request, StatusCode}, }; @@ -14,7 +13,6 @@ use tracing::{debug, Span, debug_span, warn}; pub(crate) struct AuthenticatedApp(pub(crate) AppId); -#[async_trait] impl FromRequestParts for AuthenticatedApp { type Rejection = (StatusCode, [(HeaderName, &'static str); 1]); diff --git a/proxy/src/crypto.rs b/proxy/src/crypto.rs index a9f162ba..85c3f2ef 100644 --- a/proxy/src/crypto.rs +++ b/proxy/src/crypto.rs @@ -1,7 +1,7 @@ -use axum::{async_trait, body::Bytes, http::{header, request, Method, Request, StatusCode, Uri}, response::Response, Json}; +use axum::{body::Bytes, http::{header, request, Method, Request, StatusCode, Uri}, response::Response, Json}; use beam_lib::AppOrProxyId; use shared::{ - config, config_proxy::Config, config_shared::ConfigCrypto, crypto::GetCerts, errors::{CertificateInvalidReason, SamplyBeamError}, http_client::SamplyHttpClient, reqwest, EncryptedMessage, MsgEmpty + async_trait, config, config_proxy::Config, config_shared::ConfigCrypto, crypto::GetCerts, errors::{CertificateInvalidReason, SamplyBeamError}, http_client::SamplyHttpClient, reqwest, EncryptedMessage, MsgEmpty }; use tracing::{debug, info, warn, error}; diff --git a/proxy/src/serve_sockets.rs b/proxy/src/serve_sockets.rs index 681cf164..b5346c56 100644 --- a/proxy/src/serve_sockets.rs +++ b/proxy/src/serve_sockets.rs @@ -58,7 +58,7 @@ pub(crate) fn router(client: SamplyHttpClient) -> Router { Router::new() .route("/v1/sockets", get(get_tasks)) - .route("/v1/sockets/:app_or_id", post(create_socket_con).get(connect_socket)) + .route("/v1/sockets/{app_or_id}", post(create_socket_con).get(connect_socket)) .with_state(state) .layer(Extension(task_secret_map)) } diff --git a/proxy/src/serve_tasks.rs b/proxy/src/serve_tasks.rs index 38fc0065..20bc5a64 100644 --- a/proxy/src/serve_tasks.rs +++ b/proxy/src/serve_tasks.rs @@ -39,8 +39,8 @@ pub(crate) fn router(client: &SamplyHttpClient) -> Router { Router::new() // We need both path variants so the server won't send us into a redirect loop (/tasks, /tasks/, ...) .route("/v1/tasks", get(handler_task).post(handler_task)) - .route("/v1/tasks/:task_id/results", get(handler_task)) - .route("/v1/tasks/:task_id/results/:app_id", put(handler_task)) + .route("/v1/tasks/{task_id}/results", get(handler_task)) + .route("/v1/tasks/{task_id}/results/{app_id}", put(handler_task)) .with_state(state) } diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 8e4bc740..2b0c88ad 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -17,7 +17,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" tokio = { version = "1", features = ["full"] } -axum = { version = "0.7", features = [] } +axum = { version = "0.8", features = [] } bytes = "1.4" # HTTP client with proxy support @@ -52,6 +52,7 @@ regex = "1" dashmap = { version = "6.0", optional = true} beam-lib = { workspace = true } +async-trait = "0.1" [features] expire_map = ["dep:dashmap"] diff --git a/shared/src/config_shared.rs b/shared/src/config_shared.rs index 6adff973..14b15329 100644 --- a/shared/src/config_shared.rs +++ b/shared/src/config_shared.rs @@ -8,7 +8,6 @@ use crate::{ }, SamplyBeamError, }; -use axum::async_trait; use clap::Parser; use jwt_simple::prelude::RS256KeyPair; use openssl::{ diff --git a/shared/src/crypto.rs b/shared/src/crypto.rs index 7774b6e3..582280f0 100644 --- a/shared/src/crypto.rs +++ b/shared/src/crypto.rs @@ -1,4 +1,5 @@ -use axum::{async_trait, body::Body, http::Request, Json}; +use async_trait::async_trait; +use axum::{body::Body, http::Request, Json}; use itertools::Itertools; use once_cell::sync::{Lazy, OnceCell}; diff --git a/shared/src/crypto_jwt.rs b/shared/src/crypto_jwt.rs index 0d3aee6a..068468e4 100644 --- a/shared/src/crypto_jwt.rs +++ b/shared/src/crypto_jwt.rs @@ -8,7 +8,7 @@ use crate::{ errors::{CertificateInvalidReason, SamplyBeamError}, Msg, MsgEmpty, MsgId, MsgSigned, }; -use axum::{async_trait, body::HttpBody, extract::{{FromRequest, ConnectInfo, FromRequestParts}, Request}, http::{header, request::Parts, uri::PathAndQuery, HeaderMap, HeaderName, Method, StatusCode, Uri}, BoxError, RequestExt}; +use axum::{body::HttpBody, extract::{{FromRequest, ConnectInfo, FromRequestParts}, Request}, http::{header, request::Parts, uri::PathAndQuery, HeaderMap, HeaderName, Method, StatusCode, Uri}, BoxError, RequestExt}; use jwt_simple::{ claims::JWTClaims, prelude::{ @@ -30,7 +30,6 @@ const ERR_FROM: (StatusCode, &str) = ( "\"from\" field in message does not match your certificate.", ); -#[async_trait] impl FromRequest for MsgSigned where // these trait bounds are copied from `impl FromRequest for axum::Json` diff --git a/shared/src/http_client.rs b/shared/src/http_client.rs index ea236c81..355eedab 100644 --- a/shared/src/http_client.rs +++ b/shared/src/http_client.rs @@ -1,6 +1,5 @@ use std::{collections::HashSet, ops::Deref, time::Duration}; -use axum::async_trait; use axum::http::{Request, Response, Uri}; use itertools::Itertools; use once_cell::sync::OnceCell; diff --git a/shared/src/lib.rs b/shared/src/lib.rs index f6f0f466..283d1a96 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -1,6 +1,5 @@ #![allow(unused_imports)] -use axum::async_trait; use beam_lib::{AppId, AppOrProxyId, ProxyId, FailureStrategy, WorkStatus}; use chacha20poly1305::{ aead::{Aead, AeadCore, KeyInit, OsRng}, @@ -34,6 +33,7 @@ use crate::{crypto_jwt::JWT_VERIFICATION_OPTIONS, serde_helpers::*}; // Reexport b64 implementation pub use jwt_simple::reexports::ct_codecs; pub use reqwest; +pub use async_trait::async_trait; pub type MsgId = beam_lib::MsgId; pub type MsgType = String; diff --git a/shared/src/traits.rs b/shared/src/traits.rs index 822f325a..15f43ac8 100644 --- a/shared/src/traits.rs +++ b/shared/src/traits.rs @@ -1,5 +1,4 @@ use axum::{ - async_trait, extract::{self, FromRequest, FromRequestParts, Path, Query}, http::{request::Parts, StatusCode}, BoxError, RequestPartsExt, @@ -22,7 +21,6 @@ fn test_duration_parsing() { assert_eq!(Duration::try_from(parser.parse("1234").unwrap()).unwrap().as_millis(), 1234); } -#[async_trait] impl FromRequestParts for HowLongToBlock where S: Send + Sync, From 1bc153d62ed89770ed2bb61fdff435bdec7529e0 Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 21 Jan 2025 14:51:44 +0000 Subject: [PATCH 25/29] refactor: remove unnesessary env setup --- broker/src/main.rs | 1 - proxy/src/main.rs | 1 - shared/src/config.rs | 8 -------- 3 files changed, 10 deletions(-) diff --git a/broker/src/main.rs b/broker/src/main.rs index 81015a24..9ad591c4 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -21,7 +21,6 @@ use tracing::{error, info, warn}; #[tokio::main] pub async fn main() -> anyhow::Result<()> { - shared::config::prepare_env(); shared::logger::init_logger()?; banner::print_banner(); diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 8adc6bd6..59bf311a 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -30,7 +30,6 @@ pub(crate) const PROXY_TIMEOUT: u64 = 120; #[tokio::main] pub async fn main() -> anyhow::Result<()> { - shared::config::prepare_env(); shared::logger::init_logger()?; banner::print_banner(); diff --git a/shared/src/config.rs b/shared/src/config.rs index f3bf26dc..bb3e2b7f 100644 --- a/shared/src/config.rs +++ b/shared/src/config.rs @@ -36,11 +36,3 @@ pub static CONFIG_SHARED: Lazy = Lazy::new(|| { }); pub(crate) static CONFIG_SHARED_CRYPTO: OnceCell = OnceCell::new(); - -pub fn prepare_env() { - for var in ["http_proxy", "https_proxy", "all_proxy", "no_proxy"] { - for (k, v) in std::env::vars().filter(|(k, _)| k.to_lowercase() == var) { - std::env::set_var(k.to_uppercase(), v); - } - } -} From ba6bbaee53197c8bc02edfeee61c8cca5d70f2f1 Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 21 Jan 2025 14:47:00 +0000 Subject: [PATCH 26/29] refactor(health): dont use channels to update state --- broker/src/crypto.rs | 29 +++++------------ broker/src/health.rs | 77 +++----------------------------------------- broker/src/main.rs | 22 +++++++------ 3 files changed, 26 insertions(+), 102 deletions(-) diff --git a/broker/src/crypto.rs b/broker/src/crypto.rs index c9468148..598f53e9 100644 --- a/broker/src/crypto.rs +++ b/broker/src/crypto.rs @@ -1,4 +1,4 @@ -use std::{future::Future, mem::discriminant}; +use std::{future::Future, mem::discriminant, sync::Arc}; use axum::http::{header, method, uri::Scheme, Method, Request, StatusCode, Uri}; use serde::{Deserialize, Serialize}; @@ -6,15 +6,15 @@ use shared::{ async_trait, config, crypto::{parse_crl, CertificateCache, CertificateCacheUpdate, GetCerts}, errors::SamplyBeamError, http_client::{self, SamplyHttpClient}, openssl::x509::X509Crl, reqwest::{self, Url} }; use std::time::Duration; -use tokio::time::timeout; +use tokio::{sync::RwLock, time::timeout}; use tracing::{debug, error, warn, info}; -use crate::health::{self, VaultStatus}; +use crate::health::{self, Health, VaultStatus}; pub struct GetCertsFromPki { pki_realm: String, hyper_client: SamplyHttpClient, - health_report_sender: tokio::sync::watch::Sender, + health: Arc>, } #[derive(Debug, Deserialize, Clone, Hash)] @@ -35,7 +35,7 @@ struct PkiListResponse { impl GetCertsFromPki { pub(crate) fn new( - health_report_sender: tokio::sync::watch::Sender, + health: Arc>, ) -> Result { let mut certs: Vec = Vec::new(); if let Some(dir) = &config::CONFIG_CENTRAL.tls_ca_certificates_dir { @@ -61,19 +61,12 @@ impl GetCertsFromPki { Ok(Self { pki_realm, hyper_client, - health_report_sender, + health, }) } async fn report_vault_health(&self, status: VaultStatus) { - self.health_report_sender.send_if_modified(|val| { - if discriminant(val) != discriminant(&status) { - *val = status; - true - } else { - false - } - }); + self.health.write().await.vault = status; } pub(crate) async fn check_vault_health(&self) -> Result<(), SamplyBeamError> { @@ -261,12 +254,6 @@ impl GetCerts for GetCertsFromPki { } } -pub(crate) fn build_cert_getter( - sender: tokio::sync::watch::Sender, -) -> Result { - GetCertsFromPki::new(sender) -} - -pub(crate) fn pki_url_builder(location: &str) -> Url { +fn pki_url_builder(location: &str) -> Url { config::CONFIG_CENTRAL.pki_address.join(&format!("/v1/{location}")).unwrap() } diff --git a/broker/src/health.rs b/broker/src/health.rs index b9c115e5..c28e46c3 100644 --- a/broker/src/health.rs +++ b/broker/src/health.rs @@ -19,36 +19,27 @@ impl Default for Verdict { } } -#[derive(Serialize, Clone, Copy)] +#[derive(Debug, Serialize, Clone, Copy, Default)] #[serde(rename_all = "lowercase")] pub enum VaultStatus { Ok, + #[default] Unknown, OtherError, LockedOrSealed, Unreachable, } -impl Default for VaultStatus { - fn default() -> Self { - VaultStatus::Unknown - } -} - -#[derive(Serialize, Clone, Copy)] +#[derive(Debug, Serialize, Clone, Copy, Default)] #[serde(rename_all = "lowercase")] pub enum InitStatus { + #[default] Unknown, FetchingIntermediateCert, Done } -impl Default for InitStatus { - fn default() -> Self { - InitStatus::Unknown - } -} - +#[derive(Debug, Default)] pub struct Health { pub vault: VaultStatus, pub initstatus: InitStatus, @@ -92,61 +83,3 @@ impl ProxyStatus { ProxyStatus { last_connect: SystemTime::now(), connections: 1, last_disconnect: None } } } - -pub struct Senders { - pub vault: tokio::sync::watch::Sender, - pub init: tokio::sync::watch::Sender, -} - -impl Health { - pub fn make() -> (Senders, Arc>) { - let health = Health { - vault: VaultStatus::default(), - initstatus: InitStatus::default(), - proxies: HashMap::default() - }; - let (vault_tx, mut vault_rx) = tokio::sync::watch::channel(VaultStatus::default()); - let (init_tx, mut init_rx) = tokio::sync::watch::channel(InitStatus::default()); - let health = Arc::new(RwLock::new(health)); - let health2 = health.clone(); - let health3 = health.clone(); - - let vault_watcher = async move { - while vault_rx.changed().await.is_ok() { - let new_val = vault_rx.borrow().clone(); - let mut health = health2.write().await; - health.vault = new_val; - match &health.vault { - VaultStatus::Ok => info!("Vault connection is now healthy"), - x => warn!( - "Vault connection is degraded: {}", - serde_json::to_string(x).unwrap_or_default() - ), - } - } - }; - - tokio::task::spawn(vault_watcher); - let initstatus_watcher = async move { - while init_rx.changed().await.is_ok() { - let new_val = init_rx.borrow().clone(); - let mut health = health3.write().await; - health.initstatus = new_val; - match &health.initstatus { - InitStatus::Done => { - info!("Initialization is now complete"); - return; - }, - x => warn!( - "Still initializing: {}", - serde_json::to_string(x).unwrap_or_default() - ), - } - } - }; - tokio::task::spawn(initstatus_watcher); - - let senders = Senders { vault: vault_tx, init: init_tx }; - (senders, health) - } -} diff --git a/broker/src/main.rs b/broker/src/main.rs index 9ad591c4..4fe6d51f 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -14,9 +14,11 @@ mod compare_client_server_version; use std::{collections::HashMap, sync::Arc, time::Duration}; -use health::{Senders, InitStatus}; +use crypto::GetCertsFromPki; +use health::{Health, InitStatus}; +use once_cell::sync::Lazy; use shared::{config::CONFIG_CENTRAL, *, errors::SamplyBeamError}; -use tokio::sync::{RwLock, watch}; +use tokio::sync::RwLock; use tracing::{error, info, warn}; #[tokio::main] @@ -24,25 +26,27 @@ pub async fn main() -> anyhow::Result<()> { shared::logger::init_logger()?; banner::print_banner(); - let (Senders { init: init_status_sender, vault: vault_status_sender}, health) = health::Health::make(); - let cert_getter = crypto::build_cert_getter(vault_status_sender)?; + let health = Arc::new(RwLock::new(Health::default())); + let cert_getter = GetCertsFromPki::new(health.clone())?; shared::crypto::init_cert_getter(cert_getter); - tokio::task::spawn(init_broker_ca_chain(init_status_sender)); + tokio::task::spawn(init_broker_ca_chain(health.clone())); #[cfg(debug_assertions)] if shared::examples::print_example_objects() { return Ok(()); } - let _ = config::CONFIG_CENTRAL.bind_addr; // Initialize config + Lazy::force(&config::CONFIG_CENTRAL); // Initialize config serve::serve(health).await?; Ok(()) } -async fn init_broker_ca_chain(sender: watch::Sender) { - sender.send_replace(health::InitStatus::FetchingIntermediateCert); +async fn init_broker_ca_chain(health: Arc>) { + { + health.write().await.initstatus = health::InitStatus::FetchingIntermediateCert + } shared::crypto::init_ca_chain().await.expect("Failed to init broker ca chain"); - sender.send_replace(health::InitStatus::Done); + health.write().await.initstatus = health::InitStatus::Done; } From 945af5b78ddeb342eb437685b4a73e38d453d21e Mon Sep 17 00:00:00 2001 From: Jan <59206115+Threated@users.noreply.github.com> Date: Wed, 29 Jan 2025 16:25:49 +0100 Subject: [PATCH 27/29] feat: Change health connection to SSE (#209) * feat: Change health connection to SSE * docs: document health connection --- README.md | 8 ++- broker/src/crypto.rs | 2 +- broker/src/health.rs | 85 ------------------------ broker/src/main.rs | 7 +- broker/src/serve.rs | 2 +- broker/src/serve_health.rs | 129 +++++++++++++++++++++++++------------ proxy/src/main.rs | 70 ++++++++++++-------- 7 files changed, 141 insertions(+), 162 deletions(-) delete mode 100644 broker/src/health.rs diff --git a/README.md b/README.md index 0e0b804b..883c8be9 100644 --- a/README.md +++ b/README.md @@ -399,7 +399,7 @@ Date: Mon, 27 Jun 2022 14:26:45 GMT As part of making this API performant, all reading endpoints support long-polling as an efficient alternative to regular (repeated) polling. Using this function requires the following parameters: -- `wait_count`: The API call will block until at least this many results are available. If there are more matching tasks/results avalible all of them will be returned. +- `wait_count`: The API call will block until at least this many results are available. If there are more matching tasks/results available all of them will be returned. - `wait_time`: ... or this time has passed (if not stated differently, e.g., by adding 'm', 'h', 'ms', ..., this is interpreted as seconds), whichever comes first. For example, retrieving a task's results: @@ -629,6 +629,12 @@ Samply.Beam encrypts all information in the `body` fields of both Tasks and Resu The data is symmetrically encrypted using the Authenticated Encryption with Authenticated Data (AEAD) algorithm "XChaCha20Poly1305", a widespread algorithm (e.g., mandatory for the TLS protocol), regarded as highly secure by experts. The used [chacha20poly1305 library](https://docs.rs/chacha20poly1305/latest/chacha20poly1305/) was sublected to a [security audit](https://research.nccgroup.com/2020/02/26/public-report-rustcrypto-aes-gcm-and-chacha20poly1305-implementation-review/), with no significant findings. The randomly generated symmetric keys are encapsulated in a RSA encrypted ciphertext using OAEP Padding. This ensures, that only the intended recipients can decrypt the key and subsequently the transferred data. +### Health check connection + +The beam proxy tries to keep a permanent connection to the broker to make it possible to see which sites are currently connected. +This also allows us to detected invalid connection states such as multiple proxies with the same proxy id connecting simultaneously. +In that case the second proxy trying to connect will receive a 409 status code and shut down. + ## Roadmap - [X] API Key authentication of local applications diff --git a/broker/src/crypto.rs b/broker/src/crypto.rs index 598f53e9..1f12c371 100644 --- a/broker/src/crypto.rs +++ b/broker/src/crypto.rs @@ -9,7 +9,7 @@ use std::time::Duration; use tokio::{sync::RwLock, time::timeout}; use tracing::{debug, error, warn, info}; -use crate::health::{self, Health, VaultStatus}; +use crate::serve_health::{Health, VaultStatus}; pub struct GetCertsFromPki { pki_realm: String, diff --git a/broker/src/health.rs b/broker/src/health.rs deleted file mode 100644 index c28e46c3..00000000 --- a/broker/src/health.rs +++ /dev/null @@ -1,85 +0,0 @@ -use std::{fmt::Display, sync::Arc, time::{Duration, SystemTime}, collections::HashMap}; - -use serde::{Serialize, Deserialize}; -use beam_lib::ProxyId; -use tokio::sync::RwLock; -use tracing::{info, warn}; - -#[derive(Serialize)] -#[serde(rename_all = "lowercase")] -pub enum Verdict { - Healthy, - Unhealthy, - Unknown, -} - -impl Default for Verdict { - fn default() -> Self { - Verdict::Unknown - } -} - -#[derive(Debug, Serialize, Clone, Copy, Default)] -#[serde(rename_all = "lowercase")] -pub enum VaultStatus { - Ok, - #[default] - Unknown, - OtherError, - LockedOrSealed, - Unreachable, -} - -#[derive(Debug, Serialize, Clone, Copy, Default)] -#[serde(rename_all = "lowercase")] -pub enum InitStatus { - #[default] - Unknown, - FetchingIntermediateCert, - Done -} - -#[derive(Debug, Default)] -pub struct Health { - pub vault: VaultStatus, - pub initstatus: InitStatus, - pub proxies: HashMap -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ProxyStatus { - last_connect: SystemTime, - last_disconnect: Option, - #[serde(skip)] - connections: u8, -} - -impl ProxyStatus { - pub fn online(&self) -> bool { - self.connections > 0 - } - - pub fn disconnect(&mut self) { - self.last_disconnect = Some(SystemTime::now()); - self.connections -= 1; - } - - pub fn connect(&mut self) { - self.connections += 1; - self.last_connect = SystemTime::now(); - } - - pub fn _last_seen(&self) -> SystemTime { - if self.online() { - SystemTime::now() - } else { - self.last_disconnect.expect("Should always exist as the proxy is not online") - } - } -} - -impl ProxyStatus { - pub fn new() -> ProxyStatus { - ProxyStatus { last_connect: SystemTime::now(), connections: 1, last_disconnect: None } - } -} diff --git a/broker/src/main.rs b/broker/src/main.rs index 4fe6d51f..723df949 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -2,7 +2,6 @@ mod banner; mod crypto; -mod health; mod serve; mod serve_health; mod serve_pki; @@ -15,7 +14,7 @@ mod compare_client_server_version; use std::{collections::HashMap, sync::Arc, time::Duration}; use crypto::GetCertsFromPki; -use health::{Health, InitStatus}; +use serve_health::{Health, InitStatus}; use once_cell::sync::Lazy; use shared::{config::CONFIG_CENTRAL, *, errors::SamplyBeamError}; use tokio::sync::RwLock; @@ -45,8 +44,8 @@ pub async fn main() -> anyhow::Result<()> { async fn init_broker_ca_chain(health: Arc>) { { - health.write().await.initstatus = health::InitStatus::FetchingIntermediateCert + health.write().await.initstatus = InitStatus::FetchingIntermediateCert } shared::crypto::init_ca_chain().await.expect("Failed to init broker ca chain"); - health.write().await.initstatus = health::InitStatus::Done; + health.write().await.initstatus = InitStatus::Done; } diff --git a/broker/src/serve.rs b/broker/src/serve.rs index 7b728b2e..6ea6c054 100644 --- a/broker/src/serve.rs +++ b/broker/src/serve.rs @@ -20,7 +20,7 @@ use tokio::{ }; use tracing::{debug, info, trace, warn}; -use crate::{banner, crypto, health::Health, serve_health, serve_pki, serve_tasks, compare_client_server_version}; +use crate::{banner, crypto, serve_health::Health, serve_health, serve_pki, serve_tasks, compare_client_server_version}; pub(crate) async fn serve(health: Arc>) -> anyhow::Result<()> { let app = serve_tasks::router() diff --git a/broker/src/serve_health.rs b/broker/src/serve_health.rs index 9a4d93fb..802fdd93 100644 --- a/broker/src/serve_health.rs +++ b/broker/src/serve_health.rs @@ -1,13 +1,14 @@ -use std::{sync::Arc, time::{Duration, SystemTime}}; +use std::{collections::HashMap, convert::Infallible, marker::PhantomData, sync::Arc, time::{Duration, SystemTime}}; -use axum::{extract::{State, Path}, http::StatusCode, routing::get, Json, Router, response::Response}; +use axum::{extract::{Path, State}, http::StatusCode, response::{sse::{Event, KeepAlive}, Response, Sse}, routing::get, Json, Router}; use axum_extra::{headers::{authorization::Basic, Authorization}, TypedHeader}; use beam_lib::ProxyId; +use futures_core::Stream; use serde::{Serialize, Deserialize}; use shared::{crypto_jwt::Authorized, Msg, config::CONFIG_CENTRAL}; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, OwnedMutexGuard, RwLock}; -use crate::{health::{Health, VaultStatus, Verdict, ProxyStatus, InitStatus}, compare_client_server_version::log_version_mismatch}; +use crate::compare_client_server_version::log_version_mismatch; #[derive(Serialize)] struct HealthOutput { @@ -16,6 +17,58 @@ struct HealthOutput { init_status: InitStatus } +#[derive(Serialize)] +#[serde(rename_all = "lowercase")] +pub enum Verdict { + Healthy, + Unhealthy, + Unknown, +} + +impl Default for Verdict { + fn default() -> Self { + Verdict::Unknown + } +} + +#[derive(Debug, Serialize, Clone, Copy, Default)] +#[serde(rename_all = "lowercase")] +pub enum VaultStatus { + Ok, + #[default] + Unknown, + OtherError, + LockedOrSealed, + Unreachable, +} + +#[derive(Debug, Serialize, Clone, Copy, Default)] +#[serde(rename_all = "lowercase")] +pub enum InitStatus { + #[default] + Unknown, + FetchingIntermediateCert, + Done +} + +#[derive(Debug, Default)] +pub struct Health { + pub vault: VaultStatus, + pub initstatus: InitStatus, + proxies: HashMap +} + +#[derive(Debug, Clone, Default)] +struct ProxyStatus { + online_guard: Arc>> +} + +impl ProxyStatus { + pub fn is_online(&self) -> bool { + self.online_guard.try_lock().is_err() + } +} + pub(crate) fn router(health: Arc>) -> Router { Router::new() .route("/v1/health", get(handler)) @@ -46,14 +99,14 @@ async fn handler( } async fn get_all_proxies(State(state): State>>) -> Json> { - Json(state.read().await.proxies.keys().cloned().collect()) + Json(state.read().await.proxies.iter().filter(|(_, v)| v.is_online()).map(|(k, _)| k).cloned().collect()) } async fn proxy_health( State(state): State>>, Path(proxy): Path, auth: TypedHeader> -) -> Result<(StatusCode, Json), StatusCode> { +) -> Result<(StatusCode, Json), StatusCode> { let Some(ref monitoring_key) = CONFIG_CENTRAL.monitoring_api_key else { return Err(StatusCode::NOT_IMPLEMENTED); }; @@ -63,10 +116,12 @@ async fn proxy_health( } if let Some(reported_back) = state.read().await.proxies.get(&proxy) { - if reported_back.online() { - Err(StatusCode::OK) + if let Ok(last_disconnect) = reported_back.online_guard.try_lock().as_deref().copied() { + Ok((StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({ + "last_disconnect": last_disconnect + })))) } else { - Ok((StatusCode::SERVICE_UNAVAILABLE, Json(reported_back.clone()))) + Err(StatusCode::OK) } } else { Err(StatusCode::NOT_FOUND) @@ -76,48 +131,38 @@ async fn proxy_health( async fn get_control_tasks( State(state): State>>, proxy_auth: Authorized, -) -> StatusCode { +) -> Result, StatusCode> { let proxy_id = proxy_auth.get_from().proxy_id(); // Once this is freed the connection will be removed from the map of connected proxies again // This ensures that when the connection is dropped and therefore this response future the status of this proxy will be updated - let _connection_remover = ConnectedGuard::connect(&proxy_id, &state).await; - - // In the future, this will wait for control tasks for the given proxy - tokio::time::sleep(Duration::from_secs(60 * 60)).await; + let status_mutex = state + .write() + .await + .proxies + .entry(proxy_id) + .or_default() + .online_guard + .clone(); + let Ok(connect_guard) = tokio::time::timeout(Duration::from_secs(60), status_mutex.lock_owned()).await + else { + return Err(StatusCode::CONFLICT); + }; - StatusCode::OK + Ok(Sse::new(ForeverStream(connect_guard)).keep_alive(KeepAlive::new())) } -struct ConnectedGuard<'a> { - proxy: &'a ProxyId, - state: &'a Arc> -} +struct ForeverStream(OwnedMutexGuard>); -impl<'a> ConnectedGuard<'a> { - async fn connect(proxy: &'a ProxyId, state: &'a Arc>) -> ConnectedGuard<'a> { - { - state.write().await.proxies - .entry(proxy.clone()) - .and_modify(ProxyStatus::connect) - .or_insert(ProxyStatus::new()); - } - Self { proxy, state } +impl Stream for ForeverStream { + type Item = Result; + + fn poll_next(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll> { + std::task::Poll::Pending } } -impl<'a> Drop for ConnectedGuard<'a> { +impl Drop for ForeverStream { fn drop(&mut self) { - let proxy_id = self.proxy.clone(); - let map = self.state.clone(); - tokio::spawn(async move { - // We wait here for one second to give the client a bit of time to reconnect incrementing the connection count so that it will be one again after the decrement - tokio::time::sleep(Duration::from_secs(1)).await; - map.write() - .await - .proxies - .get_mut(&proxy_id) - .expect("Has to exist as we don't remove items and the constructor of this type inserts the entry") - .disconnect(); - }); + *self.0 = Some(SystemTime::now()); } -} +} \ No newline at end of file diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 59bf311a..c822db99 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -6,6 +6,7 @@ use std::time::Duration; use axum::http::{header, HeaderValue, StatusCode}; use beam_lib::AppOrProxyId; use futures::future::Ready; +use futures::{StreamExt, TryStreamExt}; use shared::{reqwest, EncryptedMessage, MsgEmpty, PlainMessage}; use shared::crypto::CryptoPublicPortion; use shared::errors::SamplyBeamError; @@ -132,8 +133,12 @@ fn spawn_controller_polling(client: SamplyHttpClient, config: Config) { const RETRY_INTERVAL: Duration = Duration::from_secs(60); tokio::spawn(async move { let mut retries_this_min = 0; - let mut reset_interval = std::pin::pin!(tokio::time::sleep(Duration::from_secs(60))); + let mut reset_interval = Instant::now() + RETRY_INTERVAL; loop { + if reset_interval < Instant::now() { + retries_this_min = 0; + reset_interval = Instant::now() + RETRY_INTERVAL; + } let body = EncryptedMessage::MsgEmpty(MsgEmpty { from: AppOrProxyId::Proxy(config.proxy_id.clone()), }); @@ -145,39 +150,48 @@ fn spawn_controller_polling(client: SamplyHttpClient, config: Config) { let req = sign_request(body, parts, &config, None).await.expect("Unable to sign request; this should always work"); // In the future this will poll actual control related tasks - match client.execute(req).await { - Ok(res) => { - match res.status() { - StatusCode::OK => { - // Process control task - }, - status @ (StatusCode::GATEWAY_TIMEOUT | StatusCode::BAD_GATEWAY) => { - if retries_this_min < 10 { - retries_this_min += 1; - debug!("Connection to broker timed out; retrying."); - } else { - warn!("Retried more then 10 times in one minute getting status code: {status}"); - tokio::time::sleep(RETRY_INTERVAL).await; - continue; - } - }, - other => { - warn!("Got unexpected status getting control tasks from broker: {other}"); - tokio::time::sleep(RETRY_INTERVAL).await; - } - }; - }, - Err(e) if e.is_timeout() => { - debug!("Connection to broker timed out; retrying: {e}"); + let res = match client.execute(req).await { + Ok(res) if res.status() == StatusCode::CONFLICT => { + error!("A beam proxy with the same id is already running!"); + std::process::exit(409); }, + Ok(res) if res.status() != StatusCode::OK => { + if retries_this_min < 10 { + retries_this_min += 1; + debug!("Unexpected status code getting control tasks from broker: {}", res.status()); + } else { + warn!("Retried more then 10 times in one minute getting status code: {}", res.status()); + tokio::time::sleep(RETRY_INTERVAL).await; + } + continue; + } + Ok(res) => res, Err(e) => { warn!("Error getting control tasks from broker; retrying in {}s: {e}", RETRY_INTERVAL.as_secs()); tokio::time::sleep(RETRY_INTERVAL).await; + continue; } }; - if reset_interval.is_elapsed() { - retries_this_min = 0; - reset_interval.as_mut().reset(Instant::now() + Duration::from_secs(60)); + let incoming = res + .bytes_stream() + .map(|result| result.map_err(|error| { + let kind = error.is_timeout().then_some(std::io::ErrorKind::TimedOut).unwrap_or(std::io::ErrorKind::Other); + std::io::Error::new(kind, format!("IO Error: {error}")) + })) + .into_async_read(); + let mut reader = async_sse::decode(incoming); + while let Some(ev) = reader.next().await { + match ev { + Ok(_)=> (), + Err(e) if e.downcast_ref::().unwrap().kind() == std::io::ErrorKind::TimedOut => { + debug!("SSE connection timed out"); + break; + }, + Err(err) => { + error!("Got error reading SSE stream: {err}"); + break; + } + }; } } }); From 94b902169bf54e949d55b7044f4c5069a3fe9e4e Mon Sep 17 00:00:00 2001 From: janskiba Date: Wed, 12 Mar 2025 14:00:37 +0000 Subject: [PATCH 28/29] chore: bump version to 0.9 --- Cargo.toml | 1 + beam-lib/Cargo.toml | 2 +- broker/Cargo.toml | 2 +- proxy/Cargo.toml | 2 +- shared/Cargo.toml | 2 +- tests/Cargo.toml | 2 +- 6 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c2fb0252..b884863f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = ["proxy", "broker", "shared", "tests", "beam-lib"] resolver = "2" +package.version = "0.9.0" [workspace.dependencies] beam-lib = { path = "./beam-lib", features = [ "strict-ids" ] } diff --git a/beam-lib/Cargo.toml b/beam-lib/Cargo.toml index bb1348d6..922319dd 100644 --- a/beam-lib/Cargo.toml +++ b/beam-lib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beam-lib" -version = "0.8.0" +version = { workspace = true } edition = "2021" license = "Apache-2.0" diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 7a1eeaab..cf813349 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beam-broker" -version = "0.8.0" +version = { workspace = true } edition = "2021" license = "Apache-2.0" documentation = "https://github.com/samply/beam" diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 5e008857..a4064e42 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beam-proxy" -version = "0.8.0" +version = { workspace = true } edition = "2021" license = "Apache-2.0" documentation = "https://github.com/samply/beam" diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 2b0c88ad..15180f4c 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shared" -version = "0.8.0" +version = { workspace = true } edition = "2021" license = "Apache-2.0" documentation = "https://github.com/samply/beam" diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 67e95f47..abaa9860 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tests" -version = "0.1.0" +version = { workspace = true } edition = "2021" publish = false license = "Apache-2.0" From f232a0f32ffdef04cdf44da748d849fe04b41ff1 Mon Sep 17 00:00:00 2001 From: janskiba Date: Wed, 12 Mar 2025 14:01:28 +0000 Subject: [PATCH 29/29] chore: update changelog for 0.9 --- CHANGELOG.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ab1737e6..4de9c28c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,21 @@ +# Samply.Beam 0.9.0 - 2025-03-12 + +This major release of Beam 0.9 features a lot of internal changes and some bugfixes. + +## Breaking changes + +* Internal health connection now uses SSE to get around timeouts making it more reliable. This will not cause outdated beam proxies or brokers to crash but might show outdated sites as offline. + +## Bugfixes + +* Fixed a race condition where a claimed result would override a successful result. +* Fixed a socket relaying bug causing decryption of the tunnel to fail when the chunks where to large. + +## Minor changes + +* Improved logging +* beam-lib api improvements + # Samply.Beam 0.8.0 - 2024-07-26 This major release of Beam 0.8 features many changes "under the hood", such as the highly anticipated upgrade of our `hyper` dependency to version 1, as well as many bug fixes. We were able to decrease the communication overhead between Beam.Proxies and the Beam.Broker and streamlined the behavior of some endpoints to make the usage of Samply.Beam simpler.