From 9a51db62f17992360a8c7009867b2e74087bd6f7 Mon Sep 17 00:00:00 2001 From: Cappy Ishihara Date: Sat, 11 Jan 2025 04:32:30 +0700 Subject: [PATCH] Implement a locked cache for assigned exit nodes, Add exit node assignment check to local exit node selection logic. (Properly fixes #143) --- src/daemon.rs | 203 ++++++++++++++++++++++++++++++++++------------ src/deployment.rs | 8 +- src/error.rs | 3 + src/main.rs | 12 ++- 4 files changed, 170 insertions(+), 56 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 2798fc9..fad342e 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -76,7 +76,7 @@ pub struct Context { pub client: Client, // Let's implement a lock here to prevent multiple reconciles assigning the same exit node // to multiple services implicitly (#143) - pub exit_node_lock: tokio::sync::Mutex<()>, + pub exit_node_lock: Arc>>, } /// Parses the `query` string to extract the namespace and name. @@ -168,58 +168,129 @@ async fn check_service_managed(service: &Service) -> bool { const OPERATOR_CLASS: &str = "chisel-operator.io/chisel-operator-class"; const OPERATOR_MANAGER: &str = "chisel-operator"; +const BACKOFF_TIME_SECS: u64 = 5; + +async fn find_free_exit_nodes(ctx: Arc) -> Result, ReconcileError> { + let svc_api: Api = Api::all(ctx.client.clone()); + let exit_node_api: Api = Api::all(ctx.client.clone()); + + let svc_list = svc_api.list(&ListParams::default().timeout(30)).await?; + let exit_node_list = exit_node_api + .list(&ListParams::default().timeout(30)) + .await?; + + let svc_list_filtered = svc_list + .items + .into_iter() + .flat_map(|svc| { + svc.status + .and_then(|status| status.load_balancer) + .and_then(|lb| lb.ingress) + .and_then(|ingress| ingress.first().cloned()) + .and_then(|ingress| ingress.ip) + // .map(|ip| ip) + }) + .collect::>(); + + let exit_node_list_filtered = exit_node_list.items.into_iter().filter(|node| { + let host = node.get_host(); + !svc_list_filtered.contains(&host) + }); + + Ok(exit_node_list_filtered.collect()) +} + #[instrument(skip(ctx))] async fn select_exit_node_local( ctx: &Arc, service: &Service, ) -> Result { // Lock to prevent race conditions when assigning exit nodes to services - let _lock = ctx.exit_node_lock.lock().await; + let mut lock = match ctx.exit_node_lock.try_lock() { + Ok(lock) => lock, + Err(_) => { + warn!("Exit node lock is already held, requeuing"); + return Err(ReconcileError::NoAvailableExitNodes); + } + }; // if service has label with exit node name, use that and error if not found - if let Some(exit_node_name) = service - .metadata - .labels - .as_ref() - .and_then(|labels| labels.get(EXIT_NODE_NAME_LABEL)) - { - info!( - ?exit_node_name, - "Service explicitly set to use a named exit node, using that" - ); - find_exit_node_from_label( - ctx.clone(), - exit_node_name, - &service.namespace().expect("Service namespace not found"), - ) - .await - .ok_or(ReconcileError::NoAvailableExitNodes) - } else { - // otherwise, use the first available exit node - // (one to one mapping) - let nodes: Api = Api::all(ctx.client.clone()); - let node_list: kube::core::ObjectList = - nodes.list(&ListParams::default().timeout(30)).await?; - node_list - .items - .into_iter() - .filter(|node| { - let is_cloud_provisioned = node - .metadata - .annotations - .as_ref() - .map(|annotations: &BTreeMap| { - annotations.contains_key(EXIT_NODE_PROVISIONER_LABEL) - }) - .unwrap_or(false); - - // Is the ExitNode not cloud provisioned or is the status set? - !is_cloud_provisioned || node.status.is_some() - }) - .collect::>() - .first() + let exit_node_selection = { + if let Some(exit_node_name) = service + .metadata + .labels + .as_ref() + .and_then(|labels| labels.get(EXIT_NODE_NAME_LABEL)) + { + info!( + ?exit_node_name, + "Service explicitly set to use a named exit node, using that" + ); + find_exit_node_from_label( + ctx.clone(), + exit_node_name, + &service.namespace().expect("Service namespace not found"), + ) + .await .ok_or(ReconcileError::NoAvailableExitNodes) - .cloned() - } + } else { + // otherwise, use the first available exit node + // (one to one mapping) + // let nodes: Api = Api::all(ctx.client.clone()); + // let node_list: kube::core::ObjectList = + // nodes.list(&ListParams::default().timeout(30)).await?; + let node_list = find_free_exit_nodes(ctx.clone()).await?; + debug!(?node_list, "Exit node list"); + node_list + .into_iter() + .filter(|node| { + let is_cloud_provisioned = node + .metadata + .annotations + .as_ref() + .map(|annotations: &BTreeMap| { + annotations.contains_key(EXIT_NODE_PROVISIONER_LABEL) + }) + .unwrap_or(false); + + // Is the ExitNode not cloud provisioned or is the status set? + !is_cloud_provisioned || node.status.is_some() + }) + .filter(|node| { + // debug!(?node, "Checking exit node"); + let host = node.get_host(); + if let Some((instant, ip_filter)) = lock.as_ref() { + // Skip this exit node if it was recently assigned and the backoff period hasn't elapsed + if instant.elapsed().as_secs() < BACKOFF_TIME_SECS { + host != *ip_filter + } else { + true + } + } else { + // No lock present, this exit node is available + true + } + }) + .collect::>() + .first() + .ok_or(ReconcileError::NoAvailableExitNodes) + .cloned() + } + }; + // .inspect(|node| { + // let exit_node_ip = node.get_host(); + // debug!(?exit_node_ip, "Selected exit node"); + // drop(lock); + // }) + + // Add the selected exit node to the lock, with the current time and hostname + // This will prevent other services within the backoff period from selecting the same exit node + // Fixes #143 by filtering out exit nodes that were recently assigned + // when applying multiple objects in parallel + exit_node_selection.inspect(|node| { + let exit_node_ip = node.get_host(); + debug!(?exit_node_ip, "Selected exit node"); + *lock = Some((std::time::Instant::now(), node.get_host())); + }) } #[instrument(skip(ctx))] @@ -309,7 +380,7 @@ async fn exit_node_for_service( } // #[instrument(skip(ctx), fields(trace_id))] /// Reconcile cluster state -#[instrument(skip(ctx))] +#[instrument(skip(ctx, obj))] async fn reconcile_svcs(obj: Arc, ctx: Arc) -> Result { // Return if service is not LoadBalancer or if the loadBalancerClass is not blank or set to $OPERATOR_CLASS @@ -337,8 +408,36 @@ async fn reconcile_svcs(obj: Arc, ctx: Arc) -> Result = Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap()); let nodes: Api = Api::all(ctx.client.clone()); + // --- Let's skip reconciling services whose exit node IP addresses still exit in the cluster + // only list IP addresses of exit nodes + let nodes_by_ip: BTreeMap = nodes + .list(&ListParams::default().timeout(30)) + .await? + .items + .into_iter() + .filter_map(|node| { + let host = node.get_host(); + if let Some(_status) = &node.status { + Some((host, node)) + } else { + None + } + }) + .collect(); + let mut svc = services.get_status(&obj.name_any()).await?; + let svc_lb_ip = svc + .status + .as_ref() + .and_then(|status| status.load_balancer.as_ref()) + .and_then(|lb| lb.ingress.as_ref()) + .and_then(|ingress| ingress.first()) + .and_then(|ingress| ingress.ip.clone()) + .unwrap_or_default(); + + let existing_bound_node = nodes_by_ip.get(&svc_lb_ip); + let obj = svc.clone(); let node_list = nodes.list(&ListParams::default().timeout(30)).await?; @@ -373,7 +472,9 @@ async fn reconcile_svcs(obj: Arc, ctx: Arc) -> Result, ctx: Arc) -> Result { info!("exit node reconcile request: {}", obj.name_any()); let is_managed = check_exit_node_managed(&obj).await; @@ -716,6 +817,8 @@ pub async fn run() -> color_eyre::Result<()> { let mut reconcilers = vec![]; + let lock = Arc::new(tokio::sync::Mutex::new(None)); + info!("Starting reconcilers..."); // TODO: figure out how to do this in a single controller because there is a potential race where the exit node reconciler runs at the same time as the service one @@ -743,7 +846,7 @@ pub async fn run() -> color_eyre::Result<()> { error_policy, Arc::new(Context { client: client.clone(), - exit_node_lock: tokio::sync::Mutex::new(()), + exit_node_lock: lock.clone(), }), ) .for_each(|_| futures::future::ready(())) @@ -771,7 +874,7 @@ pub async fn run() -> color_eyre::Result<()> { error_policy_exit_node, Arc::new(Context { client, - exit_node_lock: tokio::sync::Mutex::new(()), + exit_node_lock: lock, }), ) .for_each(|_| futures::future::ready(())) diff --git a/src/deployment.rs b/src/deployment.rs index 75314a9..0b74386 100644 --- a/src/deployment.rs +++ b/src/deployment.rs @@ -13,7 +13,7 @@ use k8s_openapi::{ apimachinery::pkg::apis::meta::v1::LabelSelector, }; use kube::{api::ResourceExt, core::ObjectMeta, error::ErrorResponse, Resource}; -use tracing::{debug, info, instrument}; +use tracing::{info, instrument, trace}; const CHISEL_IMAGE: &str = "jpillora/chisel"; @@ -69,7 +69,7 @@ pub fn generate_remote_arg(node: &ExitNode) -> String { let host = node.get_host(); - debug!(host = ?host, "Host"); + trace!(host = ?host, "Host"); // Determine if the host is an IPv6 address and format accordingly let formatted_host = match host.parse::() { @@ -78,7 +78,7 @@ pub fn generate_remote_arg(node: &ExitNode) -> String { }; let output = format!("{}:{}", formatted_host, node.spec.port); - debug!(output = ?output, "Output"); + trace!(output = ?output, "Output"); output } @@ -134,7 +134,7 @@ pub fn generate_tunnel_args(svc: &Service) -> Result, ReconcileError .collect(); info!("Generated arguments: {:?}", ports); - debug!(svc = ?svc, "Source service"); + trace!(svc = ?svc, "Source service"); Ok(ports) } diff --git a/src/error.rs b/src/error.rs index 59ec112..0016df0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -8,6 +8,9 @@ pub enum ReconcileError { #[error("There are no exit nodes available to assign")] NoAvailableExitNodes, + #[error("Exit Node being already assigned and its backoff time has not expired")] + ExitNodeBackoff, + #[error("There are no ports set on this LoadBalancer")] NoPortsSet, diff --git a/src/main.rs b/src/main.rs index 2b2e273..5d804a6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -54,11 +54,19 @@ async fn main() -> Result<()> { let logfmt_logger = tracing_logfmt::layer().boxed(); - let pretty_logger = tracing_subscriber::fmt::layer().pretty().boxed(); + let pretty_logger = tracing_subscriber::fmt::layer() + .pretty() + .with_thread_ids(true) + .with_thread_names(true) + .boxed(); let json_logger = tracing_subscriber::fmt::layer().json().boxed(); - let compact_logger = tracing_subscriber::fmt::layer().compact().boxed(); + let compact_logger = tracing_subscriber::fmt::layer() + .compact() + .with_thread_ids(true) + .with_thread_names(true) + .boxed(); let logger = match logger_env.as_str() { "logfmt" => logfmt_logger,