Skip to content

Commit bbc13fa

Browse files
[Flight] Add Debug Channel option for stateful connection to the backend in DEV (#33627)
This adds plumbing for opening a stream from the Flight Client to the Flight Server so it can ask for more data on-demand. In this mode, the Flight Server keeps the connection open as long as the client is still alive and there's more objects to load. It retains any depth limited objects so that they can be asked for later. In this first PR it just releases the object when it's discovered on the server and doesn't actually lazy load it yet. That's coming in a follow up. This strategy is built on the model that each request has its own channel for this. Instead of some global registry. That ensures that referential identity is preserved within a Request and the Request can refer to previously written objects by reference. The fixture implements a WebSocket per request but it doesn't have to be done that way. It can be multiplexed through an existing WebSocket for example. The current protocol is just a Readable(Stream) on the server and WritableStream on the client. It could even be sent through a HTTP request body if browsers implemented full duplex (which they don't). This PR only implements the direction of messages from Client to Server. However, I also plan on adding Debug Channel in the other direction to allow debug info (optionally) be sent from Server to Client through this channel instead of through the main RSC request. So the `debugChannel` option will be able to take writable or readable or both. --------- Co-authored-by: Hendrik Liebau <[email protected]>
1 parent 12eaef7 commit bbc13fa

25 files changed

+1385
-68
lines changed

fixtures/flight/server/global.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ async function renderApp(req, res, next) {
104104
if (req.headers['cache-control']) {
105105
proxiedHeaders['Cache-Control'] = req.get('cache-control');
106106
}
107+
if (req.get('rsc-request-id')) {
108+
proxiedHeaders['rsc-request-id'] = req.get('rsc-request-id');
109+
}
107110

108111
const requestsPrerender = req.path === '/prerender';
109112

fixtures/flight/server/region.js

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,27 @@ const {readFile} = require('fs').promises;
5050

5151
const React = require('react');
5252

53-
async function renderApp(res, returnValue, formState, noCache) {
53+
const activeDebugChannels =
54+
process.env.NODE_ENV === 'development' ? new Map() : null;
55+
56+
function getDebugChannel(req) {
57+
if (process.env.NODE_ENV !== 'development') {
58+
return undefined;
59+
}
60+
const requestId = req.get('rsc-request-id');
61+
if (!requestId) {
62+
return undefined;
63+
}
64+
return activeDebugChannels.get(requestId);
65+
}
66+
67+
async function renderApp(
68+
res,
69+
returnValue,
70+
formState,
71+
noCache,
72+
promiseForDebugChannel
73+
) {
5474
const {renderToPipeableStream} = await import(
5575
'react-server-dom-webpack/server'
5676
);
@@ -101,7 +121,9 @@ async function renderApp(res, returnValue, formState, noCache) {
101121
);
102122
// For client-invoked server actions we refresh the tree and return a return value.
103123
const payload = {root, returnValue, formState};
104-
const {pipe} = renderToPipeableStream(payload, moduleMap);
124+
const {pipe} = renderToPipeableStream(payload, moduleMap, {
125+
debugChannel: await promiseForDebugChannel,
126+
});
105127
pipe(res);
106128
}
107129

@@ -166,7 +188,7 @@ app.get('/', async function (req, res) {
166188
if ('prerender' in req.query) {
167189
await prerenderApp(res, null, null, noCache);
168190
} else {
169-
await renderApp(res, null, null, noCache);
191+
await renderApp(res, null, null, noCache, getDebugChannel(req));
170192
}
171193
});
172194

@@ -204,7 +226,7 @@ app.post('/', bodyParser.text(), async function (req, res) {
204226
// We handle the error on the client
205227
}
206228
// Refresh the client and return the value
207-
renderApp(res, result, null, noCache);
229+
renderApp(res, result, null, noCache, getDebugChannel(req));
208230
} else {
209231
// This is the progressive enhancement case
210232
const UndiciRequest = require('undici').Request;
@@ -220,11 +242,11 @@ app.post('/', bodyParser.text(), async function (req, res) {
220242
// Wait for any mutations
221243
const result = await action();
222244
const formState = decodeFormState(result, formData);
223-
renderApp(res, null, formState, noCache);
245+
renderApp(res, null, formState, noCache, undefined);
224246
} catch (x) {
225247
const {setServerState} = await import('../src/ServerState.js');
226248
setServerState('Error: ' + x.message);
227-
renderApp(res, null, null, noCache);
249+
renderApp(res, null, null, noCache, undefined);
228250
}
229251
}
230252
});
@@ -324,7 +346,7 @@ if (process.env.NODE_ENV === 'development') {
324346
});
325347
}
326348

327-
app.listen(3001, () => {
349+
const httpServer = app.listen(3001, () => {
328350
console.log('Regional Flight Server listening on port 3001...');
329351
});
330352

@@ -346,3 +368,27 @@ app.on('error', function (error) {
346368
throw error;
347369
}
348370
});
371+
372+
if (process.env.NODE_ENV === 'development') {
373+
// Open a websocket server for Debug information
374+
const WebSocket = require('ws');
375+
const webSocketServer = new WebSocket.Server({noServer: true});
376+
377+
httpServer.on('upgrade', (request, socket, head) => {
378+
const DEBUG_CHANNEL_PATH = '/debug-channel?';
379+
if (request.url.startsWith(DEBUG_CHANNEL_PATH)) {
380+
const requestId = request.url.slice(DEBUG_CHANNEL_PATH.length);
381+
const promiseForWs = new Promise(resolve => {
382+
webSocketServer.handleUpgrade(request, socket, head, ws => {
383+
ws.on('close', () => {
384+
activeDebugChannels.delete(requestId);
385+
});
386+
resolve(ws);
387+
});
388+
});
389+
activeDebugChannels.set(requestId, promiseForWs);
390+
} else {
391+
socket.destroy();
392+
}
393+
});
394+
}

fixtures/flight/src/App.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@ async function ServerComponent({noCache}) {
123123
export default async function App({prerender, noCache}) {
124124
const res = await fetch('http://localhost:3001/todos');
125125
const todos = await res.json();
126-
console.log(res);
127126

128127
const dedupedChild = <ServerComponent noCache={noCache} />;
129128
const message = getServerState();

fixtures/flight/src/index.js

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,43 @@ function Shell({data}) {
4242
}
4343

4444
async function hydrateApp() {
45-
const {root, returnValue, formState} = await createFromFetch(
46-
fetch('/', {
47-
headers: {
48-
Accept: 'text/x-component',
49-
},
50-
}),
51-
{
52-
callServer,
53-
findSourceMapURL,
54-
}
55-
);
45+
let response;
46+
if (
47+
process.env.NODE_ENV === 'development' &&
48+
typeof WebSocketStream === 'function'
49+
) {
50+
const requestId = crypto.randomUUID();
51+
const wss = new WebSocketStream(
52+
'ws://localhost:3001/debug-channel?' + requestId
53+
);
54+
const debugChannel = await wss.opened;
55+
response = createFromFetch(
56+
fetch('/', {
57+
headers: {
58+
Accept: 'text/x-component',
59+
'rsc-request-id': requestId,
60+
},
61+
}),
62+
{
63+
callServer,
64+
debugChannel,
65+
findSourceMapURL,
66+
}
67+
);
68+
} else {
69+
response = createFromFetch(
70+
fetch('/', {
71+
headers: {
72+
Accept: 'text/x-component',
73+
},
74+
}),
75+
{
76+
callServer,
77+
findSourceMapURL,
78+
}
79+
);
80+
}
81+
const {root, returnValue, formState} = await response;
5682

5783
ReactDOM.hydrateRoot(
5884
document,

packages/react-client/src/ReactFlightClient.js

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,8 @@ export type FindSourceMapURLCallback = (
328328
environmentName: string,
329329
) => null | string;
330330

331+
export type DebugChannelCallback = (message: string) => void;
332+
331333
export type Response = {
332334
_bundlerConfig: ServerConsumerModuleMap,
333335
_serverReferenceConfig: null | ServerManifest,
@@ -351,6 +353,7 @@ export type Response = {
351353
_debugRootStack?: null | Error, // DEV-only
352354
_debugRootTask?: null | ConsoleTask, // DEV-only
353355
_debugFindSourceMapURL?: void | FindSourceMapURLCallback, // DEV-only
356+
_debugChannel?: void | DebugChannelCallback, // DEV-only
354357
_replayConsole: boolean, // DEV-only
355358
_rootEnvironmentName: string, // DEV-only, the requested environment name.
356359
};
@@ -687,6 +690,15 @@ export function reportGlobalError(response: Response, error: Error): void {
687690
triggerErrorOnChunk(chunk, error);
688691
}
689692
});
693+
if (__DEV__) {
694+
const debugChannel = response._debugChannel;
695+
if (debugChannel !== undefined) {
696+
// If we don't have any more ways of reading data, we don't have to send any
697+
// more neither. So we close the writable side.
698+
debugChannel('');
699+
response._debugChannel = undefined;
700+
}
701+
}
690702
if (enableProfilerTimer && enableComponentPerformanceTrack) {
691703
markAllTracksInOrder();
692704
flushComponentPerformance(
@@ -1667,6 +1679,14 @@ function parseModelString(
16671679
}
16681680
case 'Y': {
16691681
if (__DEV__) {
1682+
if (value.length > 2) {
1683+
const debugChannel = response._debugChannel;
1684+
if (debugChannel) {
1685+
const ref = value.slice(2);
1686+
debugChannel('R:' + ref); // Release this reference immediately
1687+
}
1688+
}
1689+
16701690
// In DEV mode we encode omitted objects in logs as a getter that throws
16711691
// so that when you try to access it on the client, you know why that
16721692
// happened.
@@ -1730,9 +1750,10 @@ function ResponseInstance(
17301750
encodeFormAction: void | EncodeFormActionCallback,
17311751
nonce: void | string,
17321752
temporaryReferences: void | TemporaryReferenceSet,
1733-
findSourceMapURL: void | FindSourceMapURLCallback,
1734-
replayConsole: boolean,
1735-
environmentName: void | string,
1753+
findSourceMapURL: void | FindSourceMapURLCallback, // DEV-only
1754+
replayConsole: boolean, // DEV-only
1755+
environmentName: void | string, // DEV-only
1756+
debugChannel: void | DebugChannelCallback, // DEV-only
17361757
) {
17371758
const chunks: Map<number, SomeChunk<any>> = new Map();
17381759
this._bundlerConfig = bundlerConfig;
@@ -1787,6 +1808,7 @@ function ResponseInstance(
17871808
);
17881809
}
17891810
this._debugFindSourceMapURL = findSourceMapURL;
1811+
this._debugChannel = debugChannel;
17901812
this._replayConsole = replayConsole;
17911813
this._rootEnvironmentName = rootEnv;
17921814
}
@@ -1802,9 +1824,10 @@ export function createResponse(
18021824
encodeFormAction: void | EncodeFormActionCallback,
18031825
nonce: void | string,
18041826
temporaryReferences: void | TemporaryReferenceSet,
1805-
findSourceMapURL: void | FindSourceMapURLCallback,
1806-
replayConsole: boolean,
1807-
environmentName: void | string,
1827+
findSourceMapURL: void | FindSourceMapURLCallback, // DEV-only
1828+
replayConsole: boolean, // DEV-only
1829+
environmentName: void | string, // DEV-only
1830+
debugChannel: void | DebugChannelCallback, // DEV-only
18081831
): Response {
18091832
// $FlowFixMe[invalid-constructor]: the shapes are exact here but Flow doesn't like constructors
18101833
return new ResponseInstance(
@@ -1818,6 +1841,7 @@ export function createResponse(
18181841
findSourceMapURL,
18191842
replayConsole,
18201843
environmentName,
1844+
debugChannel,
18211845
);
18221846
}
18231847

0 commit comments

Comments
 (0)