Skip to content

Commit

Permalink
add valid_until to Client (#1707)
Browse files Browse the repository at this point in the history
* add `valid_until` to Client

Signed-off-by: goenning <[email protected]>

* fix unit tests

Signed-off-by: goenning <[email protected]>

---------

Signed-off-by: goenning <[email protected]>
  • Loading branch information
goenning authored Mar 10, 2025
1 parent b27cbcd commit 55e865c
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 17 deletions.
21 changes: 14 additions & 7 deletions kube-client/src/client/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub(crate) enum Auth {
Basic(String, SecretString),
Bearer(SecretString),
RefreshableToken(RefreshableToken),
Certificate(String, SecretString),
Certificate(String, SecretString, Option<DateTime<Utc>>),
}

// Token file reference. Reloads at least once per minute.
Expand Down Expand Up @@ -227,7 +227,7 @@ impl RefreshableToken {
if Utc::now() + SIXTY_SEC >= locked_data.1 {
// TODO Improve refreshing exec to avoid `Auth::try_from`
match Auth::try_from(&locked_data.2)? {
Auth::None | Auth::Basic(_, _) | Auth::Bearer(_) | Auth::Certificate(_, _) => {
Auth::None | Auth::Basic(_, _) | Auth::Bearer(_) | Auth::Certificate(_, _, _) => {
return Err(Error::UnrefreshableTokenResponse);
}

Expand Down Expand Up @@ -350,16 +350,23 @@ impl TryFrom<&AuthInfo> for Auth {
if let Some(exec) = &auth_info.exec {
let creds = auth_exec(exec)?;
let status = creds.status.ok_or(Error::ExecPluginFailed)?;
if let (Some(client_certificate_data), Some(client_key_data)) =
(status.client_certificate_data, status.client_key_data)
{
return Ok(Self::Certificate(client_certificate_data, client_key_data.into()));
}
let expiration = status
.expiration_timestamp
.map(|ts| ts.parse())
.transpose()
.map_err(Error::MalformedTokenExpirationDate)?;


if let (Some(client_certificate_data), Some(client_key_data)) =
(status.client_certificate_data, status.client_key_data)
{
return Ok(Self::Certificate(
client_certificate_data,
client_key_data.into(),
expiration,
));
}

match (status.token.map(SecretString::from), expiration) {
(Some(token), Some(expire)) => Ok(Self::RefreshableToken(RefreshableToken::Exec(Arc::new(
Mutex::new((token, expire, auth_info.clone())),
Expand Down
26 changes: 23 additions & 3 deletions kube-client/src/client/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bytes::Bytes;
use chrono::{DateTime, Utc};
use http::{header::HeaderMap, Request, Response};
use hyper::{
body::Incoming,
Expand Down Expand Up @@ -30,6 +31,7 @@ pub type DynBody = dyn http_body::Body<Data = Bytes, Error = BoxError> + Send +
pub struct ClientBuilder<Svc> {
service: Svc,
default_ns: String,
valid_until: Option<DateTime<Utc>>,
}

impl<Svc> ClientBuilder<Svc> {
Expand All @@ -44,6 +46,7 @@ impl<Svc> ClientBuilder<Svc> {
Self {
service,
default_ns: default_namespace.into(),
valid_until: None,
}
}

Expand All @@ -52,10 +55,21 @@ impl<Svc> ClientBuilder<Svc> {
let Self {
service: stack,
default_ns,
valid_until,
} = self;
ClientBuilder {
service: layer.layer(stack),
default_ns,
valid_until,
}
}

/// Sets an expiration timestamp for the client.
pub fn with_valid_until(self, valid_until: Option<DateTime<Utc>>) -> Self {
ClientBuilder {
service: self.service,
default_ns: self.default_ns,
valid_until,
}
}

Expand All @@ -68,7 +82,7 @@ impl<Svc> ClientBuilder<Svc> {
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
{
Client::new(self.service, self.default_ns)
Client::new(self.service, self.default_ns).with_valid_until(self.valid_until)
}
}

Expand Down Expand Up @@ -242,15 +256,21 @@ where
.map_err(BoxError::from)
.service(client);

Ok(ClientBuilder::new(

let (_, expiration) = config.exec_identity_pem();

let client = ClientBuilder::new(
BoxService::new(
MapResponseBodyLayer::new(|body| {
Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box<DynBody>
})
.layer(service),
),
default_ns,
))
)
.with_valid_until(expiration);

Ok(client)
}

#[cfg(test)]
Expand Down
15 changes: 8 additions & 7 deletions kube-client/src/client/config_ext.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use chrono::{DateTime, Utc};
use http::{header::HeaderName, HeaderValue};
#[cfg(feature = "openssl-tls")] use hyper::rt::{Read, Write};
use hyper_util::client::legacy::connect::HttpConnector;
Expand Down Expand Up @@ -176,7 +177,7 @@ impl ConfigExt for Config {
Auth::RefreshableToken(refreshable) => {
Some(AuthLayer(Either::Right(AsyncFilterLayer::new(refreshable))))
}
Auth::Certificate(_client_certificate_data, _client_key_data) => None,
Auth::Certificate(_client_certificate_data, _client_key_data, _) => None,
})
}

Expand Down Expand Up @@ -207,7 +208,7 @@ impl ConfigExt for Config {

#[cfg(feature = "rustls-tls")]
fn rustls_client_config(&self) -> Result<rustls::ClientConfig> {
let identity = self.exec_identity_pem().or_else(|| self.identity_pem());
let identity = self.exec_identity_pem().0.or_else(|| self.identity_pem());
tls::rustls_tls::rustls_client_config(
identity.as_deref(),
self.root_cert.as_deref(),
Expand Down Expand Up @@ -249,7 +250,7 @@ impl ConfigExt for Config {

#[cfg(feature = "openssl-tls")]
fn openssl_ssl_connector_builder(&self) -> Result<openssl::ssl::SslConnectorBuilder> {
let identity = self.exec_identity_pem().or_else(|| self.identity_pem());
let identity = self.exec_identity_pem().0.or_else(|| self.identity_pem());
// TODO: pass self.tls_server_name for openssl
tls::openssl_tls::ssl_connector_builder(identity.as_ref(), self.root_cert.as_ref())
.map_err(|e| Error::OpensslTls(tls::openssl_tls::Error::CreateSslConnector(e)))
Expand Down Expand Up @@ -295,18 +296,18 @@ impl Config {
// returns a client certificate and key instead of a token.
// This has be to be checked on TLS configuration vs tokens
// which can be added in as an AuthLayer.
fn exec_identity_pem(&self) -> Option<Vec<u8>> {
pub(crate) fn exec_identity_pem(&self) -> (Option<Vec<u8>>, Option<DateTime<Utc>>) {
match Auth::try_from(&self.auth_info) {
Ok(Auth::Certificate(client_certificate_data, client_key_data)) => {
Ok(Auth::Certificate(client_certificate_data, client_key_data, expiratiom)) => {
const NEW_LINE: u8 = b'\n';

let mut buffer = client_key_data.expose_secret().as_bytes().to_vec();
buffer.push(NEW_LINE);
buffer.extend_from_slice(client_certificate_data.as_bytes());
buffer.push(NEW_LINE);
Some(buffer)
(Some(buffer), expiratiom)
}
_ => None,
_ => (None, None),
}
}
}
13 changes: 13 additions & 0 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//!
//! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically
//! retrieve the resources served by the kubernetes API.
use chrono::{DateTime, Utc};
use either::{Either, Left, Right};
use futures::{future::BoxFuture, AsyncBufRead, StreamExt, TryStream, TryStreamExt};
use http::{self, Request, Response};
Expand Down Expand Up @@ -78,6 +79,7 @@ pub struct Client {
// - `BoxFuture` for dynamic response future type
inner: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, BoxError>>>,
default_ns: String,
valid_until: Option<DateTime<Utc>>,
}

/// Represents a WebSocket connection.
Expand Down Expand Up @@ -154,9 +156,20 @@ impl Client {
Self {
inner: Buffer::new(BoxService::new(service), 1024),
default_ns: default_namespace.into(),
valid_until: None,
}
}

/// Sets an expiration timestamp to the client, which has to be checked by the user using [`Client::valid_until`] function.
pub fn with_valid_until(self, valid_until: Option<DateTime<Utc>>) -> Self {
Client { valid_until, ..self }
}

/// Get the expiration timestamp of the client, if it has been set.
pub fn valid_until(&self) -> &Option<DateTime<Utc>> {
&self.valid_until
}

/// Create and initialize a [`Client`] using the inferred configuration.
///
/// Will use [`Config::infer`] which attempts to load the local kubeconfig first,
Expand Down

0 comments on commit 55e865c

Please sign in to comment.