Skip to content

Commit

Permalink
Exploring shared cache based on multi reflectors
Browse files Browse the repository at this point in the history
Signed-off-by: Danil-Grigorev <[email protected]>
  • Loading branch information
Danil-Grigorev committed Feb 10, 2025
1 parent 550e50f commit 450df09
Show file tree
Hide file tree
Showing 15 changed files with 500 additions and 208 deletions.
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ws = ["kube/ws"]
latest = ["k8s-openapi/latest"]

[dev-dependencies]
parking_lot.workspace = true
tokio-util.workspace = true
assert-json-diff.workspace = true
garde = { version = "0.22.0", default-features = false, features = ["derive"] }
Expand Down
186 changes: 97 additions & 89 deletions examples/multi_reflector.rs
Original file line number Diff line number Diff line change
@@ -1,74 +1,99 @@
use futures::{future, StreamExt};
use futures::{future, stream, StreamExt};
use k8s_openapi::api::{
apps::v1::Deployment,
core::v1::{ConfigMap, Secret},
};
use kube::{
api::{ApiResource, DynamicObject, GroupVersionKind},
core::TypedResource,
runtime::{
reflector,
reflector::{ObjectRef, Store},
watcher, WatchStreamExt,
reflector::{
store::{CacheWriter, Writer},
ObjectRef, Store,
},
watcher::{self, dynamic_watcher},
WatchStreamExt,
},
Api, Client,
Api, Client, Resource,
};
use parking_lot::RwLock;
use serde::de::DeserializeOwned;
use std::sync::Arc;
use tracing::*;

// This does not work because Resource trait is not dyn safe.
/*
use std::any::TypeId;
use std::collections::HashMap;
use k8s_openapi::NamespaceResourceScope;
use kube::api::{Resource, ResourceExt};
struct MultiStore {
stores: HashMap<TypeId, Store<dyn Resource<DynamicType = (), Scope = NamespaceResourceScope>>>,
}
impl MultiStore {
fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<K>> {
let oref = ObjectRef::<K>::new(name).within(ns);
if let Some(store) = self.stores.get(&TypeId::of::<K>()) {
store.get(oref)
} else {
None
}
}
}*/

// explicit store can work
type Cache = Arc<RwLock<HashMap<GroupVersionKind, Writer<DynamicObject>>>>;

#[derive(Default, Clone, Copy)]
struct MultiWriter {
store: Cache,
buffer: Cache,
}

#[derive(Default, Clone)]
struct MultiStore {
deploys: Store<Deployment>,
cms: Store<ConfigMap>,
secs: Store<Secret>,
store: Cache,
}
// but using generics to help out won't because the K needs to be concretised
/*
impl MultiStore {
fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<Option<K>>> {
let oref = ObjectRef::<K>::new(name).within(ns);
let kind = K::kind(&()).to_owned();
match kind.as_ref() {
"Deployment" => self.deploys.get(&ObjectRef::new(name).within(ns)),
"ConfigMap" => self.cms.get(&ObjectRef::new(name).within(ns)),
"Secret" => self.secs.get(&ObjectRef::new(name).within(ns)),
_ => None,
}
None

impl MultiWriter {
fn as_reader(&self) -> MultiStore {
MultiStore { store: self.store }
}
}
*/
// so left with this

impl MultiStore {
fn get_deploy(&self, name: &str, ns: &str) -> Option<Arc<Deployment>> {
self.deploys.get(&ObjectRef::<Deployment>::new(name).within(ns))
fn get<K: Resource<DynamicType = impl Default> + DeserializeOwned + Clone>(
&self,
name: &str,
ns: &str,
) -> Option<Arc<K>> {
let oref = ObjectRef::<K>::new(name).within(ns).erase();
let store = self.get_store::<K>()?;
let obj = store.get(&oref)?.as_ref().clone();
obj.try_parse().ok().map(Arc::new)
}

fn get_secret(&self, name: &str, ns: &str) -> Option<Arc<Secret>> {
self.secs.get(&ObjectRef::<Secret>::new(name).within(ns))
fn get_store<K: Resource<DynamicType = impl Default> + DeserializeOwned + Clone>(
&self,
) -> Option<Store<DynamicObject>> {
Some(self.store.read().get(&K::gvk(&Default::default()))?.as_reader())
}
}

fn get_cm(&self, name: &str, ns: &str) -> Option<Arc<ConfigMap>> {
self.cms.get(&ObjectRef::<ConfigMap>::new(name).within(ns))
impl CacheWriter<DynamicObject> for MultiWriter {
/// Applies a single watcher event to the store
fn apply_watcher_event(&mut self, event: &watcher::Event<DynamicObject>) {
match event {
watcher::Event::Init | watcher::Event::InitDone(None) => {}
watcher::Event::Apply(obj) | watcher::Event::Delete(obj) => {
let mut stores = self.store.write();
if stores.get(&obj.gvk()).is_none() {
let store = Writer::new(ApiResource::from_gvk(&obj.gvk()));
stores.insert(obj.gvk(), store);
};
if let Some(store) = stores.get_mut(&obj.gvk()) {
store.apply_watcher_event(event);
};
}
watcher::Event::InitApply(obj) => {
let mut buffer = self.buffer.write();
if buffer.get(&obj.gvk()).is_none() {
let store = Writer::new(ApiResource::from_gvk(&obj.gvk()));
buffer.insert(obj.gvk(), store);
};
if let Some(store) = buffer.get_mut(&obj.gvk()) {
store.apply_watcher_event(event);
};
}
watcher::Event::InitDone(Some(obj)) => {
let mut buffer = self.buffer.write();
if let Some(mut store) = buffer.remove(&obj.gvk()) {
store.apply_watcher_event(event);
self.store.write().insert(obj.gvk(), store);
}
}
}
}
}

Expand All @@ -77,60 +102,43 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;

let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
let cms: Api<ConfigMap> = Api::default_namespaced(client.clone());
let secret: Api<Secret> = Api::default_namespaced(client.clone());

let (dep_reader, dep_writer) = reflector::store::<Deployment>();
let (cm_reader, cm_writer) = reflector::store::<ConfigMap>();
let (sec_reader, sec_writer) = reflector::store::<Secret>();
// multistore
let combo_stream = stream::select_all(vec![
dynamic_watcher(Api::<Deployment>::all(client.clone()), Default::default()).boxed(),
dynamic_watcher(Api::<ConfigMap>::all(client.clone()), Default::default()).boxed(),
dynamic_watcher(Api::<Secret>::all(client.clone()), Default::default()).boxed(),
]);

let cfg = watcher::Config::default();
let dep_watcher = watcher(deploys, cfg.clone())
.reflect(dep_writer)
.applied_objects()
.for_each(|_| future::ready(()));
let cm_watcher = watcher(cms, cfg.clone())
.reflect(cm_writer)
.applied_objects()
.for_each(|_| future::ready(()));
let sec_watcher = watcher(secret, cfg)
.reflect(sec_writer)
let multi_writer = MultiWriter::default();
let watcher = combo_stream
.reflect(multi_writer)
.applied_objects()
.for_each(|_| future::ready(()));
// poll these forever

// multistore
let stores = MultiStore {
deploys: dep_reader,
cms: cm_reader,
secs: sec_reader,
};

// simulate doing stuff with the stores from some other thread
tokio::spawn(async move {
// Show state every 5 seconds of watching
info!("waiting for them to be ready");
stores.deploys.wait_until_ready().await.unwrap();
stores.cms.wait_until_ready().await.unwrap();
stores.secs.wait_until_ready().await.unwrap();
info!("stores initialised");
// can use helper accessors
info!(
"common cm: {:?}",
stores.get_cm("kube-root-ca.crt", "kube-system").unwrap()
);
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
info!(
"cache content: {:?}",
multi_writer.as_reader().store.read().keys()
);
info!(
"common cm: {:?}",
multi_writer
.as_reader()
.get::<ConfigMap>("kube-root-ca.crt", "kube-system")
);
// access individual sub stores
info!("Current deploys count: {}", stores.deploys.state().len());
if let Some(deploys) = multi_writer.as_reader().get_store::<Deployment>() {
info!("Current deploys count: {}", deploys.state().len());
}
}
});
// info!("long watches starting");
info!("long watches starting");
tokio::select! {
r = dep_watcher => println!("dep watcher exit: {r:?}"),
r = cm_watcher => println!("cm watcher exit: {r:?}"),
r = sec_watcher => println!("sec watcher exit: {r:?}"),
r = watcher => println!("watcher exit: {r:?}"),
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion kube-core/src/gvk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use thiserror::Error;
pub struct ParseGroupVersionError(pub String);

/// Core information about an API Resource.
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct GroupVersionKind {
/// API group
pub group: String,
Expand Down
2 changes: 1 addition & 1 deletion kube-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub mod gvk;
pub use gvk::{GroupVersion, GroupVersionKind, GroupVersionResource};

pub mod metadata;
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta};
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta, TypedResource};

pub mod labels;

Expand Down
119 changes: 118 additions & 1 deletion kube-core/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{borrow::Cow, marker::PhantomData};
pub use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ListMeta, ObjectMeta};
use serde::{Deserialize, Serialize};

use crate::{DynamicObject, Resource};
use crate::{ApiResource, DynamicObject, GroupVersionKind, Resource};

/// Type information that is flattened into every kubernetes object
#[derive(Deserialize, Serialize, Clone, Default, Debug, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -175,6 +175,123 @@ impl<K: Resource> Resource for PartialObjectMeta<K> {
}
}

///

Check warning on line 178 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:178:1 | 178 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs = note: `#[warn(clippy::empty_docs)]` on by default
pub trait TypedResource: Resource + Sized {
///

Check warning on line 180 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:180:5 | 180 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn type_meta(&self) -> TypeMeta;
///

Check warning on line 182 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:182:5 | 182 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn gvk(&self) -> GroupVersionKind;
///

Check warning on line 184 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:184:5 | 184 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn kind(&self) -> Cow<'_, str>;
///

Check warning on line 186 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:186:5 | 186 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn group(&self) -> Cow<'_, str>;
///

Check warning on line 188 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:188:5 | 188 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn version(&self) -> Cow<'_, str>;
///

Check warning on line 190 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:190:5 | 190 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn plural(&self) -> Cow<'_, str>;
}

impl<K> TypedResource for K
where
K: Resource,
(K, K::DynamicType): TypedResourceImpl<Resource = K>,
{
fn type_meta(&self) -> TypeMeta {
<(K, K::DynamicType) as TypedResourceImpl>::type_meta(self)
}

fn gvk(&self) -> GroupVersionKind {
<(K, K::DynamicType) as TypedResourceImpl>::gvk(self)
}

fn kind(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::kind(self)
}
///

Check warning on line 210 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:210:5 | 210 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn group(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::group(self)
}
///

Check warning on line 214 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:214:5 | 214 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn version(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::version(self)
}
///

Check warning on line 218 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:218:5 | 218 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn plural(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::plural(self)
}
}

#[doc(hidden)]
// Workaround for https://github.com/rust-lang/rust/issues/20400
pub trait TypedResourceImpl {
type Resource: Resource;
fn type_meta(res: &Self::Resource) -> TypeMeta;
fn gvk(res: &Self::Resource) -> GroupVersionKind;
fn kind(res: &Self::Resource) -> Cow<'_, str>;
fn group(res: &Self::Resource) -> Cow<'_, str>;
fn version(res: &Self::Resource) -> Cow<'_, str>;
fn plural(res: &Self::Resource) -> Cow<'_, str>;
}

impl<K> TypedResourceImpl for (K, ())
where
K: Resource<DynamicType = ()>,
{
type Resource = K;

fn type_meta(_: &Self::Resource) -> TypeMeta {
TypeMeta::resource::<K>()
}

fn gvk(res: &Self::Resource) -> GroupVersionKind {
GroupVersionKind::gvk(&res.group(), &res.version(), &res.kind())
}

fn kind(_: &Self::Resource) -> Cow<'_, str> {
K::kind(&())
}

fn group(_: &Self::Resource) -> Cow<'_, str> {
K::group(&())
}

fn version(_: &Self::Resource) -> Cow<'_, str> {
K::version(&())
}

fn plural(_: &Self::Resource) -> Cow<'_, str> {
K::plural(&())
}
}

impl TypedResourceImpl for (DynamicObject, ApiResource) {
type Resource = DynamicObject;

fn type_meta(obj: &Self::Resource) -> TypeMeta {
obj.types.clone().unwrap_or_default()
}

fn gvk(res: &Self::Resource) -> GroupVersionKind {
res.type_meta().try_into().unwrap_or_default()
}

fn kind(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(res.type_meta().kind)
}

fn group(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(res.gvk().group)
}

fn version(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(res.gvk().version)
}

fn plural(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(ApiResource::from_gvk(&res.gvk()).plural)
}
}

#[cfg(test)]
mod test {
use super::{ObjectMeta, PartialObjectMeta, PartialObjectMetaExt};
Expand Down
Loading

0 comments on commit 450df09

Please sign in to comment.