@@ -10,6 +10,9 @@ import { isStream } from 'is-stream'
1010import lambdaLocal from 'lambda-local'
1111import sourceMapSupport from 'source-map-support'
1212
13+ // https://github.com/nodejs/undici/blob/a36e299d544863c5ade17d4090181be894366024/lib/web/fetch/constants.js#L6
14+ const nullBodyStatus = new Set ( [ 101 , 204 , 205 , 304 ] )
15+
1316/**
1417 * @typedef HandlerResponse
1518 * @type {import('../../../src/function/handler_response.js').HandlerResponse }
@@ -46,17 +49,33 @@ const invocationResult = /** @type {HandlerResponse} */ (
4649 } )
4750)
4851
49- /** @type {number | undefined } */
50- let streamPort
52+ /**
53+ * When the result body is a stream and result status code allow to have a body,
54+ * open up a http server that proxies back to the main thread and resolve with server port.
55+ * Otherwise, resolve with undefined.
56+ *
57+ * @param {HandlerResponse } invocationResult
58+ * @returns {Promise<number | undefined> }
59+ */
60+ async function getStreamPortForStreamingResponse ( invocationResult ) {
61+ // if we don't have result or result's body is not a stream, we do not need a stream port
62+ if ( ! invocationResult || ! isStream ( invocationResult . body ) ) {
63+ return undefined
64+ }
5165
52- // When the result body is a StreamResponse
53- // we open up a http server that proxies back to the main thread.
54- if ( invocationResult && isStream ( invocationResult . body ) ) {
5566 const { body } = invocationResult
5667
5768 delete invocationResult . body
5869
59- await new Promise ( ( resolve , reject ) => {
70+ // For streaming responses, lambda-local always returns a result with body stream.
71+ // We need to discard it if result's status code does not allow response to have a body.
72+ const shouldNotHaveABody = nullBodyStatus . has ( invocationResult . statusCode )
73+ if ( shouldNotHaveABody ) {
74+ return undefined
75+ }
76+
77+ // create a server that will proxy the body stream back to the main thread
78+ return await new Promise ( ( resolve , reject ) => {
6079 const server = createServer ( ( socket ) => {
6180 body . pipe ( socket ) . on ( 'end' , ( ) => server . close ( ) )
6281 } )
@@ -66,15 +85,19 @@ if (invocationResult && isStream(invocationResult.body)) {
6685 server . listen ( { port : 0 , host : 'localhost' } , ( ) => {
6786 const address = server . address ( )
6887
88+ /** @type {number | undefined } */
89+ let streamPort
6990 if ( address && typeof address !== 'string' ) {
7091 streamPort = address . port
7192 }
7293
73- resolve ( undefined )
94+ resolve ( streamPort )
7495 } )
7596 } )
7697}
7798
99+ const streamPort = await getStreamPortForStreamingResponse ( invocationResult )
100+
78101if ( parentPort ) {
79102 /** @type {WorkerResult } */
80103 const message = { ...invocationResult , streamPort }
0 commit comments