Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/pg-pool/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ class Pool extends EventEmitter {
client._poolUseCount = (client._poolUseCount || 0) + 1

// TODO(bmc): expose a proper, public interface _queryable and _ending
if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) {
if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses || (client && client.activeQuery && client.activeQuery.cursor)) {
if (client._poolUseCount >= this.options.maxUses) {
this.log('remove expended client')
}
Expand Down
93 changes: 93 additions & 0 deletions packages/pg-query-stream/test/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,97 @@ describe('error recovery', () => {
await client.end()
})
})

it.only('should work if used after timeout error', async () => {
const pool = new Pool({ max: 1, connectionTimeoutMillis: 400, statement_timeout: 400 });

const res1 = await pool.query('SELECT 1 AS a');
assert.deepStrictEqual(res1.rows, [ { a:1 } ]);

const query = new QueryStream('SELECT 2 AS b');
const client = await pool.connect();
const stream = await client.query(query);

await assert.rejects(() => pool.query('SELECT TRUE'), { message: 'timeout exceeded when trying to connect' });

await stream.destroy();
if(process.env.WAIT_AFTER_CLOSE === '1') await new Promise(resolve => setTimeout(resolve, 100));

if(process.env.WAIT_AFTER_CLOSE === '2') {
await new Promise(resolve => stream.once('close', resolve));
}

await client.release();

const res2 = await pool.query('SELECT 4 AS d');
assert.deepStrictEqual(res2.rows, [ { d:4 } ]);

await pool.end();
})

it.only('should work if used after syntax error', async () => {
const pool = new Pool({ max: 1, statement_timeout: 100 }); // statement_timeout is required here, so maybe this is just another timeout error?

const res1 = await pool.query('SELECT 1 AS a');
assert.deepStrictEqual(res1.rows, [ { a:1 } ]);

const query = new QueryStream('SELECT 2 AS b');
const client = await pool.connect();
const stream = await client.query(query);

await new Promise(resolve => setTimeout(resolve, 10));

await stream.destroy();
if(process.env.WAIT_AFTER_CLOSE === '1') await new Promise(resolve => setTimeout(resolve, 100));

if(process.env.WAIT_AFTER_CLOSE === '2') {
await new Promise(resolve => stream.once('close', resolve));
}

await client.release();

const res2 = await pool.query('SELECT 4 AS d');
assert.deepStrictEqual(res2.rows, [ { d:4 } ]);

await pool.end();
})

it.only('should work after cancelling query', async () => {
const pool = new Pool();
const conn = await pool.connect();

// Get connection PID for sake of pg_cancel_backend() call
const result = await conn.query('SELECT pg_backend_pid() AS pid;');
const { pid } = result.rows[0];

const stream = conn.query(new QueryStream('SELECT pg_sleep(10);'));
stream.on('data', (chunk) => {
// Switches stream into readableFlowing === true mode
});
stream.on('error', (err) => {
// Errors are expected due to pg_cancel_backend() call
});

// Create a promise that is resolved when the stream is closed
const closed = new Promise((res) => {
stream.on('close', res);
});

// Wait 100ms before cancelling the query
await new Promise((res) => setTimeout(res, 100));

// Cancel pg_sleep(10) query
await pool.query('SELECT pg_cancel_backend($1);', [pid]);

// Destroy stream and wait for it to be closed
stream.destroy();
await closed;

// Subsequent query on same connection should succeed
const res = await conn.query('SELECT 1 AS a;');
assert.deepStrictEqual(res.rows, [ { a:1 } ]);

conn.release()
await pool.end()
})
})