Skip to content

Commit

Permalink
Merge branch 'main' into fix-1675
Browse files Browse the repository at this point in the history
  • Loading branch information
clux authored Mar 10, 2025
2 parents 827f876 + b27cbcd commit a99263f
Show file tree
Hide file tree
Showing 11 changed files with 989 additions and 85 deletions.
2 changes: 1 addition & 1 deletion e2e/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ Then, run `just e2e-incluster openssl,latest` or `just e2e-incluster rustls,late

Build the `boot` bin against various `k8s-openapi` version features, and check that it runs. Uses local auth; not dockerised.

To run this with all feature combinations combinations, run `just e2e-mink8s`.
To run this with all feature combinations, run `just e2e-mink8s`.
33 changes: 31 additions & 2 deletions kube-client/src/api/core_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ where
}
}

/// [Get Metadata](`Api::get_metadata`) for a named resource if it exists, returns [`None`] if it doesn't exit
/// [Get Metadata](`Api::get_metadata`) for a named resource if it exists, returns [`None`] if it doesn't exist
///
/// ```no_run
/// # use kube::Api;
Expand All @@ -162,7 +162,36 @@ where
///
/// Note that [`PartialObjectMeta`] embeds the raw `ObjectMeta`.
pub async fn get_metadata_opt(&self, name: &str) -> Result<Option<PartialObjectMeta<K>>> {
match self.get_metadata(name).await {
self.get_metadata_opt_with(name, &GetParams::default()).await
}

/// [Get Metadata](`Api::get_metadata`) of an object if it exists, using an explicit `resourceVersion`.
/// Returns [`None`] if it doesn't exist.
///
/// ```no_run
/// # use kube::Api;
/// use k8s_openapi::api::core::v1::Pod;
/// use kube_core::params::GetParams;
///
/// async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
/// let pods: Api<Pod> = Api::namespaced(client, "apps");
/// if let Some(pod) = pods.get_metadata_opt_with("blog", &GetParams::any()).await? {
/// // Pod was found
/// } else {
/// // Pod was not found
/// }
/// # Ok(())
/// # }
/// ```
///
/// Note that [`PartialObjectMeta`] embeds the raw `ObjectMeta`.
pub async fn get_metadata_opt_with(
&self,
name: &str,
gp: &GetParams,
) -> Result<Option<PartialObjectMeta<K>>> {
match self.get_metadata_with(name, gp).await {
Ok(meta) => Ok(Some(meta)),
Err(Error::Api(ErrorResponse { reason, .. })) if &reason == "NotFound" => Ok(None),
Err(err) => Err(err),
Expand Down
55 changes: 35 additions & 20 deletions kube-client/src/api/remote_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt, DuplexStream},
select,
};
use tokio_tungstenite::{
tungstenite::{self as ws},
WebSocketStream,
};
use tokio_tungstenite::tungstenite as ws;

use crate::client::Connection;

use super::AttachParams;

Expand Down Expand Up @@ -112,10 +111,7 @@ pub struct AttachedProcess {
}

impl AttachedProcess {
pub(crate) fn new<S>(stream: WebSocketStream<S>, ap: &AttachParams) -> Self
where
S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static,
{
pub(crate) fn new(connection: Connection, ap: &AttachParams) -> Self {
// To simplify the implementation, always create a pipe for stdin.
// The caller does not have access to it unless they had requested.
let (stdin_writer, stdin_reader) = tokio::io::duplex(ap.max_stdin_buf_size.unwrap_or(MAX_BUF_SIZE));
Expand All @@ -140,7 +136,7 @@ impl AttachedProcess {
};

let task = tokio::spawn(start_message_loop(
stream,
connection,
stdin_reader,
stdout_writer,
stderr_writer,
Expand Down Expand Up @@ -259,32 +255,36 @@ impl AttachedProcess {
}
}

// theses values come from here: https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/cri/streaming/remotecommand/websocket.go#L34
// theses values come from here: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L57
const STDIN_CHANNEL: u8 = 0;
const STDOUT_CHANNEL: u8 = 1;
const STDERR_CHANNEL: u8 = 2;
// status channel receives `Status` object on exit.
const STATUS_CHANNEL: u8 = 3;
// resize channel is use to send TerminalSize object to change the size of the terminal
const RESIZE_CHANNEL: u8 = 4;
/// Used to signal that a channel has reached EOF. Only works on V5 of the protocol.
const CLOSE_CHANNEL: u8 = 255;

async fn start_message_loop<S>(
stream: WebSocketStream<S>,
async fn start_message_loop(
connection: Connection,
stdin: impl AsyncRead + Unpin,
mut stdout: Option<impl AsyncWrite + Unpin>,
mut stderr: Option<impl AsyncWrite + Unpin>,
status_tx: StatusSender,
mut terminal_size_rx: Option<TerminalSizeReceiver>,
) -> Result<(), Error>
where
S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static,
{
) -> Result<(), Error> {
let supports_stream_close = connection.supports_stream_close();
let stream = connection.into_stream();
let mut stdin_stream = tokio_util::io::ReaderStream::new(stdin);
let (mut server_send, raw_server_recv) = stream.split();
// Work with filtered messages to reduce noise.
let mut server_recv = raw_server_recv.filter_map(filter_message).boxed();
let mut have_terminal_size_rx = terminal_size_rx.is_some();

// True until we reach EOF for stdin.
let mut stdin_is_open = true;

loop {
let terminal_size_next = async {
match terminal_size_rx.as_mut() {
Expand Down Expand Up @@ -319,7 +319,7 @@ where
},
}
},
stdin_message = stdin_stream.next() => {
stdin_message = stdin_stream.next(), if stdin_is_open => {
match stdin_message {
Some(Ok(bytes)) => {
if !bytes.is_empty() {
Expand All @@ -337,9 +337,24 @@ where
}
None => {
// Stdin closed (writer half dropped).
// Let the server know and disconnect.
server_send.close().await.map_err(Error::SendClose)?;
break;
// Let the server know we reached the end of stdin.
if supports_stream_close {
// Signal stdin has reached EOF.
// See: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go#L346
let vec = vec![CLOSE_CHANNEL, STDIN_CHANNEL];
server_send
.send(ws::Message::binary(vec))
.await
.map_err(Error::SendStdin)?;
} else {
// Best we can do is trigger the whole websocket to close.
// We may miss out on any remaining stdout data that has not
// been sent yet.
server_send.close().await.map_err(Error::SendClose)?;
}

// Do not check stdin_stream for data in future loops.
stdin_is_open = false;
}
}
},
Expand Down
4 changes: 2 additions & 2 deletions kube-client/src/api/subresource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ where
.request
.portforward(name, ports)
.map_err(Error::BuildRequest)?;
let stream = self.client.connect(req).await?;
Ok(Portforwarder::new(stream, ports))
let connection = self.client.connect(req).await?;
Ok(Portforwarder::new(connection.into_stream(), ports))
}
}
4 changes: 2 additions & 2 deletions kube-client/src/client/kubelet_debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ impl Client {
let mut req =
Request::kubelet_node_portforward(kubelet_params, ports).map_err(Error::BuildRequest)?;
req.extensions_mut().insert("kubelet_node_portforward");
let stream = self.connect(req).await?;
Ok(Portforwarder::new(stream, ports))
let connection = self.connect(req).await?;
Ok(Portforwarder::new(connection.into_stream(), ports))
}

/// Stream logs directly from node
Expand Down
55 changes: 35 additions & 20 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,29 @@ pub struct Client {
valid_until: Option<DateTime<Utc>>,
}

/// Represents a WebSocket connection.
/// Value returned by [`Client::connect`].
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub struct Connection {
stream: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
protocol: upgrade::StreamProtocol,
}

#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
impl Connection {
/// Return true if the stream supports graceful close signaling.
pub fn supports_stream_close(&self) -> bool {
self.protocol.supports_stream_close()
}

/// Transform into the raw WebSocketStream.
pub fn into_stream(self) -> WebSocketStream<TokioIo<hyper::upgrade::Upgraded>> {
self.stream
}
}

/// Constructors and low-level api interfaces.
///
/// Most users only need [`Client::try_default`] or [`Client::new`] from this block.
Expand Down Expand Up @@ -203,10 +226,7 @@ impl Client {
/// Make WebSocket connection.
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub async fn connect(
&self,
request: Request<Vec<u8>>,
) -> Result<WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>> {
pub async fn connect(&self, request: Request<Vec<u8>>) -> Result<Connection> {
use http::header::HeaderValue;
let (mut parts, body) = request.into_parts();
parts
Expand All @@ -224,25 +244,20 @@ impl Client {
http::header::SEC_WEBSOCKET_KEY,
key.parse().expect("valid header value"),
);
// Use the binary subprotocol v4, to get JSON `Status` object in `error` channel (3).
// There's no official documentation about this protocol, but it's described in
// [`k8s.io/apiserver/pkg/util/wsstream/conn.go`](https://git.io/JLQED).
// There's a comment about v4 and `Status` object in
// [`kublet/cri/streaming/remotecommand/httpstream.go`](https://git.io/JLQEh).
parts.headers.insert(
http::header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::from_static(upgrade::WS_PROTOCOL),
);
upgrade::StreamProtocol::add_to_headers(&mut parts.headers)?;

let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
let protocol = upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
match hyper::upgrade::on(res).await {
Ok(upgraded) => Ok(WebSocketStream::from_raw_socket(
TokioIo::new(upgraded),
ws::protocol::Role::Client,
None,
)
.await),
Ok(upgraded) => Ok(Connection {
stream: WebSocketStream::from_raw_socket(
TokioIo::new(upgraded),
ws::protocol::Role::Client,
None,
)
.await,
protocol,
}),

Err(e) => Err(Error::UpgradeConnection(
UpgradeConnectionError::GetPendingUpgrade(e),
Expand Down
99 changes: 85 additions & 14 deletions kube-client/src/client/upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,85 @@
use http::{self, Response, StatusCode};
use http::{self, HeaderValue, Response, StatusCode};
use thiserror::Error;
use tokio_tungstenite::tungstenite as ws;

use crate::client::Body;
use crate::{client::Body, Error, Result};

// Binary subprotocol v4. See `Client::connect`.
pub const WS_PROTOCOL: &str = "v4.channel.k8s.io";
#[derive(Debug)]
pub enum StreamProtocol {
/// Binary subprotocol v4. See `Client::connect`.
V4,

/// Binary subprotocol v5. See `Client::connect`.
/// v5 supports CLOSE signals.
/// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L52C26-L52C43
V5,
}

impl StreamProtocol {
pub fn as_str(&self) -> &'static str {
match self {
Self::V4 => "v4.channel.k8s.io",
Self::V5 => "v5.channel.k8s.io",
}
}

fn as_bytes(&self) -> &'static [u8] {
self.as_str().as_bytes()
}

pub fn supports_stream_close(&self) -> bool {
match self {
Self::V4 => false,
Self::V5 => true,
}
}

/// Add HTTP header SEC_WEBSOCKET_PROTOCOL with a list of supported protocol.
pub fn add_to_headers(headers: &mut http::HeaderMap) -> Result<()> {
// Protocols we support in our preferred order.
let supported_protocols = [
// v5 supports CLOSE signals.
Self::V5.as_str(),
// Use the binary subprotocol v4, to get JSON `Status` object in `error` channel (3).
// There's no official documentation about this protocol, but it's described in
// [`k8s.io/apiserver/pkg/util/wsstream/conn.go`](https://git.io/JLQED).
// There's a comment about v4 and `Status` object in
// [`kublet/cri/streaming/remotecommand/httpstream.go`](https://git.io/JLQEh).
Self::V4.as_str(),
];

let header_value_string = supported_protocols.join(", ");

// Note: Multiple headers does not work. Only a single CSV works.
headers.insert(
http::header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::from_str(&header_value_string).map_err(|e| Error::HttpError(e.into()))?,
);

Ok(())
}

/// Return the subprotocol of an HTTP response.
fn get_from_response<B>(res: &Response<B>) -> Option<Self> {
let headers = res.headers();

match headers
.get(http::header::SEC_WEBSOCKET_PROTOCOL)
.map(|h| h.as_bytes())
{
Some(protocol) => {
if protocol == Self::V4.as_bytes() {
Some(Self::V4)
} else if protocol == Self::V5.as_bytes() {
Some(Self::V5)
} else {
None
}
}
_ => None,
}
}
}

/// Possible errors from upgrading to a WebSocket connection
#[cfg(feature = "ws")]
Expand Down Expand Up @@ -42,7 +116,7 @@ pub enum UpgradeConnectionError {

// Verify upgrade response according to RFC6455.
// Based on `tungstenite` and added subprotocol verification.
pub fn verify_response(res: &Response<Body>, key: &str) -> Result<(), UpgradeConnectionError> {
pub fn verify_response(res: &Response<Body>, key: &str) -> Result<StreamProtocol, UpgradeConnectionError> {
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
return Err(UpgradeConnectionError::ProtocolSwitch(res.status()));
}
Expand Down Expand Up @@ -75,14 +149,11 @@ pub fn verify_response(res: &Response<Body>, key: &str) -> Result<(), UpgradeCon
return Err(UpgradeConnectionError::SecWebSocketAcceptKeyMismatch);
}

// Make sure that the server returned the correct subprotocol.
if !headers
.get(http::header::SEC_WEBSOCKET_PROTOCOL)
.map(|h| h == WS_PROTOCOL)
.unwrap_or(false)
{
return Err(UpgradeConnectionError::SecWebSocketProtocolMismatch);
}
// Make sure that the server returned an expected subprotocol.
let protocol = match StreamProtocol::get_from_response(res) {
Some(p) => p,
None => return Err(UpgradeConnectionError::SecWebSocketProtocolMismatch),
};

Ok(())
Ok(protocol)
}
Loading

0 comments on commit a99263f

Please sign in to comment.