Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .taprc
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ ts: false
jsx: false
flow: false
coverage: true
check-coverage: false
check-coverage: true
48 changes: 29 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,26 +110,26 @@ This `fastify` plugin supports _all_ the options of
*Note that this plugin is fully encapsulated, and non-JSON payloads will
be streamed directly to the destination.*

### upstream
### `upstream`

An URL (including protocol) that represents the target server to use for proxying.

### prefix
### `prefix`

The prefix to mount this plugin on. All the requests to the current server starting with the given prefix will be proxied to the provided upstream.

The prefix will be removed from the URL when forwarding the HTTP
request.

### rewritePrefix
### `rewritePrefix`

Rewrite the prefix to the specified string. Default: `''`.

### preHandler
### `preHandler`

A `preHandler` to be applied on all routes. Useful for performing actions before the proxy is executed (e.g. check for authentication).

### proxyPayloads
### `proxyPayloads`

When this option is `false`, you will be able to access the body but it will also disable direct pass through of the payload. As a result, it is left up to the implementation to properly parse and proxy the payload correctly.

Expand All @@ -142,38 +142,47 @@ fastify.addContentTypeParser('application/xml', (req, done) => {
})
```

### config
### `config`

An object accessible within the `preHandler` via `reply.context.config`.
See [Config](https://www.fastify.io/docs/v4.8.x/Reference/Routes/#config) in the Fastify
documentation for information on this option. Note: this is merged with other
configuration passed to the route.

### replyOptions
### `replyOptions`

Object with [reply options](https://github.com/fastify/fastify-reply-from#replyfromsource-opts) for `@fastify/reply-from`.

### httpMethods
### `httpMethods`
An array that contains the types of the methods. Default: `['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT', 'OPTIONS']`.

## websocket
### `websocket`

This module has _partial_ support for forwarding websockets by passing a
`websocket` option. All those options are going to be forwarded to
[`@fastify/websocket`](https://github.com/fastify/fastify-websocket).

Multiple websocket proxies may be attached to the same HTTP server at different paths.
In this case, only the first `wsServerOptions` is applied.
`websocket` boolean option.

A few things are missing:

1. forwarding headers as well as `rewriteHeaders`. Note: Only cookie headers are being forwarded
2. request id logging
3. support `ignoreTrailingSlash`
4. forwarding more than one subprotocols. Note: Only the first subprotocol is being forwarded
1. request id logging
2. support `ignoreTrailingSlash`
3. forwarding more than one subprotocols. Note: Only the first subprotocol is being forwarded

Pull requests are welcome to finish this feature.

### `wsServerOptions`

The options passed to [`new ws.Server()`](https://github.com/websockets/ws/blob/HEAD/doc/ws.md#class-websocketserver).

In case multiple websocket proxies are attached to the same HTTP server at different paths.
In this case, only the first `wsServerOptions` is applied.

### `wsClientOptions`

The options passed to the [`WebSocket` constructor](https://github.com/websockets/ws/blob/HEAD/doc/ws.md#class-websocket) for outgoing websockets.

It also supports an additional `rewriteRequestHeaders(headers, request)` function that can be used to write the headers before
opening the WebSocket connection. This function should return an object with the given headers.
The default implementation forwards the `cookie` header.

## Benchmarks

Expand All @@ -189,8 +198,9 @@ The results were gathered on the second run of `autocannon -c 100 -d 5
URL`.

## TODO

* [ ] Perform validations for incoming data
* [ ] Finish implementing websocket (follow TODO)
* [ ] Finish implementing websocket

## License

Expand Down
124 changes: 86 additions & 38 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
'use strict'
const From = require('@fastify/reply-from')
const { ServerResponse } = require('http')
const WebSocket = require('ws')
const { convertUrlToWebSocket } = require('./utils')
const fp = require('fastify-plugin')

const httpMethods = ['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT', 'OPTIONS']
const urlPattern = /^https?:\/\//
const kWs = Symbol('ws')
const kWsHead = Symbol('wsHead')

function liftErrorCode (code) {
/* istanbul ignore next */
if (typeof code !== 'number') {
// Sometimes "close" event emits with a non-numeric value
return 1011
Expand All @@ -33,9 +37,11 @@ function waitConnection (socket, write) {
}
}

function isExternalUrl (url = '') {
function isExternalUrl (url) {
return urlPattern.test(url)
};
}

function noop () {}

function proxyWebSockets (source, target) {
function close (code, reason) {
Expand All @@ -44,18 +50,26 @@ function proxyWebSockets (source, target) {
}

source.on('message', (data, binary) => waitConnection(target, () => target.send(data, { binary })))
/* istanbul ignore next */
source.on('ping', data => waitConnection(target, () => target.ping(data)))
/* istanbul ignore next */
source.on('pong', data => waitConnection(target, () => target.pong(data)))
source.on('close', close)
/* istanbul ignore next */
source.on('error', error => close(1011, error.message))
/* istanbul ignore next */
source.on('unexpected-response', () => close(1011, 'unexpected response'))

// source WebSocket is already connected because it is created by ws server
target.on('message', (data, binary) => source.send(data, { binary }))
/* istanbul ignore next */
target.on('ping', data => source.ping(data))
/* istanbul ignore next */
target.on('pong', data => source.pong(data))
target.on('close', close)
/* istanbul ignore next */
target.on('error', error => close(1011, error.message))
/* istanbul ignore next */
target.on('unexpected-response', () => close(1011, 'unexpected response'))
}

Expand All @@ -64,37 +78,61 @@ class WebSocketProxy {
this.logger = fastify.log

const wss = new WebSocket.Server({
server: fastify.server,
noServer: true,
...wsServerOptions
})

fastify.server.on('upgrade', (rawRequest, socket, head) => {
// Save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket.
rawRequest[kWs] = socket
rawRequest[kWsHead] = head

const rawResponse = new ServerResponse(rawRequest)
rawResponse.assignSocket(socket)
fastify.routing(rawRequest, rawResponse)

rawResponse.on('finish', () => {
socket.destroy()
})
})

this.handleUpgrade = (request, cb) => {
wss.handleUpgrade(request.raw, request.raw[kWs], request.raw[kWsHead], (socket) => {
this.handleConnection(socket, request)
cb()
})
}

// To be able to close the HTTP server,
// all WebSocket clients need to be disconnected.
// Fastify is missing a pre-close event, or the ability to
// add a hook before the server.close call. We need to resort
// to monkeypatching for now.
const oldClose = fastify.server.close
fastify.server.close = function (done) {
for (const client of wss.clients) {
client.close()
{
const oldClose = fastify.server.close
fastify.server.close = function (done) {
wss.close(() => {
oldClose.call(this, (err) => {
/* istanbul ignore next */
done && done(err)
})
})
for (const client of wss.clients) {
client.close()
}
}
oldClose.call(this, done)
}

/* istanbul ignore next */
wss.on('error', (err) => {
/* istanbul ignore next */
this.logger.error(err)
})

wss.on('connection', this.handleConnection.bind(this))

this.wss = wss
this.prefixList = []
}

close (done) {
this.wss.close(done)
}

addUpstream (prefix, rewritePrefix, upstream, wsClientOptions) {
this.prefixList.push({
prefix: new URL(prefix, 'ws://127.0.0.1').pathname,
Expand All @@ -117,66 +155,64 @@ class WebSocketProxy {
}
}

return undefined
/* istanbul ignore next */
throw new Error(`no upstream found for ${request.url}. this should not happened. Please report to https://github.com/fastify/fastify-http-proxy`)
}

handleConnection (source, request) {
const upstream = this.findUpstream(request)
if (!upstream) {
this.logger.debug({ url: request.url }, 'not matching prefix')
source.close()
return
}
const { target: url, wsClientOptions } = upstream
const rewriteRequestHeaders = wsClientOptions?.rewriteRequestHeaders || defaultWsHeadersRewrite
const headersToRewrite = wsClientOptions?.headers || {}

const subprotocols = []
if (source.protocol) {
subprotocols.push(source.protocol)
}

let optionsWs = {}
if (request.headers.cookie) {
const headers = { cookie: request.headers.cookie }
optionsWs = { ...wsClientOptions, headers }
} else {
optionsWs = wsClientOptions
}
const headers = rewriteRequestHeaders(headersToRewrite, request)
const optionsWs = { ...(wsClientOptions || {}), headers }

const target = new WebSocket(url, subprotocols, optionsWs)
this.logger.debug({ url: url.href }, 'proxy websocket')
proxyWebSockets(source, target)
}
}

function defaultWsHeadersRewrite (headers, request) {
if (request.headers.cookie) {
return { ...headers, cookie: request.headers.cookie }
}
return { ...headers }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return { ...headers }
return headers

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is deliberate to create a new object.

}

const httpWss = new WeakMap() // http.Server => WebSocketProxy

function setupWebSocketProxy (fastify, options, rewritePrefix) {
let wsProxy = httpWss.get(fastify.server)
if (!wsProxy) {
wsProxy = new WebSocketProxy(fastify, options.wsServerOptions)
httpWss.set(fastify.server, wsProxy)

fastify.addHook('onClose', (instance, done) => {
httpWss.delete(fastify.server)
wsProxy.close(done)
})
}

if (options.upstream !== '') {
wsProxy.addUpstream(fastify.prefix, rewritePrefix, options.upstream, options.wsClientOptions)
} else if (typeof options.replyOptions.getUpstream === 'function') {
// The else block is validate earlier in the code
} else {
wsProxy.findUpstream = function (request) {
const source = new URL(request.url, 'ws://127.0.0.1')
const upstream = options.replyOptions.getUpstream(request, '')
const target = new URL(source.pathname, upstream)
/* istanbul ignore next */
target.protocol = upstream.indexOf('http:') === 0 ? 'ws:' : 'wss'
target.search = source.search
return { target, wsClientOptions: options.wsClientOptions }
}
}
return wsProxy
}

function generateRewritePrefix (prefix = '', opts) {
function generateRewritePrefix (prefix, opts) {
let rewritePrefix = opts.rewritePrefix || (opts.upstream ? new URL(opts.upstream).pathname : '/')

if (!prefix.endsWith('/') && rewritePrefix.endsWith('/')) {
Expand Down Expand Up @@ -243,7 +279,23 @@ async function fastifyHttpProxy (fastify, opts) {
handler
})

let wsProxy

if (opts.websocket) {
wsProxy = setupWebSocketProxy(fastify, opts, rewritePrefix)
}

function handler (request, reply) {
if (request.raw[kWs]) {
reply.hijack()
try {
wsProxy.handleUpgrade(request, noop)
} catch (err) {
/* istanbul ignore next */
request.log.warn({ err }, 'websocket proxy error')
}
return
}
const queryParamIndex = request.raw.url.indexOf('?')
let dest = request.raw.url.slice(0, queryParamIndex !== -1 ? queryParamIndex : undefined)

Expand All @@ -256,10 +308,6 @@ async function fastifyHttpProxy (fastify, opts) {
}
reply.from(dest || '/', replyOpts)
}

if (opts.websocket) {
setupWebSocketProxy(fastify, opts, rewritePrefix)
}
}

module.exports = fp(fastifyHttpProxy, {
Expand Down
Loading