diff --git a/index.js b/index.js index 57cf199..9c07a06 100644 --- a/index.js +++ b/index.js @@ -80,12 +80,6 @@ function fastifyWebsocket (fastify, opts, next) { } function wsHandle (handle, req, res) { - req[kWs].socket.on('newListener', event => { - if (event === 'message') { - req[kWs].resume() - } - }) - return handle.call(fastify, req[kWs], res) } @@ -97,6 +91,13 @@ function fastifyWebsocket (fastify, opts, next) { const response = new ServerResponse(request) request[kWs] = WebSocket.createWebSocketStream(connection) request[kWs].socket = connection + + request[kWs].socket.on('newListener', event => { + if (event === 'message') { + request[kWs].resume() + } + }) + router.lookup(request, response) } diff --git a/test/base.js b/test/base.js index ccd72de..f565ec9 100644 --- a/test/base.js +++ b/test/base.js @@ -258,3 +258,67 @@ test('Should keep accepting connection', t => { client.on('error', console.error) }) }) + +test('Should keep processing message when many medium sized messages are sent', t => { + t.plan(3) + + const fastify = Fastify() + const total = 200 + let safetyInterval + let sent = 0 + let handled = 0 + + fastify.register(fastifyWebsocket) + + fastify.get('/', { websocket: true }, ({ socket }, req) => { + socket.on('message', message => { + socket.send('handled') + }) + + socket.on('error', err => { + t.error(err) + }) + + /* + This is a safety check - If the socket is stuck, fastify.close will not run. + */ + safetyInterval = setInterval(() => { + if (sent < total) { + return + } + + t.fail('Forcibly closed.') + + clearInterval(safetyInterval) + socket.terminate() + }, 100) + }) + + fastify.listen(0, err => { + t.error(err) + + // Setup a client that sends a lot of messages to the server + const client = new WebSocket('ws://localhost:' + fastify.server.address().port) + + client.on('open', () => { + for (let i = 0; i < total; i++) { + client.send(Buffer.alloc(160, `${i}`).toString('utf-8')) + sent++ + } + }) + + client.on('message', message => { + handled++ + + if (handled === total) { + fastify.close(err => { + clearInterval(safetyInterval) + t.error(err) + t.equal(handled, total) + }) + } + }) + + client.on('error', console.error) + }) +})