Skip to content

Commit

Permalink
Make Config::clusters dynamic (#1125)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle authored Feb 20, 2025
1 parent 96e3bd0 commit d7a3ed8
Show file tree
Hide file tree
Showing 24 changed files with 324 additions and 184 deletions.
2 changes: 1 addition & 1 deletion benches/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ impl QuilkinLoop {
let thread = spawn("quilkin", move || {
let runtime = tokio::runtime::Runtime::new().unwrap();
let config = Arc::new(quilkin::Config::default_non_agent());
config.clusters.modify(|clusters| {
config.dyn_cfg.clusters().unwrap().modify(|clusters| {
clusters
.insert_default([quilkin::net::endpoint::Endpoint::new(endpoint.into())].into())
});
Expand Down
4 changes: 3 additions & 1 deletion crates/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,9 @@ impl Pail {

if !endpoints.is_empty() {
config
.clusters
.dyn_cfg
.clusters()
.unwrap()
.modify(|clusters| clusters.insert_default(endpoints));
}

Expand Down
8 changes: 6 additions & 2 deletions crates/test/tests/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ trace_test!(uring_receiver, {

let config = std::sync::Arc::new(quilkin::Config::default_non_agent());
config
.clusters
.dyn_cfg
.clusters()
.unwrap()
.modify(|clusters| clusters.insert_default([endpoint.into()].into()));

let socket = sb.client();
Expand Down Expand Up @@ -133,7 +135,9 @@ trace_test!(

let config = std::sync::Arc::new(quilkin::Config::default_non_agent());
config
.clusters
.dyn_cfg
.clusters()
.unwrap()
.modify(|clusters| clusters.insert_default([endpoint.into()].into()));

let pending_sends: Vec<_> = [
Expand Down
10 changes: 7 additions & 3 deletions src/cli/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,13 @@ impl Service {
let filters = config
.dyn_cfg
.filters()
.clone()
.context("XDP requires a filter chain")?;
let clusters = config.clusters.clone();
.context("XDP requires a filter chain")?
.clone();
let clusters = config
.dyn_cfg
.clusters()
.context("XDP requires a cluster map")?
.clone();

let config = crate::net::xdp::process::ConfigState { filters, clusters };

Expand Down
8 changes: 5 additions & 3 deletions src/components/manage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ impl Manage {
mut shutdown_rx,
}: RunArgs<Ready>,
) -> crate::Result<()> {
let Some(clusters) = config.dyn_cfg.clusters() else {
eyre::bail!("clusters were not configured, this is a configuration issue");
};

if let Some(locality) = &self.locality {
config
.clusters
.modify(|map| map.update_unlocated_endpoints(locality.clone()));
clusters.modify(|map| map.update_unlocated_endpoints(locality.clone()));
}

let provider_task = match self.provider {
Expand Down
8 changes: 6 additions & 2 deletions src/components/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ impl Proxy {
})
});

let Some(clusters) = config.dyn_cfg.clusters() else {
eyre::bail!("empty clusters were not created")
};

if !self.to.is_empty() {
let endpoints = if let Some(tt) = &self.to_tokens {
let (unique, overflow) = 256u64.overflowing_pow(tt.length as _);
Expand Down Expand Up @@ -192,12 +196,12 @@ impl Proxy {
.collect()
};

config.clusters.modify(|clusters| {
clusters.modify(|clusters| {
clusters.insert(None, endpoints);
});
}

if !config.clusters.read().has_endpoints() && self.management_servers.is_empty() {
if !clusters.read().has_endpoints() && self.management_servers.is_empty() {
return Err(eyre::eyre!(
"`quilkin proxy` requires at least one `to` address or `management_server` endpoint."
));
Expand Down
10 changes: 7 additions & 3 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,16 @@ impl<P: PacketMut> DownstreamPacket<P> {
sessions: &S,
destinations: &mut Vec<crate::net::EndpointAddress>,
) -> Result<(), PipelineError> {
if !config.clusters.read().has_endpoints() {
let Some(clusters) = config
.dyn_cfg
.clusters()
.filter(|c| c.read().has_endpoints())
else {
tracing::trace!("no upstream endpoints");
return Err(PipelineError::NoUpstreamEndpoints);
}
};

let cm = config.clusters.clone_value();
let cm = clusters.clone_value();
let Some(filters) = config.dyn_cfg.filters() else {
return Err(PipelineError::Filter(crate::filters::FilterError::Custom(
"no filters loaded",
Expand Down
Loading

0 comments on commit d7a3ed8

Please sign in to comment.