From 4d7773183af9de4e3473a55b74a2646c8e1c2120 Mon Sep 17 00:00:00 2001 From: alxndrsn Date: Mon, 3 Oct 2022 11:17:37 +0300 Subject: [PATCH 1/7] stream-use-after-error: failing test case --- packages/pg-query-stream/test/close.ts | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/packages/pg-query-stream/test/close.ts b/packages/pg-query-stream/test/close.ts index 97e4627d9..db2f8c404 100644 --- a/packages/pg-query-stream/test/close.ts +++ b/packages/pg-query-stream/test/close.ts @@ -1,5 +1,6 @@ import assert from 'assert' import concat from 'concat-stream' +import pg from 'pg' import QueryStream from '../src' import helper from './helper' @@ -90,4 +91,25 @@ if (process.version.startsWith('v8.')) { stream.on('close', done) }) }) + + describe('use after error', () => { + it('should work if used after error', async () => { + const pool = new pg.Pool({ max: 1, connectionTimeoutMillis: 400, statement_timeout: 400 }); + + const res1 = await pool.query('SELECT TRUE'); + assert.deepStrictEqual(res1.rows, [ { bool:true } ]); + + const query = new QueryStream('SELECT TRUE'); + const client = await pool.connect(); + const stream = await client.query(query); + + await assert.rejects(() => pool.query('SELECT TRUE'), { message: 'timeout exceeded when trying to connect' }); + + await stream.destroy(); + await client.release(); + + const res2 = await pool.query('SELECT TRUE'); + assert.deepStrictEqual(res2.rows, [ { bool:true } ]); + }) + }) } From 5181059a4cbccaafcb5d16af94e4ee1d32d3461a Mon Sep 17 00:00:00 2001 From: alxndrsn Date: Mon, 3 Oct 2022 12:58:59 +0300 Subject: [PATCH 2/7] Close the pool at the end of the test --- packages/pg-query-stream/test/close.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/pg-query-stream/test/close.ts b/packages/pg-query-stream/test/close.ts index db2f8c404..1e2378332 100644 --- a/packages/pg-query-stream/test/close.ts +++ b/packages/pg-query-stream/test/close.ts @@ -110,6 +110,8 @@ if (process.version.startsWith('v8.')) { const res2 = await pool.query('SELECT TRUE'); assert.deepStrictEqual(res2.rows, [ { bool:true } ]); + + await pool.end(); }) }) } From a3e601b5154e5bdede9629f0a5fa2533de68a0b1 Mon Sep 17 00:00:00 2001 From: alxndrsn Date: Wed, 5 Oct 2022 18:11:58 +0300 Subject: [PATCH 3/7] pg-pool: don't allow connection re-use after streaming --- packages/pg-pool/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pg-pool/index.js b/packages/pg-pool/index.js index 00f55b4da..c8b22d968 100644 --- a/packages/pg-pool/index.js +++ b/packages/pg-pool/index.js @@ -331,7 +331,7 @@ class Pool extends EventEmitter { client._poolUseCount = (client._poolUseCount || 0) + 1 // TODO(bmc): expose a proper, public interface _queryable and _ending - if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) { + if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses || client?.activeQuery?.cursor) { if (client._poolUseCount >= this.options.maxUses) { this.log('remove expended client') } From e06f8f1e7ad88d34146950e005dfdd7520b6d49b Mon Sep 17 00:00:00 2001 From: alxndrsn Date: Thu, 6 Oct 2022 19:33:13 +0300 Subject: [PATCH 4/7] Add second test case; move tests to error.ts --- packages/pg-query-stream/test/close.ts | 24 ------------ packages/pg-query-stream/test/error.ts | 54 ++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 24 deletions(-) diff --git a/packages/pg-query-stream/test/close.ts b/packages/pg-query-stream/test/close.ts index 1e2378332..97e4627d9 100644 --- a/packages/pg-query-stream/test/close.ts +++ b/packages/pg-query-stream/test/close.ts @@ -1,6 +1,5 @@ import assert from 'assert' import concat from 'concat-stream' -import pg from 'pg' import QueryStream from '../src' import helper from './helper' @@ -91,27 +90,4 @@ if (process.version.startsWith('v8.')) { stream.on('close', done) }) }) - - describe('use after error', () => { - it('should work if used after error', async () => { - const pool = new pg.Pool({ max: 1, connectionTimeoutMillis: 400, statement_timeout: 400 }); - - const res1 = await pool.query('SELECT TRUE'); - assert.deepStrictEqual(res1.rows, [ { bool:true } ]); - - const query = new QueryStream('SELECT TRUE'); - const client = await pool.connect(); - const stream = await client.query(query); - - await assert.rejects(() => pool.query('SELECT TRUE'), { message: 'timeout exceeded when trying to connect' }); - - await stream.destroy(); - await client.release(); - - const res2 = await pool.query('SELECT TRUE'); - assert.deepStrictEqual(res2.rows, [ { bool:true } ]); - - await pool.end(); - }) - }) } diff --git a/packages/pg-query-stream/test/error.ts b/packages/pg-query-stream/test/error.ts index 220a52485..22ff0bb41 100644 --- a/packages/pg-query-stream/test/error.ts +++ b/packages/pg-query-stream/test/error.ts @@ -89,4 +89,58 @@ describe('error recovery', () => { await client.end() }) }) + + it.only('should work if used after timeout error', async () => { + const pool = new Pool({ max: 1, connectionTimeoutMillis: 400, statement_timeout: 400 }); + + const res1 = await pool.query('SELECT 1 AS a'); + assert.deepStrictEqual(res1.rows, [ { a:1 } ]); + + const query = new QueryStream('SELECT 2 AS b'); + const client = await pool.connect(); + const stream = await client.query(query); + + await assert.rejects(() => pool.query('SELECT TRUE'), { message: 'timeout exceeded when trying to connect' }); + + await stream.destroy(); + if(process.env.WAIT_AFTER_CLOSE === '1') await new Promise(resolve => setTimeout(resolve, 100)); + + if(process.env.WAIT_AFTER_CLOSE === '2') { + await new Promise(resolve => stream.once('close', resolve)); + } + + await client.release(); + + const res2 = await pool.query('SELECT 4 AS d'); + assert.deepStrictEqual(res2.rows, [ { d:4 } ]); + + await pool.end(); + }) + + it.only('should work if used after syntax error', async () => { + const pool = new Pool({ max: 1, statement_timeout: 1 }); // statement_timeout is required here, so maybe this is just another timeout error? + + const res1 = await pool.query('SELECT 1 AS a'); + assert.deepStrictEqual(res1.rows, [ { a:1 } ]); + + const query = new QueryStream('SELECT 2 AS b'); + const client = await pool.connect(); + const stream = await client.query(query); + + await new Promise(resolve => setTimeout(resolve, 10)); + + await stream.destroy(); + if(process.env.WAIT_AFTER_CLOSE === '1') await new Promise(resolve => setTimeout(resolve, 100)); + + if(process.env.WAIT_AFTER_CLOSE === '2') { + await new Promise(resolve => stream.once('close', resolve)); + } + + await client.release(); + + const res2 = await pool.query('SELECT 4 AS d'); + assert.deepStrictEqual(res2.rows, [ { d:4 } ]); + + await pool.end(); + }) }) From e2a1c1df2faaf89a82586a8109d69771d76df3e0 Mon Sep 17 00:00:00 2001 From: alxndrsn Date: Thu, 6 Oct 2022 19:36:44 +0300 Subject: [PATCH 5/7] Increase statement timeout --- packages/pg-query-stream/test/error.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pg-query-stream/test/error.ts b/packages/pg-query-stream/test/error.ts index 22ff0bb41..fba137e2a 100644 --- a/packages/pg-query-stream/test/error.ts +++ b/packages/pg-query-stream/test/error.ts @@ -118,7 +118,7 @@ describe('error recovery', () => { }) it.only('should work if used after syntax error', async () => { - const pool = new Pool({ max: 1, statement_timeout: 1 }); // statement_timeout is required here, so maybe this is just another timeout error? + const pool = new Pool({ max: 1, statement_timeout: 100 }); // statement_timeout is required here, so maybe this is just another timeout error? const res1 = await pool.query('SELECT 1 AS a'); assert.deepStrictEqual(res1.rows, [ { a:1 } ]); From 19d66e582479c7a6493a7372a6a8d0fd6deef89d Mon Sep 17 00:00:00 2001 From: alxndrsn Date: Fri, 2 Jun 2023 06:45:23 +0000 Subject: [PATCH 6/7] Introduce test from @nathanjcochran --- packages/pg-query-stream/test/error.ts | 39 ++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/packages/pg-query-stream/test/error.ts b/packages/pg-query-stream/test/error.ts index fba137e2a..580272d74 100644 --- a/packages/pg-query-stream/test/error.ts +++ b/packages/pg-query-stream/test/error.ts @@ -143,4 +143,43 @@ describe('error recovery', () => { await pool.end(); }) + + it.only('should work after cancelling query', async () => { + const pool = new Pool(); + const conn = await pool.connect(); + + // Get connection PID for sake of pg_cancel_backend() call + const result = await conn.query('SELECT pg_backend_pid() AS pid;'); + const { pid } = result.rows[0]; + + const stream = conn.query(new QueryStream('SELECT pg_sleep(10);')); + stream.on('data', (chunk) => { + // Switches stream into readableFlowing === true mode + }); + stream.on('error', (err) => { + // Errors are expected due to pg_cancel_backend() call + }); + + // Create a promise that is resolved when the stream is closed + const closed = new Promise((res) => { + stream.on('close', res); + }); + + // Wait 100ms before cancelling the query + await new Promise((res) => setTimeout(res, 100)); + + // Cancel pg_sleep(10) query + await pool.query('SELECT pg_cancel_backend($1);', [pid]); + + // Destroy stream and wait for it to be closed + stream.destroy(); + await closed; + + // Subsequent query on same connection should succeed + const res = await conn.query('SELECT 1 AS a;'); + assert.deepStrictEqual(res.rows, [ { a:1 } ]); + + conn.release() + await pool.end() + }) }) From 41dd1ba1023eee00c2bc8393fa8847c1ef56c21c Mon Sep 17 00:00:00 2001 From: alxndrsn Date: Fri, 2 Jun 2023 06:54:31 +0000 Subject: [PATCH 7/7] Fix null-safe dereference --- packages/pg-pool/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pg-pool/index.js b/packages/pg-pool/index.js index c8b22d968..97728b366 100644 --- a/packages/pg-pool/index.js +++ b/packages/pg-pool/index.js @@ -331,7 +331,7 @@ class Pool extends EventEmitter { client._poolUseCount = (client._poolUseCount || 0) + 1 // TODO(bmc): expose a proper, public interface _queryable and _ending - if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses || client?.activeQuery?.cursor) { + if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses || (client && client.activeQuery && client.activeQuery.cursor)) { if (client._poolUseCount >= this.options.maxUses) { this.log('remove expended client') }