From 67ffa337403e6188763a7418104b3caf444d06db Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Wed, 7 Oct 2020 17:16:36 +0200 Subject: [PATCH 1/3] fix: Resume connection stream before continuing. [#80] --- index.js | 7 +++++++ test/base.js | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/index.js b/index.js index 57cf199..7086ef1 100644 --- a/index.js +++ b/index.js @@ -97,6 +97,13 @@ function fastifyWebsocket (fastify, opts, next) { const response = new ServerResponse(request) request[kWs] = WebSocket.createWebSocketStream(connection) request[kWs].socket = connection + + request[kWs].socket.on('newListener', event => { + if (event === 'message') { + request[kWs].resume() + } + }) + router.lookup(request, response) } diff --git a/test/base.js b/test/base.js index ccd72de..3d2f3ea 100644 --- a/test/base.js +++ b/test/base.js @@ -258,3 +258,56 @@ test('Should keep accepting connection', t => { client.on('error', console.error) }) }) + +test('Should keep processing message when many medium sized messages are sent', t => { + t.plan(3) + + const fastify = Fastify() + const total = 200 + let handled = 0 + + fastify.register(fastifyWebsocket, { handle }) + + function handle ({ socket }) { + socket.on('message', message => { + socket.send('handled') + }) + + socket.on('error', err => { + t.error(err) + }) + } + + fastify.listen(0, err => { + t.error(err) + + // Setup a client that sends a lot of messages to the server + const client = new WebSocket('ws://localhost:' + fastify.server.address().port) + + let i = 0 + function send () { + client.send(Buffer.alloc(160, `${i}`).toString('utf-8'), () => { + i++ + + if (i < total) { + send() + } else { + setTimeout(() => { + fastify.close(err => { + t.error(err) + t.equal(handled, total) + }) + }, 1000) + } + }) + } + + client.on('open', send) + + client.on('message', () => { + handled++ + }) + + client.on('error', console.error) + }) +}) From 1dccad5a631e1d5ac81e0b0ed25212e47cc2f826 Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Thu, 8 Oct 2020 09:50:57 +0200 Subject: [PATCH 2/3] chore: Remove redundant listener. --- index.js | 6 ------ test/base.js | 47 +++++++++++++++++++++++++++++------------------ 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/index.js b/index.js index 7086ef1..9c07a06 100644 --- a/index.js +++ b/index.js @@ -80,12 +80,6 @@ function fastifyWebsocket (fastify, opts, next) { } function wsHandle (handle, req, res) { - req[kWs].socket.on('newListener', event => { - if (event === 'message') { - req[kWs].resume() - } - }) - return handle.call(fastify, req[kWs], res) } diff --git a/test/base.js b/test/base.js index 3d2f3ea..5c219fd 100644 --- a/test/base.js +++ b/test/base.js @@ -264,11 +264,13 @@ test('Should keep processing message when many medium sized messages are sent', const fastify = Fastify() const total = 200 + let safetyInterval + let sent = 0 let handled = 0 - fastify.register(fastifyWebsocket, { handle }) + fastify.register(fastifyWebsocket) - function handle ({ socket }) { + fastify.get('/', { websocket: true }, ({ socket }, req) => { socket.on('message', message => { socket.send('handled') }) @@ -276,36 +278,45 @@ test('Should keep processing message when many medium sized messages are sent', socket.on('error', err => { t.error(err) }) + + /* + This is a safety check - If the socket is stuck, fastify.close will not run. + */ + safetyInterval = setInterval(() => { + if (sent < total) { + return } + t.fail('Forcibly closed.') + + clearInterval(safetyInterval) + socket.terminate() + }, 100) + }) + fastify.listen(0, err => { t.error(err) // Setup a client that sends a lot of messages to the server const client = new WebSocket('ws://localhost:' + fastify.server.address().port) - let i = 0 - function send () { - client.send(Buffer.alloc(160, `${i}`).toString('utf-8'), () => { - i++ + client.on('open', () => { + for (let i = 0; i < total; i++) { + client.send(Buffer.alloc(160, `${i}`).toString('utf-8')) + sent++ + } + }) + + client.on('message', message => { + handled++ - if (i < total) { - send() - } else { - setTimeout(() => { + if (handled === total) { fastify.close(err => { + clearInterval(safetyInterval) t.error(err) t.equal(handled, total) }) - }, 1000) - } - }) } - - client.on('open', send) - - client.on('message', () => { - handled++ }) client.on('error', console.error) From 635763ce27cb1e9b40f2535321fca8e1bd7a78f8 Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Thu, 8 Oct 2020 09:53:23 +0200 Subject: [PATCH 3/3] chore: Linted code --- test/base.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/base.js b/test/base.js index 5c219fd..f565ec9 100644 --- a/test/base.js +++ b/test/base.js @@ -285,7 +285,7 @@ test('Should keep processing message when many medium sized messages are sent', safetyInterval = setInterval(() => { if (sent < total) { return - } + } t.fail('Forcibly closed.') @@ -311,12 +311,12 @@ test('Should keep processing message when many medium sized messages are sent', handled++ if (handled === total) { - fastify.close(err => { + fastify.close(err => { clearInterval(safetyInterval) - t.error(err) - t.equal(handled, total) - }) - } + t.error(err) + t.equal(handled, total) + }) + } }) client.on('error', console.error)