Skip to content

Commit

Permalink
Exploring shared cache based on multi reflectors
Browse files Browse the repository at this point in the history
Signed-off-by: Danil-Grigorev <[email protected]>
  • Loading branch information
Danil-Grigorev committed Feb 13, 2025
1 parent 550e50f commit f26a217
Show file tree
Hide file tree
Showing 14 changed files with 430 additions and 188 deletions.
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ws = ["kube/ws"]
latest = ["k8s-openapi/latest"]

[dev-dependencies]
parking_lot.workspace = true
tokio-util.workspace = true
assert-json-diff.workspace = true
garde = { version = "0.22.0", default-features = false, features = ["derive"] }
Expand Down
192 changes: 100 additions & 92 deletions examples/multi_reflector.rs
Original file line number Diff line number Diff line change
@@ -1,74 +1,80 @@
use futures::{future, StreamExt};
use futures::{future, stream, StreamExt};
use k8s_openapi::api::{
apps::v1::Deployment,
core::v1::{ConfigMap, Secret},
};
use kube::{
runtime::{
reflector,
reflector::{ObjectRef, Store},
watcher, WatchStreamExt,
},
Api, Client,
api::{ApiResource, DynamicObject, GroupVersionKind},
core::TypedResource,
runtime::{reflector::store::CacheWriter, watcher, WatchStreamExt},
Api, Client, Resource,
};
use parking_lot::RwLock;
use serde::de::DeserializeOwned;
use std::sync::Arc;
use tracing::*;

// This does not work because Resource trait is not dyn safe.
/*
use std::any::TypeId;
use std::collections::HashMap;
use k8s_openapi::NamespaceResourceScope;
use kube::api::{Resource, ResourceExt};
struct MultiStore {
stores: HashMap<TypeId, Store<dyn Resource<DynamicType = (), Scope = NamespaceResourceScope>>>,
}
impl MultiStore {
fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<K>> {
let oref = ObjectRef::<K>::new(name).within(ns);
if let Some(store) = self.stores.get(&TypeId::of::<K>()) {
store.get(oref)
} else {
None
}
}
}*/

// explicit store can work
struct MultiStore {
deploys: Store<Deployment>,
cms: Store<ConfigMap>,
secs: Store<Secret>,
type Cache = Arc<RwLock<HashMap<LookupKey, Arc<DynamicObject>>>>;

#[derive(Default, Clone, Hash, PartialEq, Eq, Debug)]
struct LookupKey {
gvk: GroupVersionKind,
name: Option<String>,
namespace: Option<String>,
}
// but using generics to help out won't because the K needs to be concretised
/*
impl MultiStore {
fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<Option<K>>> {
let oref = ObjectRef::<K>::new(name).within(ns);
let kind = K::kind(&()).to_owned();
match kind.as_ref() {
"Deployment" => self.deploys.get(&ObjectRef::new(name).within(ns)),
"ConfigMap" => self.cms.get(&ObjectRef::new(name).within(ns)),
"Secret" => self.secs.get(&ObjectRef::new(name).within(ns)),
_ => None,

impl LookupKey {
fn new<R: TypedResource>(resource: &R) -> LookupKey {
let meta = resource.meta();
LookupKey {
gvk: resource.gvk(),
name: meta.name.clone(),
namespace: meta.namespace.clone(),
}
None
}
}
*/
// so left with this

impl MultiStore {
fn get_deploy(&self, name: &str, ns: &str) -> Option<Arc<Deployment>> {
self.deploys.get(&ObjectRef::<Deployment>::new(name).within(ns))
}
#[derive(Default, Clone)]
struct MultiCache {
store: Cache,
}

fn get_secret(&self, name: &str, ns: &str) -> Option<Arc<Secret>> {
self.secs.get(&ObjectRef::<Secret>::new(name).within(ns))
impl MultiCache {
fn get<K: Resource<DynamicType = impl Default> + DeserializeOwned + Clone>(
&self,
name: &str,
ns: &str,
) -> Option<Arc<K>> {
let obj = self
.store
.read()
.get(&LookupKey {
gvk: K::gvk(&Default::default()),
name: Some(name.into()),
namespace: if !ns.is_empty() { Some(ns.into()) } else { None },
})?
.as_ref()
.clone();
obj.try_parse().ok().map(Arc::new)
}
}

fn get_cm(&self, name: &str, ns: &str) -> Option<Arc<ConfigMap>> {
self.cms.get(&ObjectRef::<ConfigMap>::new(name).within(ns))
impl CacheWriter<DynamicObject> for MultiCache {
/// Applies a single watcher event to the store
fn apply_watcher_event(&mut self, event: &watcher::Event<DynamicObject>) {
match event {
watcher::Event::Init | watcher::Event::InitDone => {}
watcher::Event::Delete(obj) => {
self.store.write().remove(&LookupKey::new(obj));
}
watcher::Event::InitApply(obj) | watcher::Event::Apply(obj) => {
self.store
.write()
.insert(LookupKey::new(obj), Arc::new(obj.clone()));
}
}
}
}

Expand All @@ -77,60 +83,62 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;

let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
let cms: Api<ConfigMap> = Api::default_namespaced(client.clone());
let secret: Api<Secret> = Api::default_namespaced(client.clone());
// multistore
let mut combo_stream = stream::select_all(vec![]);
combo_stream.push(
watcher::watcher(
Api::all_with(client.clone(), &ApiResource::erase::<Deployment>(&())),
Default::default(),
)
.boxed(),
);
combo_stream.push(
watcher::watcher(
Api::all_with(client.clone(), &ApiResource::erase::<ConfigMap>(&())),
Default::default(),
)
.boxed(),
);

let (dep_reader, dep_writer) = reflector::store::<Deployment>();
let (cm_reader, cm_writer) = reflector::store::<ConfigMap>();
let (sec_reader, sec_writer) = reflector::store::<Secret>();
// // Duplicate streams with narrowed down selection
combo_stream.push(
watcher::watcher(
Api::default_namespaced_with(client.clone(), &ApiResource::erase::<Secret>(&())),
Default::default(),
)
.boxed(),
);
combo_stream.push(
watcher::watcher(
Api::all_with(client.clone(), &ApiResource::erase::<Secret>(&())),
Default::default(),
)
.boxed(),
);

let cfg = watcher::Config::default();
let dep_watcher = watcher(deploys, cfg.clone())
.reflect(dep_writer)
.applied_objects()
.for_each(|_| future::ready(()));
let cm_watcher = watcher(cms, cfg.clone())
.reflect(cm_writer)
let multi_writer = MultiCache::default();
let watcher = combo_stream
.reflect(multi_writer.clone())
.applied_objects()
.for_each(|_| future::ready(()));
let sec_watcher = watcher(secret, cfg)
.reflect(sec_writer)
.applied_objects()
.for_each(|_| future::ready(()));
// poll these forever

// multistore
let stores = MultiStore {
deploys: dep_reader,
cms: cm_reader,
secs: sec_reader,
};

// simulate doing stuff with the stores from some other thread
tokio::spawn(async move {
// Show state every 5 seconds of watching
info!("waiting for them to be ready");
stores.deploys.wait_until_ready().await.unwrap();
stores.cms.wait_until_ready().await.unwrap();
stores.secs.wait_until_ready().await.unwrap();
info!("stores initialised");
// can use helper accessors
info!(
"common cm: {:?}",
stores.get_cm("kube-root-ca.crt", "kube-system").unwrap()
);
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
info!("cache content: {:?}", multi_writer.store.read().keys());
info!(
"common cm: {:?}",
multi_writer.get::<ConfigMap>("kube-root-ca.crt", "kube-system")
);
// access individual sub stores
info!("Current deploys count: {}", stores.deploys.state().len());
info!("Current objects count: {}", multi_writer.store.read().len());
}
});
// info!("long watches starting");
info!("long watches starting");
tokio::select! {
r = dep_watcher => println!("dep watcher exit: {r:?}"),
r = cm_watcher => println!("cm watcher exit: {r:?}"),
r = sec_watcher => println!("sec watcher exit: {r:?}"),
r = watcher => println!("watcher exit: {r:?}"),
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion kube-core/src/gvk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use thiserror::Error;
pub struct ParseGroupVersionError(pub String);

/// Core information about an API Resource.
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct GroupVersionKind {
/// API group
pub group: String,
Expand Down
2 changes: 1 addition & 1 deletion kube-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub mod gvk;
pub use gvk::{GroupVersion, GroupVersionKind, GroupVersionResource};

pub mod metadata;
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta};
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta, TypedResource};

pub mod labels;

Expand Down
Loading

0 comments on commit f26a217

Please sign in to comment.