diff --git a/packages/pg-pool/index.js b/packages/pg-pool/index.js index 00f55b4da..97728b366 100644 --- a/packages/pg-pool/index.js +++ b/packages/pg-pool/index.js @@ -331,7 +331,7 @@ class Pool extends EventEmitter { client._poolUseCount = (client._poolUseCount || 0) + 1 // TODO(bmc): expose a proper, public interface _queryable and _ending - if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) { + if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses || (client && client.activeQuery && client.activeQuery.cursor)) { if (client._poolUseCount >= this.options.maxUses) { this.log('remove expended client') } diff --git a/packages/pg-query-stream/test/error.ts b/packages/pg-query-stream/test/error.ts index 220a52485..580272d74 100644 --- a/packages/pg-query-stream/test/error.ts +++ b/packages/pg-query-stream/test/error.ts @@ -89,4 +89,97 @@ describe('error recovery', () => { await client.end() }) }) + + it.only('should work if used after timeout error', async () => { + const pool = new Pool({ max: 1, connectionTimeoutMillis: 400, statement_timeout: 400 }); + + const res1 = await pool.query('SELECT 1 AS a'); + assert.deepStrictEqual(res1.rows, [ { a:1 } ]); + + const query = new QueryStream('SELECT 2 AS b'); + const client = await pool.connect(); + const stream = await client.query(query); + + await assert.rejects(() => pool.query('SELECT TRUE'), { message: 'timeout exceeded when trying to connect' }); + + await stream.destroy(); + if(process.env.WAIT_AFTER_CLOSE === '1') await new Promise(resolve => setTimeout(resolve, 100)); + + if(process.env.WAIT_AFTER_CLOSE === '2') { + await new Promise(resolve => stream.once('close', resolve)); + } + + await client.release(); + + const res2 = await pool.query('SELECT 4 AS d'); + assert.deepStrictEqual(res2.rows, [ { d:4 } ]); + + await pool.end(); + }) + + it.only('should work if used after syntax error', async () => { + const pool = new Pool({ max: 1, statement_timeout: 100 }); // statement_timeout is required here, so maybe this is just another timeout error? + + const res1 = await pool.query('SELECT 1 AS a'); + assert.deepStrictEqual(res1.rows, [ { a:1 } ]); + + const query = new QueryStream('SELECT 2 AS b'); + const client = await pool.connect(); + const stream = await client.query(query); + + await new Promise(resolve => setTimeout(resolve, 10)); + + await stream.destroy(); + if(process.env.WAIT_AFTER_CLOSE === '1') await new Promise(resolve => setTimeout(resolve, 100)); + + if(process.env.WAIT_AFTER_CLOSE === '2') { + await new Promise(resolve => stream.once('close', resolve)); + } + + await client.release(); + + const res2 = await pool.query('SELECT 4 AS d'); + assert.deepStrictEqual(res2.rows, [ { d:4 } ]); + + await pool.end(); + }) + + it.only('should work after cancelling query', async () => { + const pool = new Pool(); + const conn = await pool.connect(); + + // Get connection PID for sake of pg_cancel_backend() call + const result = await conn.query('SELECT pg_backend_pid() AS pid;'); + const { pid } = result.rows[0]; + + const stream = conn.query(new QueryStream('SELECT pg_sleep(10);')); + stream.on('data', (chunk) => { + // Switches stream into readableFlowing === true mode + }); + stream.on('error', (err) => { + // Errors are expected due to pg_cancel_backend() call + }); + + // Create a promise that is resolved when the stream is closed + const closed = new Promise((res) => { + stream.on('close', res); + }); + + // Wait 100ms before cancelling the query + await new Promise((res) => setTimeout(res, 100)); + + // Cancel pg_sleep(10) query + await pool.query('SELECT pg_cancel_backend($1);', [pid]); + + // Destroy stream and wait for it to be closed + stream.destroy(); + await closed; + + // Subsequent query on same connection should succeed + const res = await conn.query('SELECT 1 AS a;'); + assert.deepStrictEqual(res.rows, [ { a:1 } ]); + + conn.release() + await pool.end() + }) })