Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native multi reflector #1692

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ws = ["kube/ws"]
latest = ["k8s-openapi/latest"]

[dev-dependencies]
parking_lot.workspace = true
tokio-util.workspace = true
assert-json-diff.workspace = true
garde = { version = "0.22.0", default-features = false, features = ["derive"] }
Expand Down Expand Up @@ -130,6 +131,10 @@ path = "log_stream.rs"
name = "multi_watcher"
path = "multi_watcher.rs"

[[example]]
name = "multi_reflector"
path = "multi_reflector.rs"

[[example]]
name = "pod_api"
path = "pod_api.rs"
Expand Down
145 changes: 145 additions & 0 deletions examples/multi_reflector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use futures::{future, stream, StreamExt};
use k8s_openapi::api::{
apps::v1::Deployment,
core::v1::{ConfigMap, Secret},
};
use kube::{
api::{ApiResource, DynamicObject, GroupVersionKind},
core::TypedResource,
runtime::{reflector::store::CacheWriter, watcher, WatchStreamExt},
Api, Client, Resource,
};
use parking_lot::RwLock;
use serde::de::DeserializeOwned;
use std::sync::Arc;
use tracing::*;

use std::collections::HashMap;

type Cache = Arc<RwLock<HashMap<LookupKey, Arc<DynamicObject>>>>;

#[derive(Default, Clone, Hash, PartialEq, Eq, Debug)]
struct LookupKey {
gvk: GroupVersionKind,
name: Option<String>,
namespace: Option<String>,
}

impl LookupKey {
fn new<R: TypedResource>(resource: &R) -> LookupKey {
let meta = resource.meta();
LookupKey {
gvk: resource.gvk(),
name: meta.name.clone(),
namespace: meta.namespace.clone(),
}
}
}

#[derive(Default, Clone)]
struct MultiCache {
store: Cache,
}

impl MultiCache {
fn get<K: Resource<DynamicType = impl Default> + DeserializeOwned + Clone>(
&self,
name: &str,
ns: &str,
) -> Option<Arc<K>> {
let obj = self
.store
.read()
.get(&LookupKey {
gvk: K::gvk(&Default::default()),
name: Some(name.into()),
namespace: if !ns.is_empty() { Some(ns.into()) } else { None },
})?
.as_ref()
.clone();
obj.try_parse().ok().map(Arc::new)
}
}

impl CacheWriter<DynamicObject> for MultiCache {
/// Applies a single watcher event to the store
fn apply_watcher_event(&mut self, event: &watcher::Event<DynamicObject>) {
match event {
watcher::Event::Init | watcher::Event::InitDone => {}
watcher::Event::Delete(obj) => {
self.store.write().remove(&LookupKey::new(obj));
}
watcher::Event::InitApply(obj) | watcher::Event::Apply(obj) => {
self.store
.write()
.insert(LookupKey::new(obj), Arc::new(obj.clone()));
}
}
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;

// multistore
let mut combo_stream = stream::select_all(vec![]);
combo_stream.push(
watcher::watcher(
Api::all_with(client.clone(), &ApiResource::erase::<Deployment>(&())),
Default::default(),
)
.boxed(),
);
combo_stream.push(
watcher::watcher(
Api::all_with(client.clone(), &ApiResource::erase::<ConfigMap>(&())),
Default::default(),
)
.boxed(),
);

// // Duplicate streams with narrowed down selection
combo_stream.push(
watcher::watcher(
Api::default_namespaced_with(client.clone(), &ApiResource::erase::<Secret>(&())),
Default::default(),
)
.boxed(),
);
combo_stream.push(
watcher::watcher(
Api::all_with(client.clone(), &ApiResource::erase::<Secret>(&())),
Default::default(),
)
.boxed(),
);

let multi_writer = MultiCache::default();
let watcher = combo_stream
.reflect(multi_writer.clone())
.applied_objects()
.for_each(|_| future::ready(()));

// simulate doing stuff with the stores from some other thread
tokio::spawn(async move {
// can use helper accessors
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
info!("cache content: {:?}", multi_writer.store.read().keys());
info!(
"common cm: {:?}",
multi_writer.get::<ConfigMap>("kube-root-ca.crt", "kube-system")
);
// access individual sub stores
info!("Current objects count: {}", multi_writer.store.read().len());
}
});
info!("long watches starting");
tokio::select! {
r = watcher => println!("watcher exit: {r:?}"),
}

Ok(())
}
2 changes: 1 addition & 1 deletion kube-core/src/gvk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use thiserror::Error;
pub struct ParseGroupVersionError(pub String);

/// Core information about an API Resource.
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct GroupVersionKind {
/// API group
pub group: String,
Expand Down
2 changes: 1 addition & 1 deletion kube-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub mod gvk;
pub use gvk::{GroupVersion, GroupVersionKind, GroupVersionResource};

pub mod metadata;
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta};
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta, TypedResource};

pub mod labels;

Expand Down
137 changes: 136 additions & 1 deletion kube-core/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
pub use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ListMeta, ObjectMeta};
use serde::{Deserialize, Serialize};

use crate::{DynamicObject, Resource};
use crate::{ApiResource, DynamicObject, GroupVersionKind, Resource};

/// Type information that is flattened into every kubernetes object
#[derive(Deserialize, Serialize, Clone, Default, Debug, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -51,6 +51,24 @@
kind: K::kind(&()).into(),
}
}

/// Construct a new `TypeMeta` for the object from the list `TypeMeta`.
///
/// ```
/// # use k8s_openapi::api::core::v1::Pod;
/// # use kube_core::TypeMeta;
///
/// let mut type_meta = TypeMeta::resource::<Pod>();
/// type_meta.kind = "PodList".to_string();
/// assert_eq!(type_meta.clone().singular().kind, "Pod");
/// assert_eq!(type_meta.clone().singular().api_version, "v1");
/// ```
pub fn singular(self) -> Self {
Self {
kind: self.kind.strip_suffix("List").unwrap_or(&self.kind).to_string(),
..self
}
}
Comment on lines +55 to +71
Copy link
Member

@clux clux Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this fn is nice by itself. but it relies on context that is not clear from the name;

  • it's not clear that it works on a List type
  • it's not clear it's a noop on a non-list type

maybe it's better to have this fn named as fn singularize_list to resolve this.
feel free to do this as it's own pr.

}

/// A generic representation of any object with `ObjectMeta`.
Expand Down Expand Up @@ -175,6 +193,123 @@
}
}

///

Check warning on line 196 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:196:1 | 196 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs = note: `#[warn(clippy::empty_docs)]` on by default
pub trait TypedResource: Resource + Sized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this trait differ from one of our existing ones here? How does this help out with solving the issue? I see a lot of trait magic here, and it's non-trivial for me to try to decipher this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a problem with anything else available. DynamicObject is a special case of resource, and unlike any typed resource it has types field we can use.

Unfortunately for anything generated by k8s-openapi or derive CustomResourceDefinition types, TypeMeta is available from constant kind, group and version. There is no type field we can use, so data, even if returned from API server, is lost.

This ✨ magic ✨ makes it possible to have GVK of a resource available as if in a blanket implementation, and it is used for identification of resources in dynamic watchers cache, for routing of events.

This dynamic cache can later be used as a source to establish dynamic watches on statically typed resources, if used as a stream which filters resources by GVK and serializes it to the expected type. This is not a part of this implementation yet, thought.

Such cache, if passed around one or multiple controllers, may serve as an up-to-date state of all watched objects for read operations.

///

Check warning on line 198 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:198:5 | 198 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn type_meta(&self) -> TypeMeta;
///

Check warning on line 200 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:200:5 | 200 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn gvk(&self) -> GroupVersionKind;
///

Check warning on line 202 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:202:5 | 202 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn kind(&self) -> Cow<'_, str>;
///

Check warning on line 204 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:204:5 | 204 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn group(&self) -> Cow<'_, str>;
///

Check warning on line 206 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:206:5 | 206 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn version(&self) -> Cow<'_, str>;
///

Check warning on line 208 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:208:5 | 208 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn plural(&self) -> Cow<'_, str>;
}

impl<K> TypedResource for K
where
K: Resource,
(K, K::DynamicType): TypedResourceImpl<Resource = K>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol. yeah, this is a bit too much magic imo.

{
fn type_meta(&self) -> TypeMeta {
<(K, K::DynamicType) as TypedResourceImpl>::type_meta(self)

Check warning on line 218 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L217-L218

Added lines #L217 - L218 were not covered by tests
}

fn gvk(&self) -> GroupVersionKind {
<(K, K::DynamicType) as TypedResourceImpl>::gvk(self)

Check warning on line 222 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L221-L222

Added lines #L221 - L222 were not covered by tests
}

fn kind(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::kind(self)

Check warning on line 226 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L225-L226

Added lines #L225 - L226 were not covered by tests
}
///

Check warning on line 228 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:228:5 | 228 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn group(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::group(self)

Check warning on line 230 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L229-L230

Added lines #L229 - L230 were not covered by tests
}
///

Check warning on line 232 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:232:5 | 232 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn version(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::version(self)

Check warning on line 234 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L233-L234

Added lines #L233 - L234 were not covered by tests
}
///

Check warning on line 236 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:236:5 | 236 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn plural(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::plural(self)

Check warning on line 238 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L237-L238

Added lines #L237 - L238 were not covered by tests
}
}

#[doc(hidden)]
// Workaround for https://github.com/rust-lang/rust/issues/20400
pub trait TypedResourceImpl {
type Resource: Resource;
fn type_meta(res: &Self::Resource) -> TypeMeta;
fn gvk(res: &Self::Resource) -> GroupVersionKind;
fn kind(res: &Self::Resource) -> Cow<'_, str>;
fn group(res: &Self::Resource) -> Cow<'_, str>;
fn version(res: &Self::Resource) -> Cow<'_, str>;
fn plural(res: &Self::Resource) -> Cow<'_, str>;
}

impl<K> TypedResourceImpl for (K, ())
where
K: Resource<DynamicType = ()>,
{
type Resource = K;

fn type_meta(_: &Self::Resource) -> TypeMeta {
TypeMeta::resource::<K>()

Check warning on line 261 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L260-L261

Added lines #L260 - L261 were not covered by tests
}

fn gvk(res: &Self::Resource) -> GroupVersionKind {
GroupVersionKind::gvk(&res.group(), &res.version(), &res.kind())

Check warning on line 265 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L264-L265

Added lines #L264 - L265 were not covered by tests
}

fn kind(_: &Self::Resource) -> Cow<'_, str> {
K::kind(&())

Check warning on line 269 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L268-L269

Added lines #L268 - L269 were not covered by tests
}

fn group(_: &Self::Resource) -> Cow<'_, str> {
K::group(&())

Check warning on line 273 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L272-L273

Added lines #L272 - L273 were not covered by tests
}

fn version(_: &Self::Resource) -> Cow<'_, str> {
K::version(&())

Check warning on line 277 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L276-L277

Added lines #L276 - L277 were not covered by tests
}

fn plural(_: &Self::Resource) -> Cow<'_, str> {
K::plural(&())

Check warning on line 281 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L280-L281

Added lines #L280 - L281 were not covered by tests
}
}

impl TypedResourceImpl for (DynamicObject, ApiResource) {
type Resource = DynamicObject;

fn type_meta(obj: &Self::Resource) -> TypeMeta {
obj.types.clone().unwrap_or_default()

Check warning on line 289 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L288-L289

Added lines #L288 - L289 were not covered by tests
}

fn gvk(res: &Self::Resource) -> GroupVersionKind {
res.type_meta().try_into().unwrap_or_default()

Check warning on line 293 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L292-L293

Added lines #L292 - L293 were not covered by tests
Comment on lines +292 to +293
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have derived Default on GroupVersionKind for this afaikt, but this is not particularly useful. I would rather not derive this Default (because it's not useful in the default state), and have this fn return an Option

}

fn kind(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(res.type_meta().kind)

Check warning on line 297 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L296-L297

Added lines #L296 - L297 were not covered by tests
}

fn group(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(res.gvk().group)

Check warning on line 301 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L300-L301

Added lines #L300 - L301 were not covered by tests
}

fn version(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(res.gvk().version)

Check warning on line 305 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L304-L305

Added lines #L304 - L305 were not covered by tests
}

fn plural(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(ApiResource::from_gvk(&res.gvk()).plural)

Check warning on line 309 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L308-L309

Added lines #L308 - L309 were not covered by tests
}
}

#[cfg(test)]
mod test {
use super::{ObjectMeta, PartialObjectMeta, PartialObjectMetaExt};
Expand Down
Loading
Loading