Skip to content

Commit

Permalink
apply rust 1.79 clippy lints
Browse files Browse the repository at this point in the history
  • Loading branch information
sebadob committed Jun 21, 2024
1 parent 4a9042b commit aff5afc
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 144 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "redhac"
version = "0.10.3"
version = "0.10.4"
edition = "2021"
rust-version = "1.70.0"
license = "Apache-2.0 OR MIT"
Expand Down
32 changes: 14 additions & 18 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use crate::quorum::{AckLevel, RpcServer, RpcServerState};
use crate::rpc::cache;
use crate::rpc::cache::cache_client::CacheClient;
use crate::rpc::cache::mgmt_ack::Method;
use crate::rpc::cache::{ack, cache_request, CacheRequest};
use crate::{get_cache_req_id, get_rand_between, CacheError, QuorumReq, TLS};
use cached::Cached;
use futures_util::TryStreamExt;
use lazy_static::lazy_static;
use std::env;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use cached::Cached;
use futures_util::TryStreamExt;
use lazy_static::lazy_static;
use tokio::sync::{mpsc, watch};
use tokio::{fs, time};
use tokio_stream::wrappers::ReceiverStream;
Expand All @@ -16,13 +21,6 @@ use tonic::{Request, Status};
use tower::util::ServiceExt;
use tracing::{debug, error, info, warn};

use crate::quorum::{AckLevel, RpcServer, RpcServerState};
use crate::rpc::cache;
use crate::rpc::cache::cache_client::CacheClient;
use crate::rpc::cache::mgmt_ack::Method;
use crate::rpc::cache::{ack, cache_request, CacheRequest};
use crate::{get_cache_req_id, get_rand_between, CacheError, QuorumReq, TLS};

lazy_static! {
static ref BUF_SIZE_CLIENT: usize = env::var("CACHE_BUF_CLIENT")
.unwrap_or_else(|_| String::from("64"))
Expand Down Expand Up @@ -360,19 +358,20 @@ async fn run_client(
});

debug!("Starting the Sending Stream to the Server");
if let Err(err)= tx_quorum
if let Err(err) = tx_quorum
.send_async(QuorumReq::UpdateServer {
server: server.clone(),
})
.await {
.await
{
// can fail in case of a conflict resolution, if the other side has just shut down
debug!("tx_quorum send error: {:?}", err);
callback_handle.abort();
time::sleep(Duration::from_millis(get_rand_between(
*RECONNECT_TIMEOUT_LOWER,
*RECONNECT_TIMEOUT_UPPER,
)))
.await;
.await;
continue;
}

Expand Down Expand Up @@ -696,10 +695,7 @@ async fn run_client(
match res_stream.try_next().await {
Ok(recv) => {
if recv.is_none() {
warn!(
"Lost connection to remote cache host {}",
host
);
warn!("Lost connection to remote cache host {}", host);
break;
}

Expand Down
219 changes: 99 additions & 120 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,25 +114,25 @@
//! The way it works:
//!
//! 1. **A node gets its own hostname from the OS**<br>
//! This is the reason, why you use a StatefulSet for the deployment, even without any volumes
//! attached. For a `StatefulSet` called `rauthy`, the replicas will always have the names `rauthy-0`,
//! `rauthy-1`, ..., which are at the same time the hostnames inside the pod.
//! This is the reason, why you use a StatefulSet for the deployment, even without any volumes
//! attached. For a `StatefulSet` called `rauthy`, the replicas will always have the names `rauthy-0`,
//! `rauthy-1`, ..., which are at the same time the hostnames inside the pod.
//! 2. **Find "me" inside the `HA_HOSTS` variable**<br>
//! If the hostname cannot be found in the `HA_HOSTS`, the application will panic and exit because
//! of a misconfiguration.
//! If the hostname cannot be found in the `HA_HOSTS`, the application will panic and exit because
//! of a misconfiguration.
//! 3. **Use the port from the "me"-Entry that was found for the server part**<br>
//! This means you do not need to specify the port in another variable which eliminates the risk of
//! having inconsistencies
//! or a bad config in that case.
//! This means you do not need to specify the port in another variable which eliminates the risk of
//! having inconsistencies
//! or a bad config in that case.
//! 4. **Extract "me" from the `HA_HOSTS`**<br>
//! then take the leftover nodes as all cache members and connect to them
//! then take the leftover nodes as all cache members and connect to them
//! 5. **Once a quorum has been reached, a leader will be elected**<br>
//! From that point on, the cache will start accepting requests
//! From that point on, the cache will start accepting requests
//! 6. **If the leader is lost - elect a new one - No values will be lost**
//! 7. **If quorum is lost, the cache will be invalidated**<br>
//! This happens for security reasons to provide cache inconsistencies. Better invalidate the cache
//! and fetch the values fresh from the DB or other cache members than working with possibly invalid
//! values, which is especially true in an authn / authz situation.
//! This happens for security reasons to provide cache inconsistencies. Better invalidate the
//! cache and fetch the values fresh from the DB or other cache members than working with
//! possibly invalid values, which is especially true in an authn / authz situation.
//!
//! **NOTE:**<br>
//! If you are in an environment where the described mechanism with extracting the hostname would
Expand Down Expand Up @@ -375,6 +375,7 @@
use crate::client::{cache_clients, RpcRequest};
use crate::quorum::{quorum_handler, QuorumReq};
pub use crate::quorum::{AckLevel, QuorumHealth, QuorumHealthState, QuorumState};
use crate::server::{CacheMap, RpcCacheService};
use bincode::ErrorKind;
use cached::Cached;
Expand All @@ -394,8 +395,6 @@ use tokio::sync::{mpsc, oneshot, watch};
use tokio::time;
use tracing::{debug, error, info, warn};

pub use crate::quorum::{AckLevel, QuorumHealth, QuorumHealthState, QuorumState};

mod client;
pub mod quorum;
#[allow(clippy::enum_variant_names)]
Expand Down Expand Up @@ -432,22 +431,20 @@ lazy_static! {
.expect("Cannot parse CACHE_MTLS to bool");
}

/**
This is a simple macro to get values from the cache and deserialize them properly at the same
time.
## Example usage:
```ignore
use redhac::{cache_get, cache_get_value, cache_get_from};
// cache_get!(<ReturnType>, <CacheNameAsString>, <KeyAsString>, &CacheConfig, <IFNotFoundLocally - DoRemoteLookip?>).await?;
cache_get!(String, "cache_name".to_string(), "key_name".to_string(), &cache_config, false).await?;
```
This would look for the entry `key_name` in the cache with the name `cache_name` and will try to
deserialize the value, if it is `Some(_)`, into a `String`. If it does not find it in
the local cache, it will not try to do a remote lookup on the other ha cache instances.<br />
The `&cache_config` is of type `&CacheConfig`, which is being created, when the caches are created.
*/
/// This is a simple macro to get values from the cache and deserialize them properly at the same
/// time.
///
/// ## Example usage:
/// ```ignore
/// use redhac::{cache_get, cache_get_value, cache_get_from};
/// // cache_get!(<ReturnType>, <CacheNameAsString>, <KeyAsString>, &CacheConfig, <IFNotFoundLocally - DoRemoteLookip?>).await?;
/// cache_get!(String, "cache_name".to_string(), "key_name".to_string(), &cache_config, false).await?;
/// ```
///
/// This would look for the entry `key_name` in the cache with the name `cache_name` and will try to
/// deserialize the value, if it is `Some(_)`, into a `String`. If it does not find it in
/// the local cache, it will not try to do a remote lookup on the other ha cache instances.<br />
/// The `&cache_config` is of type `&CacheConfig`, which is being created, when the caches are created.
#[macro_export]
macro_rules! cache_get {
($type:ty, $name:expr, $entry:expr, $config:expr, $lookup:expr) => {
Expand Down Expand Up @@ -502,13 +499,11 @@ pub struct CacheConfig {
}

impl CacheConfig {
/**
This returns a tuple with the first value being the watch receiver channel, which returns the
current `QuorumHealthState`.
This function looks for the `HA_MODE` environment variable and if it is `true`, will setup the
channels for HA communication, otherwise they will be `None`.
*/
/// This returns a tuple with the first value being the watch receiver channel, which returns the
/// current `QuorumHealthState`.
///
/// This function looks for the `HA_MODE` environment variable and if it is `true`, will setup the
/// channels for HA communication, otherwise they will be `None`.
pub fn new() -> (watch::Sender<Option<QuorumHealthState>>, Self) {
let cache_map = HashMap::new();
let (tx_watch, rx_watch) = watch::channel::<Option<QuorumHealthState>>(None);
Expand Down Expand Up @@ -684,17 +679,15 @@ pub enum CacheReq {
Reset,
}

/**
Gets values out of the cache.
This should not be used directly, but the `cache_get!` macro instead, which needs lees
boilerplate code. The [cache_get_from](cache_get_from), which deserializes values from the
cache, needs to exist on its own for different reasons.<br>
Everything should be pretty straight forward. If a value does not exist in the local cache,
for instance after a restart or late join, when `remote_lookup` is `true`, the function will try
to get the value from any other instance. This is useful, if you have values which only live
inside the cache.
*/
/// Gets values out of the cache.
///
/// This should not be used directly, but the `cache_get!` macro instead, which needs lees
/// boilerplate code. The [cache_get_from](cache_get_from), which deserializes values from the
/// cache, needs to exist on its own for different reasons.<br>
/// Everything should be pretty straight forward. If a value does not exist in the local cache,
/// for instance after a restart or late join, when `remote_lookup` is `true`, the function will try
/// to get the value from any other instance. This is useful, if you have values which only live
/// inside the cache.
pub async fn cache_get_value(
cache_name: String,
entry: String,
Expand Down Expand Up @@ -775,11 +768,9 @@ pub async fn cache_get_value(
}
}

/**
This is the deserializer function for cached values. If you do not need deserialization, you can
skip using the `cache_get!` macro and only use [cache_get_value](cache_get_value) to not
deserialize.
*/
/// This is the deserializer function for cached values. If you do not need deserialization, you can
/// skip using the `cache_get!` macro and only use [cache_get_value](cache_get_value) to not
/// deserialize.
pub async fn cache_get_from<'a, T>(value: &'a [u8]) -> Result<Option<T>, CacheError>
where
T: Debug + serde::Deserialize<'a>,
Expand All @@ -790,17 +781,15 @@ where
Ok(Some(res))
}

/**
Put values into the cache.
This is a direct put, which means that if the cache is running in a HA cluster, the leader
will be ignored and the update will be sent to all participants directly. This could produce conflicts
of course, if 2 hosts modify the same key in the same cache at the exact same time.
However, if you are inserting a newly generated random key for instance, which cannot conflict with
another host, or if you are sure, that this cannot happen, it gives a huge performance and latency
boost compared to `cache_insert`.
*/
/// Put values into the cache.
///
/// This is a direct put, which means that if the cache is running in a HA cluster, the leader
/// will be ignored and the update will be sent to all participants directly. This could produce conflicts
/// of course, if 2 hosts modify the same key in the same cache at the exact same time.
///
/// However, if you are inserting a newly generated random key for instance, which cannot conflict with
/// another host, or if you are sure, that this cannot happen, it gives a huge performance and latency
/// boost compared to `cache_insert`.
pub async fn cache_put<T>(
cache_name: String,
entry: String,
Expand Down Expand Up @@ -854,23 +843,21 @@ where
Ok(())
}

/**
The HA pendant to [cache_put](cache_put) - defaults to [cache_put](cache_put) in non-HA mode
This makes Put's safe and conflict free. If the current host is the leader, it will push the cache
operation to all nodes, if it is a Follower, the request will be forwarded to the current leader to
avoid any conflicts, if it could happen, that 2 hosts modify the same key in the same cache at the
exact same time.
The different [AckLevel](AckLevel)'s will provide different levels of safety vs performance.<br>
For instance, if you request an ack from at least "quorum" nodes, it means that the value will be
persisted, even if the cache will end up in split brain mode. However, this will provide the least
amount of performance of course.
If `HA_MODE` is not active, this function will default back to the faster [cache_put](cache_put).
This makes it possible to use `cache_insert` in a HA context with the given [AckLevel](AckLevel)
and just have the direct insert otherwise.
*/
/// The HA pendant to [cache_put](cache_put) - defaults to [cache_put](cache_put) in non-HA mode
///
/// This makes Put's safe and conflict free. If the current host is the leader, it will push the cache
/// operation to all nodes, if it is a Follower, the request will be forwarded to the current leader to
/// avoid any conflicts, if it could happen, that 2 hosts modify the same key in the same cache at the
/// exact same time.
///
/// The different [AckLevel](AckLevel)'s will provide different levels of safety vs performance.<br>
/// For instance, if you request an ack from at least "quorum" nodes, it means that the value will be
/// persisted, even if the cache will end up in split brain mode. However, this will provide the least
/// amount of performance of course.
///
/// If `HA_MODE` is not active, this function will default back to the faster [cache_put](cache_put).
/// This makes it possible to use `cache_insert` in a HA context with the given [AckLevel](AckLevel)
/// and just have the direct insert otherwise.
pub async fn cache_insert<T>(
cache_name: String,
entry: String,
Expand Down Expand Up @@ -1087,10 +1074,8 @@ pub(crate) async fn insert_from_leader(
Ok(true)
}

/**
Deletes a value from the cache. Will return immediately with `Ok(())` if quorum is Bad in `HA_MODE`.
Does ignore the quorum health state, if the cache is running in HA mode.
*/
/// Deletes a value from the cache. Will return immediately with `Ok(())` if quorum is Bad in `HA_MODE`.
/// Does ignore the quorum health state, if the cache is running in HA mode.
pub async fn cache_del(
cache_name: String,
entry: String,
Expand Down Expand Up @@ -1131,18 +1116,16 @@ pub async fn cache_del(
Ok(())
}

/**
The HA pendant to [cache_del](cache_del) - defaults to [cache_del](cache_del) in non-HA mode
This is the HA version of [cache_del](cache_del). It works like [cache_insert](cache_insert), just
for deletions. Values are removed from the cache and requests are routed over the current leader to
avoid possible conflicts. This is of course less performing than the direct [cache_del](cache_del),
which should be favored, if this works out for you.
If `HA_MODE` is not active, this function will default back to the faster [cache_del](cache_del).
This makes it possible to use `cache_remove` in a HA context with the given [AckLevel](AckLevel)
and just have the direct delete otherwise.
*/
/// The HA pendant to [cache_del](cache_del) - defaults to [cache_del](cache_del) in non-HA mode
///
/// This is the HA version of [cache_del](cache_del). It works like [cache_insert](cache_insert), just
/// for deletions. Values are removed from the cache and requests are routed over the current leader to
/// avoid possible conflicts. This is of course less performing than the direct [cache_del](cache_del),
/// which should be favored, if this works out for you.
///
/// If `HA_MODE` is not active, this function will default back to the faster [cache_del](cache_del).
/// This makes it possible to use `cache_remove` in a HA context with the given [AckLevel](AckLevel)
/// and just have the direct delete otherwise.
pub async fn cache_remove(
cache_name: String,
entry: String,
Expand Down Expand Up @@ -1387,21 +1370,19 @@ pub async fn clear_caches(cache_config: &CacheConfig) -> Result<(), CacheError>
Ok(())
}

/**
The main function to start the whole backend when `HA_MODE` == true. It cares about starting the
server and one client for each remote server configured in `HA_HOSTS`, as well as the internal
'quorum_handler'.
# Panics
- If a bad [CacheConfig](CacheConfig) was given, in which the channels needed for HA communication
are `None`. This cannot happen, if [CacheConfig::new](CacheConfig::new) is used for the
initialization.
- If `HA_HOSTS` has a bad format and if the hostname of the current instance does not appear in it
and IP addresses are used instead. Currently, the cache instances must contain their hostnames
in the DNS address somehow. This function executes a `.contains` on the CSV to decide, which of
the hosts in the list this very instance is.<br>
You can overwrite the current hostname to make this check pass with the `HOSTNAME_OVERWRITE` env var.
*/
/// The main function to start the whole backend when `HA_MODE` == true. It cares about starting the
/// server and one client for each remote server configured in `HA_HOSTS`, as well as the internal
/// 'quorum_handler'.
///
/// # Panics
/// - If a bad [CacheConfig](CacheConfig) was given, in which the channels needed for HA communication
/// are `None`. This cannot happen, if [CacheConfig::new](CacheConfig::new) is used for the
/// initialization.
/// - If `HA_HOSTS` has a bad format and if the hostname of the current instance does not appear in it
/// and IP addresses are used instead. Currently, the cache instances must contain their hostnames
/// in the DNS address somehow. This function executes a `.contains` on the CSV to decide, which of
/// the hosts in the list this very instance is.<br>
/// You can overwrite the current hostname to make this check pass with the `HOSTNAME_OVERWRITE` env var.
pub async fn start_cluster(
tx_watch: watch::Sender<Option<QuorumHealthState>>,
cache_config: &mut CacheConfig,
Expand Down Expand Up @@ -1521,13 +1502,11 @@ pub async fn start_cluster(
Ok(())
}

/**
The main function to start up a new cache handler.<br />
- `cache` accepts any of the Cache structs from the [Cached](https://crates.io/crates/cached) crate.
This means, if you need caches with different configs, just start up a new `cache_recv`.<br />
- `name` is only used for logging and debugging.
- `rx` for [CacheReq](CacheReq)s
*/
/// The main function to start up a new cache handler.<br />
/// - `cache` accepts any of the Cache structs from the [Cached](https://crates.io/crates/cached) crate.
/// This means, if you need caches with different configs, just start up a new `cache_recv`.<br />
/// - `name` is only used for logging and debugging.
/// - `rx` for [CacheReq](CacheReq)s
pub(crate) async fn cache_recv<C>(mut cache: C, name: String, rx: flume::Receiver<CacheReq>)
where
C: Cached<String, Vec<u8>>,
Expand Down
Loading

0 comments on commit aff5afc

Please sign in to comment.