Skip to content

Commit

Permalink
websocket: fix message send corruption
Browse files Browse the repository at this point in the history
- Fix a bug in EAGAIN handling when sending frames that led to a
  corrupted last byte of the frame sent.

- Restore sanity to curl_ws_send() behaviour:

  - Partial writes are reported as OK with the actual number of
    payload bytes sent.

  - CURLE_AGAIN is only returned when none of the payload bytes
    (or for 0-length frames, not all of the frame header bytes)
    could be sent.

  - curl_ws_send() now behaves like a common send() call.

- Change 'ws-data' test client to allow concurrent send/recv
  operations and vary frame sizes and repeat count.

- Add DEBUG env var CURL_WS_CHUNK_EAGAIN to simulate blocking
  after a chunk of an encoded websocket frame has been sent.

- Add tests.


Prior to this change data corruption may occur when sending websocket
messages due to two bugs:

1) 3e64569 (precedes 8.10.0) caused a data corruption bug in the last
   byte of frame of large messages.

2) curl_ws_send had non-traditional send behavior and could return
   CURLE_AGAIN with bytes sent and expect the caller to adjust buffer
   and buflen in a subsequent call. That behavior was not documented.


Reported-by: [email protected]

Fixes curl#15865
Fixes curl#15865 (comment)
Closes curl#15901
  • Loading branch information
icing authored and jay committed Jan 16, 2025
1 parent 86f5653 commit 02edae5
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 224 deletions.
5 changes: 5 additions & 0 deletions docs/libcurl/libcurl-env-dbg.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ greater. There is a number of debug levels, refer to *openldap.c* comments.
Used to influence the buffer chunk size used for WebSocket encoding and
decoding.

## CURL_WS_CHUNK_EAGAIN

Used to simulate blocking sends after this chunk size for WebSocket
connections.

## CURL_FORBID_REUSE

Used to set the CURLOPT_FORBID_REUSE flag on each transfer initiated
Expand Down
23 changes: 0 additions & 23 deletions lib/bufq.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ static size_t chunk_len(const struct buf_chunk *chunk)
return chunk->w_offset - chunk->r_offset;
}

static size_t chunk_space(const struct buf_chunk *chunk)
{
return chunk->dlen - chunk->w_offset;
}

static void chunk_reset(struct buf_chunk *chunk)
{
chunk->next = NULL;
Expand Down Expand Up @@ -287,24 +282,6 @@ size_t Curl_bufq_len(const struct bufq *q)
return len;
}

size_t Curl_bufq_space(const struct bufq *q)
{
size_t space = 0;
if(q->tail)
space += chunk_space(q->tail);
if(q->spare) {
struct buf_chunk *chunk = q->spare;
while(chunk) {
space += chunk->dlen;
chunk = chunk->next;
}
}
if(q->chunk_count < q->max_chunks) {
space += (q->max_chunks - q->chunk_count) * q->chunk_size;
}
return space;
}

bool Curl_bufq_is_empty(const struct bufq *q)
{
return !q->head || chunk_is_empty(q->head);
Expand Down
8 changes: 0 additions & 8 deletions lib/bufq.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,6 @@ void Curl_bufq_free(struct bufq *q);
*/
size_t Curl_bufq_len(const struct bufq *q);

/**
* Return the total amount of free space in the queue.
* The returned length is the number of bytes that can
* be expected to be written successfully to the bufq,
* providing no memory allocations fail.
*/
size_t Curl_bufq_space(const struct bufq *q);

/**
* Returns TRUE iff there is no data in the buffer queue.
*/
Expand Down
156 changes: 88 additions & 68 deletions lib/ws.c
Original file line number Diff line number Diff line change
Expand Up @@ -1009,8 +1009,28 @@ static CURLcode ws_flush(struct Curl_easy *data, struct websocket *ws,
CURLcode result;
const unsigned char *out;
size_t outlen, n;
#ifdef DEBUGBUILD
/* Simulate a blocking send after this chunk has been sent */
bool eagain_next = FALSE;
size_t chunk_egain = 0;
char *p = getenv("CURL_WS_CHUNK_EAGAIN");
if(p) {
long l = strtol(p, NULL, 10);
if(l > 0 && l <= (1*1024*1024)) {
chunk_egain = (size_t)l;
}
}
#endif

while(Curl_bufq_peek(&ws->sendbuf, &out, &outlen)) {
#ifdef DEBUGBUILD
if(eagain_next)
return CURLE_AGAIN;
if(chunk_egain && (outlen > chunk_egain)) {
outlen = chunk_egain;
eagain_next = TRUE;
}
#endif
if(blocking) {
result = ws_send_raw_blocking(data, ws, (char *)out, outlen);
n = result ? 0 : outlen;
Expand Down Expand Up @@ -1119,15 +1139,15 @@ static CURLcode ws_send_raw(struct Curl_easy *data, const void *buffer,
return result;
}

CURL_EXTERN CURLcode curl_ws_send(CURL *d, const void *buffer,
CURL_EXTERN CURLcode curl_ws_send(CURL *d, const void *buffer_arg,
size_t buflen, size_t *sent,
curl_off_t fragsize,
unsigned int flags)
{
struct websocket *ws;
const unsigned char *buffer = buffer_arg;
ssize_t n;
size_t space, payload_added;
CURLcode result;
CURLcode result = CURLE_OK;
struct Curl_easy *data = d;

CURL_TRC_WS(data, "curl_ws_send(len=%zu, fragsize=%" FMT_OFF_T
Expand All @@ -1151,13 +1171,13 @@ CURL_EXTERN CURLcode curl_ws_send(CURL *d, const void *buffer,
}
ws = data->conn->proto.ws;

/* try flushing any content still waiting to be sent. */
result = ws_flush(data, ws, FALSE);
if(result)
goto out;

if(data->set.ws_raw_mode) {
/* In raw mode, we write directly to the connection */
/* try flushing any content still waiting to be sent. */
result = ws_flush(data, ws, FALSE);
if(result)
goto out;

if(fragsize || flags) {
failf(data, "ws_send, raw mode: fragsize and flags cannot be non-zero");
return CURLE_BAD_FUNCTION_ARGUMENT;
Expand All @@ -1167,87 +1187,87 @@ CURL_EXTERN CURLcode curl_ws_send(CURL *d, const void *buffer,
}

/* Not RAW mode, buf we do the frame encoding */
space = Curl_bufq_space(&ws->sendbuf);
CURL_TRC_WS(data, "curl_ws_send(len=%zu), sendbuf=%zu space_left=%zu",
buflen, Curl_bufq_len(&ws->sendbuf), space);
if(space < 14) {
result = CURLE_AGAIN;
goto out;
}

if(flags & CURLWS_OFFSET) {
if(fragsize) {
/* a frame series 'fragsize' bytes big, this is the first */
n = ws_enc_write_head(data, &ws->enc, flags, fragsize,
&ws->sendbuf, &result);
if(n < 0)
goto out;
if(ws->enc.payload_remain || !Curl_bufq_is_empty(&ws->sendbuf)) {
/* a frame is ongoing with payload buffered or more payload
* that needs to be encoded into the buffer */
if(buflen < ws->sendbuf_payload) {
/* We have been called with LESS buffer data than before. This
* is not how it's supposed too work. */
failf(data, "curl_ws_send() called with smaller 'buflen' than "
"bytes already buffered in previous call, %zu vs %zu",
buflen, ws->sendbuf_payload);
result = CURLE_BAD_FUNCTION_ARGUMENT;
goto out;
}
else {
if((curl_off_t)buflen > ws->enc.payload_remain) {
infof(data, "WS: unaligned frame size (sending %zu instead of %"
FMT_OFF_T ")",
buflen, ws->enc.payload_remain);
}
if((curl_off_t)buflen >
(ws->enc.payload_remain + (curl_off_t)ws->sendbuf_payload)) {
/* too large buflen beyond payload length of frame */
infof(data, "WS: unaligned frame size (sending %zu instead of %"
FMT_OFF_T ")",
buflen, ws->enc.payload_remain + ws->sendbuf_payload);
result = CURLE_BAD_FUNCTION_ARGUMENT;
goto out;
}
}
else if(!ws->enc.payload_remain) {
n = ws_enc_write_head(data, &ws->enc, flags, (curl_off_t)buflen,
else {
/* starting a new frame, we want a clean sendbuf */
curl_off_t payload_len = (flags & CURLWS_OFFSET) ?
fragsize : (curl_off_t)buflen;
result = ws_flush(data, ws, Curl_is_in_callback(data));
if(result)
goto out;

n = ws_enc_write_head(data, &ws->enc, flags, payload_len,
&ws->sendbuf, &result);
if(n < 0)
goto out;
}

n = ws_enc_write_payload(&ws->enc, data,
buffer, buflen, &ws->sendbuf, &result);
if(n < 0)
goto out;
payload_added = (size_t)n;
/* While there is either sendbuf to flush OR more payload to encode... */
while(!Curl_bufq_is_empty(&ws->sendbuf) || (buflen > ws->sendbuf_payload)) {
/* Try to add more payload to sendbuf */
if(buflen > ws->sendbuf_payload) {
size_t prev_len = Curl_bufq_len(&ws->sendbuf);
n = ws_enc_write_payload(&ws->enc, data,
buffer + ws->sendbuf_payload,
buflen - ws->sendbuf_payload,
&ws->sendbuf, &result);
if(n < 0 && (result != CURLE_AGAIN))
goto out;
ws->sendbuf_payload += Curl_bufq_len(&ws->sendbuf) - prev_len;
}

while(!result && (buflen || !Curl_bufq_is_empty(&ws->sendbuf))) {
/* flush, blocking when in callback */
result = ws_flush(data, ws, Curl_is_in_callback(data));
if(!result) {
DEBUGASSERT(payload_added <= buflen);
/* all buffered data sent. Try sending the rest if there is any. */
*sent += payload_added;
buffer = (const char *)buffer + payload_added;
buflen -= payload_added;
payload_added = 0;
if(buflen) {
n = ws_enc_write_payload(&ws->enc, data,
buffer, buflen, &ws->sendbuf, &result);
if(n < 0)
goto out;
payload_added = Curl_bufq_len(&ws->sendbuf);
}
*sent += ws->sendbuf_payload;
buffer += ws->sendbuf_payload;
buflen -= ws->sendbuf_payload;
ws->sendbuf_payload = 0;
}
else if(result == CURLE_AGAIN) {
/* partially sent. how much of the call data has been part of it? what
* should we report to out caller so it can retry/send the rest? */
if(payload_added < buflen) {
/* We did not add everything the caller wanted. Return just
* the partial write to our buffer. */
*sent = payload_added;
if(ws->sendbuf_payload > Curl_bufq_len(&ws->sendbuf)) {
/* blocked, part of payload bytes remain, report length
* that we managed to send. */
size_t flushed = (ws->sendbuf_payload - Curl_bufq_len(&ws->sendbuf));
*sent += flushed;
ws->sendbuf_payload -= flushed;
result = CURLE_OK;
goto out;
}
else if(!buflen) {
/* We have no payload to report a partial write. EAGAIN would make
* the caller repeat this and add the frame again.
* Flush blocking seems the only way out of this. */
*sent = (size_t)n;
result = ws_flush(data, ws, TRUE);
else {
/* blocked before sending headers or 1st payload byte. We cannot report
* OK on 0-length send (caller counts only payload) and EAGAIN */
CURL_TRC_WS(data, "EAGAIN flushing sendbuf, payload_encoded: %zu/%zu",
ws->sendbuf_payload, buflen);
DEBUGASSERT(*sent == 0);
result = CURLE_AGAIN;
goto out;
}
/* We added the complete data to our sendbuf. Report one byte less as
* sent. This partial success should make the caller invoke us again
* with the last byte. */
*sent = payload_added - 1;
result = Curl_bufq_unwrite(&ws->sendbuf, 1);
if(!result)
result = CURLE_AGAIN;
}
else
goto out; /* real error sending the data */
}

out:
Expand Down
3 changes: 2 additions & 1 deletion lib/ws.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ struct ws_encoder {
unsigned int xori; /* xor index */
unsigned char mask[4]; /* 32-bit mask for this connection */
unsigned char firstbyte; /* first byte of frame we encode */
bool contfragment; /* set TRUE if the previous fragment sent was not final */
BIT(contfragment); /* set TRUE if the previous fragment sent was not final */
};

/* A websocket connection with en- and decoder that treat frames
Expand All @@ -65,6 +65,7 @@ struct websocket {
struct bufq recvbuf; /* raw data from the server */
struct bufq sendbuf; /* raw data to be sent to the server */
struct curl_ws_frame frame; /* the current WS FRAME received */
size_t sendbuf_payload; /* number of payload bytes in sendbuf */
};

CURLcode Curl_ws_request(struct Curl_easy *data, struct dynbuf *req);
Expand Down
Loading

0 comments on commit 02edae5

Please sign in to comment.