diff --git a/.taprc b/.taprc index 6e22785..530de00 100644 --- a/.taprc +++ b/.taprc @@ -2,4 +2,4 @@ ts: false jsx: false flow: false coverage: true -check-coverage: false +check-coverage: true diff --git a/README.md b/README.md index 40a3ec2..9b075c7 100644 --- a/README.md +++ b/README.md @@ -110,26 +110,26 @@ This `fastify` plugin supports _all_ the options of *Note that this plugin is fully encapsulated, and non-JSON payloads will be streamed directly to the destination.* -### upstream +### `upstream` An URL (including protocol) that represents the target server to use for proxying. -### prefix +### `prefix` The prefix to mount this plugin on. All the requests to the current server starting with the given prefix will be proxied to the provided upstream. The prefix will be removed from the URL when forwarding the HTTP request. -### rewritePrefix +### `rewritePrefix` Rewrite the prefix to the specified string. Default: `''`. -### preHandler +### `preHandler` A `preHandler` to be applied on all routes. Useful for performing actions before the proxy is executed (e.g. check for authentication). -### proxyPayloads +### `proxyPayloads` When this option is `false`, you will be able to access the body but it will also disable direct pass through of the payload. As a result, it is left up to the implementation to properly parse and proxy the payload correctly. @@ -142,38 +142,47 @@ fastify.addContentTypeParser('application/xml', (req, done) => { }) ``` -### config +### `config` An object accessible within the `preHandler` via `reply.context.config`. See [Config](https://www.fastify.io/docs/v4.8.x/Reference/Routes/#config) in the Fastify documentation for information on this option. Note: this is merged with other configuration passed to the route. -### replyOptions +### `replyOptions` Object with [reply options](https://github.com/fastify/fastify-reply-from#replyfromsource-opts) for `@fastify/reply-from`. -### httpMethods +### `httpMethods` An array that contains the types of the methods. Default: `['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT', 'OPTIONS']`. -## websocket +### `websocket` This module has _partial_ support for forwarding websockets by passing a -`websocket` option. All those options are going to be forwarded to -[`@fastify/websocket`](https://github.com/fastify/fastify-websocket). - -Multiple websocket proxies may be attached to the same HTTP server at different paths. -In this case, only the first `wsServerOptions` is applied. +`websocket` boolean option. A few things are missing: -1. forwarding headers as well as `rewriteHeaders`. Note: Only cookie headers are being forwarded -2. request id logging -3. support `ignoreTrailingSlash` -4. forwarding more than one subprotocols. Note: Only the first subprotocol is being forwarded +1. request id logging +2. support `ignoreTrailingSlash` +3. forwarding more than one subprotocols. Note: Only the first subprotocol is being forwarded Pull requests are welcome to finish this feature. +### `wsServerOptions` + +The options passed to [`new ws.Server()`](https://github.com/websockets/ws/blob/HEAD/doc/ws.md#class-websocketserver). + +In case multiple websocket proxies are attached to the same HTTP server at different paths. +In this case, only the first `wsServerOptions` is applied. + +### `wsClientOptions` + +The options passed to the [`WebSocket` constructor](https://github.com/websockets/ws/blob/HEAD/doc/ws.md#class-websocket) for outgoing websockets. + +It also supports an additional `rewriteRequestHeaders(headers, request)` function that can be used to write the headers before +opening the WebSocket connection. This function should return an object with the given headers. +The default implementation forwards the `cookie` header. ## Benchmarks @@ -189,8 +198,9 @@ The results were gathered on the second run of `autocannon -c 100 -d 5 URL`. ## TODO + * [ ] Perform validations for incoming data -* [ ] Finish implementing websocket (follow TODO) +* [ ] Finish implementing websocket ## License diff --git a/index.js b/index.js index 93400fe..2bfb995 100644 --- a/index.js +++ b/index.js @@ -1,13 +1,17 @@ 'use strict' const From = require('@fastify/reply-from') +const { ServerResponse } = require('http') const WebSocket = require('ws') const { convertUrlToWebSocket } = require('./utils') const fp = require('fastify-plugin') const httpMethods = ['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT', 'OPTIONS'] const urlPattern = /^https?:\/\// +const kWs = Symbol('ws') +const kWsHead = Symbol('wsHead') function liftErrorCode (code) { + /* istanbul ignore next */ if (typeof code !== 'number') { // Sometimes "close" event emits with a non-numeric value return 1011 @@ -33,9 +37,11 @@ function waitConnection (socket, write) { } } -function isExternalUrl (url = '') { +function isExternalUrl (url) { return urlPattern.test(url) -}; +} + +function noop () {} function proxyWebSockets (source, target) { function close (code, reason) { @@ -44,18 +50,26 @@ function proxyWebSockets (source, target) { } source.on('message', (data, binary) => waitConnection(target, () => target.send(data, { binary }))) + /* istanbul ignore next */ source.on('ping', data => waitConnection(target, () => target.ping(data))) + /* istanbul ignore next */ source.on('pong', data => waitConnection(target, () => target.pong(data))) source.on('close', close) + /* istanbul ignore next */ source.on('error', error => close(1011, error.message)) + /* istanbul ignore next */ source.on('unexpected-response', () => close(1011, 'unexpected response')) // source WebSocket is already connected because it is created by ws server target.on('message', (data, binary) => source.send(data, { binary })) + /* istanbul ignore next */ target.on('ping', data => source.ping(data)) + /* istanbul ignore next */ target.on('pong', data => source.pong(data)) target.on('close', close) + /* istanbul ignore next */ target.on('error', error => close(1011, error.message)) + /* istanbul ignore next */ target.on('unexpected-response', () => close(1011, 'unexpected response')) } @@ -64,37 +78,61 @@ class WebSocketProxy { this.logger = fastify.log const wss = new WebSocket.Server({ - server: fastify.server, + noServer: true, ...wsServerOptions }) + fastify.server.on('upgrade', (rawRequest, socket, head) => { + // Save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket. + rawRequest[kWs] = socket + rawRequest[kWsHead] = head + + const rawResponse = new ServerResponse(rawRequest) + rawResponse.assignSocket(socket) + fastify.routing(rawRequest, rawResponse) + + rawResponse.on('finish', () => { + socket.destroy() + }) + }) + + this.handleUpgrade = (request, cb) => { + wss.handleUpgrade(request.raw, request.raw[kWs], request.raw[kWsHead], (socket) => { + this.handleConnection(socket, request) + cb() + }) + } + // To be able to close the HTTP server, // all WebSocket clients need to be disconnected. // Fastify is missing a pre-close event, or the ability to // add a hook before the server.close call. We need to resort // to monkeypatching for now. - const oldClose = fastify.server.close - fastify.server.close = function (done) { - for (const client of wss.clients) { - client.close() + { + const oldClose = fastify.server.close + fastify.server.close = function (done) { + wss.close(() => { + oldClose.call(this, (err) => { + /* istanbul ignore next */ + done && done(err) + }) + }) + for (const client of wss.clients) { + client.close() + } } - oldClose.call(this, done) } + /* istanbul ignore next */ wss.on('error', (err) => { + /* istanbul ignore next */ this.logger.error(err) }) - wss.on('connection', this.handleConnection.bind(this)) - this.wss = wss this.prefixList = [] } - close (done) { - this.wss.close(done) - } - addUpstream (prefix, rewritePrefix, upstream, wsClientOptions) { this.prefixList.push({ prefix: new URL(prefix, 'ws://127.0.0.1').pathname, @@ -117,30 +155,23 @@ class WebSocketProxy { } } - return undefined + /* istanbul ignore next */ + throw new Error(`no upstream found for ${request.url}. this should not happened. Please report to https://github.com/fastify/fastify-http-proxy`) } handleConnection (source, request) { const upstream = this.findUpstream(request) - if (!upstream) { - this.logger.debug({ url: request.url }, 'not matching prefix') - source.close() - return - } const { target: url, wsClientOptions } = upstream + const rewriteRequestHeaders = wsClientOptions?.rewriteRequestHeaders || defaultWsHeadersRewrite + const headersToRewrite = wsClientOptions?.headers || {} const subprotocols = [] if (source.protocol) { subprotocols.push(source.protocol) } - let optionsWs = {} - if (request.headers.cookie) { - const headers = { cookie: request.headers.cookie } - optionsWs = { ...wsClientOptions, headers } - } else { - optionsWs = wsClientOptions - } + const headers = rewriteRequestHeaders(headersToRewrite, request) + const optionsWs = { ...(wsClientOptions || {}), headers } const target = new WebSocket(url, subprotocols, optionsWs) this.logger.debug({ url: url.href }, 'proxy websocket') @@ -148,6 +179,13 @@ class WebSocketProxy { } } +function defaultWsHeadersRewrite (headers, request) { + if (request.headers.cookie) { + return { ...headers, cookie: request.headers.cookie } + } + return { ...headers } +} + const httpWss = new WeakMap() // http.Server => WebSocketProxy function setupWebSocketProxy (fastify, options, rewritePrefix) { @@ -155,28 +193,26 @@ function setupWebSocketProxy (fastify, options, rewritePrefix) { if (!wsProxy) { wsProxy = new WebSocketProxy(fastify, options.wsServerOptions) httpWss.set(fastify.server, wsProxy) - - fastify.addHook('onClose', (instance, done) => { - httpWss.delete(fastify.server) - wsProxy.close(done) - }) } if (options.upstream !== '') { wsProxy.addUpstream(fastify.prefix, rewritePrefix, options.upstream, options.wsClientOptions) - } else if (typeof options.replyOptions.getUpstream === 'function') { + // The else block is validate earlier in the code + } else { wsProxy.findUpstream = function (request) { const source = new URL(request.url, 'ws://127.0.0.1') const upstream = options.replyOptions.getUpstream(request, '') const target = new URL(source.pathname, upstream) + /* istanbul ignore next */ target.protocol = upstream.indexOf('http:') === 0 ? 'ws:' : 'wss' target.search = source.search return { target, wsClientOptions: options.wsClientOptions } } } + return wsProxy } -function generateRewritePrefix (prefix = '', opts) { +function generateRewritePrefix (prefix, opts) { let rewritePrefix = opts.rewritePrefix || (opts.upstream ? new URL(opts.upstream).pathname : '/') if (!prefix.endsWith('/') && rewritePrefix.endsWith('/')) { @@ -243,7 +279,23 @@ async function fastifyHttpProxy (fastify, opts) { handler }) + let wsProxy + + if (opts.websocket) { + wsProxy = setupWebSocketProxy(fastify, opts, rewritePrefix) + } + function handler (request, reply) { + if (request.raw[kWs]) { + reply.hijack() + try { + wsProxy.handleUpgrade(request, noop) + } catch (err) { + /* istanbul ignore next */ + request.log.warn({ err }, 'websocket proxy error') + } + return + } const queryParamIndex = request.raw.url.indexOf('?') let dest = request.raw.url.slice(0, queryParamIndex !== -1 ? queryParamIndex : undefined) @@ -256,10 +308,6 @@ async function fastifyHttpProxy (fastify, opts) { } reply.from(dest || '/', replyOpts) } - - if (opts.websocket) { - setupWebSocketProxy(fastify, opts, rewritePrefix) - } } module.exports = fp(fastifyHttpProxy, { diff --git a/test/websocket.js b/test/websocket.js index a2e4824..c5cf8a5 100644 --- a/test/websocket.js +++ b/test/websocket.js @@ -126,8 +126,8 @@ test('captures errors on start', async (t) => { t.teardown(app2.close.bind(app2)) }) -test('getWebSocketStream', async (t) => { - t.plan(7) +test('getUpstream', async (t) => { + t.plan(9) const origin = createServer() const wss = new WebSocket.Server({ server: origin }) @@ -148,10 +148,18 @@ test('getWebSocketStream', async (t) => { await promisify(origin.listen.bind(origin))({ port: 0 }) const server = Fastify() + + let _req + + server.server.on('upgrade', (req) => { + _req = req + }) server.register(proxy, { upstream: '', replyOptions: { - getUpstream: function (original, base) { + getUpstream: function (original) { + t.not(original, _req) + t.equal(original.raw, _req) return `http://localhost:${origin.address().port}` } }, @@ -185,3 +193,217 @@ test('getWebSocketStream', async (t) => { server.close() ]) }) + +test('websocket proxy trigger hooks', async (t) => { + t.plan(8) + + const origin = createServer() + const wss = new WebSocket.Server({ server: origin }) + t.teardown(wss.close.bind(wss)) + t.teardown(origin.close.bind(origin)) + + const serverMessages = [] + wss.on('connection', (ws, request) => { + t.equal(ws.protocol, subprotocolValue) + t.equal(request.headers.cookie, cookieValue) + ws.on('message', (message, binary) => { + serverMessages.push([message.toString(), binary]) + // echo + ws.send(message, { binary }) + }) + }) + + await promisify(origin.listen.bind(origin))({ port: 0 }) + + const server = Fastify() + server.addHook('onRequest', (request, reply, done) => { + t.pass('onRequest') + done() + }) + server.register(proxy, { + upstream: `ws://localhost:${origin.address().port}`, + websocket: true + }) + + await server.listen({ port: 0 }) + t.teardown(server.close.bind(server)) + + const options = { headers: { cookie: cookieValue } } + const ws = new WebSocket(`ws://localhost:${server.server.address().port}`, [subprotocolValue], options) + await once(ws, 'open') + + ws.send('hello', { binary: false }) + const [reply0, binary0] = await once(ws, 'message') + t.equal(reply0.toString(), 'hello') + t.equal(binary0, false) + + ws.send(Buffer.from('fastify'), { binary: true }) + const [reply1, binary1] = await once(ws, 'message') + t.equal(reply1.toString(), 'fastify') + t.equal(binary1, true) + + t.strictSame(serverMessages, [ + ['hello', false], + ['fastify', true] + ]) + + await Promise.all([ + once(ws, 'close'), + server.close() + ]) +}) + +test('websocket proxy with rewriteRequestHeaders', async (t) => { + t.plan(7) + + const origin = createServer() + const wss = new WebSocket.Server({ server: origin }) + t.teardown(wss.close.bind(wss)) + t.teardown(origin.close.bind(origin)) + + const serverMessages = [] + wss.on('connection', (ws, request) => { + t.equal(ws.protocol, subprotocolValue) + t.equal(request.headers.myauth, 'myauth') + ws.on('message', (message, binary) => { + serverMessages.push([message.toString(), binary]) + // echo + ws.send(message, { binary }) + }) + }) + + await promisify(origin.listen.bind(origin))({ port: 0 }) + + const server = Fastify() + server.register(proxy, { + upstream: `ws://localhost:${origin.address().port}`, + websocket: true, + wsClientOptions: { + rewriteRequestHeaders: (headers, request) => { + return { + ...headers, + myauth: 'myauth' + } + } + } + }) + + await server.listen({ port: 0 }) + t.teardown(server.close.bind(server)) + + const ws = new WebSocket(`ws://localhost:${server.server.address().port}`, [subprotocolValue]) + await once(ws, 'open') + + ws.send('hello', { binary: false }) + const [reply0, binary0] = await once(ws, 'message') + t.equal(reply0.toString(), 'hello') + t.equal(binary0, false) + + ws.send(Buffer.from('fastify'), { binary: true }) + const [reply1, binary1] = await once(ws, 'message') + t.equal(reply1.toString(), 'fastify') + t.equal(binary1, true) + + t.strictSame(serverMessages, [ + ['hello', false], + ['fastify', true] + ]) + + await Promise.all([ + once(ws, 'close'), + server.close() + ]) +}) + +test('websocket proxy custom headers', async (t) => { + t.plan(7) + + const origin = createServer() + const wss = new WebSocket.Server({ server: origin }) + t.teardown(wss.close.bind(wss)) + t.teardown(origin.close.bind(origin)) + + const serverMessages = [] + wss.on('connection', (ws, request) => { + t.equal(ws.protocol, subprotocolValue) + t.equal(request.headers.myauth, 'myauth') + ws.on('message', (message, binary) => { + serverMessages.push([message.toString(), binary]) + // echo + ws.send(message, { binary }) + }) + }) + + await promisify(origin.listen.bind(origin))({ port: 0 }) + + const server = Fastify() + server.register(proxy, { + upstream: `ws://localhost:${origin.address().port}`, + websocket: true, + wsClientOptions: { + headers: { + myauth: 'myauth' + } + } + }) + + await server.listen({ port: 0 }) + t.teardown(server.close.bind(server)) + + const ws = new WebSocket(`ws://localhost:${server.server.address().port}`, [subprotocolValue]) + await once(ws, 'open') + + ws.send('hello', { binary: false }) + const [reply0, binary0] = await once(ws, 'message') + t.equal(reply0.toString(), 'hello') + t.equal(binary0, false) + + ws.send(Buffer.from('fastify'), { binary: true }) + const [reply1, binary1] = await once(ws, 'message') + t.equal(reply1.toString(), 'fastify') + t.equal(binary1, true) + + t.strictSame(serverMessages, [ + ['hello', false], + ['fastify', true] + ]) + + await Promise.all([ + once(ws, 'close'), + server.close() + ]) +}) + +test('Should gracefully close when clients attempt to connect after calling close', async (t) => { + const origin = createServer() + const wss = new WebSocket.Server({ server: origin }) + t.teardown(wss.close.bind(wss)) + t.teardown(origin.close.bind(origin)) + + await promisify(origin.listen.bind(origin))({ port: 0 }) + + const server = Fastify({ logger: false }) + await server.register(proxy, { + upstream: `ws://localhost:${origin.address().port}`, + websocket: true + }) + + const oldClose = server.server.close + let p + server.server.close = function (cb) { + const ws = new WebSocket('ws://localhost:' + server.server.address().port) + + p = once(ws, 'unexpected-response').then(([req, res]) => { + t.equal(res.statusCode, 503) + oldClose.call(this, cb) + }) + } + + await server.listen({ port: 0 }) + + const ws = new WebSocket('ws://localhost:' + server.server.address().port) + + await once(ws, 'open') + await server.close() + await p +}) diff --git a/test/ws-prefix-rewrite-core.js b/test/ws-prefix-rewrite-core.js index 0b7a138..b6712ad 100644 --- a/test/ws-prefix-rewrite-core.js +++ b/test/ws-prefix-rewrite-core.js @@ -82,13 +82,19 @@ async function handleProxy (info, { backendPath, proxyOptions, wrapperOptions }, } }) - t.teardown(() => backend.close()) + t.teardown(async () => { + await backend.close() + t.comment('backend closed') + }) const backendURL = await backend.listen({ port: 0, host: '127.0.0.1' }) const [frontend, frontendURL] = await proxyServer(t, backendURL, backendPath, proxyOptions, wrapperOptions) - t.teardown(() => frontend.close()) + t.teardown(async () => { + await frontend.close() + t.comment('frontend closed') + }) for (const path of paths) { await processRequest(t, frontendURL, path, expected(path))