Skip to content

Commit

Permalink
fix(server) handle ignored body and node:http destroy (#10268)
Browse files Browse the repository at this point in the history
  • Loading branch information
cirospaciari committed May 1, 2024
1 parent 23fb8ce commit 79c7abe
Show file tree
Hide file tree
Showing 9 changed files with 281 additions and 10 deletions.
7 changes: 6 additions & 1 deletion src/bun.js/api/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1846,10 +1846,15 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp

if (this.request_body) |body| {
ctxLog("finalizeWithoutDeinit: request_body != null", .{});
// Case 1:
// User called .blob(), .json(), text(), or .arrayBuffer() on the Request object
// but we received nothing or the connection was aborted
// the promise is pending
if (body.value == .Locked and body.value.Locked.hasPendingPromise()) {
// Case 2:
// User ignored the body and the connection was aborted or ended
// Case 3:
// Stream was not consumed and the connection was aborted or ended
if (body.value == .Locked) {
body.value.toErrorInstance(JSC.toTypeError(.ABORT_ERR, "Request aborted", .{}, this.server.globalThis), this.server.globalThis);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/bun.js/webcore/body.zig
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,7 @@ pub const Body = struct {
}

pub fn toErrorInstance(this: *Value, error_instance: JSC.JSValue, global: *JSGlobalObject) void {
error_instance.ensureStillAlive();
if (this.* == .Locked) {
var locked = this.Locked;
locked.deinit = true;
Expand All @@ -842,7 +843,7 @@ pub const Body = struct {
}

if (locked.readable.get()) |readable| {
readable.done(global);
readable.abort(global);
locked.readable.deinit();
}
// will be unprotected by body value deinit
Expand Down
26 changes: 21 additions & 5 deletions src/bun.js/webcore/streams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,37 @@ pub const ReadableStream = struct {
}

pub fn done(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
JSC.markBinding(@src());
// done is called when we are done consuming the stream
// cancel actually mark the stream source as done
// this will resolve any pending promises to done: true
switch (this.ptr) {
.Blob => |source| {
source.parent().cancel();
},
.File => |source| {
source.parent().cancel();
},
.Bytes => |source| {
source.parent().cancel();
},
else => {},
}
this.detachIfPossible(globalThis);
}

pub fn cancel(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
JSC.markBinding(@src());

// cancel the stream
ReadableStream__cancel(this.value, globalThis);
this.detachIfPossible(globalThis);
// mark the stream source as done
this.done(globalThis);
}

pub fn abort(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
JSC.markBinding(@src());

ReadableStream__cancel(this.value, globalThis);
this.detachIfPossible(globalThis);
// for now we are just calling cancel should be fine
this.cancel(globalThis);
}

pub fn forceDetach(this: *const ReadableStream, globalObject: *JSGlobalObject) void {
Expand Down
12 changes: 10 additions & 2 deletions src/js/node/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,6 @@ Object.defineProperty(OutgoingMessage.prototype, "finished", {

function emitCloseNT(self) {
if (!self._closed) {
self.destroyed = true;
self._closed = true;
self.emit("close");
}
Expand Down Expand Up @@ -1432,6 +1431,15 @@ class ClientRequest extends OutgoingMessage {
this.#bodyChunks.push(...chunks);
callback();
}
_destroy(err, callback) {
this.destroyed = true;
// If request is destroyed we abort the current response
this[kAbortController]?.abort?.();
if (err) {
this.emit("error", err);
}
callback();
}

_final(callback) {
this.#finished = true;
Expand Down Expand Up @@ -1519,7 +1527,7 @@ class ClientRequest extends OutgoingMessage {

abort() {
if (this.aborted) return;
this[kAbortController]!.abort();
this[kAbortController]?.abort?.();
// TODO: Close stream if body streaming
}

Expand Down
38 changes: 38 additions & 0 deletions test/js/bun/http/body-leak-test-fixture.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

134 changes: 134 additions & 0 deletions test/js/bun/http/serve-body-leak.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import { join } from "path";
import { it, expect, beforeAll, afterAll } from "bun:test";
import { bunExe, bunEnv } from "harness";
import type { Subprocess } from "bun";

const ACCEPTABLE_MEMORY_LEAK = 2; //MB for acceptable memory leak variance
const payload = "1".repeat(32 * 1024); // decent size payload to test memory leak

let url: URL;
let process: Subprocess<"ignore", "pipe", "inherit"> | null = null;
beforeAll(async () => {
process = Bun.spawn([bunExe(), "--smol", join(import.meta.dirname, "body-leak-test-fixture.ts")], {
env: bunEnv,
stdout: "pipe",
stderr: "inherit",
stdin: "ignore",
});
const { value } = await process.stdout.getReader().read();
url = new URL(new TextDecoder().decode(value));

await warmup();
});
afterAll(() => {
process?.kill();
});

async function getMemoryUsage(): Promise<number> {
return (await fetch(`${url.origin}/report`).then(res => res.json())) as number;
}

async function warmup() {
const batch = new Array(100);
for (let i = 0; i < 100; i++) {
for (let j = 0; j < 100; j++) {
// warmup the server with streaming requests, because is the most memory intensive
batch[j] = fetch(`${url.origin}/streaming`, {
method: "POST",
body: payload,
});
}
await Promise.all(batch);
}
// clean up memory before first test
await getMemoryUsage();
}

async function callBuffering() {
const result = await fetch(`${url.origin}/buffering`, {
method: "POST",
body: payload,
}).then(res => res.text());
expect(result).toBe("Ok");
}
async function callStreaming() {
const result = await fetch(`${url.origin}/streaming`, {
method: "POST",
body: payload,
}).then(res => res.text());
expect(result).toBe("Ok");
}
async function callIncompleteStreaming() {
const result = await fetch(`${url.origin}/incomplete-streaming`, {
method: "POST",
body: payload,
}).then(res => res.text());
expect(result).toBe("Ok");
}
async function callStreamingEcho() {
const result = await fetch(`${url.origin}/streaming-echo`, {
method: "POST",
body: payload,
}).then(res => res.text());
expect(result).toBe(payload);
}
async function callIgnore() {
const result = await fetch(url, {
method: "POST",
body: payload,
}).then(res => res.text());
expect(result).toBe("Ok");
}

async function calculateMemoryLeak(fn: () => Promise<void>) {
const start_memory = await getMemoryUsage();
const memory_examples: Array<number> = [];
let peak_memory = start_memory;
const batch = new Array(100);
for (let i = 0; i < 100; i++) {
for (let j = 0; j < 100; j++) {
batch[j] = fn();
}
await Promise.all(batch);
// garbage collect and check memory usage every 1000 requests
if (i > 0 && i % 10 === 0) {
const report = await getMemoryUsage();
if (report > peak_memory) {
peak_memory = report;
}
memory_examples.push(report);
}
}

// wait for the last memory usage to be stable
const end_memory = await getMemoryUsage();
if (end_memory > peak_memory) {
peak_memory = end_memory;
}
// use first example as a reference if is a memory leak this should keep increasing and not be stable
const consumption = end_memory - memory_examples[0];
// memory leak in MB
const leak = Math.floor(consumption > 0 ? consumption / 1024 / 1024 : 0);
return { leak, start_memory, peak_memory, end_memory, memory_examples };
}

for (const test_info of [
["#10265 should not leak memory when ignoring the body", callIgnore, false],
["should not leak memory when buffering the body", callBuffering, false],
["should not leak memory when streaming the body", callStreaming, false],
["should not leak memory when streaming the body incompletely", callIncompleteStreaming, true],
["should not leak memory when streaming the body and echoing it back", callStreamingEcho, true],
]) {
const [testName, fn, skip] = test_info as [string, () => Promise<void>, boolean];
it.todoIf(skip)(
testName,
async () => {
const report = await calculateMemoryLeak(fn);
// peak memory is too high
expect(report.peak_memory > report.start_memory * 2).toBe(false);
// acceptable memory leak
expect(report.leak).toBeLessThanOrEqual(ACCEPTABLE_MEMORY_LEAK);
},
30_000,
);
}
29 changes: 29 additions & 0 deletions test/js/bun/http/serve.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1484,3 +1484,32 @@ describe("should error with invalid options", async () => {
}).toThrow("Expected lowMemoryMode to be a boolean");
});
});
it("should resolve pending promise if requested ended with pending read", async () => {
let error: Error;
function shouldError(e: Error) {
error = e;
}
let is_done = false;
function shouldMarkDone(result: { done: boolean; value: any }) {
is_done = result.done;
}
await runTest(
{
fetch(req) {
// @ts-ignore
req.body?.getReader().read().catch(shouldError).then(shouldMarkDone);
return new Response("OK");
},
},
async server => {
const response = await fetch(server.url.origin, {
method: "POST",
body: "1".repeat(64 * 1024),
});
const text = await response.text();
expect(text).toContain("OK");
expect(is_done).toBe(true);
expect(error).toBeUndefined();
},
);
});
2 changes: 1 addition & 1 deletion test/js/node/http/fixtures/log-events.mjs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as http from "node:http";

const options = {
hostname: "www.google.com",
hostname: "www.example.com",
port: 80,
path: "/",
method: "GET",
Expand Down
40 changes: 40 additions & 0 deletions test/js/node/http/node-http.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1904,3 +1904,43 @@ it.skipIf(!process.env.TEST_INFO_STRIPE)("should be able to connect to stripe",
err = await new Response(stderr).text();
expect(err).toContain(`error: No such charge: '${charge_id}'\n`);
});

it("destroy should end download", async () => {
// just simulate some file that will take forever to download
const payload = Buffer.from("X".repeat(16 * 1024));

const server = Bun.serve({
port: 0,
async fetch(req) {
let running = true;
req.signal.onabort = () => (running = false);
return new Response(async function* () {
while (running) {
yield payload;
await Bun.sleep(10);
}
});
},
});

try {
let chunks = 0;

const { promise, resolve } = Promise.withResolvers();
const req = request(server.url, res => {
res.on("data", () => {
process.nextTick(resolve);
chunks++;
});
});
req.end();
// wait for the first chunk
await promise;
// should stop the download
req.destroy();
await Bun.sleep(200);
expect(chunks).toBeLessThanOrEqual(3);
} finally {
server.stop(true);
}
});

0 comments on commit 79c7abe

Please sign in to comment.