From 08a23b74ff0b2ef399a5be837535856f2554db4b Mon Sep 17 00:00:00 2001 From: francesco Date: Thu, 28 Aug 2025 12:33:27 +0200 Subject: [PATCH 01/13] feat: pipeline mode --- packages/pg/lib/client.js | 187 +++++++++++++++++- packages/pg/lib/connection.js | 10 + packages/pg/lib/query.js | 35 ++++ .../test/integration/client/pipeline-tests.js | 116 +++++++++++ 4 files changed, 341 insertions(+), 7 deletions(-) create mode 100644 packages/pg/test/integration/client/pipeline-tests.js diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 903db6c66..44dfe2fc7 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -83,6 +83,9 @@ class Client extends EventEmitter { this.binary = c.binary || defaults.binary this.processID = null this.secretKey = null + this._pipelining = false + this._pipelineQueue = [] + this._pipelineActive = false this.ssl = this.connectionParameters.ssl || false // As with Password, make SSL->Key (the private key) non-enumerable. // It won't show up in stack traces @@ -125,6 +128,12 @@ class Client extends EventEmitter { this._queryQueue.forEach(enqueueError) this._queryQueue.length = 0 + + // Also error all pipeline queries + if (this._pipelineQueue) { + this._pipelineQueue.forEach(enqueueError) + this._pipelineQueue.length = 0 + } } _connect(callback) { @@ -354,6 +363,11 @@ class Client extends EventEmitter { } this.emit('connect') } + + if (this._pipelining) { + return this._handlePipelineReadyForQuery(msg) + } + const activeQuery = this._getActiveQuery() this._activeQuery = null this.readyForQuery = true @@ -363,6 +377,22 @@ class Client extends EventEmitter { this._pulseQueryQueue() } + _handlePipelineReadyForQuery(msg) { + // In pipeline mode, handle completed queries + if (this._pipelineQueue.length > 0) { + const completedQuery = this._pipelineQueue.shift() + if (completedQuery) { + completedQuery.handleReadyForQuery(this.connection) + } + } + + // If no more queries in pipeline, we're ready for more + if (this._pipelineQueue.length === 0) { + this.readyForQuery = true + this.emit('drain') + } + } + // if we receive an error event or error message // during the connection process we handle it here _handleErrorWhileConnecting(err) { @@ -408,33 +438,53 @@ class Client extends EventEmitter { _handleRowDescription(msg) { // delegate rowDescription to active query - this._getActiveQuery().handleRowDescription(msg) + const query = this._getCurrentQuery() + if (query) { + query.handleRowDescription(msg) + } } _handleDataRow(msg) { // delegate dataRow to active query - this._getActiveQuery().handleDataRow(msg) + const query = this._getCurrentQuery() + if (query) { + query.handleDataRow(msg) + } } _handlePortalSuspended(msg) { // delegate portalSuspended to active query - this._getActiveQuery().handlePortalSuspended(this.connection) + const query = this._getCurrentQuery() + if (query) { + query.handlePortalSuspended(this.connection) + } } _handleEmptyQuery(msg) { // delegate emptyQuery to active query - this._getActiveQuery().handleEmptyQuery(this.connection) + const query = this._getCurrentQuery() + if (query) { + query.handleEmptyQuery(this.connection) + } } _handleCommandComplete(msg) { - const activeQuery = this._getActiveQuery() - if (activeQuery == null) { + const query = this._getCurrentQuery() + if (query == null) { const error = new Error('Received unexpected commandComplete message from backend.') this._handleErrorEvent(error) return } // delegate commandComplete to active query - activeQuery.handleCommandComplete(msg, this.connection) + query.handleCommandComplete(msg, this.connection) + } + + _getCurrentQuery() { + if (this._pipelining) { + // In pipeline mode, return the first query in the pipeline queue + return this._pipelineQueue.length > 0 ? this._pipelineQueue[0] : null + } + return this._getActiveQuery() } _handleParseComplete() { @@ -644,6 +694,10 @@ class Client extends EventEmitter { return result } + if (this._pipelining) { + return this._pipelineQuery(query, result) + } + this._queryQueue.push(query) this._pulseQueryQueue() return result @@ -689,6 +743,125 @@ class Client extends EventEmitter { queryQueueDeprecationNotice() return this._queryQueue } + + get pipelining() { + return this._pipelining + } + + set pipelining(value) { + if (typeof value !== 'boolean') { + throw new TypeError('pipelining must be a boolean') + } + + if (this._pipelining === value) { + return + } + + if (value && !this._connected) { + throw new Error('Cannot enable pipelining mode before connection is established') + } + + if (value && this._getActiveQuery()) { + throw new Error('Cannot enable pipelining mode while a query is active') + } + + if (value) { + this._enterPipelineMode() + } else { + this._exitPipelineMode() + } + } + + _enterPipelineMode() { + if (this._pipelining) { + return + } + + this._pipelining = true + this._pipelineActive = true + this._pipelineQueue = [] + + // Send pipeline mode command to server + this.connection.enterPipelineMode() + } + + _exitPipelineMode() { + if (!this._pipelining) { + return + } + + // Process any remaining queries in pipeline + if (this._pipelineQueue.length > 0) { + throw new Error('Cannot exit pipeline mode with pending queries') + } + + this._pipelining = false + this._pipelineActive = false + + // Send exit pipeline mode command to server + this.connection.exitPipelineMode() + } + + _pipelineQuery(query, result) { + // Validate query for pipeline mode + if (query.text && typeof query.text === 'string' && query.text.includes(';')) { + const error = new Error('Multiple SQL commands in a single query are not allowed in pipeline mode') + process.nextTick(() => { + query.handleError(error, this.connection) + }) + return result + } + + // Disallow simple query protocol in pipeline mode + if (!query.requiresPreparation()) { + const error = new Error('Simple query protocol is not allowed in pipeline mode. Use parameterized queries.') + process.nextTick(() => { + query.handleError(error, this.connection) + }) + return result + } + + // Add query to pipeline queue + this._pipelineQueue.push(query) + + // Submit query using pipeline-specific method + const queryError = this._submitPipelineQuery(query) + if (queryError) { + process.nextTick(() => { + query.handleError(queryError, this.connection) + // Remove from pipeline queue on error + const index = this._pipelineQueue.indexOf(query) + if (index > -1) { + this._pipelineQueue.splice(index, 1) + } + }) + } + + return result + } + + _submitPipelineQuery(query) { + if (typeof query.text !== 'string' && typeof query.name !== 'string') { + return new Error('A query must have either text or a name. Supplying neither is unsupported.') + } + const previous = this.connection.parsedStatements[query.name] + if (query.text && previous && query.text !== previous) { + return new Error(`Prepared statements must be unique - '${query.name}' was used for a different statement`) + } + if (query.values && !Array.isArray(query.values)) { + return new Error('Query values must be an array') + } + + // In pipeline mode, we always use extended query protocol + this.connection.stream.cork && this.connection.stream.cork() + try { + query.preparePipeline(this.connection) + } finally { + this.connection.stream.uncork && this.connection.stream.uncork() + } + + return null + } } // expose a Query constructor diff --git a/packages/pg/lib/connection.js b/packages/pg/lib/connection.js index 8045af858..9bf32d52e 100644 --- a/packages/pg/lib/connection.js +++ b/packages/pg/lib/connection.js @@ -217,6 +217,16 @@ class Connection extends EventEmitter { sendCopyFail(msg) { this._send(serialize.copyFail(msg)) } + + enterPipelineMode() { + this._pipelineMode = true + } + + exitPipelineMode() { + // Send sync to end pipeline mode + this.sync() + this._pipelineMode = false + } } module.exports = Connection diff --git a/packages/pg/lib/query.js b/packages/pg/lib/query.js index 64aab5ff2..9c58d06ac 100644 --- a/packages/pg/lib/query.js +++ b/packages/pg/lib/query.js @@ -247,6 +247,41 @@ class Query extends EventEmitter { handleCopyData(msg, connection) { // noop } + + preparePipeline(connection) { + if (!this.hasBeenParsed(connection)) { + connection.parse({ + text: this.text, + name: this.name, + types: this.types, + }) + } + + try { + connection.bind({ + portal: this.portal, + statement: this.name, + values: this.values, + binary: this.binary, + valueMapper: utils.prepareValue, + }) + } catch (err) { + this.handleError(err, connection) + return + } + + connection.describe({ + type: 'P', + name: this.portal || '', + }) + + connection.execute({ + portal: this.portal, + rows: this.rows, + }) + + connection.flush() + } } module.exports = Query diff --git a/packages/pg/test/integration/client/pipeline-tests.js b/packages/pg/test/integration/client/pipeline-tests.js new file mode 100644 index 000000000..01ab9fdcf --- /dev/null +++ b/packages/pg/test/integration/client/pipeline-tests.js @@ -0,0 +1,116 @@ +'use strict' + +const helper = require('../test-helper') +const { Client } = require('../../../') + +const suite = new helper.Suite() + +suite.test('pipelining property can be set and retrieved', (cb) => { + const client = new Client(helper.config) + + // Initially false + helper.assert.equal(client.pipelining, false) + + client.connect((err) => { + if (err) return cb(err) + + // Can be set to true after connection + client.pipelining = true + helper.assert.equal(client.pipelining, true) + + // Can be set back to false + client.pipelining = false + helper.assert.equal(client.pipelining, false) + + client.end(cb) + }) +}) + +suite.test('cannot enable pipelining before connection', (cb) => { + const client = new Client(helper.config) + + try { + client.pipelining = true + helper.assert.fail('Should have thrown error') + } catch (err) { + helper.assert.equal(err.message, 'Cannot enable pipelining mode before connection is established') + cb() + } +}) + +suite.test('pipelining mode allows multiple parameterized queries', (cb) => { + const client = new Client(helper.config) + + client.connect((err) => { + if (err) return cb(err) + + client.pipelining = true + + let completed = 0 + const results = [] + + // Send multiple queries in pipeline mode + client.query('SELECT $1::text as message', ['Hello'], (err, result) => { + if (err) return cb(err) + results[0] = result + completed++ + if (completed === 3) checkResults() + }) + + client.query('SELECT $1::int as number', [42], (err, result) => { + if (err) return cb(err) + results[1] = result + completed++ + if (completed === 3) checkResults() + }) + + client.query('SELECT $1::text as greeting', ['World'], (err, result) => { + if (err) return cb(err) + results[2] = result + completed++ + if (completed === 3) checkResults() + }) + + function checkResults() { + helper.assert.equal(results[0].rows[0].message, 'Hello') + helper.assert.equal(results[1].rows[0].number, 42) + helper.assert.equal(results[2].rows[0].greeting, 'World') + + client.end(cb) + } + }) +}) + +suite.test('pipelining mode rejects simple queries', (cb) => { + const client = new Client(helper.config) + + client.connect((err) => { + if (err) return cb(err) + + client.pipelining = true + + client.query('SELECT 1', (err, result) => { + helper.assert(err) + helper.assert.equal(err.message, 'Simple query protocol is not allowed in pipeline mode. Use parameterized queries.') + client.end(cb) + }) + }) +}) + +suite.test('pipelining mode rejects multi-statement queries', (cb) => { + const client = new Client(helper.config) + + client.connect((err) => { + if (err) return cb(err) + + client.pipelining = true + + client.query('SELECT $1; SELECT $2', [1, 2], (err, result) => { + helper.assert(err) + helper.assert.equal(err.message, 'Multiple SQL commands in a single query are not allowed in pipeline mode') + client.end(cb) + }) + }) +}) + +module.exports = suite \ No newline at end of file From 38c7e9d1d1053d2ea5ba6d1d1f9ebfbcf3f905e8 Mon Sep 17 00:00:00 2001 From: francesco Date: Thu, 28 Aug 2025 12:40:47 +0200 Subject: [PATCH 02/13] fix: lint --- packages/pg/lib/client.js | 12 +++--- packages/pg/lib/query.js | 2 +- .../test/integration/client/pipeline-tests.js | 38 +++++++++---------- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 44dfe2fc7..1d9b77e14 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -752,7 +752,7 @@ class Client extends EventEmitter { if (typeof value !== 'boolean') { throw new TypeError('pipelining must be a boolean') } - + if (this._pipelining === value) { return } @@ -776,11 +776,11 @@ class Client extends EventEmitter { if (this._pipelining) { return } - + this._pipelining = true this._pipelineActive = true this._pipelineQueue = [] - + // Send pipeline mode command to server this.connection.enterPipelineMode() } @@ -797,7 +797,7 @@ class Client extends EventEmitter { this._pipelining = false this._pipelineActive = false - + // Send exit pipeline mode command to server this.connection.exitPipelineMode() } @@ -823,7 +823,7 @@ class Client extends EventEmitter { // Add query to pipeline queue this._pipelineQueue.push(query) - + // Submit query using pipeline-specific method const queryError = this._submitPipelineQuery(query) if (queryError) { @@ -859,7 +859,7 @@ class Client extends EventEmitter { } finally { this.connection.stream.uncork && this.connection.stream.uncork() } - + return null } } diff --git a/packages/pg/lib/query.js b/packages/pg/lib/query.js index 9c58d06ac..364c5e06a 100644 --- a/packages/pg/lib/query.js +++ b/packages/pg/lib/query.js @@ -279,7 +279,7 @@ class Query extends EventEmitter { portal: this.portal, rows: this.rows, }) - + connection.flush() } } diff --git a/packages/pg/test/integration/client/pipeline-tests.js b/packages/pg/test/integration/client/pipeline-tests.js index 01ab9fdcf..83bce7ff0 100644 --- a/packages/pg/test/integration/client/pipeline-tests.js +++ b/packages/pg/test/integration/client/pipeline-tests.js @@ -10,25 +10,25 @@ suite.test('pipelining property can be set and retrieved', (cb) => { // Initially false helper.assert.equal(client.pipelining, false) - + client.connect((err) => { if (err) return cb(err) - + // Can be set to true after connection client.pipelining = true helper.assert.equal(client.pipelining, true) - + // Can be set back to false client.pipelining = false helper.assert.equal(client.pipelining, false) - + client.end(cb) }) }) suite.test('cannot enable pipelining before connection', (cb) => { const client = new Client(helper.config) - + try { client.pipelining = true helper.assert.fail('Should have thrown error') @@ -40,15 +40,15 @@ suite.test('cannot enable pipelining before connection', (cb) => { suite.test('pipelining mode allows multiple parameterized queries', (cb) => { const client = new Client(helper.config) - + client.connect((err) => { if (err) return cb(err) - + client.pipelining = true - + let completed = 0 const results = [] - + // Send multiple queries in pipeline mode client.query('SELECT $1::text as message', ['Hello'], (err, result) => { if (err) return cb(err) @@ -56,26 +56,26 @@ suite.test('pipelining mode allows multiple parameterized queries', (cb) => { completed++ if (completed === 3) checkResults() }) - + client.query('SELECT $1::int as number', [42], (err, result) => { if (err) return cb(err) results[1] = result completed++ if (completed === 3) checkResults() }) - + client.query('SELECT $1::text as greeting', ['World'], (err, result) => { if (err) return cb(err) results[2] = result completed++ if (completed === 3) checkResults() }) - + function checkResults() { helper.assert.equal(results[0].rows[0].message, 'Hello') helper.assert.equal(results[1].rows[0].number, 42) helper.assert.equal(results[2].rows[0].greeting, 'World') - + client.end(cb) } }) @@ -83,12 +83,12 @@ suite.test('pipelining mode allows multiple parameterized queries', (cb) => { suite.test('pipelining mode rejects simple queries', (cb) => { const client = new Client(helper.config) - + client.connect((err) => { if (err) return cb(err) - + client.pipelining = true - + client.query('SELECT 1', (err, result) => { helper.assert(err) helper.assert.equal(err.message, 'Simple query protocol is not allowed in pipeline mode. Use parameterized queries.') @@ -99,12 +99,12 @@ suite.test('pipelining mode rejects simple queries', (cb) => { suite.test('pipelining mode rejects multi-statement queries', (cb) => { const client = new Client(helper.config) - + client.connect((err) => { if (err) return cb(err) - + client.pipelining = true - + client.query('SELECT $1; SELECT $2', [1, 2], (err, result) => { helper.assert(err) helper.assert.equal(err.message, 'Multiple SQL commands in a single query are not allowed in pipeline mode') From 967d7b7c9db22f673c7ccfb6fe6c5c4e96d315f4 Mon Sep 17 00:00:00 2001 From: francesco Date: Thu, 28 Aug 2025 12:44:43 +0200 Subject: [PATCH 03/13] fix: lint --- .../test/integration/client/pipeline-tests.js | 23 +------------------ 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/packages/pg/test/integration/client/pipeline-tests.js b/packages/pg/test/integration/client/pipeline-tests.js index 83bce7ff0..b53dd1f05 100644 --- a/packages/pg/test/integration/client/pipeline-tests.js +++ b/packages/pg/test/integration/client/pipeline-tests.js @@ -7,28 +7,22 @@ const suite = new helper.Suite() suite.test('pipelining property can be set and retrieved', (cb) => { const client = new Client(helper.config) - // Initially false helper.assert.equal(client.pipelining, false) - client.connect((err) => { if (err) return cb(err) - // Can be set to true after connection client.pipelining = true helper.assert.equal(client.pipelining, true) - // Can be set back to false client.pipelining = false helper.assert.equal(client.pipelining, false) - client.end(cb) }) }) suite.test('cannot enable pipelining before connection', (cb) => { const client = new Client(helper.config) - try { client.pipelining = true helper.assert.fail('Should have thrown error') @@ -40,15 +34,11 @@ suite.test('cannot enable pipelining before connection', (cb) => { suite.test('pipelining mode allows multiple parameterized queries', (cb) => { const client = new Client(helper.config) - client.connect((err) => { if (err) return cb(err) - client.pipelining = true - let completed = 0 const results = [] - // Send multiple queries in pipeline mode client.query('SELECT $1::text as message', ['Hello'], (err, result) => { if (err) return cb(err) @@ -56,26 +46,22 @@ suite.test('pipelining mode allows multiple parameterized queries', (cb) => { completed++ if (completed === 3) checkResults() }) - client.query('SELECT $1::int as number', [42], (err, result) => { if (err) return cb(err) results[1] = result completed++ if (completed === 3) checkResults() }) - client.query('SELECT $1::text as greeting', ['World'], (err, result) => { if (err) return cb(err) results[2] = result completed++ if (completed === 3) checkResults() }) - function checkResults() { helper.assert.equal(results[0].rows[0].message, 'Hello') helper.assert.equal(results[1].rows[0].number, 42) helper.assert.equal(results[2].rows[0].greeting, 'World') - client.end(cb) } }) @@ -83,12 +69,9 @@ suite.test('pipelining mode allows multiple parameterized queries', (cb) => { suite.test('pipelining mode rejects simple queries', (cb) => { const client = new Client(helper.config) - client.connect((err) => { if (err) return cb(err) - client.pipelining = true - client.query('SELECT 1', (err, result) => { helper.assert(err) helper.assert.equal(err.message, 'Simple query protocol is not allowed in pipeline mode. Use parameterized queries.') @@ -99,12 +82,9 @@ suite.test('pipelining mode rejects simple queries', (cb) => { suite.test('pipelining mode rejects multi-statement queries', (cb) => { const client = new Client(helper.config) - client.connect((err) => { if (err) return cb(err) - client.pipelining = true - client.query('SELECT $1; SELECT $2', [1, 2], (err, result) => { helper.assert(err) helper.assert.equal(err.message, 'Multiple SQL commands in a single query are not allowed in pipeline mode') @@ -112,5 +92,4 @@ suite.test('pipelining mode rejects multi-statement queries', (cb) => { }) }) }) - -module.exports = suite \ No newline at end of file +module.exports = suite From 4d795cf45df6ef2d34580fad003c006e17182c8f Mon Sep 17 00:00:00 2001 From: francesco Date: Thu, 28 Aug 2025 12:48:22 +0200 Subject: [PATCH 04/13] chore: lf --- .../test/integration/client/pipeline-tests.js | 190 +++++++++--------- 1 file changed, 95 insertions(+), 95 deletions(-) diff --git a/packages/pg/test/integration/client/pipeline-tests.js b/packages/pg/test/integration/client/pipeline-tests.js index b53dd1f05..51cea1390 100644 --- a/packages/pg/test/integration/client/pipeline-tests.js +++ b/packages/pg/test/integration/client/pipeline-tests.js @@ -1,95 +1,95 @@ -'use strict' - -const helper = require('../test-helper') -const { Client } = require('../../../') - -const suite = new helper.Suite() - -suite.test('pipelining property can be set and retrieved', (cb) => { - const client = new Client(helper.config) - // Initially false - helper.assert.equal(client.pipelining, false) - client.connect((err) => { - if (err) return cb(err) - // Can be set to true after connection - client.pipelining = true - helper.assert.equal(client.pipelining, true) - // Can be set back to false - client.pipelining = false - helper.assert.equal(client.pipelining, false) - client.end(cb) - }) -}) - -suite.test('cannot enable pipelining before connection', (cb) => { - const client = new Client(helper.config) - try { - client.pipelining = true - helper.assert.fail('Should have thrown error') - } catch (err) { - helper.assert.equal(err.message, 'Cannot enable pipelining mode before connection is established') - cb() - } -}) - -suite.test('pipelining mode allows multiple parameterized queries', (cb) => { - const client = new Client(helper.config) - client.connect((err) => { - if (err) return cb(err) - client.pipelining = true - let completed = 0 - const results = [] - // Send multiple queries in pipeline mode - client.query('SELECT $1::text as message', ['Hello'], (err, result) => { - if (err) return cb(err) - results[0] = result - completed++ - if (completed === 3) checkResults() - }) - client.query('SELECT $1::int as number', [42], (err, result) => { - if (err) return cb(err) - results[1] = result - completed++ - if (completed === 3) checkResults() - }) - client.query('SELECT $1::text as greeting', ['World'], (err, result) => { - if (err) return cb(err) - results[2] = result - completed++ - if (completed === 3) checkResults() - }) - function checkResults() { - helper.assert.equal(results[0].rows[0].message, 'Hello') - helper.assert.equal(results[1].rows[0].number, 42) - helper.assert.equal(results[2].rows[0].greeting, 'World') - client.end(cb) - } - }) -}) - -suite.test('pipelining mode rejects simple queries', (cb) => { - const client = new Client(helper.config) - client.connect((err) => { - if (err) return cb(err) - client.pipelining = true - client.query('SELECT 1', (err, result) => { - helper.assert(err) - helper.assert.equal(err.message, 'Simple query protocol is not allowed in pipeline mode. Use parameterized queries.') - client.end(cb) - }) - }) -}) - -suite.test('pipelining mode rejects multi-statement queries', (cb) => { - const client = new Client(helper.config) - client.connect((err) => { - if (err) return cb(err) - client.pipelining = true - client.query('SELECT $1; SELECT $2', [1, 2], (err, result) => { - helper.assert(err) - helper.assert.equal(err.message, 'Multiple SQL commands in a single query are not allowed in pipeline mode') - client.end(cb) - }) - }) -}) -module.exports = suite +'use strict' + +const helper = require('../test-helper') +const { Client } = require('../../../') + +const suite = new helper.Suite() + +suite.test('pipelining property can be set and retrieved', (cb) => { + const client = new Client(helper.config) + // Initially false + helper.assert.equal(client.pipelining, false) + client.connect((err) => { + if (err) return cb(err) + // Can be set to true after connection + client.pipelining = true + helper.assert.equal(client.pipelining, true) + // Can be set back to false + client.pipelining = false + helper.assert.equal(client.pipelining, false) + client.end(cb) + }) +}) + +suite.test('cannot enable pipelining before connection', (cb) => { + const client = new Client(helper.config) + try { + client.pipelining = true + helper.assert.fail('Should have thrown error') + } catch (err) { + helper.assert.equal(err.message, 'Cannot enable pipelining mode before connection is established') + cb() + } +}) + +suite.test('pipelining mode allows multiple parameterized queries', (cb) => { + const client = new Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + client.pipelining = true + let completed = 0 + const results = [] + // Send multiple queries in pipeline mode + client.query('SELECT $1::text as message', ['Hello'], (err, result) => { + if (err) return cb(err) + results[0] = result + completed++ + if (completed === 3) checkResults() + }) + client.query('SELECT $1::int as number', [42], (err, result) => { + if (err) return cb(err) + results[1] = result + completed++ + if (completed === 3) checkResults() + }) + client.query('SELECT $1::text as greeting', ['World'], (err, result) => { + if (err) return cb(err) + results[2] = result + completed++ + if (completed === 3) checkResults() + }) + function checkResults() { + helper.assert.equal(results[0].rows[0].message, 'Hello') + helper.assert.equal(results[1].rows[0].number, 42) + helper.assert.equal(results[2].rows[0].greeting, 'World') + client.end(cb) + } + }) +}) + +suite.test('pipelining mode rejects simple queries', (cb) => { + const client = new Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + client.pipelining = true + client.query('SELECT 1', (err, result) => { + helper.assert(err) + helper.assert.equal(err.message, 'Simple query protocol is not allowed in pipeline mode. Use parameterized queries.') + client.end(cb) + }) + }) +}) + +suite.test('pipelining mode rejects multi-statement queries', (cb) => { + const client = new Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + client.pipelining = true + client.query('SELECT $1; SELECT $2', [1, 2], (err, result) => { + helper.assert(err) + helper.assert.equal(err.message, 'Multiple SQL commands in a single query are not allowed in pipeline mode') + client.end(cb) + }) + }) +}) +module.exports = suite From 014afcc7e6e6b4a7936b598c18e8c2d99d3a66c8 Mon Sep 17 00:00:00 2001 From: francesco Date: Thu, 28 Aug 2025 12:50:43 +0200 Subject: [PATCH 05/13] fix: lint --- packages/pg/test/integration/client/pipeline-tests.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/pg/test/integration/client/pipeline-tests.js b/packages/pg/test/integration/client/pipeline-tests.js index 51cea1390..a63c94341 100644 --- a/packages/pg/test/integration/client/pipeline-tests.js +++ b/packages/pg/test/integration/client/pipeline-tests.js @@ -74,7 +74,10 @@ suite.test('pipelining mode rejects simple queries', (cb) => { client.pipelining = true client.query('SELECT 1', (err, result) => { helper.assert(err) - helper.assert.equal(err.message, 'Simple query protocol is not allowed in pipeline mode. Use parameterized queries.') + helper.assert.equal( + err.message, + 'Simple query protocol is not allowed in pipeline mode. Use parameterized queries.' + ) client.end(cb) }) }) From b92a970b4773975ff269a2c591eedafd363243d0 Mon Sep 17 00:00:00 2001 From: francesco Date: Thu, 28 Aug 2025 12:52:01 +0200 Subject: [PATCH 06/13] fix: lint --- packages/pg/test/integration/client/pipeline-tests.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pg/test/integration/client/pipeline-tests.js b/packages/pg/test/integration/client/pipeline-tests.js index a63c94341..aaea28998 100644 --- a/packages/pg/test/integration/client/pipeline-tests.js +++ b/packages/pg/test/integration/client/pipeline-tests.js @@ -75,7 +75,7 @@ suite.test('pipelining mode rejects simple queries', (cb) => { client.query('SELECT 1', (err, result) => { helper.assert(err) helper.assert.equal( - err.message, + err.message, 'Simple query protocol is not allowed in pipeline mode. Use parameterized queries.' ) client.end(cb) From 8bb43bc83c31b01c2956965602372862c6d8ec1e Mon Sep 17 00:00:00 2001 From: francesco Date: Thu, 28 Aug 2025 12:57:50 +0200 Subject: [PATCH 07/13] fix: assert --- .../test/integration/client/pipeline-tests.js | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/packages/pg/test/integration/client/pipeline-tests.js b/packages/pg/test/integration/client/pipeline-tests.js index aaea28998..767aa8e9e 100644 --- a/packages/pg/test/integration/client/pipeline-tests.js +++ b/packages/pg/test/integration/client/pipeline-tests.js @@ -2,21 +2,22 @@ const helper = require('../test-helper') const { Client } = require('../../../') +const assert = require('assert') const suite = new helper.Suite() suite.test('pipelining property can be set and retrieved', (cb) => { const client = new Client(helper.config) // Initially false - helper.assert.equal(client.pipelining, false) + assert.strictEqual(client.pipelining, false) client.connect((err) => { if (err) return cb(err) // Can be set to true after connection client.pipelining = true - helper.assert.equal(client.pipelining, true) + assert.strictEqual(client.pipelining, true) // Can be set back to false client.pipelining = false - helper.assert.equal(client.pipelining, false) + assert.strictEqual(client.pipelining, false) client.end(cb) }) }) @@ -25,9 +26,9 @@ suite.test('cannot enable pipelining before connection', (cb) => { const client = new Client(helper.config) try { client.pipelining = true - helper.assert.fail('Should have thrown error') + assert.fail('Should have thrown error') } catch (err) { - helper.assert.equal(err.message, 'Cannot enable pipelining mode before connection is established') + assert.strictEqual(err.message, 'Cannot enable pipelining mode before connection is established') cb() } }) @@ -59,9 +60,9 @@ suite.test('pipelining mode allows multiple parameterized queries', (cb) => { if (completed === 3) checkResults() }) function checkResults() { - helper.assert.equal(results[0].rows[0].message, 'Hello') - helper.assert.equal(results[1].rows[0].number, 42) - helper.assert.equal(results[2].rows[0].greeting, 'World') + assert.strictEqual(results[0].rows[0].message, 'Hello') + assert.strictEqual(results[1].rows[0].number, 42) + assert.strictEqual(results[2].rows[0].greeting, 'World') client.end(cb) } }) @@ -73,8 +74,7 @@ suite.test('pipelining mode rejects simple queries', (cb) => { if (err) return cb(err) client.pipelining = true client.query('SELECT 1', (err, result) => { - helper.assert(err) - helper.assert.equal( + assert.strictEqual( err.message, 'Simple query protocol is not allowed in pipeline mode. Use parameterized queries.' ) @@ -89,8 +89,7 @@ suite.test('pipelining mode rejects multi-statement queries', (cb) => { if (err) return cb(err) client.pipelining = true client.query('SELECT $1; SELECT $2', [1, 2], (err, result) => { - helper.assert(err) - helper.assert.equal(err.message, 'Multiple SQL commands in a single query are not allowed in pipeline mode') + assert.strictEqual(err.message, 'Multiple SQL commands in a single query are not allowed in pipeline mode') client.end(cb) }) }) From 40df1bb939a2f8ad9105270793b469c206325ac7 Mon Sep 17 00:00:00 2001 From: francesco Date: Thu, 28 Aug 2025 13:53:13 +0200 Subject: [PATCH 08/13] fix: _getCurrentQuery --- packages/pg/lib/client.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 1d9b77e14..dddd65caf 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -488,7 +488,7 @@ class Client extends EventEmitter { } _handleParseComplete() { - const activeQuery = this._getActiveQuery() + const activeQuery = this._getCurrentQuery() if (activeQuery == null) { const error = new Error('Received unexpected parseComplete message from backend.') this._handleErrorEvent(error) From e4131240e5cb17c4c8098000c12119d341b6ce48 Mon Sep 17 00:00:00 2001 From: francesco Date: Thu, 28 Aug 2025 13:59:47 +0200 Subject: [PATCH 09/13] fix: preparePipeline --- packages/pg/lib/query.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/pg/lib/query.js b/packages/pg/lib/query.js index 364c5e06a..a2a21be26 100644 --- a/packages/pg/lib/query.js +++ b/packages/pg/lib/query.js @@ -280,7 +280,11 @@ class Query extends EventEmitter { rows: this.rows, }) - connection.flush() + if (!this.rows) { + connection.sync() + } else { + connection.flush() + } } } From d9940000bf3f8f7e9ddafd2029c95aa31f9f3cb5 Mon Sep 17 00:00:00 2001 From: francesco Date: Thu, 28 Aug 2025 14:05:51 +0200 Subject: [PATCH 10/13] fix: ending conection --- packages/pg/lib/connection.js | 5 +++++ packages/pg/lib/query.js | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/pg/lib/connection.js b/packages/pg/lib/connection.js index 9bf32d52e..458071e82 100644 --- a/packages/pg/lib/connection.js +++ b/packages/pg/lib/connection.js @@ -178,6 +178,11 @@ class Connection extends EventEmitter { this._send(syncBuffer) } + pipelineSync() { + // Send sync without marking connection as ending (for pipeline mode) + this._send(syncBuffer) + } + ref() { this.stream.ref() } diff --git a/packages/pg/lib/query.js b/packages/pg/lib/query.js index a2a21be26..bec72ec08 100644 --- a/packages/pg/lib/query.js +++ b/packages/pg/lib/query.js @@ -281,7 +281,7 @@ class Query extends EventEmitter { }) if (!this.rows) { - connection.sync() + connection.pipelineSync() } else { connection.flush() } From fd6621beaef1c3d6ea1e1cae0bb9c0bbfad4f369 Mon Sep 17 00:00:00 2001 From: francesco Date: Thu, 28 Aug 2025 14:15:50 +0200 Subject: [PATCH 11/13] fix: parseComplete --- packages/pg/lib/client.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index dddd65caf..6917f35ab 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -490,6 +490,11 @@ class Client extends EventEmitter { _handleParseComplete() { const activeQuery = this._getCurrentQuery() if (activeQuery == null) { + // In pipeline mode, parseComplete might be received before queries are fully processed + // This is normal behavior, so we can safely ignore it + if (this._pipelining) { + return + } const error = new Error('Received unexpected parseComplete message from backend.') this._handleErrorEvent(error) return From e58ca2297bebbd8ded71ad597add0805ad5eb01b Mon Sep 17 00:00:00 2001 From: francesco Date: Thu, 28 Aug 2025 14:25:07 +0200 Subject: [PATCH 12/13] fix: command complete --- packages/pg/lib/client.js | 5 +++++ packages/pg/lib/query.js | 6 +----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 6917f35ab..a50db2498 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -471,6 +471,11 @@ class Client extends EventEmitter { _handleCommandComplete(msg) { const query = this._getCurrentQuery() if (query == null) { + // In pipeline mode, commandComplete might be received after query is processed + // This can happen due to message timing, so we can safely ignore it + if (this._pipelining) { + return + } const error = new Error('Received unexpected commandComplete message from backend.') this._handleErrorEvent(error) return diff --git a/packages/pg/lib/query.js b/packages/pg/lib/query.js index bec72ec08..364c5e06a 100644 --- a/packages/pg/lib/query.js +++ b/packages/pg/lib/query.js @@ -280,11 +280,7 @@ class Query extends EventEmitter { rows: this.rows, }) - if (!this.rows) { - connection.pipelineSync() - } else { - connection.flush() - } + connection.flush() } } From fd9d73e0b59185cfdd940adbdfa86fa3ecceaab3 Mon Sep 17 00:00:00 2001 From: francesco Date: Thu, 28 Aug 2025 14:58:51 +0200 Subject: [PATCH 13/13] fix: flush --- packages/pg/lib/client.js | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index a50db2498..9a4c9602c 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -383,6 +383,11 @@ class Client extends EventEmitter { const completedQuery = this._pipelineQueue.shift() if (completedQuery) { completedQuery.handleReadyForQuery(this.connection) + } else { + // No queries in pipeline queue, but we received readyForQuery + // This might happen due to message timing in pipeline mode + // Just mark as ready for more queries + this.readyForQuery = true } } @@ -808,6 +813,12 @@ class Client extends EventEmitter { this._pipelining = false this._pipelineActive = false + // Clear any pending sync timer + if (this._pipelineSyncTimer) { + clearTimeout(this._pipelineSyncTimer) + this._pipelineSyncTimer = null + } + // Send exit pipeline mode command to server this.connection.exitPipelineMode() } @@ -845,11 +856,29 @@ class Client extends EventEmitter { this._pipelineQueue.splice(index, 1) } }) + } else { + // Schedule a sync after a short delay to allow batching + this._schedulePipelineSync() } return result } + _schedulePipelineSync() { + // Clear any existing sync timer + if (this._pipelineSyncTimer) { + clearTimeout(this._pipelineSyncTimer) + } + + // Schedule a sync after a short delay to allow multiple queries to batch + this._pipelineSyncTimer = setTimeout(() => { + if (this._pipelining && this._pipelineQueue.length > 0) { + this.connection.pipelineSync() + } + this._pipelineSyncTimer = null + }, 0) // Use 0 delay to sync on next tick + } + _submitPipelineQuery(query) { if (typeof query.text !== 'string' && typeof query.name !== 'string') { return new Error('A query must have either text or a name. Supplying neither is unsupported.')