Skip to content

Commit

Permalink
[Flight Reply] Encode ReadableStream and AsyncIterables (#28893)
Browse files Browse the repository at this point in the history
Same as #28847 but in the other direction.

Like other promises, this doesn't actually stream in the outgoing
direction. It buffers until the stream is done. This is mainly due to
our protocol remains compatible with Safari's lack of outgoing streams
until recently.

However, the stream chunks are encoded as separate fields and so does
support the busboy streaming on the receiving side.
  • Loading branch information
sebmarkbage committed May 3, 2024
1 parent 5fcfd71 commit ec9400d
Show file tree
Hide file tree
Showing 4 changed files with 722 additions and 14 deletions.
146 changes: 141 additions & 5 deletions packages/react-client/src/ReactFlightReplyClient.js
Expand Up @@ -20,6 +20,7 @@ import type {TemporaryReferenceSet} from './ReactFlightTemporaryReferences';
import {
enableRenderableContext,
enableBinaryFlight,
enableFlightReadableStream,
} from 'shared/ReactFeatureFlags';

import {
Expand All @@ -28,6 +29,7 @@ import {
REACT_CONTEXT_TYPE,
REACT_PROVIDER_TYPE,
getIteratorFn,
ASYNC_ITERATOR,
} from 'shared/ReactSymbols';

import {
Expand Down Expand Up @@ -206,6 +208,123 @@ export function processReply(
return '$' + tag + blobId.toString(16);
}

function serializeReadableStream(stream: ReadableStream): string {
if (formData === null) {
// Upgrade to use FormData to allow us to stream this value.
formData = new FormData();
}
const data = formData;

pendingParts++;
const streamId = nextPartId++;

// Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the
// receiving side. It also implies that different chunks can be split up or merged as opposed
// to a readable stream that happens to have Uint8Array as the type which might expect it to be
// received in the same slices.
// $FlowFixMe: This is a Node.js extension.
let supportsBYOB: void | boolean = stream.supportsBYOB;
if (supportsBYOB === undefined) {
try {
// $FlowFixMe[extra-arg]: This argument is accepted.
stream.getReader({mode: 'byob'}).releaseLock();
supportsBYOB = true;
} catch (x) {
supportsBYOB = false;
}
}

const reader = stream.getReader();

function progress(entry: {done: boolean, value: ReactServerValue, ...}) {
if (entry.done) {
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + streamId, 'C'); // Close signal
pendingParts--;
if (pendingParts === 0) {
resolve(data);
}
} else {
try {
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + streamId, partJSON);
reader.read().then(progress, reject);
} catch (x) {
reject(x);
}
}
}
reader.read().then(progress, reject);

return '$' + (supportsBYOB ? 'r' : 'R') + streamId.toString(16);
}

function serializeAsyncIterable(
iterable: $AsyncIterable<ReactServerValue, ReactServerValue, void>,
iterator: $AsyncIterator<ReactServerValue, ReactServerValue, void>,
): string {
if (formData === null) {
// Upgrade to use FormData to allow us to stream this value.
formData = new FormData();
}
const data = formData;

pendingParts++;
const streamId = nextPartId++;

// Generators/Iterators are Iterables but they're also their own iterator
// functions. If that's the case, we treat them as single-shot. Otherwise,
// we assume that this iterable might be a multi-shot and allow it to be
// iterated more than once on the receiving server.
const isIterator = iterable === iterator;

// There's a race condition between when the stream is aborted and when the promise
// resolves so we track whether we already aborted it to avoid writing twice.
function progress(
entry:
| {done: false, +value: ReactServerValue, ...}
| {done: true, +value: ReactServerValue, ...},
) {
if (entry.done) {
if (entry.value === undefined) {
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + streamId, 'C'); // Close signal
} else {
// Unlike streams, the last value may not be undefined. If it's not
// we outline it and encode a reference to it in the closing instruction.
try {
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
data.append(formFieldPrefix + streamId, 'C' + partJSON); // Close signal
} catch (x) {
reject(x);
return;
}
}
pendingParts--;
if (pendingParts === 0) {
resolve(data);
}
} else {
try {
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + streamId, partJSON);
iterator.next().then(progress, reject);
} catch (x) {
reject(x);
return;
}
}
}

iterator.next().then(progress, reject);
return '$' + (isIterator ? 'x' : 'X') + streamId.toString(16);
}

function resolveToJSON(
this:
| {+[key: string | number]: ReactServerValue}
Expand Down Expand Up @@ -349,11 +468,9 @@ export function processReply(
reject(reason);
}
},
reason => {
// In the future we could consider serializing this as an error
// that throws on the server instead.
reject(reason);
},
// In the future we could consider serializing this as an error
// that throws on the server instead.
reject,
);
return serializePromiseID(promiseId);
}
Expand Down Expand Up @@ -486,6 +603,25 @@ export function processReply(
return Array.from((iterator: any));
}

if (enableFlightReadableStream) {
// TODO: ReadableStream is not available in old Node. Remove the typeof check later.
if (
typeof ReadableStream === 'function' &&
value instanceof ReadableStream
) {
return serializeReadableStream(value);
}
const getAsyncIterator: void | (() => $AsyncIterator<any, any, any>) =
(value: any)[ASYNC_ITERATOR];
if (typeof getAsyncIterator === 'function') {
// We treat AsyncIterables as a Fragment and as such we might need to key them.
return serializeAsyncIterable(
(value: any),
getAsyncIterator.call((value: any)),
);
}
}

// Verify that this is a simple plain object.
const proto = getPrototypeOf(value);
if (
Expand Down
Expand Up @@ -376,4 +376,165 @@ describe('ReactFlightDOMReply', () => {
// This should've been the same reference that we already saw.
expect(response.children).toBe(children);
});

// @gate enableFlightReadableStream
it('should supports streaming ReadableStream with objects', async () => {
let controller1;
let controller2;
const s1 = new ReadableStream({
start(c) {
controller1 = c;
},
});
const s2 = new ReadableStream({
start(c) {
controller2 = c;
},
});

const promise = ReactServerDOMClient.encodeReply({s1, s2});

controller1.enqueue({hello: 'world'});
controller2.enqueue({hi: 'there'});

controller1.enqueue('text1');
controller2.enqueue('text2');

controller1.close();
controller2.close();

const body = await promise;

const result = await ReactServerDOMServer.decodeReply(
body,
webpackServerMap,
);
const reader1 = result.s1.getReader();
const reader2 = result.s2.getReader();

expect(await reader1.read()).toEqual({
value: {hello: 'world'},
done: false,
});
expect(await reader2.read()).toEqual({
value: {hi: 'there'},
done: false,
});

expect(await reader1.read()).toEqual({
value: 'text1',
done: false,
});
expect(await reader1.read()).toEqual({
value: undefined,
done: true,
});
expect(await reader2.read()).toEqual({
value: 'text2',
done: false,
});
expect(await reader2.read()).toEqual({
value: undefined,
done: true,
});
});

// @gate enableFlightReadableStream
it('should supports streaming AsyncIterables with objects', async () => {
let resolve;
const wait = new Promise(r => (resolve = r));
const multiShotIterable = {
async *[Symbol.asyncIterator]() {
const next = yield {hello: 'A'};
expect(next).toBe(undefined);
await wait;
yield {hi: 'B'};
return 'C';
},
};
const singleShotIterator = (async function* () {
const next = yield {hello: 'D'};
expect(next).toBe(undefined);
await wait;
yield {hi: 'E'};
return 'F';
})();

await resolve();

const body = await ReactServerDOMClient.encodeReply({
multiShotIterable,
singleShotIterator,
});
const result = await ReactServerDOMServer.decodeReply(
body,
webpackServerMap,
);

const iterator1 = result.multiShotIterable[Symbol.asyncIterator]();
const iterator2 = result.singleShotIterator[Symbol.asyncIterator]();

expect(iterator1).not.toBe(result.multiShotIterable);
expect(iterator2).toBe(result.singleShotIterator);

expect(await iterator1.next()).toEqual({
value: {hello: 'A'},
done: false,
});
expect(await iterator2.next()).toEqual({
value: {hello: 'D'},
done: false,
});

expect(await iterator1.next()).toEqual({
value: {hi: 'B'},
done: false,
});
expect(await iterator2.next()).toEqual({
value: {hi: 'E'},
done: false,
});
expect(await iterator1.next()).toEqual({
value: 'C', // Return value
done: true,
});
expect(await iterator1.next()).toEqual({
value: undefined,
done: true,
});

expect(await iterator2.next()).toEqual({
value: 'F', // Return value
done: true,
});

// Multi-shot iterables should be able to do the same thing again
const iterator3 = result.multiShotIterable[Symbol.asyncIterator]();

expect(iterator3).not.toBe(iterator1);

// We should be able to iterate over the iterable again and it should be
// synchronously available using instrumented promises so that React can
// rerender it synchronously.
expect(iterator3.next().value).toEqual({
value: {hello: 'A'},
done: false,
});
expect(iterator3.next().value).toEqual({
value: {hi: 'B'},
done: false,
});
expect(iterator3.next().value).toEqual({
value: 'C', // Return value
done: true,
});
expect(iterator3.next().value).toEqual({
value: undefined,
done: true,
});

expect(() => iterator3.next('this is not allowed')).toThrow(
'Values cannot be passed to next() of AsyncIterables passed to Client Components.',
);
});
});

0 comments on commit ec9400d

Please sign in to comment.