diff --git a/packages/react-client/src/ReactFlightReplyClient.js b/packages/react-client/src/ReactFlightReplyClient.js index b54f6e4edb0f..a78f4517546b 100644 --- a/packages/react-client/src/ReactFlightReplyClient.js +++ b/packages/react-client/src/ReactFlightReplyClient.js @@ -20,6 +20,7 @@ import type {TemporaryReferenceSet} from './ReactFlightTemporaryReferences'; import { enableRenderableContext, enableBinaryFlight, + enableFlightReadableStream, } from 'shared/ReactFeatureFlags'; import { @@ -28,6 +29,7 @@ import { REACT_CONTEXT_TYPE, REACT_PROVIDER_TYPE, getIteratorFn, + ASYNC_ITERATOR, } from 'shared/ReactSymbols'; import { @@ -206,6 +208,123 @@ export function processReply( return '$' + tag + blobId.toString(16); } + function serializeReadableStream(stream: ReadableStream): string { + if (formData === null) { + // Upgrade to use FormData to allow us to stream this value. + formData = new FormData(); + } + const data = formData; + + pendingParts++; + const streamId = nextPartId++; + + // Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the + // receiving side. It also implies that different chunks can be split up or merged as opposed + // to a readable stream that happens to have Uint8Array as the type which might expect it to be + // received in the same slices. + // $FlowFixMe: This is a Node.js extension. + let supportsBYOB: void | boolean = stream.supportsBYOB; + if (supportsBYOB === undefined) { + try { + // $FlowFixMe[extra-arg]: This argument is accepted. + stream.getReader({mode: 'byob'}).releaseLock(); + supportsBYOB = true; + } catch (x) { + supportsBYOB = false; + } + } + + const reader = stream.getReader(); + + function progress(entry: {done: boolean, value: ReactServerValue, ...}) { + if (entry.done) { + // eslint-disable-next-line react-internal/safe-string-coercion + data.append(formFieldPrefix + streamId, 'C'); // Close signal + pendingParts--; + if (pendingParts === 0) { + resolve(data); + } + } else { + try { + // $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here. + const partJSON: string = JSON.stringify(entry.value, resolveToJSON); + // eslint-disable-next-line react-internal/safe-string-coercion + data.append(formFieldPrefix + streamId, partJSON); + reader.read().then(progress, reject); + } catch (x) { + reject(x); + } + } + } + reader.read().then(progress, reject); + + return '$' + (supportsBYOB ? 'r' : 'R') + streamId.toString(16); + } + + function serializeAsyncIterable( + iterable: $AsyncIterable, + iterator: $AsyncIterator, + ): string { + if (formData === null) { + // Upgrade to use FormData to allow us to stream this value. + formData = new FormData(); + } + const data = formData; + + pendingParts++; + const streamId = nextPartId++; + + // Generators/Iterators are Iterables but they're also their own iterator + // functions. If that's the case, we treat them as single-shot. Otherwise, + // we assume that this iterable might be a multi-shot and allow it to be + // iterated more than once on the receiving server. + const isIterator = iterable === iterator; + + // There's a race condition between when the stream is aborted and when the promise + // resolves so we track whether we already aborted it to avoid writing twice. + function progress( + entry: + | {done: false, +value: ReactServerValue, ...} + | {done: true, +value: ReactServerValue, ...}, + ) { + if (entry.done) { + if (entry.value === undefined) { + // eslint-disable-next-line react-internal/safe-string-coercion + data.append(formFieldPrefix + streamId, 'C'); // Close signal + } else { + // Unlike streams, the last value may not be undefined. If it's not + // we outline it and encode a reference to it in the closing instruction. + try { + // $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here. + const partJSON: string = JSON.stringify(entry.value, resolveToJSON); + data.append(formFieldPrefix + streamId, 'C' + partJSON); // Close signal + } catch (x) { + reject(x); + return; + } + } + pendingParts--; + if (pendingParts === 0) { + resolve(data); + } + } else { + try { + // $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here. + const partJSON: string = JSON.stringify(entry.value, resolveToJSON); + // eslint-disable-next-line react-internal/safe-string-coercion + data.append(formFieldPrefix + streamId, partJSON); + iterator.next().then(progress, reject); + } catch (x) { + reject(x); + return; + } + } + } + + iterator.next().then(progress, reject); + return '$' + (isIterator ? 'x' : 'X') + streamId.toString(16); + } + function resolveToJSON( this: | {+[key: string | number]: ReactServerValue} @@ -349,11 +468,9 @@ export function processReply( reject(reason); } }, - reason => { - // In the future we could consider serializing this as an error - // that throws on the server instead. - reject(reason); - }, + // In the future we could consider serializing this as an error + // that throws on the server instead. + reject, ); return serializePromiseID(promiseId); } @@ -486,6 +603,25 @@ export function processReply( return Array.from((iterator: any)); } + if (enableFlightReadableStream) { + // TODO: ReadableStream is not available in old Node. Remove the typeof check later. + if ( + typeof ReadableStream === 'function' && + value instanceof ReadableStream + ) { + return serializeReadableStream(value); + } + const getAsyncIterator: void | (() => $AsyncIterator) = + (value: any)[ASYNC_ITERATOR]; + if (typeof getAsyncIterator === 'function') { + // We treat AsyncIterables as a Fragment and as such we might need to key them. + return serializeAsyncIterable( + (value: any), + getAsyncIterator.call((value: any)), + ); + } + } + // Verify that this is a simple plain object. const proto = getPrototypeOf(value); if ( diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReply-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReply-test.js index 62948f275298..4e38815cad1c 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReply-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReply-test.js @@ -376,4 +376,165 @@ describe('ReactFlightDOMReply', () => { // This should've been the same reference that we already saw. expect(response.children).toBe(children); }); + + // @gate enableFlightReadableStream + it('should supports streaming ReadableStream with objects', async () => { + let controller1; + let controller2; + const s1 = new ReadableStream({ + start(c) { + controller1 = c; + }, + }); + const s2 = new ReadableStream({ + start(c) { + controller2 = c; + }, + }); + + const promise = ReactServerDOMClient.encodeReply({s1, s2}); + + controller1.enqueue({hello: 'world'}); + controller2.enqueue({hi: 'there'}); + + controller1.enqueue('text1'); + controller2.enqueue('text2'); + + controller1.close(); + controller2.close(); + + const body = await promise; + + const result = await ReactServerDOMServer.decodeReply( + body, + webpackServerMap, + ); + const reader1 = result.s1.getReader(); + const reader2 = result.s2.getReader(); + + expect(await reader1.read()).toEqual({ + value: {hello: 'world'}, + done: false, + }); + expect(await reader2.read()).toEqual({ + value: {hi: 'there'}, + done: false, + }); + + expect(await reader1.read()).toEqual({ + value: 'text1', + done: false, + }); + expect(await reader1.read()).toEqual({ + value: undefined, + done: true, + }); + expect(await reader2.read()).toEqual({ + value: 'text2', + done: false, + }); + expect(await reader2.read()).toEqual({ + value: undefined, + done: true, + }); + }); + + // @gate enableFlightReadableStream + it('should supports streaming AsyncIterables with objects', async () => { + let resolve; + const wait = new Promise(r => (resolve = r)); + const multiShotIterable = { + async *[Symbol.asyncIterator]() { + const next = yield {hello: 'A'}; + expect(next).toBe(undefined); + await wait; + yield {hi: 'B'}; + return 'C'; + }, + }; + const singleShotIterator = (async function* () { + const next = yield {hello: 'D'}; + expect(next).toBe(undefined); + await wait; + yield {hi: 'E'}; + return 'F'; + })(); + + await resolve(); + + const body = await ReactServerDOMClient.encodeReply({ + multiShotIterable, + singleShotIterator, + }); + const result = await ReactServerDOMServer.decodeReply( + body, + webpackServerMap, + ); + + const iterator1 = result.multiShotIterable[Symbol.asyncIterator](); + const iterator2 = result.singleShotIterator[Symbol.asyncIterator](); + + expect(iterator1).not.toBe(result.multiShotIterable); + expect(iterator2).toBe(result.singleShotIterator); + + expect(await iterator1.next()).toEqual({ + value: {hello: 'A'}, + done: false, + }); + expect(await iterator2.next()).toEqual({ + value: {hello: 'D'}, + done: false, + }); + + expect(await iterator1.next()).toEqual({ + value: {hi: 'B'}, + done: false, + }); + expect(await iterator2.next()).toEqual({ + value: {hi: 'E'}, + done: false, + }); + expect(await iterator1.next()).toEqual({ + value: 'C', // Return value + done: true, + }); + expect(await iterator1.next()).toEqual({ + value: undefined, + done: true, + }); + + expect(await iterator2.next()).toEqual({ + value: 'F', // Return value + done: true, + }); + + // Multi-shot iterables should be able to do the same thing again + const iterator3 = result.multiShotIterable[Symbol.asyncIterator](); + + expect(iterator3).not.toBe(iterator1); + + // We should be able to iterate over the iterable again and it should be + // synchronously available using instrumented promises so that React can + // rerender it synchronously. + expect(iterator3.next().value).toEqual({ + value: {hello: 'A'}, + done: false, + }); + expect(iterator3.next().value).toEqual({ + value: {hi: 'B'}, + done: false, + }); + expect(iterator3.next().value).toEqual({ + value: 'C', // Return value + done: true, + }); + expect(iterator3.next().value).toEqual({ + value: undefined, + done: true, + }); + + expect(() => iterator3.next('this is not allowed')).toThrow( + 'Values cannot be passed to next() of AsyncIterables passed to Client Components.', + ); + }); }); diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js index ab0d54c0bc0f..93f23e22ae93 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js @@ -132,4 +132,105 @@ describe('ReactFlightDOMReplyEdge', () => { expect(resultBlob.size).toBe(bytes.length * 2); expect(await resultBlob.arrayBuffer()).toEqual(await blob.arrayBuffer()); }); + + // @gate enableFlightReadableStream && enableBinaryFlight + it('should supports ReadableStreams with typed arrays', async () => { + const buffer = new Uint8Array([ + 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, + ]).buffer; + const buffers = [ + buffer, + new Int8Array(buffer, 1), + new Uint8Array(buffer, 2), + new Uint8ClampedArray(buffer, 2), + new Int16Array(buffer, 2), + new Uint16Array(buffer, 2), + new Int32Array(buffer, 4), + new Uint32Array(buffer, 4), + new Float32Array(buffer, 4), + new Float64Array(buffer, 0), + new BigInt64Array(buffer, 0), + new BigUint64Array(buffer, 0), + new DataView(buffer, 3), + ]; + + // This is not a binary stream, it's a stream that contain binary chunks. + const s = new ReadableStream({ + start(c) { + for (let i = 0; i < buffers.length; i++) { + c.enqueue(buffers[i]); + } + c.close(); + }, + }); + + const body = await ReactServerDOMClient.encodeReply(s); + const result = await ReactServerDOMServer.decodeReply( + body, + webpackServerMap, + ); + + const streamedBuffers = []; + const reader = result.getReader(); + let entry; + while (!(entry = await reader.read()).done) { + streamedBuffers.push(entry.value); + } + + expect(streamedBuffers).toEqual(buffers); + }); + + // @gate enableFlightReadableStream && enableBinaryFlight + it('should support BYOB binary ReadableStreams', async () => { + const buffer = new Uint8Array([ + 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, + ]).buffer; + const buffers = [ + new Int8Array(buffer, 1), + new Uint8Array(buffer, 2), + new Uint8ClampedArray(buffer, 2), + new Int16Array(buffer, 2), + new Uint16Array(buffer, 2), + new Int32Array(buffer, 4), + new Uint32Array(buffer, 4), + new Float32Array(buffer, 4), + new Float64Array(buffer, 0), + new BigInt64Array(buffer, 0), + new BigUint64Array(buffer, 0), + new DataView(buffer, 3), + ]; + + // This a binary stream where each chunk ends up as Uint8Array. + const s = new ReadableStream({ + type: 'bytes', + start(c) { + for (let i = 0; i < buffers.length; i++) { + c.enqueue(buffers[i]); + } + c.close(); + }, + }); + + const body = await ReactServerDOMClient.encodeReply(s); + const result = await ReactServerDOMServer.decodeReply( + body, + webpackServerMap, + ); + + const streamedBuffers = []; + const reader = result.getReader({mode: 'byob'}); + let entry; + while (!(entry = await reader.read(new Uint8Array(10))).done) { + expect(entry.value instanceof Uint8Array).toBe(true); + streamedBuffers.push(entry.value); + } + + // The streamed buffers might be in different chunks and in Uint8Array form but + // the concatenated bytes should be the same. + expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual( + buffers.flatMap(c => + Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), + ), + ); + }); }); diff --git a/packages/react-server/src/ReactFlightReplyServer.js b/packages/react-server/src/ReactFlightReplyServer.js index 4a447a5ee18d..1989936afe41 100644 --- a/packages/react-server/src/ReactFlightReplyServer.js +++ b/packages/react-server/src/ReactFlightReplyServer.js @@ -25,7 +25,17 @@ import { } from 'react-client/src/ReactFlightClientConfig'; import {createTemporaryReference} from './ReactFlightServerTemporaryReferences'; -import {enableBinaryFlight} from 'shared/ReactFeatureFlags'; +import { + enableBinaryFlight, + enableFlightReadableStream, +} from 'shared/ReactFeatureFlags'; +import {ASYNC_ITERATOR} from 'shared/ReactSymbols'; + +interface FlightStreamController { + enqueueModel(json: string): void; + close(json: string): void; + error(error: Error): void; +} export type JSONValue = | number @@ -46,35 +56,44 @@ type PendingChunk = { value: null | Array<(T) => mixed>, reason: null | Array<(mixed) => mixed>, _response: Response, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type BlockedChunk = { status: 'blocked', value: null | Array<(T) => mixed>, reason: null | Array<(mixed) => mixed>, _response: Response, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type ResolvedModelChunk = { status: 'resolved_model', value: string, reason: null, _response: Response, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type InitializedChunk = { status: 'fulfilled', value: T, reason: null, _response: Response, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, +}; +type InitializedStreamChunk< + T: ReadableStream | $AsyncIterable, +> = { + status: 'fulfilled', + value: T, + reason: FlightStreamController, + _response: Response, + then(resolve: (ReadableStream) => mixed, reject?: (mixed) => mixed): void, }; type ErroredChunk = { status: 'rejected', value: null, reason: mixed, _response: Response, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type SomeChunk = | PendingChunk @@ -181,7 +200,14 @@ function wakeChunkIfInitialized( function triggerErrorOnChunk(chunk: SomeChunk, error: mixed): void { if (chunk.status !== PENDING && chunk.status !== BLOCKED) { - // We already resolved. We didn't expect to see this. + if (enableFlightReadableStream) { + // If we get more data to an already resolved ID, we assume that it's + // a stream chunk since any other row shouldn't have more than one entry. + const streamChunk: InitializedStreamChunk = (chunk: any); + const controller = streamChunk.reason; + // $FlowFixMe[incompatible-call]: The error method should accept mixed. + controller.error(error); + } return; } const listeners = chunk.reason; @@ -203,7 +229,17 @@ function createResolvedModelChunk( function resolveModelChunk(chunk: SomeChunk, value: string): void { if (chunk.status !== PENDING) { - // We already resolved. We didn't expect to see this. + if (enableFlightReadableStream) { + // If we get more data to an already resolved ID, we assume that it's + // a stream chunk since any other row shouldn't have more than one entry. + const streamChunk: InitializedStreamChunk = (chunk: any); + const controller = streamChunk.reason; + if (value[0] === 'C') { + controller.close(value === 'C' ? '"$undefined"' : value.slice(1)); + } else { + controller.enqueueModel(value); + } + } return; } const resolveListeners = chunk.value; @@ -221,6 +257,42 @@ function resolveModelChunk(chunk: SomeChunk, value: string): void { } } +function createInitializedStreamChunk< + T: ReadableStream | $AsyncIterable, +>( + response: Response, + value: T, + controller: FlightStreamController, +): InitializedChunk { + // We use the reason field to stash the controller since we already have that + // field. It's a bit of a hack but efficient. + // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors + return new Chunk(INITIALIZED, value, controller, response); +} + +function createResolvedIteratorResultChunk( + response: Response, + value: string, + done: boolean, +): ResolvedModelChunk> { + // To reuse code as much code as possible we add the wrapper element as part of the JSON. + const iteratorResultJSON = + (done ? '{"done":true,"value":' : '{"done":false,"value":') + value + '}'; + // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors + return new Chunk(RESOLVED_MODEL, iteratorResultJSON, null, response); +} + +function resolveIteratorResultChunk( + chunk: SomeChunk>, + value: string, + done: boolean, +): void { + // To reuse code as much code as possible we add the wrapper element as part of the JSON. + const iteratorResultJSON = + (done ? '{"done":true,"value":' : '{"done":false,"value":') + value + '}'; + resolveModelChunk(chunk, iteratorResultJSON); +} + function bindArgs(fn: any, args: any) { return fn.bind.apply(fn, [null].concat(args)); } @@ -342,11 +414,18 @@ function createModelResolver( } else { blocked = initializingChunkBlockedModel = { deps: 1, - value: null, + value: (null: any), }; } return value => { parentObject[key] = value; + + // If this is the root object for a model reference, where `blocked.value` + // is a stale `null`, the resolved value can be used directly. + if (key === '' && blocked.value === null) { + blocked.value = parentObject[key]; + } + blocked.deps--; if (blocked.deps === 0) { if (chunk.status !== BLOCKED) { @@ -411,6 +490,221 @@ function parseTypedArray( return null; } +function resolveStream>( + response: Response, + id: number, + stream: T, + controller: FlightStreamController, +): void { + const chunks = response._chunks; + const chunk = createInitializedStreamChunk(response, stream, controller); + chunks.set(id, chunk); + + const prefix = response._prefix; + const key = prefix + id; + const existingEntries = response._formData.getAll(key); + for (let i = 0; i < existingEntries.length; i++) { + // We assume that this is a string entry for now. + const value: string = (existingEntries[i]: any); + if (value[0] === 'C') { + controller.close(value === 'C' ? '"$undefined"' : value.slice(1)); + } else { + controller.enqueueModel(value); + } + } +} + +function parseReadableStream( + response: Response, + reference: string, + type: void | 'bytes', + parentObject: Object, + parentKey: string, +): ReadableStream { + const id = parseInt(reference.slice(2), 16); + + let controller: ReadableStreamController = (null: any); + const stream = new ReadableStream({ + type: type, + start(c) { + controller = c; + }, + }); + let previousBlockedChunk: SomeChunk | null = null; + const flightController = { + enqueueModel(json: string): void { + if (previousBlockedChunk === null) { + // If we're not blocked on any other chunks, we can try to eagerly initialize + // this as a fast-path to avoid awaiting them. + const chunk: ResolvedModelChunk = createResolvedModelChunk( + response, + json, + ); + initializeModelChunk(chunk); + const initializedChunk: SomeChunk = chunk; + if (initializedChunk.status === INITIALIZED) { + controller.enqueue(initializedChunk.value); + } else { + chunk.then( + v => controller.enqueue(v), + e => controller.error((e: any)), + ); + previousBlockedChunk = chunk; + } + } else { + // We're still waiting on a previous chunk so we can't enqueue quite yet. + const blockedChunk = previousBlockedChunk; + const chunk: SomeChunk = createPendingChunk(response); + chunk.then( + v => controller.enqueue(v), + e => controller.error((e: any)), + ); + previousBlockedChunk = chunk; + blockedChunk.then(function () { + if (previousBlockedChunk === chunk) { + // We were still the last chunk so we can now clear the queue and return + // to synchronous emitting. + previousBlockedChunk = null; + } + resolveModelChunk(chunk, json); + }); + } + }, + close(json: string): void { + if (previousBlockedChunk === null) { + controller.close(); + } else { + const blockedChunk = previousBlockedChunk; + // We shouldn't get any more enqueues after this so we can set it back to null. + previousBlockedChunk = null; + blockedChunk.then(() => controller.close()); + } + }, + error(error: mixed): void { + if (previousBlockedChunk === null) { + // $FlowFixMe[incompatible-call] + controller.error(error); + } else { + const blockedChunk = previousBlockedChunk; + // We shouldn't get any more enqueues after this so we can set it back to null. + previousBlockedChunk = null; + blockedChunk.then(() => controller.error((error: any))); + } + }, + }; + resolveStream(response, id, stream, flightController); + return stream; +} + +function asyncIterator(this: $AsyncIterator) { + // Self referencing iterator. + return this; +} + +function createIterator( + next: (arg: void) => SomeChunk>, +): $AsyncIterator { + const iterator: any = { + next: next, + // TODO: Add return/throw as options for aborting. + }; + // TODO: The iterator could inherit the AsyncIterator prototype which is not exposed as + // a global but exists as a prototype of an AsyncGenerator. However, it's not needed + // to satisfy the iterable protocol. + (iterator: any)[ASYNC_ITERATOR] = asyncIterator; + return iterator; +} + +function parseAsyncIterable( + response: Response, + reference: string, + iterator: boolean, + parentObject: Object, + parentKey: string, +): $AsyncIterable | $AsyncIterator { + const id = parseInt(reference.slice(2), 16); + + const buffer: Array>> = []; + let closed = false; + let nextWriteIndex = 0; + const flightController = { + enqueueModel(value: string): void { + if (nextWriteIndex === buffer.length) { + buffer[nextWriteIndex] = createResolvedIteratorResultChunk( + response, + value, + false, + ); + } else { + resolveIteratorResultChunk(buffer[nextWriteIndex], value, false); + } + nextWriteIndex++; + }, + close(value: string): void { + closed = true; + if (nextWriteIndex === buffer.length) { + buffer[nextWriteIndex] = createResolvedIteratorResultChunk( + response, + value, + true, + ); + } else { + resolveIteratorResultChunk(buffer[nextWriteIndex], value, true); + } + nextWriteIndex++; + while (nextWriteIndex < buffer.length) { + // In generators, any extra reads from the iterator have the value undefined. + resolveIteratorResultChunk( + buffer[nextWriteIndex++], + '"$undefined"', + true, + ); + } + }, + error(error: Error): void { + closed = true; + if (nextWriteIndex === buffer.length) { + buffer[nextWriteIndex] = + createPendingChunk>(response); + } + while (nextWriteIndex < buffer.length) { + triggerErrorOnChunk(buffer[nextWriteIndex++], error); + } + }, + }; + const iterable: $AsyncIterable = { + [ASYNC_ITERATOR](): $AsyncIterator { + let nextReadIndex = 0; + return createIterator(arg => { + if (arg !== undefined) { + throw new Error( + 'Values cannot be passed to next() of AsyncIterables passed to Client Components.', + ); + } + if (nextReadIndex === buffer.length) { + if (closed) { + // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors + return new Chunk( + INITIALIZED, + {done: true, value: undefined}, + null, + response, + ); + } + buffer[nextReadIndex] = + createPendingChunk>(response); + } + return buffer[nextReadIndex++]; + }); + }, + }; + // TODO: If it's a single shot iterator we can optimize memory by cleaning up the buffer after + // reading through the end, but currently we favor code size over this optimization. + const stream = iterator ? iterable[ASYNC_ITERATOR]() : iterable; + resolveStream(response, id, stream, flightController); + return stream; +} + function parseModelString( response: Response, obj: Object, @@ -560,6 +854,22 @@ function parseModelString( } } } + if (enableFlightReadableStream) { + switch (value[1]) { + case 'R': { + return parseReadableStream(response, value, undefined, obj, key); + } + case 'r': { + return parseReadableStream(response, value, 'bytes', obj, key); + } + case 'X': { + return parseAsyncIterable(response, value, false, obj, key); + } + case 'x': { + return parseAsyncIterable(response, value, true, obj, key); + } + } + } // We assume that anything else is a reference ID. const id = parseInt(value.slice(1), 16);