Skip to content

Commit a638699

Browse files
committed
Support Streams/Async Iterable in Flight Reply
1 parent 5fcfd71 commit a638699

File tree

4 files changed

+722
-14
lines changed

4 files changed

+722
-14
lines changed

packages/react-client/src/ReactFlightReplyClient.js

Lines changed: 141 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import type {TemporaryReferenceSet} from './ReactFlightTemporaryReferences';
2020
import {
2121
enableRenderableContext,
2222
enableBinaryFlight,
23+
enableFlightReadableStream,
2324
} from 'shared/ReactFeatureFlags';
2425

2526
import {
@@ -28,6 +29,7 @@ import {
2829
REACT_CONTEXT_TYPE,
2930
REACT_PROVIDER_TYPE,
3031
getIteratorFn,
32+
ASYNC_ITERATOR,
3133
} from 'shared/ReactSymbols';
3234

3335
import {
@@ -206,6 +208,123 @@ export function processReply(
206208
return '$' + tag + blobId.toString(16);
207209
}
208210

211+
function serializeReadableStream(stream: ReadableStream): string {
212+
if (formData === null) {
213+
// Upgrade to use FormData to allow us to stream this value.
214+
formData = new FormData();
215+
}
216+
const data = formData;
217+
218+
pendingParts++;
219+
const streamId = nextPartId++;
220+
221+
// Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the
222+
// receiving side. It also implies that different chunks can be split up or merged as opposed
223+
// to a readable stream that happens to have Uint8Array as the type which might expect it to be
224+
// received in the same slices.
225+
// $FlowFixMe: This is a Node.js extension.
226+
let supportsBYOB: void | boolean = stream.supportsBYOB;
227+
if (supportsBYOB === undefined) {
228+
try {
229+
// $FlowFixMe[extra-arg]: This argument is accepted.
230+
stream.getReader({mode: 'byob'}).releaseLock();
231+
supportsBYOB = true;
232+
} catch (x) {
233+
supportsBYOB = false;
234+
}
235+
}
236+
237+
const reader = stream.getReader();
238+
239+
function progress(entry: {done: boolean, value: ReactServerValue, ...}) {
240+
if (entry.done) {
241+
// eslint-disable-next-line react-internal/safe-string-coercion
242+
data.append(formFieldPrefix + streamId, 'C'); // Close signal
243+
pendingParts--;
244+
if (pendingParts === 0) {
245+
resolve(data);
246+
}
247+
} else {
248+
try {
249+
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
250+
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
251+
// eslint-disable-next-line react-internal/safe-string-coercion
252+
data.append(formFieldPrefix + streamId, partJSON);
253+
reader.read().then(progress, reject);
254+
} catch (x) {
255+
reject(x);
256+
}
257+
}
258+
}
259+
reader.read().then(progress, reject);
260+
261+
return '$' + (supportsBYOB ? 'r' : 'R') + streamId.toString(16);
262+
}
263+
264+
function serializeAsyncIterable(
265+
iterable: $AsyncIterable<ReactServerValue, ReactServerValue, void>,
266+
iterator: $AsyncIterator<ReactServerValue, ReactServerValue, void>,
267+
): string {
268+
if (formData === null) {
269+
// Upgrade to use FormData to allow us to stream this value.
270+
formData = new FormData();
271+
}
272+
const data = formData;
273+
274+
pendingParts++;
275+
const streamId = nextPartId++;
276+
277+
// Generators/Iterators are Iterables but they're also their own iterator
278+
// functions. If that's the case, we treat them as single-shot. Otherwise,
279+
// we assume that this iterable might be a multi-shot and allow it to be
280+
// iterated more than once on the receiving server.
281+
const isIterator = iterable === iterator;
282+
283+
// There's a race condition between when the stream is aborted and when the promise
284+
// resolves so we track whether we already aborted it to avoid writing twice.
285+
function progress(
286+
entry:
287+
| {done: false, +value: ReactServerValue, ...}
288+
| {done: true, +value: ReactServerValue, ...},
289+
) {
290+
if (entry.done) {
291+
if (entry.value === undefined) {
292+
// eslint-disable-next-line react-internal/safe-string-coercion
293+
data.append(formFieldPrefix + streamId, 'C'); // Close signal
294+
} else {
295+
// Unlike streams, the last value may not be undefined. If it's not
296+
// we outline it and encode a reference to it in the closing instruction.
297+
try {
298+
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
299+
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
300+
data.append(formFieldPrefix + streamId, 'C' + partJSON); // Close signal
301+
} catch (x) {
302+
reject(x);
303+
return;
304+
}
305+
}
306+
pendingParts--;
307+
if (pendingParts === 0) {
308+
resolve(data);
309+
}
310+
} else {
311+
try {
312+
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
313+
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
314+
// eslint-disable-next-line react-internal/safe-string-coercion
315+
data.append(formFieldPrefix + streamId, partJSON);
316+
iterator.next().then(progress, reject);
317+
} catch (x) {
318+
reject(x);
319+
return;
320+
}
321+
}
322+
}
323+
324+
iterator.next().then(progress, reject);
325+
return '$' + (isIterator ? 'x' : 'X') + streamId.toString(16);
326+
}
327+
209328
function resolveToJSON(
210329
this:
211330
| {+[key: string | number]: ReactServerValue}
@@ -349,11 +468,9 @@ export function processReply(
349468
reject(reason);
350469
}
351470
},
352-
reason => {
353-
// In the future we could consider serializing this as an error
354-
// that throws on the server instead.
355-
reject(reason);
356-
},
471+
// In the future we could consider serializing this as an error
472+
// that throws on the server instead.
473+
reject,
357474
);
358475
return serializePromiseID(promiseId);
359476
}
@@ -486,6 +603,25 @@ export function processReply(
486603
return Array.from((iterator: any));
487604
}
488605

606+
if (enableFlightReadableStream) {
607+
// TODO: ReadableStream is not available in old Node. Remove the typeof check later.
608+
if (
609+
typeof ReadableStream === 'function' &&
610+
value instanceof ReadableStream
611+
) {
612+
return serializeReadableStream(value);
613+
}
614+
const getAsyncIterator: void | (() => $AsyncIterator<any, any, any>) =
615+
(value: any)[ASYNC_ITERATOR];
616+
if (typeof getAsyncIterator === 'function') {
617+
// We treat AsyncIterables as a Fragment and as such we might need to key them.
618+
return serializeAsyncIterable(
619+
(value: any),
620+
getAsyncIterator.call((value: any)),
621+
);
622+
}
623+
}
624+
489625
// Verify that this is a simple plain object.
490626
const proto = getPrototypeOf(value);
491627
if (

packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReply-test.js

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,4 +376,165 @@ describe('ReactFlightDOMReply', () => {
376376
// This should've been the same reference that we already saw.
377377
expect(response.children).toBe(children);
378378
});
379+
380+
// @gate enableFlightReadableStream
381+
it('should supports streaming ReadableStream with objects', async () => {
382+
let controller1;
383+
let controller2;
384+
const s1 = new ReadableStream({
385+
start(c) {
386+
controller1 = c;
387+
},
388+
});
389+
const s2 = new ReadableStream({
390+
start(c) {
391+
controller2 = c;
392+
},
393+
});
394+
395+
const promise = ReactServerDOMClient.encodeReply({s1, s2});
396+
397+
controller1.enqueue({hello: 'world'});
398+
controller2.enqueue({hi: 'there'});
399+
400+
controller1.enqueue('text1');
401+
controller2.enqueue('text2');
402+
403+
controller1.close();
404+
controller2.close();
405+
406+
const body = await promise;
407+
408+
const result = await ReactServerDOMServer.decodeReply(
409+
body,
410+
webpackServerMap,
411+
);
412+
const reader1 = result.s1.getReader();
413+
const reader2 = result.s2.getReader();
414+
415+
expect(await reader1.read()).toEqual({
416+
value: {hello: 'world'},
417+
done: false,
418+
});
419+
expect(await reader2.read()).toEqual({
420+
value: {hi: 'there'},
421+
done: false,
422+
});
423+
424+
expect(await reader1.read()).toEqual({
425+
value: 'text1',
426+
done: false,
427+
});
428+
expect(await reader1.read()).toEqual({
429+
value: undefined,
430+
done: true,
431+
});
432+
expect(await reader2.read()).toEqual({
433+
value: 'text2',
434+
done: false,
435+
});
436+
expect(await reader2.read()).toEqual({
437+
value: undefined,
438+
done: true,
439+
});
440+
});
441+
442+
// @gate enableFlightReadableStream
443+
it('should supports streaming AsyncIterables with objects', async () => {
444+
let resolve;
445+
const wait = new Promise(r => (resolve = r));
446+
const multiShotIterable = {
447+
async *[Symbol.asyncIterator]() {
448+
const next = yield {hello: 'A'};
449+
expect(next).toBe(undefined);
450+
await wait;
451+
yield {hi: 'B'};
452+
return 'C';
453+
},
454+
};
455+
const singleShotIterator = (async function* () {
456+
const next = yield {hello: 'D'};
457+
expect(next).toBe(undefined);
458+
await wait;
459+
yield {hi: 'E'};
460+
return 'F';
461+
})();
462+
463+
await resolve();
464+
465+
const body = await ReactServerDOMClient.encodeReply({
466+
multiShotIterable,
467+
singleShotIterator,
468+
});
469+
const result = await ReactServerDOMServer.decodeReply(
470+
body,
471+
webpackServerMap,
472+
);
473+
474+
const iterator1 = result.multiShotIterable[Symbol.asyncIterator]();
475+
const iterator2 = result.singleShotIterator[Symbol.asyncIterator]();
476+
477+
expect(iterator1).not.toBe(result.multiShotIterable);
478+
expect(iterator2).toBe(result.singleShotIterator);
479+
480+
expect(await iterator1.next()).toEqual({
481+
value: {hello: 'A'},
482+
done: false,
483+
});
484+
expect(await iterator2.next()).toEqual({
485+
value: {hello: 'D'},
486+
done: false,
487+
});
488+
489+
expect(await iterator1.next()).toEqual({
490+
value: {hi: 'B'},
491+
done: false,
492+
});
493+
expect(await iterator2.next()).toEqual({
494+
value: {hi: 'E'},
495+
done: false,
496+
});
497+
expect(await iterator1.next()).toEqual({
498+
value: 'C', // Return value
499+
done: true,
500+
});
501+
expect(await iterator1.next()).toEqual({
502+
value: undefined,
503+
done: true,
504+
});
505+
506+
expect(await iterator2.next()).toEqual({
507+
value: 'F', // Return value
508+
done: true,
509+
});
510+
511+
// Multi-shot iterables should be able to do the same thing again
512+
const iterator3 = result.multiShotIterable[Symbol.asyncIterator]();
513+
514+
expect(iterator3).not.toBe(iterator1);
515+
516+
// We should be able to iterate over the iterable again and it should be
517+
// synchronously available using instrumented promises so that React can
518+
// rerender it synchronously.
519+
expect(iterator3.next().value).toEqual({
520+
value: {hello: 'A'},
521+
done: false,
522+
});
523+
expect(iterator3.next().value).toEqual({
524+
value: {hi: 'B'},
525+
done: false,
526+
});
527+
expect(iterator3.next().value).toEqual({
528+
value: 'C', // Return value
529+
done: true,
530+
});
531+
expect(iterator3.next().value).toEqual({
532+
value: undefined,
533+
done: true,
534+
});
535+
536+
expect(() => iterator3.next('this is not allowed')).toThrow(
537+
'Values cannot be passed to next() of AsyncIterables passed to Client Components.',
538+
);
539+
});
379540
});

0 commit comments

Comments
 (0)