Skip to content

Commit

Permalink
Exec can return stdout data even after stdin is closed. #1689
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Webster <[email protected]>
  • Loading branch information
esw-amzn committed Feb 19, 2025
1 parent 8876639 commit 41c2313
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 49 deletions.
51 changes: 34 additions & 17 deletions kube-client/src/api/remote_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt, DuplexStream},
select,
};
use tokio_tungstenite::{
tungstenite::{self as ws},
WebSocketStream,
};
use tokio_tungstenite::tungstenite as ws;

use crate::client::Connection;

use super::AttachParams;

Expand Down Expand Up @@ -112,9 +111,7 @@ pub struct AttachedProcess {
}

impl AttachedProcess {
pub(crate) fn new<S>(stream: WebSocketStream<S>, ap: &AttachParams) -> Self
where
S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static,
pub(crate) fn new(connection: Connection, ap: &AttachParams) -> Self
{
// To simplify the implementation, always create a pipe for stdin.
// The caller does not have access to it unless they had requested.
Expand All @@ -140,7 +137,7 @@ impl AttachedProcess {
};

let task = tokio::spawn(start_message_loop(
stream,
connection,
stdin_reader,
stdout_writer,
stderr_writer,
Expand Down Expand Up @@ -259,32 +256,37 @@ impl AttachedProcess {
}
}

// theses values come from here: https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/cri/streaming/remotecommand/websocket.go#L34
// theses values come from here: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L57
const STDIN_CHANNEL: u8 = 0;
const STDOUT_CHANNEL: u8 = 1;
const STDERR_CHANNEL: u8 = 2;
// status channel receives `Status` object on exit.
const STATUS_CHANNEL: u8 = 3;
// resize channel is use to send TerminalSize object to change the size of the terminal
const RESIZE_CHANNEL: u8 = 4;
/// Used to signal that a channel has reached EOF. Only works on V5 of the protocol.
const CLOSE_CHANNEL: u8 = 255;

async fn start_message_loop<S>(
stream: WebSocketStream<S>,
async fn start_message_loop(
connection: Connection,
stdin: impl AsyncRead + Unpin,
mut stdout: Option<impl AsyncWrite + Unpin>,
mut stderr: Option<impl AsyncWrite + Unpin>,
status_tx: StatusSender,
mut terminal_size_rx: Option<TerminalSizeReceiver>,
) -> Result<(), Error>
where
S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static,
{
let supports_stream_close = connection.supports_stream_close();
let stream = connection.into_stream();
let mut stdin_stream = tokio_util::io::ReaderStream::new(stdin);
let (mut server_send, raw_server_recv) = stream.split();
// Work with filtered messages to reduce noise.
let mut server_recv = raw_server_recv.filter_map(filter_message).boxed();
let mut have_terminal_size_rx = terminal_size_rx.is_some();

// True until we reach EOF for stdin.
let mut stdin_is_open = true;

loop {
let terminal_size_next = async {
match terminal_size_rx.as_mut() {
Expand Down Expand Up @@ -319,7 +321,7 @@ where
},
}
},
stdin_message = stdin_stream.next() => {
stdin_message = stdin_stream.next(), if stdin_is_open => {
match stdin_message {
Some(Ok(bytes)) => {
if !bytes.is_empty() {
Expand All @@ -337,9 +339,24 @@ where
}
None => {
// Stdin closed (writer half dropped).
// Let the server know and disconnect.
server_send.close().await.map_err(Error::SendClose)?;
break;
// Let the server know we reached the end of stdin.
if supports_stream_close {

Check warning on line 343 in kube-client/src/api/remote_command.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/api/remote_command.rs#L343

Added line #L343 was not covered by tests
// Signal stdin has reached EOF.
// See: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go#L346
let vec = vec![CLOSE_CHANNEL, STDIN_CHANNEL];
server_send
.send(ws::Message::binary(vec))
.await
.map_err(Error::SendStdin)?;

Check warning on line 350 in kube-client/src/api/remote_command.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/api/remote_command.rs#L346-L350

Added lines #L346 - L350 were not covered by tests
} else {
// Best we can do is trigger the whole websocket to close.
// We may miss out on any remaining stdout data that has not
// been sent yet.
server_send.close().await.map_err(Error::SendClose)?;

Check warning on line 355 in kube-client/src/api/remote_command.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/api/remote_command.rs#L355

Added line #L355 was not covered by tests
}

// Do not check stdin_stream for data in future loops.
stdin_is_open = false;

Check warning on line 359 in kube-client/src/api/remote_command.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/api/remote_command.rs#L359

Added line #L359 was not covered by tests
}
}
},
Expand Down
4 changes: 2 additions & 2 deletions kube-client/src/api/subresource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ where
.request
.portforward(name, ports)
.map_err(Error::BuildRequest)?;
let stream = self.client.connect(req).await?;
Ok(Portforwarder::new(stream, ports))
let connection = self.client.connect(req).await?;
Ok(Portforwarder::new(connection.into_stream(), ports))

Check warning on line 610 in kube-client/src/api/subresource.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/api/subresource.rs#L609-L610

Added lines #L609 - L610 were not covered by tests
}
}
4 changes: 2 additions & 2 deletions kube-client/src/client/kubelet_debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ impl Client {
let mut req =
Request::kubelet_node_portforward(kubelet_params, ports).map_err(Error::BuildRequest)?;
req.extensions_mut().insert("kubelet_node_portforward");
let stream = self.connect(req).await?;
Ok(Portforwarder::new(stream, ports))
let connection = self.connect(req).await?;
Ok(Portforwarder::new(connection.into_stream(), ports))
}

/// Stream logs directly from node
Expand Down
42 changes: 29 additions & 13 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,29 @@ pub struct Client {
default_ns: String,
}

/// Represents a WebSocket connection.
/// Value returned by [`Client::connect`].
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub struct Connection {
stream: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
protocol: upgrade::StreamProtocol,
}

#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
impl Connection {
/// Return true if the stream supports graceful close signaling.
pub fn supports_stream_close(&self) -> bool {
self.protocol.supports_stream_close()
}

/// Transform into the raw WebSocketStream.
pub fn into_stream(self) -> WebSocketStream<TokioIo<hyper::upgrade::Upgraded>> {
self.stream
}
}

/// Constructors and low-level api interfaces.
///
/// Most users only need [`Client::try_default`] or [`Client::new`] from this block.
Expand Down Expand Up @@ -193,7 +216,7 @@ impl Client {
pub async fn connect(
&self,
request: Request<Vec<u8>>,
) -> Result<WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>> {
) -> Result<Connection> {
use http::header::HeaderValue;
let (mut parts, body) = request.into_parts();
parts
Expand All @@ -211,25 +234,18 @@ impl Client {
http::header::SEC_WEBSOCKET_KEY,
key.parse().expect("valid header value"),
);
// Use the binary subprotocol v4, to get JSON `Status` object in `error` channel (3).
// There's no official documentation about this protocol, but it's described in
// [`k8s.io/apiserver/pkg/util/wsstream/conn.go`](https://git.io/JLQED).
// There's a comment about v4 and `Status` object in
// [`kublet/cri/streaming/remotecommand/httpstream.go`](https://git.io/JLQEh).
parts.headers.insert(
http::header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::from_static(upgrade::WS_PROTOCOL),
);
upgrade::StreamProtocol::add_to_headers(&mut parts.headers)?;

let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
let protocol = upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
match hyper::upgrade::on(res).await {
Ok(upgraded) => Ok(WebSocketStream::from_raw_socket(
Ok(upgraded) => Ok(Connection {
stream: WebSocketStream::from_raw_socket(
TokioIo::new(upgraded),
ws::protocol::Role::Client,
None,
)
.await),
.await, protocol}),

Err(e) => Err(Error::UpgradeConnection(
UpgradeConnectionError::GetPendingUpgrade(e),
Expand Down
96 changes: 82 additions & 14 deletions kube-client/src/client/upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,82 @@
use http::{self, Response, StatusCode};
use http::{self, HeaderValue, Response, StatusCode};
use thiserror::Error;
use tokio_tungstenite::tungstenite as ws;

use crate::client::Body;
use crate::{client::Body, Error, Result};

// Binary subprotocol v4. See `Client::connect`.
pub const WS_PROTOCOL: &str = "v4.channel.k8s.io";
#[derive(Debug)]
pub enum StreamProtocol {
/// Binary subprotocol v4. See `Client::connect`.
V4,

/// Binary subprotocol v5. See `Client::connect`.
/// v5 supports CLOSE signals.
/// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L52C26-L52C43
V5,
}

impl StreamProtocol {
pub fn as_str(&self) -> &'static str {
match self {
StreamProtocol::V4 => "v4.channel.k8s.io",
StreamProtocol::V5 => "v5.channel.k8s.io"
}
}

fn as_bytes(&self) -> &'static [u8] {
self.as_str().as_bytes()
}

pub fn supports_stream_close(&self) -> bool {
match self {
StreamProtocol::V4 => false,
StreamProtocol::V5 => true

Check warning on line 33 in kube-client/src/client/upgrade.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/upgrade.rs#L33

Added line #L33 was not covered by tests
}
}

/// Add HTTP header SEC_WEBSOCKET_PROTOCOL with a list of supported protocol.
pub fn add_to_headers(headers: &mut http::HeaderMap) -> Result<()> {
// Protocols we support in our preferred order.
let supported_protocols = vec![
// v5 supports CLOSE signals.
StreamProtocol::V5.as_str(),
// Use the binary subprotocol v4, to get JSON `Status` object in `error` channel (3).
// There's no official documentation about this protocol, but it's described in
// [`k8s.io/apiserver/pkg/util/wsstream/conn.go`](https://git.io/JLQED).
// There's a comment about v4 and `Status` object in
// [`kublet/cri/streaming/remotecommand/httpstream.go`](https://git.io/JLQEh).
StreamProtocol::V4.as_str(),
];

Check warning on line 49 in kube-client/src/client/upgrade.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

useless use of `vec!`

warning: useless use of `vec!` --> kube-client/src/client/upgrade.rs:40:35 | 40 | let supported_protocols = vec![ | ___________________________________^ 41 | | // v5 supports CLOSE signals. 42 | | StreamProtocol::V5.as_str(), ... | 48 | | StreamProtocol::V4.as_str(), 49 | | ]; | |_________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#useless_vec = note: `#[warn(clippy::useless_vec)]` on by default help: you can use an array directly | 40 ~ let supported_protocols = [StreamProtocol::V5.as_str(), 41 + // Use the binary subprotocol v4, to get JSON `Status` object in `error` channel (3). 42 + // There's no official documentation about this protocol, but it's described in 43 + // [`k8s.io/apiserver/pkg/util/wsstream/conn.go`](https://git.io/JLQED). 44 + // There's a comment about v4 and `Status` object in 45 + // [`kublet/cri/streaming/remotecommand/httpstream.go`](https://git.io/JLQEh). 46 ~ StreamProtocol::V4.as_str()]; |

let header_value_string = supported_protocols.join(", ");

// Note: Multiple headers does not work. Only a single CSV works.
headers.insert(
http::header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::from_str(&header_value_string).map_err(|e| Error::HttpError(e.into()))?,
);

Ok(())
}

/// Return the subprotocol of an HTTP response.
fn get_from_response<B>(res: &Response<B>) -> Option<Self> {
let headers = res.headers();

match headers
.get(http::header::SEC_WEBSOCKET_PROTOCOL)

Check warning on line 67 in kube-client/src/client/upgrade.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/upgrade.rs#L67

Added line #L67 was not covered by tests
.map(|h| h.as_bytes()) {
Some(protocol) => if protocol == StreamProtocol::V4.as_bytes() {
Some(StreamProtocol::V4)
} else if protocol == StreamProtocol::V5.as_bytes() {
Some(StreamProtocol::V5)

Check warning on line 72 in kube-client/src/client/upgrade.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/upgrade.rs#L71-L72

Added lines #L71 - L72 were not covered by tests
} else {
None

Check warning on line 74 in kube-client/src/client/upgrade.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/upgrade.rs#L74

Added line #L74 was not covered by tests
},
_ => None,

Check warning on line 76 in kube-client/src/client/upgrade.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/upgrade.rs#L76

Added line #L76 was not covered by tests
}
}
}

/// Possible errors from upgrading to a WebSocket connection
#[cfg(feature = "ws")]
Expand Down Expand Up @@ -42,7 +113,7 @@ pub enum UpgradeConnectionError {

// Verify upgrade response according to RFC6455.
// Based on `tungstenite` and added subprotocol verification.
pub fn verify_response(res: &Response<Body>, key: &str) -> Result<(), UpgradeConnectionError> {
pub fn verify_response(res: &Response<Body>, key: &str) -> Result<StreamProtocol, UpgradeConnectionError> {
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
return Err(UpgradeConnectionError::ProtocolSwitch(res.status()));
}
Expand Down Expand Up @@ -75,14 +146,11 @@ pub fn verify_response(res: &Response<Body>, key: &str) -> Result<(), UpgradeCon
return Err(UpgradeConnectionError::SecWebSocketAcceptKeyMismatch);
}

// Make sure that the server returned the correct subprotocol.
if !headers
.get(http::header::SEC_WEBSOCKET_PROTOCOL)
.map(|h| h == WS_PROTOCOL)
.unwrap_or(false)
{
return Err(UpgradeConnectionError::SecWebSocketProtocolMismatch);
}
// Make sure that the server returned an expected subprotocol.
let protocol = match StreamProtocol::get_from_response(res) {
Some(p) => p,
None => return Err(UpgradeConnectionError::SecWebSocketProtocolMismatch)

Check warning on line 152 in kube-client/src/client/upgrade.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/upgrade.rs#L152

Added line #L152 was not covered by tests
};

Ok(())
Ok(protocol)
}
36 changes: 35 additions & 1 deletion kube-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ mod test {
#[cfg(feature = "ws")]
async fn pod_can_exec_and_write_to_stdin() -> Result<(), Box<dyn std::error::Error>> {
use crate::api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent};
use tokio::io::AsyncWriteExt;

let client = Client::try_default().await?;
let pods: Api<Pod> = Api::default_namespaced(client);
Expand Down Expand Up @@ -348,9 +349,42 @@ mod test {
assert_eq!(out, "1\n2\n3\n");
}

// Verify we read from stdout after stdin is closed.
{
let name = "busybox-kube2";
let command = vec!["sh", "-c", "sleep 2; echo test string 2"];
let ap = AttachParams::default().stdin(true).stderr(false);

// Make a connection so we can determine if the K8s cluster supports stream closing.
let mut req = pods.request.exec(name, command.clone(), &ap)?;
req.extensions_mut().insert("exec");
let stream = pods.client.connect(req).await?;

// This only works is the cluster supports protocol version v5.channel.k8s.io
// Skip for older protocols.
if stream.supports_stream_close() {
let mut attached = pods.exec(name, command, &ap).await?;
let mut stdin_writer = attached.stdin().unwrap();
let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());

Check warning on line 368 in kube-client/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/lib.rs#L366-L368

Added lines #L366 - L368 were not covered by tests

stdin_writer.write_all(b"this will be ignored\n").await?;
_ = stdin_writer.shutdown().await;

Check warning on line 371 in kube-client/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/lib.rs#L370-L371

Added lines #L370 - L371 were not covered by tests

let next_stdout = stdout_stream.next();
let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
assert_eq!(stdout, "test string 2\n");

Check warning on line 375 in kube-client/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/lib.rs#L373-L375

Added lines #L373 - L375 were not covered by tests

// AttachedProcess resolves with status object.
let status = attached.take_status().unwrap();
if let Some(status) = status.await {
assert_eq!(status.status, Some("Success".to_owned()));
assert_eq!(status.reason, None);

Check warning on line 381 in kube-client/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/lib.rs#L378-L381

Added lines #L378 - L381 were not covered by tests
}
}
}

// Verify we can write to Stdin
{
use tokio::io::AsyncWriteExt;
let mut attached = pods
.exec(
"busybox-kube2",
Expand Down

0 comments on commit 41c2313

Please sign in to comment.