Skip to content

Release 0.9 #229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d95d0eb
Handle logging via spans
Threated Sep 11, 2023
bb57a89
Beam id api improvements
Threated Sep 14, 2023
c94bf68
Better beam proxy auth logging
Threated Sep 14, 2023
3964160
ip logging in broker
Threated Sep 14, 2023
5fc9eb5
Refactor error handeling
Threated Nov 7, 2023
0406392
chore: Replace `docker-compose` with `docker compose`
Threated Aug 8, 2024
a492fcd
Merge pull request #207 from samply/chore/docker-compose
lablans Aug 12, 2024
ea8957d
refactor: Migrate retry handeling form `backoff` to `tryhard`
Threated Aug 8, 2024
1ee3597
fix: retry 502 status without delay in control connection
Threated Aug 8, 2024
7e4ff7e
Update dashmap requirement from 5.5 to 6.0
dependabot[bot] Aug 12, 2024
8f2def0
Merge pull request #206 from samply/timeout-changes
lablans Aug 12, 2024
bf7ef35
Update itertools requirement from 0.12.0 to 0.13.0
dependabot[bot] Aug 12, 2024
cda88eb
Merge pull request #162 from samply/feature/tracing
lablans Aug 12, 2024
6d7cf28
Merge pull request #205 from samply/dependabot/cargo/develop/dashmap-6.0
lablans Aug 12, 2024
5b5d67d
Merge pull request #204 from samply/dependabot/cargo/develop/itertool…
lablans Aug 12, 2024
7920fef
fix: Better timout timings
Threated Aug 12, 2024
0d0042a
Merge pull request #208 from samply/fix/timeout-time
lablans Aug 12, 2024
9d031e6
fix(sockets): return right headers for request upgrade
Threated Oct 24, 2024
99a3aab
fix(tests): fix issue with static reqwest clients
Threated Sep 11, 2024
81b1fe2
fix(broker): increase result channel size to prevent lag (#212)
Threated Nov 4, 2024
f32b58b
fix(sockets): correctly index into encrypt and decrypt buffers
Threated Nov 4, 2024
8a82238
Merge pull request #213 from samply/fix/sockets
TKussel Nov 4, 2024
aa7ce7d
fix(sockets): correctly handle pending writes to prevent double write…
Threated Nov 8, 2024
b52a1b4
chore: print more context for connection error (#217)
Threated Nov 15, 2024
3d6e1ca
Disable daily rebuild (#218)
lablans Nov 25, 2024
c3049c7
Show correct authentication header (#219)
askask Dec 18, 2024
cb06ca9
chore(deps): update thiserror from 1 to 2 (#216)
dependabot[bot] Dec 18, 2024
47c0266
fix: dont override successfull result with claimed (#224)
Threated Jan 14, 2025
f8fee4f
docs: explain weird `wait_count` behavior (#225)
Threated Jan 15, 2025
bab8f39
chore(deps): update itertools requirement from 0.13.0 to 0.14.0 (#222)
dependabot[bot] Jan 15, 2025
8768b6c
chore(deps): update axum requirement from 0.7 to 0.8 (#221)
dependabot[bot] Jan 15, 2025
1bc153d
refactor: remove unnesessary env setup
Threated Jan 21, 2025
ba6bbae
refactor(health): dont use channels to update state
Threated Jan 21, 2025
945af5b
feat: Change health connection to SSE (#209)
Threated Jan 29, 2025
94b9021
chore: bump version to 0.9
Threated Mar 12, 2025
f232a0f
chore: update changelog for 0.9
Threated Mar 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ on:
push:
workflow_dispatch:
pull_request:
schedule:
# Fetch new base image updates every night at 1am
- cron: '0 1 * * *'

jobs:
build-with-samply:
Expand Down
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
# Samply.Beam 0.9.0 - 2025-03-12

This major release of Beam 0.9 features a lot of internal changes and some bugfixes.

## Breaking changes

* Internal health connection now uses SSE to get around timeouts making it more reliable. This will not cause outdated beam proxies or brokers to crash but might show outdated sites as offline.

## Bugfixes

* Fixed a race condition where a claimed result would override a successful result.
* Fixed a socket relaying bug causing decryption of the tunnel to fail when the chunks where to large.

## Minor changes

* Improved logging
* beam-lib api improvements

# Samply.Beam 0.8.0 - 2024-07-26

This major release of Beam 0.8 features many changes "under the hood", such as the highly anticipated upgrade of our `hyper` dependency to version 1, as well as many bug fixes. We were able to decrease the communication overhead between Beam.Proxies and the Beam.Broker and streamlined the behavior of some endpoints to make the usage of Samply.Beam simpler.
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
members = ["proxy", "broker", "shared", "tests", "beam-lib"]
resolver = "2"
package.version = "0.9.0"

[workspace.dependencies]
beam-lib = { path = "./beam-lib", features = [ "strict-ids" ] }
Expand Down
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,13 @@ Date: Mon, 27 Jun 2022 14:26:45 GMT

As part of making this API performant, all reading endpoints support long-polling as an efficient alternative to regular (repeated) polling. Using this function requires the following parameters:

- `wait_count`: The API call will block until this many results are available ...
- `wait_count`: The API call will block until at least this many results are available. If there are more matching tasks/results available all of them will be returned.
- `wait_time`: ... or this time has passed (if not stated differently, e.g., by adding 'm', 'h', 'ms', ..., this is interpreted as seconds), whichever comes first.

For example, retrieving a task's results:

- `GET /v1/tasks/<task_id>/results` will return immediately with however many results are available,
- `GET /v1/tasks/<task_id>/results?wait_count=5` will block forever until 5 results are available,
- `GET /v1/tasks/<task_id>/results?wait_count=5` will block until at least 5 results are available,
- `GET /v1/tasks/<task_id>/results?wait_count=5&wait_time=30s` will block until 5 results are available or 30 seconds have passed (whichever comes first). In the latter case, HTTP code `206 (Partial Content)` is returned to indicate that the result is incomplete.

### Server-sent Events (SSE) API (experimental)
Expand Down Expand Up @@ -576,7 +576,7 @@ Alternatively, you can run the services in the background and get the logs as fo

```shell
./dev/beamdev start_bg
docker-compose logs -f
docker compose logs -f
```

Confirm that your setup works by running `./dev/test noci`, which runs the tests against your instances.
Expand Down Expand Up @@ -629,6 +629,12 @@ Samply.Beam encrypts all information in the `body` fields of both Tasks and Resu

The data is symmetrically encrypted using the Authenticated Encryption with Authenticated Data (AEAD) algorithm "XChaCha20Poly1305", a widespread algorithm (e.g., mandatory for the TLS protocol), regarded as highly secure by experts. The used [chacha20poly1305 library](https://docs.rs/chacha20poly1305/latest/chacha20poly1305/) was sublected to a [security audit](https://research.nccgroup.com/2020/02/26/public-report-rustcrypto-aes-gcm-and-chacha20poly1305-implementation-review/), with no significant findings. The randomly generated symmetric keys are encapsulated in a RSA encrypted ciphertext using OAEP Padding. This ensures, that only the intended recipients can decrypt the key and subsequently the transferred data.

### Health check connection

The beam proxy tries to keep a permanent connection to the broker to make it possible to see which sites are currently connected.
This also allows us to detected invalid connection states such as multiple proxies with the same proxy id connecting simultaneously.
In that case the second proxy trying to connect will receive a 409 status code and shut down.

## Roadmap

- [X] API Key authentication of local applications
Expand Down
4 changes: 2 additions & 2 deletions beam-lib/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "beam-lib"
version = "0.8.0"
version = { workspace = true }
edition = "2021"
license = "Apache-2.0"

Expand All @@ -10,7 +10,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
uuid = { version = "1", features = ["v4", "serde"] }
reqwest = { version = "0.12", features = ["json"], default-features = false, optional = true }
thiserror = { version = "1.0", optional = true }
thiserror = { version = "2.0", optional = true }

[features]
strict-ids = []
Expand Down
66 changes: 35 additions & 31 deletions beam-lib/src/ids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,10 @@ impl AppOrProxyId {
self.as_ref().ends_with(other.as_ref())
}

pub fn hide_broker(&self) -> String {
pub fn hide_broker(&self) -> &str {
match self {
AppOrProxyId::App(app) => {
let without_broker = strip_broker_id(&app.0).expect("Is valid id");
without_broker[..without_broker.len() - 1].to_owned()
}
AppOrProxyId::Proxy(proxy) => proxy
.0
.split_once('.')
.map(|(proxy, _broker)| proxy)
.unwrap_or_default()
.to_string(),
AppOrProxyId::App(app) => app.hide_broker_name(),
AppOrProxyId::Proxy(proxy) => proxy.proxy_name(),
}
}
}
Expand Down Expand Up @@ -137,7 +129,7 @@ pub(crate) enum BeamIdType {
BrokerId,
}

macro_rules! impl_new {
macro_rules! impl_id {
($id:ident) => {
impl $id {
#[cfg(feature = "strict-ids")]
Expand All @@ -161,11 +153,23 @@ macro_rules! impl_new {
self.as_ref().ends_with(other.as_ref())
}
}

impl AsRef<str> for $id {
fn as_ref(&self) -> &str {
&self.0
}
}

impl Display for $id {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
};
}

impl_new!(AppId);
impl_new!(ProxyId);
impl_id!(AppId);
impl_id!(ProxyId);

#[cfg(feature = "strict-ids")]
fn get_id_type(id: &str) -> Result<BeamIdType, BeamIdError> {
Expand Down Expand Up @@ -209,32 +213,32 @@ impl AppId {
.expect("AppId should be valid");
ProxyId(proxy_id.to_string())
}
}

impl AsRef<str> for AppId {
fn as_ref(&self) -> &str {
&self.0
/// Returns the AppId as a string slice without the broker part of the string
/// ## Example
/// app1.proxy1.broker => app1.proxy1
#[cfg(feature = "strict-ids")]
pub fn hide_broker_name(&self) -> &str {
let without_broker = strip_broker_id(&self.0).expect("Is valid id");
&without_broker[..without_broker.len() - 1]
}
}

impl Display for AppId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}

#[derive(Debug, Clone, Serialize, PartialEq, Eq, Hash)]
pub struct ProxyId(String);

impl AsRef<str> for ProxyId {
fn as_ref(&self) -> &str {
&self.0
}
}
impl ProxyId {

impl Display for ProxyId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
/// Returns the proxies name without the broker id
/// ## Example
/// proxy1.broker => proxy1
#[cfg(feature = "strict-ids")]
pub fn proxy_name(&self) -> &str {
self.0
.split_once('.')
.map(|(proxy, _broker)| proxy)
.expect("This is a valid proxy id")
}
}

Expand Down
10 changes: 5 additions & 5 deletions broker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "beam-broker"
version = "0.8.0"
version = { workspace = true }
edition = "2021"
license = "Apache-2.0"
documentation = "https://github.com/samply/beam"
Expand All @@ -14,12 +14,12 @@ beam-lib = { workspace = true }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
axum = { version = "0.7", features = [ "query" ] }
axum = { version = "0.8", features = [ "query" ] }
#axum-macros = "0.3.7"
dashmap = "5.4"
dashmap = "6.0"

anyhow = "1"
thiserror = "1"
thiserror = "2"

# Subscriber is setup through shared
tracing = "0.1"
Expand All @@ -30,7 +30,7 @@ futures-core = { version = "0.3", default-features = false }
once_cell = "1"
# Socket dependencies
bytes = { version = "1", optional = true }
axum-extra = { version = "0.9", features = ["typed-header"] }
axum-extra = { version = "0.10", features = ["typed-header"] }
hyper = { version = "1", default-features = false, optional = true}
hyper-util = { version = "0.1", default-features = false, features = ["tokio"], optional = true}

Expand Down
39 changes: 10 additions & 29 deletions broker/src/crypto.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
use std::{future::Future, mem::discriminant};
use std::{future::Future, mem::discriminant, sync::Arc};

use axum::{
async_trait,
http::{header, method, uri::Scheme, Method, Request, StatusCode, Uri},
};
use axum::http::{header, method, uri::Scheme, Method, Request, StatusCode, Uri};
use serde::{Deserialize, Serialize};
use shared::{
config,
crypto::{parse_crl, CertificateCache, CertificateCacheUpdate, GetCerts},
errors::SamplyBeamError,
http_client::{self, SamplyHttpClient}, openssl::x509::X509Crl, reqwest::{self, Url},
async_trait, config, crypto::{parse_crl, CertificateCache, CertificateCacheUpdate, GetCerts}, errors::SamplyBeamError, http_client::{self, SamplyHttpClient}, openssl::x509::X509Crl, reqwest::{self, Url}
};
use std::time::Duration;
use tokio::time::timeout;
use tokio::{sync::RwLock, time::timeout};
use tracing::{debug, error, warn, info};

use crate::health::{self, VaultStatus};
use crate::serve_health::{Health, VaultStatus};

pub struct GetCertsFromPki {
pki_realm: String,
hyper_client: SamplyHttpClient,
health_report_sender: tokio::sync::watch::Sender<health::VaultStatus>,
health: Arc<RwLock<Health>>,
}

#[derive(Debug, Deserialize, Clone, Hash)]
Expand All @@ -41,7 +35,7 @@ struct PkiListResponse {

impl GetCertsFromPki {
pub(crate) fn new(
health_report_sender: tokio::sync::watch::Sender<health::VaultStatus>,
health: Arc<RwLock<Health>>,
) -> Result<Self, SamplyBeamError> {
let mut certs: Vec<String> = Vec::new();
if let Some(dir) = &config::CONFIG_CENTRAL.tls_ca_certificates_dir {
Expand All @@ -67,19 +61,12 @@ impl GetCertsFromPki {
Ok(Self {
pki_realm,
hyper_client,
health_report_sender,
health,
})
}

async fn report_vault_health(&self, status: VaultStatus) {
self.health_report_sender.send_if_modified(|val| {
if discriminant(val) != discriminant(&status) {
*val = status;
true
} else {
false
}
});
self.health.write().await.vault = status;
}

pub(crate) async fn check_vault_health(&self) -> Result<(), SamplyBeamError> {
Expand Down Expand Up @@ -267,12 +254,6 @@ impl GetCerts for GetCertsFromPki {
}
}

pub(crate) fn build_cert_getter(
sender: tokio::sync::watch::Sender<VaultStatus>,
) -> Result<GetCertsFromPki, SamplyBeamError> {
GetCertsFromPki::new(sender)
}

pub(crate) fn pki_url_builder(location: &str) -> Url {
fn pki_url_builder(location: &str) -> Url {
config::CONFIG_CENTRAL.pki_address.join(&format!("/v1/{location}")).unwrap()
}
Loading
Loading