diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 903db6c66..9a4c9602c 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -83,6 +83,9 @@ class Client extends EventEmitter { this.binary = c.binary || defaults.binary this.processID = null this.secretKey = null + this._pipelining = false + this._pipelineQueue = [] + this._pipelineActive = false this.ssl = this.connectionParameters.ssl || false // As with Password, make SSL->Key (the private key) non-enumerable. // It won't show up in stack traces @@ -125,6 +128,12 @@ class Client extends EventEmitter { this._queryQueue.forEach(enqueueError) this._queryQueue.length = 0 + + // Also error all pipeline queries + if (this._pipelineQueue) { + this._pipelineQueue.forEach(enqueueError) + this._pipelineQueue.length = 0 + } } _connect(callback) { @@ -354,6 +363,11 @@ class Client extends EventEmitter { } this.emit('connect') } + + if (this._pipelining) { + return this._handlePipelineReadyForQuery(msg) + } + const activeQuery = this._getActiveQuery() this._activeQuery = null this.readyForQuery = true @@ -363,6 +377,27 @@ class Client extends EventEmitter { this._pulseQueryQueue() } + _handlePipelineReadyForQuery(msg) { + // In pipeline mode, handle completed queries + if (this._pipelineQueue.length > 0) { + const completedQuery = this._pipelineQueue.shift() + if (completedQuery) { + completedQuery.handleReadyForQuery(this.connection) + } else { + // No queries in pipeline queue, but we received readyForQuery + // This might happen due to message timing in pipeline mode + // Just mark as ready for more queries + this.readyForQuery = true + } + } + + // If no more queries in pipeline, we're ready for more + if (this._pipelineQueue.length === 0) { + this.readyForQuery = true + this.emit('drain') + } + } + // if we receive an error event or error message // during the connection process we handle it here _handleErrorWhileConnecting(err) { @@ -408,38 +443,68 @@ class Client extends EventEmitter { _handleRowDescription(msg) { // delegate rowDescription to active query - this._getActiveQuery().handleRowDescription(msg) + const query = this._getCurrentQuery() + if (query) { + query.handleRowDescription(msg) + } } _handleDataRow(msg) { // delegate dataRow to active query - this._getActiveQuery().handleDataRow(msg) + const query = this._getCurrentQuery() + if (query) { + query.handleDataRow(msg) + } } _handlePortalSuspended(msg) { // delegate portalSuspended to active query - this._getActiveQuery().handlePortalSuspended(this.connection) + const query = this._getCurrentQuery() + if (query) { + query.handlePortalSuspended(this.connection) + } } _handleEmptyQuery(msg) { // delegate emptyQuery to active query - this._getActiveQuery().handleEmptyQuery(this.connection) + const query = this._getCurrentQuery() + if (query) { + query.handleEmptyQuery(this.connection) + } } _handleCommandComplete(msg) { - const activeQuery = this._getActiveQuery() - if (activeQuery == null) { + const query = this._getCurrentQuery() + if (query == null) { + // In pipeline mode, commandComplete might be received after query is processed + // This can happen due to message timing, so we can safely ignore it + if (this._pipelining) { + return + } const error = new Error('Received unexpected commandComplete message from backend.') this._handleErrorEvent(error) return } // delegate commandComplete to active query - activeQuery.handleCommandComplete(msg, this.connection) + query.handleCommandComplete(msg, this.connection) + } + + _getCurrentQuery() { + if (this._pipelining) { + // In pipeline mode, return the first query in the pipeline queue + return this._pipelineQueue.length > 0 ? this._pipelineQueue[0] : null + } + return this._getActiveQuery() } _handleParseComplete() { - const activeQuery = this._getActiveQuery() + const activeQuery = this._getCurrentQuery() if (activeQuery == null) { + // In pipeline mode, parseComplete might be received before queries are fully processed + // This is normal behavior, so we can safely ignore it + if (this._pipelining) { + return + } const error = new Error('Received unexpected parseComplete message from backend.') this._handleErrorEvent(error) return @@ -644,6 +709,10 @@ class Client extends EventEmitter { return result } + if (this._pipelining) { + return this._pipelineQuery(query, result) + } + this._queryQueue.push(query) this._pulseQueryQueue() return result @@ -689,6 +758,149 @@ class Client extends EventEmitter { queryQueueDeprecationNotice() return this._queryQueue } + + get pipelining() { + return this._pipelining + } + + set pipelining(value) { + if (typeof value !== 'boolean') { + throw new TypeError('pipelining must be a boolean') + } + + if (this._pipelining === value) { + return + } + + if (value && !this._connected) { + throw new Error('Cannot enable pipelining mode before connection is established') + } + + if (value && this._getActiveQuery()) { + throw new Error('Cannot enable pipelining mode while a query is active') + } + + if (value) { + this._enterPipelineMode() + } else { + this._exitPipelineMode() + } + } + + _enterPipelineMode() { + if (this._pipelining) { + return + } + + this._pipelining = true + this._pipelineActive = true + this._pipelineQueue = [] + + // Send pipeline mode command to server + this.connection.enterPipelineMode() + } + + _exitPipelineMode() { + if (!this._pipelining) { + return + } + + // Process any remaining queries in pipeline + if (this._pipelineQueue.length > 0) { + throw new Error('Cannot exit pipeline mode with pending queries') + } + + this._pipelining = false + this._pipelineActive = false + + // Clear any pending sync timer + if (this._pipelineSyncTimer) { + clearTimeout(this._pipelineSyncTimer) + this._pipelineSyncTimer = null + } + + // Send exit pipeline mode command to server + this.connection.exitPipelineMode() + } + + _pipelineQuery(query, result) { + // Validate query for pipeline mode + if (query.text && typeof query.text === 'string' && query.text.includes(';')) { + const error = new Error('Multiple SQL commands in a single query are not allowed in pipeline mode') + process.nextTick(() => { + query.handleError(error, this.connection) + }) + return result + } + + // Disallow simple query protocol in pipeline mode + if (!query.requiresPreparation()) { + const error = new Error('Simple query protocol is not allowed in pipeline mode. Use parameterized queries.') + process.nextTick(() => { + query.handleError(error, this.connection) + }) + return result + } + + // Add query to pipeline queue + this._pipelineQueue.push(query) + + // Submit query using pipeline-specific method + const queryError = this._submitPipelineQuery(query) + if (queryError) { + process.nextTick(() => { + query.handleError(queryError, this.connection) + // Remove from pipeline queue on error + const index = this._pipelineQueue.indexOf(query) + if (index > -1) { + this._pipelineQueue.splice(index, 1) + } + }) + } else { + // Schedule a sync after a short delay to allow batching + this._schedulePipelineSync() + } + + return result + } + + _schedulePipelineSync() { + // Clear any existing sync timer + if (this._pipelineSyncTimer) { + clearTimeout(this._pipelineSyncTimer) + } + + // Schedule a sync after a short delay to allow multiple queries to batch + this._pipelineSyncTimer = setTimeout(() => { + if (this._pipelining && this._pipelineQueue.length > 0) { + this.connection.pipelineSync() + } + this._pipelineSyncTimer = null + }, 0) // Use 0 delay to sync on next tick + } + + _submitPipelineQuery(query) { + if (typeof query.text !== 'string' && typeof query.name !== 'string') { + return new Error('A query must have either text or a name. Supplying neither is unsupported.') + } + const previous = this.connection.parsedStatements[query.name] + if (query.text && previous && query.text !== previous) { + return new Error(`Prepared statements must be unique - '${query.name}' was used for a different statement`) + } + if (query.values && !Array.isArray(query.values)) { + return new Error('Query values must be an array') + } + + // In pipeline mode, we always use extended query protocol + this.connection.stream.cork && this.connection.stream.cork() + try { + query.preparePipeline(this.connection) + } finally { + this.connection.stream.uncork && this.connection.stream.uncork() + } + + return null + } } // expose a Query constructor diff --git a/packages/pg/lib/connection.js b/packages/pg/lib/connection.js index 8045af858..458071e82 100644 --- a/packages/pg/lib/connection.js +++ b/packages/pg/lib/connection.js @@ -178,6 +178,11 @@ class Connection extends EventEmitter { this._send(syncBuffer) } + pipelineSync() { + // Send sync without marking connection as ending (for pipeline mode) + this._send(syncBuffer) + } + ref() { this.stream.ref() } @@ -217,6 +222,16 @@ class Connection extends EventEmitter { sendCopyFail(msg) { this._send(serialize.copyFail(msg)) } + + enterPipelineMode() { + this._pipelineMode = true + } + + exitPipelineMode() { + // Send sync to end pipeline mode + this.sync() + this._pipelineMode = false + } } module.exports = Connection diff --git a/packages/pg/lib/query.js b/packages/pg/lib/query.js index 64aab5ff2..364c5e06a 100644 --- a/packages/pg/lib/query.js +++ b/packages/pg/lib/query.js @@ -247,6 +247,41 @@ class Query extends EventEmitter { handleCopyData(msg, connection) { // noop } + + preparePipeline(connection) { + if (!this.hasBeenParsed(connection)) { + connection.parse({ + text: this.text, + name: this.name, + types: this.types, + }) + } + + try { + connection.bind({ + portal: this.portal, + statement: this.name, + values: this.values, + binary: this.binary, + valueMapper: utils.prepareValue, + }) + } catch (err) { + this.handleError(err, connection) + return + } + + connection.describe({ + type: 'P', + name: this.portal || '', + }) + + connection.execute({ + portal: this.portal, + rows: this.rows, + }) + + connection.flush() + } } module.exports = Query diff --git a/packages/pg/test/integration/client/pipeline-tests.js b/packages/pg/test/integration/client/pipeline-tests.js new file mode 100644 index 000000000..767aa8e9e --- /dev/null +++ b/packages/pg/test/integration/client/pipeline-tests.js @@ -0,0 +1,97 @@ +'use strict' + +const helper = require('../test-helper') +const { Client } = require('../../../') +const assert = require('assert') + +const suite = new helper.Suite() + +suite.test('pipelining property can be set and retrieved', (cb) => { + const client = new Client(helper.config) + // Initially false + assert.strictEqual(client.pipelining, false) + client.connect((err) => { + if (err) return cb(err) + // Can be set to true after connection + client.pipelining = true + assert.strictEqual(client.pipelining, true) + // Can be set back to false + client.pipelining = false + assert.strictEqual(client.pipelining, false) + client.end(cb) + }) +}) + +suite.test('cannot enable pipelining before connection', (cb) => { + const client = new Client(helper.config) + try { + client.pipelining = true + assert.fail('Should have thrown error') + } catch (err) { + assert.strictEqual(err.message, 'Cannot enable pipelining mode before connection is established') + cb() + } +}) + +suite.test('pipelining mode allows multiple parameterized queries', (cb) => { + const client = new Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + client.pipelining = true + let completed = 0 + const results = [] + // Send multiple queries in pipeline mode + client.query('SELECT $1::text as message', ['Hello'], (err, result) => { + if (err) return cb(err) + results[0] = result + completed++ + if (completed === 3) checkResults() + }) + client.query('SELECT $1::int as number', [42], (err, result) => { + if (err) return cb(err) + results[1] = result + completed++ + if (completed === 3) checkResults() + }) + client.query('SELECT $1::text as greeting', ['World'], (err, result) => { + if (err) return cb(err) + results[2] = result + completed++ + if (completed === 3) checkResults() + }) + function checkResults() { + assert.strictEqual(results[0].rows[0].message, 'Hello') + assert.strictEqual(results[1].rows[0].number, 42) + assert.strictEqual(results[2].rows[0].greeting, 'World') + client.end(cb) + } + }) +}) + +suite.test('pipelining mode rejects simple queries', (cb) => { + const client = new Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + client.pipelining = true + client.query('SELECT 1', (err, result) => { + assert.strictEqual( + err.message, + 'Simple query protocol is not allowed in pipeline mode. Use parameterized queries.' + ) + client.end(cb) + }) + }) +}) + +suite.test('pipelining mode rejects multi-statement queries', (cb) => { + const client = new Client(helper.config) + client.connect((err) => { + if (err) return cb(err) + client.pipelining = true + client.query('SELECT $1; SELECT $2', [1, 2], (err, result) => { + assert.strictEqual(err.message, 'Multiple SQL commands in a single query are not allowed in pipeline mode') + client.end(cb) + }) + }) +}) +module.exports = suite