Skip to content

Commit

Permalink
Exec can return stdout data even after stdin is closed. #1689
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Webster <[email protected]>
  • Loading branch information
esw-amzn committed Feb 11, 2025
1 parent 0bcc625 commit 7e03490
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 51 deletions.
51 changes: 34 additions & 17 deletions kube-client/src/api/remote_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt, DuplexStream},
select,
};
use tokio_tungstenite::{
tungstenite::{self as ws},
WebSocketStream,
};
use tokio_tungstenite::tungstenite as ws;

use crate::client::Connection;

use super::AttachParams;

Expand Down Expand Up @@ -112,9 +111,7 @@ pub struct AttachedProcess {
}

impl AttachedProcess {
pub(crate) fn new<S>(stream: WebSocketStream<S>, ap: &AttachParams) -> Self
where
S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static,
pub(crate) fn new(connection: Connection, ap: &AttachParams) -> Self
{
// To simplify the implementation, always create a pipe for stdin.
// The caller does not have access to it unless they had requested.
Expand All @@ -140,7 +137,7 @@ impl AttachedProcess {
};

let task = tokio::spawn(start_message_loop(
stream,
connection,
stdin_reader,
stdout_writer,
stderr_writer,
Expand Down Expand Up @@ -259,32 +256,37 @@ impl AttachedProcess {
}
}

// theses values come from here: https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/cri/streaming/remotecommand/websocket.go#L34
// theses values come from here: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L57
const STDIN_CHANNEL: u8 = 0;
const STDOUT_CHANNEL: u8 = 1;
const STDERR_CHANNEL: u8 = 2;
// status channel receives `Status` object on exit.
const STATUS_CHANNEL: u8 = 3;
// resize channel is use to send TerminalSize object to change the size of the terminal
const RESIZE_CHANNEL: u8 = 4;
/// Used to signal that a channel has reached EOF. Only works on V5 of the protocol.
const CLOSE_CHANNEL: u8 = 255;

async fn start_message_loop<S>(
stream: WebSocketStream<S>,
async fn start_message_loop(
connection: Connection,
stdin: impl AsyncRead + Unpin,
mut stdout: Option<impl AsyncWrite + Unpin>,
mut stderr: Option<impl AsyncWrite + Unpin>,
status_tx: StatusSender,
mut terminal_size_rx: Option<TerminalSizeReceiver>,
) -> Result<(), Error>
where
S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static,
{
let supports_stream_close = connection.supports_stream_close();
let stream = connection.into_stream();
let mut stdin_stream = tokio_util::io::ReaderStream::new(stdin);
let (mut server_send, raw_server_recv) = stream.split();
// Work with filtered messages to reduce noise.
let mut server_recv = raw_server_recv.filter_map(filter_message).boxed();
let mut have_terminal_size_rx = terminal_size_rx.is_some();

// True until we reach EOF for stdin.
let mut stdin_is_open = true;

loop {
let terminal_size_next = async {
match terminal_size_rx.as_mut() {
Expand Down Expand Up @@ -319,7 +321,7 @@ where
},
}
},
stdin_message = stdin_stream.next() => {
stdin_message = stdin_stream.next(), if stdin_is_open => {
match stdin_message {
Some(Ok(bytes)) => {
if !bytes.is_empty() {
Expand All @@ -337,9 +339,24 @@ where
}
None => {
// Stdin closed (writer half dropped).
// Let the server know and disconnect.
server_send.close().await.map_err(Error::SendClose)?;
break;
// Let the server know we reached the end of stdin.
if supports_stream_close {
// Signal stdin has reached EOF.
// See: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go#L346
let vec = vec![CLOSE_CHANNEL, STDIN_CHANNEL];
server_send
.send(ws::Message::binary(vec))
.await
.map_err(Error::SendStdin)?;
} else {
// Best we can do is trigger the whole websocket to close.
// We may miss out on any remaining stdout data that has not
// been sent yet.
server_send.close().await.map_err(Error::SendClose)?;
}

// Do not check stdin_stream for data in future loops.
stdin_is_open = false;
}
}
},
Expand Down
8 changes: 4 additions & 4 deletions kube-client/src/api/subresource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ where
pub async fn attach(&self, name: &str, ap: &AttachParams) -> Result<AttachedProcess> {
let mut req = self.request.attach(name, ap).map_err(Error::BuildRequest)?;
req.extensions_mut().insert("attach");
let stream = self.client.connect(req).await?;
let stream = self.client.connect_exec(req).await?;
Ok(AttachedProcess::new(stream, ap))
}
}
Expand Down Expand Up @@ -565,7 +565,7 @@ where
.exec(name, command, ap)
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("exec");
let stream = self.client.connect(req).await?;
let stream = self.client.connect_exec(req).await?;
Ok(AttachedProcess::new(stream, ap))
}
}
Expand Down Expand Up @@ -606,7 +606,7 @@ where
.request
.portforward(name, ports)
.map_err(Error::BuildRequest)?;
let stream = self.client.connect(req).await?;
Ok(Portforwarder::new(stream, ports))
let connection = self.client.connect_portforward(req).await?;
Ok(Portforwarder::new(connection.into_stream(), ports))
}
}
8 changes: 4 additions & 4 deletions kube-client/src/client/kubelet_debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Client {
let mut req =
Request::kubelet_node_attach(kubelet_params, container, ap).map_err(Error::BuildRequest)?;
req.extensions_mut().insert("kubelet_node_attach");
let stream = self.connect(req).await?;
let stream = self.connect_exec(req).await?;
Ok(AttachedProcess::new(stream, ap))
}

Expand All @@ -53,7 +53,7 @@ impl Client {
let mut req = Request::kubelet_node_exec(kubelet_params, container, command, ap)
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("kubelet_node_exec");
let stream = self.connect(req).await?;
let stream = self.connect_exec(req).await?;
Ok(AttachedProcess::new(stream, ap))
}

Expand All @@ -69,8 +69,8 @@ impl Client {
let mut req =
Request::kubelet_node_portforward(kubelet_params, ports).map_err(Error::BuildRequest)?;
req.extensions_mut().insert("kubelet_node_portforward");
let stream = self.connect(req).await?;
Ok(Portforwarder::new(stream, ports))
let connection = self.connect_portforward(req).await?;
Ok(Portforwarder::new(connection.into_stream(), ports))
}

/// Stream logs directly from node
Expand Down
66 changes: 52 additions & 14 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,29 @@ pub struct Client {
default_ns: String,
}

/// Represents a WebSocket connection.
/// Value returned by [`Client::connect`].
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub struct Connection {
stream: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
protocol: upgrade::StreamProtocol,
}

#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
impl Connection {
/// Return true if the stream supports graceful close signaling.
pub fn supports_stream_close(&self) -> bool {
self.protocol.supports_stream_close()
}

/// Transform into the raw WebSocketStream.
pub fn into_stream(self) -> WebSocketStream<TokioIo<hyper::upgrade::Upgraded>> {
self.stream
}
}

/// Constructors and low-level api interfaces.
///
/// Most users only need [`Client::try_default`] or [`Client::new`] from this block.
Expand Down Expand Up @@ -187,13 +210,36 @@ impl Client {
Ok(res)
}

/// Make WebSocket connection exec operations.
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub async fn connect_exec(
&self,
mut request: Request<Vec<u8>>,
) -> Result<Connection> {
upgrade::StreamProtocol::add_to_exec_headers(request.headers_mut());
self.connect_internal(request).await
}

/// Make WebSocket connection for portforward operations.
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub async fn connect_portforward(
&self,
mut request: Request<Vec<u8>>,
) -> Result<Connection> {
upgrade::StreamProtocol::add_to_portforward_headers(request.headers_mut());
self.connect_internal(request).await
}

/// Make WebSocket connection.
/// The http::header::SEC_WEBSOCKET_PROTOCOL header must be set on the request before calling this.
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub async fn connect(
async fn connect_internal(
&self,
request: Request<Vec<u8>>,
) -> Result<WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>> {
) -> Result<Connection> {
use http::header::HeaderValue;
let (mut parts, body) = request.into_parts();
parts
Expand All @@ -211,25 +257,17 @@ impl Client {
http::header::SEC_WEBSOCKET_KEY,
key.parse().expect("valid header value"),
);
// Use the binary subprotocol v4, to get JSON `Status` object in `error` channel (3).
// There's no official documentation about this protocol, but it's described in
// [`k8s.io/apiserver/pkg/util/wsstream/conn.go`](https://git.io/JLQED).
// There's a comment about v4 and `Status` object in
// [`kublet/cri/streaming/remotecommand/httpstream.go`](https://git.io/JLQEh).
parts.headers.insert(
http::header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::from_static(upgrade::WS_PROTOCOL),
);

let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
let protocol = upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
match hyper::upgrade::on(res).await {
Ok(upgraded) => Ok(WebSocketStream::from_raw_socket(
Ok(upgraded) => Ok(Connection {
stream: WebSocketStream::from_raw_socket(
TokioIo::new(upgraded),
ws::protocol::Role::Client,
None,
)
.await),
.await, protocol}),

Err(e) => Err(Error::UpgradeConnection(
UpgradeConnectionError::GetPendingUpgrade(e),
Expand Down
93 changes: 81 additions & 12 deletions kube-client/src/client/upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,64 @@
use http::{self, Response, StatusCode};
use http::{self, HeaderValue, Response, StatusCode};
use thiserror::Error;
use tokio_tungstenite::tungstenite as ws;

use crate::client::Body;

// Binary subprotocol v4. See `Client::connect`.
pub const WS_PROTOCOL: &str = "v4.channel.k8s.io";
pub enum StreamProtocol {
/// Binary subprotocol v4. See `Client::connect`.
V4,

/// Binary subprotocol v5. See `Client::connect`.
/// v5 supports CLOSE signals.
/// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L52C26-L52C43
V5,
}

impl StreamProtocol {
pub fn as_str(&self) -> &'static str {
match self {
StreamProtocol::V4 => "v4.channel.k8s.io",
StreamProtocol::V5 => "v5.channel.k8s.io"
}
}

fn as_bytes(&self) -> &'static [u8] {
self.as_str().as_bytes()
}

pub fn supports_stream_close(&self) -> bool {
match self {
StreamProtocol::V4 => false,
StreamProtocol::V5 => true
}
}

pub fn add_to_exec_headers(headers: &mut http::HeaderMap) {
// v5 supports CLOSE signals.
headers.insert(
http::header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::from_static(StreamProtocol::V5.as_str()),
);
// Use the binary subprotocol v4, to get JSON `Status` object in `error` channel (3).
// There's no official documentation about this protocol, but it's described in
// [`k8s.io/apiserver/pkg/util/wsstream/conn.go`](https://git.io/JLQED).
// There's a comment about v4 and `Status` object in
// [`kublet/cri/streaming/remotecommand/httpstream.go`](https://git.io/JLQEh).
headers.append(
http::header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::from_static(StreamProtocol::V4.as_str()),
);
}

pub fn add_to_portforward_headers(headers: &mut http::HeaderMap) {
// Seems only v4 is supported for portforwarding.
// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/kubelet/pkg/cri/streaming/portforward/websocket.go#L43C31-L43C34
headers.insert(
http::header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::from_static(StreamProtocol::V4.as_str()),
);
}
}

/// Possible errors from upgrading to a WebSocket connection
#[cfg(feature = "ws")]
Expand Down Expand Up @@ -42,7 +95,7 @@ pub enum UpgradeConnectionError {

// Verify upgrade response according to RFC6455.
// Based on `tungstenite` and added subprotocol verification.
pub fn verify_response(res: &Response<Body>, key: &str) -> Result<(), UpgradeConnectionError> {
pub fn verify_response(res: &Response<Body>, key: &str) -> Result<StreamProtocol, UpgradeConnectionError> {
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
return Err(UpgradeConnectionError::ProtocolSwitch(res.status()));
}
Expand Down Expand Up @@ -75,16 +128,32 @@ pub fn verify_response(res: &Response<Body>, key: &str) -> Result<(), UpgradeCon
return Err(UpgradeConnectionError::SecWebSocketAcceptKeyMismatch);
}

// Make sure that the server returned the correct subprotocol.
if !headers
// Make sure that the server returned an expected subprotocol.
let protocol = match get_subprotocol(res) {
Some(p) => p,
None => return Err(UpgradeConnectionError::SecWebSocketProtocolMismatch)
};

Ok(protocol)
}

/// Return the subprotocol of the response.
fn get_subprotocol(res: &Response<Body>) -> Option<StreamProtocol> {
let headers = res.headers();

match headers
.get(http::header::SEC_WEBSOCKET_PROTOCOL)
.map(|h| h == WS_PROTOCOL)
.unwrap_or(false)
{
return Err(UpgradeConnectionError::SecWebSocketProtocolMismatch);
}
.map(|h| h.as_bytes()) {
Some(protocol) => if protocol == StreamProtocol::V4.as_bytes() {
Some(StreamProtocol::V4)
} else if protocol == StreamProtocol::V5.as_bytes() {
Some(StreamProtocol::V5)
} else {
None
},
_ => None,
}

Ok(())
}

/// Generate a random key for the `Sec-WebSocket-Key` header.
Expand Down
Loading

0 comments on commit 7e03490

Please sign in to comment.