Skip to content

Commit

Permalink
deduplicate onClose implementations for web handlers, wait for stream…
Browse files Browse the repository at this point in the history
… end in edge route modules
  • Loading branch information
lubieowoce committed May 9, 2024
1 parent fa82174 commit c83ae52
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 74 deletions.
40 changes: 5 additions & 35 deletions packages/next/src/server/base-http/web.ts
Expand Up @@ -5,6 +5,7 @@ import { toNodeOutgoingHttpHeaders } from '../web/utils'
import { BaseNextRequest, BaseNextResponse } from './index'
import { DetachedPromise } from '../../lib/detached-promise'
import type { NextRequestHint } from '../web/adapter'
import { CloseController, trackBodyConsumed } from '../web/web-on-close'

export class WebNextRequest extends BaseNextRequest<ReadableStream | null> {
public request: Request
Expand Down Expand Up @@ -37,8 +38,7 @@ export class WebNextResponse extends BaseNextResponse<WritableStream> {
private headers = new Headers()
private textBody: string | undefined = undefined

private listeners = 0
private target = new EventTarget()
private closeController = new CloseController()

public statusCode: number | undefined
public statusMessage: string | undefined
Expand Down Expand Up @@ -108,9 +108,9 @@ export class WebNextResponse extends BaseNextResponse<WritableStream> {
const body = this.textBody ?? this.transformStream.readable

let bodyInit: BodyInit = body
if (this.listeners > 0) {
if (this.closeController.listeners > 0) {
bodyInit = trackBodyConsumed(body, () => {
this.target.dispatchEvent(new Event('close'))
this.closeController.dispatchClose()
})
}

Expand All @@ -125,36 +125,6 @@ export class WebNextResponse extends BaseNextResponse<WritableStream> {
if (this.sent) {
throw new Error('Cannot call onClose on a response that is already sent')
}
this.target.addEventListener('close', callback)
this.listeners++
return () => {
this.target.removeEventListener('close', callback)
this.listeners--
}
}
}

function trackBodyConsumed(
body: string | ReadableStream,
onEnd: () => void
): BodyInit {
// monitor when the consumer finishes reading the response body.
// that's as close as we can get to `res.on('close')` using web APIs.

if (typeof body === 'string') {
const generator = async function* generate() {
const encoder = new TextEncoder()
yield encoder.encode(body)
onEnd()
}
// @ts-expect-error BodyInit typings doesn't seem to include AsyncIterables even though it's supported in practice
return generator()
} else {
const closePassThrough = new TransformStream({
flush: () => {
return onEnd()
},
})
return body.pipeThrough(closePassThrough)
return this.closeController.onClose(callback)
}
}
21 changes: 8 additions & 13 deletions packages/next/src/server/web/adapter.ts
Expand Up @@ -22,6 +22,7 @@ import { getTracer } from '../lib/trace/tracer'
import type { TextMapGetter } from 'next/dist/compiled/@opentelemetry/api'
import { MiddlewareSpan } from '../lib/trace/constants'
import type { RenderOptsPartial } from '../app-render/types'
import { CloseController } from './web-on-close'

export class NextRequestHint extends NextRequest {
sourcePage: string
Expand Down Expand Up @@ -222,19 +223,11 @@ export async function adapter(
!!process.env.__NEXT_AFTER

let waitUntil: WrapperRenderOpts['waitUntil'] = undefined
let onClose: WrapperRenderOpts['onClose'] = undefined
let dispatchClose: (() => void) | undefined = undefined
let closeController: CloseController | undefined = undefined

if (isAfterEnabled) {
waitUntil = event.waitUntil.bind(event)

const requestClosedTarget = new EventTarget()
onClose = (callback: () => void) => {
requestClosedTarget.addEventListener('close', callback)
}
dispatchClose = () => {
requestClosedTarget.dispatchEvent(new Event('close'))
}
closeController = new CloseController()
}

return getTracer().trace(
Expand Down Expand Up @@ -262,7 +255,9 @@ export async function adapter(
previewModeSigningKey: '',
},
waitUntil,
onClose,
onClose: closeController
? closeController.onClose.bind(closeController)
: undefined,
experimental: {
after: isAfterEnabled,
} as RenderOptsPartial['experimental'],
Expand All @@ -274,10 +269,10 @@ export async function adapter(
} finally {
// middleware cannot stream, so we can consider the response closed
// as soon as the handler returns.
if (dispatchClose) {
if (closeController && closeController.listeners > 0) {
// we can delay running it until a bit later --
// if it's needed, we'll have a `waitUntil` lock anyway.
setTimeout(dispatchClose, 0)
setTimeout(() => closeController!.dispatchClose(), 0)
}
}
}
Expand Down
57 changes: 31 additions & 26 deletions packages/next/src/server/web/edge-route-module-wrapper.ts
Expand Up @@ -15,6 +15,7 @@ import { internal_getCurrentFunctionWaitUntil } from './internal-edge-wait-until
import { getUtils } from '../server-utils'
import { searchParamsToUrlQuery } from '../../shared/lib/router/utils/querystring'
import type { RequestLifecycleOpts } from '../base-server'
import { CloseController, trackStreamConsumed } from './web-on-close'

type WrapOptions = Partial<Pick<AdapterOptions, 'page'>>

Expand Down Expand Up @@ -91,18 +92,11 @@ export class EdgeRouteModuleWrapper {
const isAfterEnabled = !!process.env.__NEXT_AFTER

let waitUntil: RequestLifecycleOpts['waitUntil'] | undefined = undefined
let onClose: RequestLifecycleOpts['onClose'] | undefined = undefined
let dispatchClose: (() => void) | undefined = undefined
let closeController: CloseController | undefined

if (isAfterEnabled) {
waitUntil = evt.waitUntil.bind(evt)

const requestClosedTarget = new EventTarget()
onClose = (callback: () => void) => {
requestClosedTarget.addEventListener('close', callback)
}
dispatchClose = () => {
requestClosedTarget.dispatchEvent(new Event('close'))
}
closeController = new CloseController()
}

// Create the context for the handler. This contains the params from the
Expand All @@ -123,30 +117,41 @@ export class EdgeRouteModuleWrapper {
renderOpts: {
supportsDynamicHTML: true,
waitUntil,
onClose,
onClose: closeController
? closeController.onClose.bind(closeController)
: undefined,
experimental: {
after: isAfterEnabled,
},
},
}

try {
// Get the response from the handler.
const res = await this.routeModule.handle(request, context)
// Get the response from the handler.
let res = await this.routeModule.handle(request, context)

const waitUntilPromises = [internal_getCurrentFunctionWaitUntil()]
if (context.renderOpts.pendingWaitUntil) {
waitUntilPromises.push(context.renderOpts.pendingWaitUntil)
}
evt.waitUntil(Promise.all(waitUntilPromises))

return res
} finally {
// TODO(after): this might be a streaming response, in which case this'll run too early.
// we should probably do the same thing as `WebNextResponse#onClose` here
if (dispatchClose) {
setTimeout(dispatchClose, 0)
const waitUntilPromises = [internal_getCurrentFunctionWaitUntil()]
if (context.renderOpts.pendingWaitUntil) {
waitUntilPromises.push(context.renderOpts.pendingWaitUntil)
}
evt.waitUntil(Promise.all(waitUntilPromises))

if (closeController && closeController.listeners > 0) {
if (!res.body) {
// we can delay running it until a bit later --
// if it's needed, we'll have a `waitUntil` lock anyway.
setTimeout(() => closeController!.dispatchClose(), 0)
} else {
const trackedBody = trackStreamConsumed(res.body, () =>
closeController!.dispatchClose()
)
res = new Response(trackedBody, {
status: res.status,
statusText: res.statusText,
headers: res.headers,
})
}
}

return res
}
}
60 changes: 60 additions & 0 deletions packages/next/src/server/web/web-on-close.ts
@@ -0,0 +1,60 @@
/** Monitor when the consumer finishes reading the response body.
that's as close as we can get to `res.on('close')` using web APIs.
*/
export function trackBodyConsumed(
body: string | ReadableStream,
onEnd: () => void
): BodyInit {
if (typeof body === 'string') {
const generator = async function* generate() {
const encoder = new TextEncoder()
yield encoder.encode(body)
onEnd()
}
// @ts-expect-error BodyInit typings doesn't seem to include AsyncIterables even though it's supported in practice
return generator()
} else {
return trackStreamConsumed(body, onEnd)
}
}

export function trackStreamConsumed<TChunk>(
stream: ReadableStream<TChunk>,
onEnd: () => void
): ReadableStream<TChunk> {
const closePassThrough = new TransformStream<TChunk, TChunk>({
flush: () => {
return onEnd()
},
})
return stream.pipeThrough(closePassThrough)
}

export function createCloseController() {
return new CloseController()
}

export class CloseController {
target = new EventTarget()
listeners = 0
isClosed = false

onClose(callback: () => void) {
if (this.isClosed) {
throw new Error('Cannot subscribe to a closed CloseController')
}

this.target.addEventListener('close', callback)
this.listeners++
}

dispatchClose() {
if (this.isClosed) {
throw new Error('Cannot close multiple times')
}
if (this.listeners > 0) {
this.target.dispatchEvent(new Event('close'))
}
this.isClosed = true
}
}

0 comments on commit c83ae52

Please sign in to comment.