Skip to content

Commit

Permalink
fix: move pipeline reset to user space
Browse files Browse the repository at this point in the history
The current behavior was broken. The user must decide if and how
a request should be retried.
  • Loading branch information
ronag committed Feb 29, 2024
1 parent c90af4a commit 81563f6
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 53 deletions.
10 changes: 10 additions & 0 deletions lib/core/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ class InformationalError extends UndiciError {
}
}

class ResetError extends UndiciError {
constructor (message) {
super(message)
this.name = 'ResetError'
this.message = message || 'Request reset'
this.code = 'ECONNRESET'
}
}

class RequestContentLengthMismatchError extends UndiciError {
constructor (message) {
super(message)
Expand Down Expand Up @@ -211,6 +220,7 @@ module.exports = {
ClientDestroyedError,
ClientClosedError,
InformationalError,
ResetError,
SocketError,
NotSupportedError,
ResponseContentLengthMismatchError,
Expand Down
30 changes: 9 additions & 21 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const {
HeadersOverflowError,
SocketError,
InformationalError,
ResetError,
BodyTimeoutError,
HTTPParserError,
ResponseExceededMaxSizeError
Expand All @@ -25,7 +26,6 @@ const {
kParser,
kBlocking,
kRunning,
kPending,
kSize,
kWriting,
kQueue,
Expand Down Expand Up @@ -707,33 +707,21 @@ async function connectH1 (client, socket) {
this[kParser] = null
}

const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
this[kError] ??= new SocketError('closed', util.getSocketInfo(this))

client[kSocket] = null
client[kHTTPContext] = null // TODO (fix): This is hacky...

if (client.destroyed) {
assert(client[kPending] === 0)

// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
errorRequest(client, request, err)
}
} else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
// Fail head of pipeline.
const request = client[kQueue][client[kRunningIdx]]
client[kQueue][client[kRunningIdx]++] = null

errorRequest(client, request, err)
if (client[kRunningIdx] < client[kPendingIdx]) {
errorRequest(client, client[kQueue][client[kRunningIdx]++], this[kError])
}

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)
const err = Object.assign(new ResetError('pipeline reset'), { cause: this[kError] })
while (client[kRunningIdx] < client[kPendingIdx]) {
errorRequest(client, client[kQueue][client[kRunningIdx]++], err)
}

client.emit('disconnect', client[kUrl], [client], err)
client.emit('disconnect', client[kUrl], [client], this[kError])

client[kResume]()
})
Expand Down
41 changes: 9 additions & 32 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ const {
kReset,
kClient,
kRunning,
kPending,
kQueue,
kPendingIdx,
kRunningIdx,
kError,
kSocket,
kStrictContentLength,
kOnError,
// HTTP2
kHTTPContext,
kMaxConcurrentStreams,
kHTTP2Session,
kResume
Expand Down Expand Up @@ -79,24 +78,18 @@ async function connectH2 (client, socket) {
session.on('close', function () {
const { [kClient]: client } = this

const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
socket[kError] ??= new SocketError('closed', util.getSocketInfo(socket))

client[kSocket] = null
client[kHTTPContext] = null // TODO (fix): This is hacky...

assert(client[kPending] === 0)

// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
errorRequest(client, request, err)
while (client[kRunningIdx] < client[kPendingIdx]) {
errorRequest(client, client[kQueue][client[kRunningIdx]++], socket[kError])
}

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)
assert(client[kRunning] === 0)

client.emit('disconnect', client[kUrl], [client], err)
client.emit('disconnect', client[kUrl], [client], socket[kError])

client[kResume]()
})
Expand Down Expand Up @@ -176,25 +169,9 @@ function onHTTP2GoAway (code) {
client[kSocket] = null
client[kHTTP2Session] = null

if (client.destroyed) {
assert(this[kPending] === 0)

// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
errorRequest(this, request, err)
}
} else if (client[kRunning] > 0) {
// Fail head of pipeline.
const request = client[kQueue][client[kRunningIdx]]
client[kQueue][client[kRunningIdx]++] = null

errorRequest(client, request, err)
while (client[kRunningIdx] < client[kPendingIdx]) {
errorRequest(client, client[kQueue][client[kRunningIdx]++], err)
}

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)

client.emit('disconnect',
Expand Down

0 comments on commit 81563f6

Please sign in to comment.