diff --git a/packages/next/src/server/base-http/web.ts b/packages/next/src/server/base-http/web.ts index c8ac6a35306a..76c6207209fa 100644 --- a/packages/next/src/server/base-http/web.ts +++ b/packages/next/src/server/base-http/web.ts @@ -5,6 +5,7 @@ import { toNodeOutgoingHttpHeaders } from '../web/utils' import { BaseNextRequest, BaseNextResponse } from './index' import { DetachedPromise } from '../../lib/detached-promise' import type { NextRequestHint } from '../web/adapter' +import { CloseController, trackBodyConsumed } from '../web/web-on-close' export class WebNextRequest extends BaseNextRequest { public request: Request @@ -37,8 +38,7 @@ export class WebNextResponse extends BaseNextResponse { private headers = new Headers() private textBody: string | undefined = undefined - private listeners = 0 - private target = new EventTarget() + private closeController = new CloseController() public statusCode: number | undefined public statusMessage: string | undefined @@ -108,9 +108,9 @@ export class WebNextResponse extends BaseNextResponse { const body = this.textBody ?? this.transformStream.readable let bodyInit: BodyInit = body - if (this.listeners > 0) { + if (this.closeController.listeners > 0) { bodyInit = trackBodyConsumed(body, () => { - this.target.dispatchEvent(new Event('close')) + this.closeController.dispatchClose() }) } @@ -125,36 +125,6 @@ export class WebNextResponse extends BaseNextResponse { if (this.sent) { throw new Error('Cannot call onClose on a response that is already sent') } - this.target.addEventListener('close', callback) - this.listeners++ - return () => { - this.target.removeEventListener('close', callback) - this.listeners-- - } - } -} - -function trackBodyConsumed( - body: string | ReadableStream, - onEnd: () => void -): BodyInit { - // monitor when the consumer finishes reading the response body. - // that's as close as we can get to `res.on('close')` using web APIs. - - if (typeof body === 'string') { - const generator = async function* generate() { - const encoder = new TextEncoder() - yield encoder.encode(body) - onEnd() - } - // @ts-expect-error BodyInit typings doesn't seem to include AsyncIterables even though it's supported in practice - return generator() - } else { - const closePassThrough = new TransformStream({ - flush: () => { - return onEnd() - }, - }) - return body.pipeThrough(closePassThrough) + return this.closeController.onClose(callback) } } diff --git a/packages/next/src/server/web/adapter.ts b/packages/next/src/server/web/adapter.ts index 52086c93ef48..6dc29eb3af4f 100644 --- a/packages/next/src/server/web/adapter.ts +++ b/packages/next/src/server/web/adapter.ts @@ -22,6 +22,7 @@ import { getTracer } from '../lib/trace/tracer' import type { TextMapGetter } from 'next/dist/compiled/@opentelemetry/api' import { MiddlewareSpan } from '../lib/trace/constants' import type { RenderOptsPartial } from '../app-render/types' +import { CloseController } from './web-on-close' export class NextRequestHint extends NextRequest { sourcePage: string @@ -222,19 +223,11 @@ export async function adapter( !!process.env.__NEXT_AFTER let waitUntil: WrapperRenderOpts['waitUntil'] = undefined - let onClose: WrapperRenderOpts['onClose'] = undefined - let dispatchClose: (() => void) | undefined = undefined + let closeController: CloseController | undefined = undefined if (isAfterEnabled) { waitUntil = event.waitUntil.bind(event) - - const requestClosedTarget = new EventTarget() - onClose = (callback: () => void) => { - requestClosedTarget.addEventListener('close', callback) - } - dispatchClose = () => { - requestClosedTarget.dispatchEvent(new Event('close')) - } + closeController = new CloseController() } return getTracer().trace( @@ -262,7 +255,9 @@ export async function adapter( previewModeSigningKey: '', }, waitUntil, - onClose, + onClose: closeController + ? closeController.onClose.bind(closeController) + : undefined, experimental: { after: isAfterEnabled, } as RenderOptsPartial['experimental'], @@ -274,10 +269,10 @@ export async function adapter( } finally { // middleware cannot stream, so we can consider the response closed // as soon as the handler returns. - if (dispatchClose) { + if (closeController && closeController.listeners > 0) { // we can delay running it until a bit later -- // if it's needed, we'll have a `waitUntil` lock anyway. - setTimeout(dispatchClose, 0) + setTimeout(() => closeController!.dispatchClose(), 0) } } } diff --git a/packages/next/src/server/web/edge-route-module-wrapper.ts b/packages/next/src/server/web/edge-route-module-wrapper.ts index 3cf51bc3b638..7792519ea37b 100644 --- a/packages/next/src/server/web/edge-route-module-wrapper.ts +++ b/packages/next/src/server/web/edge-route-module-wrapper.ts @@ -15,6 +15,7 @@ import { internal_getCurrentFunctionWaitUntil } from './internal-edge-wait-until import { getUtils } from '../server-utils' import { searchParamsToUrlQuery } from '../../shared/lib/router/utils/querystring' import type { RequestLifecycleOpts } from '../base-server' +import { CloseController, trackStreamConsumed } from './web-on-close' type WrapOptions = Partial> @@ -91,18 +92,11 @@ export class EdgeRouteModuleWrapper { const isAfterEnabled = !!process.env.__NEXT_AFTER let waitUntil: RequestLifecycleOpts['waitUntil'] | undefined = undefined - let onClose: RequestLifecycleOpts['onClose'] | undefined = undefined - let dispatchClose: (() => void) | undefined = undefined + let closeController: CloseController | undefined + if (isAfterEnabled) { waitUntil = evt.waitUntil.bind(evt) - - const requestClosedTarget = new EventTarget() - onClose = (callback: () => void) => { - requestClosedTarget.addEventListener('close', callback) - } - dispatchClose = () => { - requestClosedTarget.dispatchEvent(new Event('close')) - } + closeController = new CloseController() } // Create the context for the handler. This contains the params from the @@ -123,30 +117,41 @@ export class EdgeRouteModuleWrapper { renderOpts: { supportsDynamicHTML: true, waitUntil, - onClose, + onClose: closeController + ? closeController.onClose.bind(closeController) + : undefined, experimental: { after: isAfterEnabled, }, }, } - try { - // Get the response from the handler. - const res = await this.routeModule.handle(request, context) + // Get the response from the handler. + let res = await this.routeModule.handle(request, context) - const waitUntilPromises = [internal_getCurrentFunctionWaitUntil()] - if (context.renderOpts.pendingWaitUntil) { - waitUntilPromises.push(context.renderOpts.pendingWaitUntil) - } - evt.waitUntil(Promise.all(waitUntilPromises)) - - return res - } finally { - // TODO(after): this might be a streaming response, in which case this'll run too early. - // we should probably do the same thing as `WebNextResponse#onClose` here - if (dispatchClose) { - setTimeout(dispatchClose, 0) + const waitUntilPromises = [internal_getCurrentFunctionWaitUntil()] + if (context.renderOpts.pendingWaitUntil) { + waitUntilPromises.push(context.renderOpts.pendingWaitUntil) + } + evt.waitUntil(Promise.all(waitUntilPromises)) + + if (closeController && closeController.listeners > 0) { + if (!res.body) { + // we can delay running it until a bit later -- + // if it's needed, we'll have a `waitUntil` lock anyway. + setTimeout(() => closeController!.dispatchClose(), 0) + } else { + const trackedBody = trackStreamConsumed(res.body, () => + closeController!.dispatchClose() + ) + res = new Response(trackedBody, { + status: res.status, + statusText: res.statusText, + headers: res.headers, + }) } } + + return res } } diff --git a/packages/next/src/server/web/web-on-close.ts b/packages/next/src/server/web/web-on-close.ts new file mode 100644 index 000000000000..4255d4fa1ab3 --- /dev/null +++ b/packages/next/src/server/web/web-on-close.ts @@ -0,0 +1,60 @@ +/** Monitor when the consumer finishes reading the response body. +that's as close as we can get to `res.on('close')` using web APIs. +*/ +export function trackBodyConsumed( + body: string | ReadableStream, + onEnd: () => void +): BodyInit { + if (typeof body === 'string') { + const generator = async function* generate() { + const encoder = new TextEncoder() + yield encoder.encode(body) + onEnd() + } + // @ts-expect-error BodyInit typings doesn't seem to include AsyncIterables even though it's supported in practice + return generator() + } else { + return trackStreamConsumed(body, onEnd) + } +} + +export function trackStreamConsumed( + stream: ReadableStream, + onEnd: () => void +): ReadableStream { + const closePassThrough = new TransformStream({ + flush: () => { + return onEnd() + }, + }) + return stream.pipeThrough(closePassThrough) +} + +export function createCloseController() { + return new CloseController() +} + +export class CloseController { + target = new EventTarget() + listeners = 0 + isClosed = false + + onClose(callback: () => void) { + if (this.isClosed) { + throw new Error('Cannot subscribe to a closed CloseController') + } + + this.target.addEventListener('close', callback) + this.listeners++ + } + + dispatchClose() { + if (this.isClosed) { + throw new Error('Cannot close multiple times') + } + if (this.listeners > 0) { + this.target.dispatchEvent(new Event('close')) + } + this.isClosed = true + } +}