-
-
Notifications
You must be signed in to change notification settings - Fork 328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Native multi reflector #1692
base: main
Are you sure you want to change the base?
Native multi reflector #1692
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
use futures::{future, stream, StreamExt}; | ||
use k8s_openapi::api::{ | ||
apps::v1::Deployment, | ||
core::v1::{ConfigMap, Secret}, | ||
}; | ||
use kube::{ | ||
api::{ApiResource, DynamicObject, GroupVersionKind}, | ||
core::TypedResource, | ||
runtime::{reflector::store::CacheWriter, watcher, WatchStreamExt}, | ||
Api, Client, Resource, | ||
}; | ||
use parking_lot::RwLock; | ||
use serde::de::DeserializeOwned; | ||
use std::sync::Arc; | ||
use tracing::*; | ||
|
||
use std::collections::HashMap; | ||
|
||
type Cache = Arc<RwLock<HashMap<LookupKey, Arc<DynamicObject>>>>; | ||
|
||
#[derive(Default, Clone, Hash, PartialEq, Eq, Debug)] | ||
struct LookupKey { | ||
gvk: GroupVersionKind, | ||
name: Option<String>, | ||
namespace: Option<String>, | ||
} | ||
|
||
impl LookupKey { | ||
fn new<R: TypedResource>(resource: &R) -> LookupKey { | ||
let meta = resource.meta(); | ||
LookupKey { | ||
gvk: resource.gvk(), | ||
name: meta.name.clone(), | ||
namespace: meta.namespace.clone(), | ||
} | ||
} | ||
} | ||
|
||
#[derive(Default, Clone)] | ||
struct MultiCache { | ||
store: Cache, | ||
} | ||
|
||
impl MultiCache { | ||
fn get<K: Resource<DynamicType = impl Default> + DeserializeOwned + Clone>( | ||
&self, | ||
name: &str, | ||
ns: &str, | ||
) -> Option<Arc<K>> { | ||
let obj = self | ||
.store | ||
.read() | ||
.get(&LookupKey { | ||
gvk: K::gvk(&Default::default()), | ||
name: Some(name.into()), | ||
namespace: if !ns.is_empty() { Some(ns.into()) } else { None }, | ||
})? | ||
.as_ref() | ||
.clone(); | ||
obj.try_parse().ok().map(Arc::new) | ||
} | ||
} | ||
|
||
impl CacheWriter<DynamicObject> for MultiCache { | ||
/// Applies a single watcher event to the store | ||
fn apply_watcher_event(&mut self, event: &watcher::Event<DynamicObject>) { | ||
match event { | ||
watcher::Event::Init | watcher::Event::InitDone => {} | ||
watcher::Event::Delete(obj) => { | ||
self.store.write().remove(&LookupKey::new(obj)); | ||
} | ||
watcher::Event::InitApply(obj) | watcher::Event::Apply(obj) => { | ||
self.store | ||
.write() | ||
.insert(LookupKey::new(obj), Arc::new(obj.clone())); | ||
} | ||
} | ||
} | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> anyhow::Result<()> { | ||
tracing_subscriber::fmt::init(); | ||
let client = Client::try_default().await?; | ||
|
||
// multistore | ||
let mut combo_stream = stream::select_all(vec![]); | ||
combo_stream.push( | ||
watcher::watcher( | ||
Api::all_with(client.clone(), &ApiResource::erase::<Deployment>(&())), | ||
Default::default(), | ||
) | ||
.boxed(), | ||
); | ||
combo_stream.push( | ||
watcher::watcher( | ||
Api::all_with(client.clone(), &ApiResource::erase::<ConfigMap>(&())), | ||
Default::default(), | ||
) | ||
.boxed(), | ||
); | ||
|
||
// // Duplicate streams with narrowed down selection | ||
combo_stream.push( | ||
watcher::watcher( | ||
Api::default_namespaced_with(client.clone(), &ApiResource::erase::<Secret>(&())), | ||
Default::default(), | ||
) | ||
.boxed(), | ||
); | ||
combo_stream.push( | ||
watcher::watcher( | ||
Api::all_with(client.clone(), &ApiResource::erase::<Secret>(&())), | ||
Default::default(), | ||
) | ||
.boxed(), | ||
); | ||
|
||
let multi_writer = MultiCache::default(); | ||
let watcher = combo_stream | ||
.reflect(multi_writer.clone()) | ||
.applied_objects() | ||
.for_each(|_| future::ready(())); | ||
|
||
// simulate doing stuff with the stores from some other thread | ||
tokio::spawn(async move { | ||
// can use helper accessors | ||
loop { | ||
tokio::time::sleep(std::time::Duration::from_secs(5)).await; | ||
info!("cache content: {:?}", multi_writer.store.read().keys()); | ||
info!( | ||
"common cm: {:?}", | ||
multi_writer.get::<ConfigMap>("kube-root-ca.crt", "kube-system") | ||
); | ||
// access individual sub stores | ||
info!("Current objects count: {}", multi_writer.store.read().len()); | ||
} | ||
}); | ||
info!("long watches starting"); | ||
tokio::select! { | ||
r = watcher => println!("watcher exit: {r:?}"), | ||
} | ||
|
||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,7 @@ | |
pub use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ListMeta, ObjectMeta}; | ||
use serde::{Deserialize, Serialize}; | ||
|
||
use crate::{DynamicObject, Resource}; | ||
use crate::{ApiResource, DynamicObject, GroupVersionKind, Resource}; | ||
|
||
/// Type information that is flattened into every kubernetes object | ||
#[derive(Deserialize, Serialize, Clone, Default, Debug, Eq, PartialEq, Hash)] | ||
|
@@ -51,6 +51,24 @@ | |
kind: K::kind(&()).into(), | ||
} | ||
} | ||
|
||
/// Construct a new `TypeMeta` for the object from the list `TypeMeta`. | ||
/// | ||
/// ``` | ||
/// # use k8s_openapi::api::core::v1::Pod; | ||
/// # use kube_core::TypeMeta; | ||
/// | ||
/// let mut type_meta = TypeMeta::resource::<Pod>(); | ||
/// type_meta.kind = "PodList".to_string(); | ||
/// assert_eq!(type_meta.clone().singular().kind, "Pod"); | ||
/// assert_eq!(type_meta.clone().singular().api_version, "v1"); | ||
/// ``` | ||
pub fn singular(self) -> Self { | ||
Self { | ||
kind: self.kind.strip_suffix("List").unwrap_or(&self.kind).to_string(), | ||
..self | ||
} | ||
} | ||
} | ||
|
||
/// A generic representation of any object with `ObjectMeta`. | ||
|
@@ -175,6 +193,123 @@ | |
} | ||
} | ||
|
||
/// | ||
Check warning on line 196 in kube-core/src/metadata.rs
|
||
pub trait TypedResource: Resource + Sized { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does this trait differ from one of our existing ones here? How does this help out with solving the issue? I see a lot of trait magic here, and it's non-trivial for me to try to decipher this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a problem with anything else available. Unfortunately for anything generated by k8s-openapi or derive This ✨ magic ✨ makes it possible to have GVK of a resource available as if in a blanket implementation, and it is used for identification of resources in dynamic watchers cache, for routing of events. This dynamic cache can later be used as a source to establish dynamic watches on statically typed resources, if used as a stream which filters resources by GVK and serializes it to the expected type. This is not a part of this implementation yet, thought. Such cache, if passed around one or multiple controllers, may serve as an up-to-date state of all watched objects for read operations. |
||
/// | ||
Check warning on line 198 in kube-core/src/metadata.rs
|
||
fn type_meta(&self) -> TypeMeta; | ||
/// | ||
Check warning on line 200 in kube-core/src/metadata.rs
|
||
fn gvk(&self) -> GroupVersionKind; | ||
/// | ||
Check warning on line 202 in kube-core/src/metadata.rs
|
||
fn kind(&self) -> Cow<'_, str>; | ||
/// | ||
Check warning on line 204 in kube-core/src/metadata.rs
|
||
fn group(&self) -> Cow<'_, str>; | ||
/// | ||
Check warning on line 206 in kube-core/src/metadata.rs
|
||
fn version(&self) -> Cow<'_, str>; | ||
/// | ||
Check warning on line 208 in kube-core/src/metadata.rs
|
||
fn plural(&self) -> Cow<'_, str>; | ||
} | ||
|
||
impl<K> TypedResource for K | ||
where | ||
K: Resource, | ||
(K, K::DynamicType): TypedResourceImpl<Resource = K>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lol. yeah, this is a bit too much magic imo. |
||
{ | ||
fn type_meta(&self) -> TypeMeta { | ||
<(K, K::DynamicType) as TypedResourceImpl>::type_meta(self) | ||
} | ||
|
||
fn gvk(&self) -> GroupVersionKind { | ||
<(K, K::DynamicType) as TypedResourceImpl>::gvk(self) | ||
} | ||
|
||
fn kind(&self) -> Cow<'_, str> { | ||
<(K, K::DynamicType) as TypedResourceImpl>::kind(self) | ||
} | ||
/// | ||
Check warning on line 228 in kube-core/src/metadata.rs
|
||
fn group(&self) -> Cow<'_, str> { | ||
<(K, K::DynamicType) as TypedResourceImpl>::group(self) | ||
} | ||
/// | ||
Check warning on line 232 in kube-core/src/metadata.rs
|
||
fn version(&self) -> Cow<'_, str> { | ||
<(K, K::DynamicType) as TypedResourceImpl>::version(self) | ||
} | ||
/// | ||
Check warning on line 236 in kube-core/src/metadata.rs
|
||
fn plural(&self) -> Cow<'_, str> { | ||
<(K, K::DynamicType) as TypedResourceImpl>::plural(self) | ||
} | ||
} | ||
|
||
#[doc(hidden)] | ||
// Workaround for https://github.com/rust-lang/rust/issues/20400 | ||
pub trait TypedResourceImpl { | ||
type Resource: Resource; | ||
fn type_meta(res: &Self::Resource) -> TypeMeta; | ||
fn gvk(res: &Self::Resource) -> GroupVersionKind; | ||
fn kind(res: &Self::Resource) -> Cow<'_, str>; | ||
fn group(res: &Self::Resource) -> Cow<'_, str>; | ||
fn version(res: &Self::Resource) -> Cow<'_, str>; | ||
fn plural(res: &Self::Resource) -> Cow<'_, str>; | ||
} | ||
|
||
impl<K> TypedResourceImpl for (K, ()) | ||
where | ||
K: Resource<DynamicType = ()>, | ||
{ | ||
type Resource = K; | ||
|
||
fn type_meta(_: &Self::Resource) -> TypeMeta { | ||
TypeMeta::resource::<K>() | ||
} | ||
|
||
fn gvk(res: &Self::Resource) -> GroupVersionKind { | ||
GroupVersionKind::gvk(&res.group(), &res.version(), &res.kind()) | ||
} | ||
|
||
fn kind(_: &Self::Resource) -> Cow<'_, str> { | ||
K::kind(&()) | ||
} | ||
|
||
fn group(_: &Self::Resource) -> Cow<'_, str> { | ||
K::group(&()) | ||
} | ||
|
||
fn version(_: &Self::Resource) -> Cow<'_, str> { | ||
K::version(&()) | ||
} | ||
|
||
fn plural(_: &Self::Resource) -> Cow<'_, str> { | ||
K::plural(&()) | ||
} | ||
} | ||
|
||
impl TypedResourceImpl for (DynamicObject, ApiResource) { | ||
type Resource = DynamicObject; | ||
|
||
fn type_meta(obj: &Self::Resource) -> TypeMeta { | ||
obj.types.clone().unwrap_or_default() | ||
} | ||
|
||
fn gvk(res: &Self::Resource) -> GroupVersionKind { | ||
res.type_meta().try_into().unwrap_or_default() | ||
Comment on lines
+292
to
+293
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we have derived |
||
} | ||
|
||
fn kind(res: &Self::Resource) -> Cow<'_, str> { | ||
Cow::from(res.type_meta().kind) | ||
} | ||
|
||
fn group(res: &Self::Resource) -> Cow<'_, str> { | ||
Cow::from(res.gvk().group) | ||
} | ||
|
||
fn version(res: &Self::Resource) -> Cow<'_, str> { | ||
Cow::from(res.gvk().version) | ||
} | ||
|
||
fn plural(res: &Self::Resource) -> Cow<'_, str> { | ||
Cow::from(ApiResource::from_gvk(&res.gvk()).plural) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod test { | ||
use super::{ObjectMeta, PartialObjectMeta, PartialObjectMetaExt}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this fn is nice by itself. but it relies on context that is not clear from the name;
maybe it's better to have this fn named as
fn singularize_list
to resolve this.feel free to do this as it's own pr.