Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 220 additions & 8 deletions packages/pg/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class Client extends EventEmitter {
this.binary = c.binary || defaults.binary
this.processID = null
this.secretKey = null
this._pipelining = false
this._pipelineQueue = []
this._pipelineActive = false
this.ssl = this.connectionParameters.ssl || false
// As with Password, make SSL->Key (the private key) non-enumerable.
// It won't show up in stack traces
Expand Down Expand Up @@ -125,6 +128,12 @@ class Client extends EventEmitter {

this._queryQueue.forEach(enqueueError)
this._queryQueue.length = 0

// Also error all pipeline queries
if (this._pipelineQueue) {
this._pipelineQueue.forEach(enqueueError)
this._pipelineQueue.length = 0
}
}

_connect(callback) {
Expand Down Expand Up @@ -354,6 +363,11 @@ class Client extends EventEmitter {
}
this.emit('connect')
}

if (this._pipelining) {
return this._handlePipelineReadyForQuery(msg)
}

const activeQuery = this._getActiveQuery()
this._activeQuery = null
this.readyForQuery = true
Expand All @@ -363,6 +377,27 @@ class Client extends EventEmitter {
this._pulseQueryQueue()
}

_handlePipelineReadyForQuery(msg) {
// In pipeline mode, handle completed queries
if (this._pipelineQueue.length > 0) {
const completedQuery = this._pipelineQueue.shift()
if (completedQuery) {
completedQuery.handleReadyForQuery(this.connection)
} else {
// No queries in pipeline queue, but we received readyForQuery
// This might happen due to message timing in pipeline mode
// Just mark as ready for more queries
this.readyForQuery = true
}
}

// If no more queries in pipeline, we're ready for more
if (this._pipelineQueue.length === 0) {
this.readyForQuery = true
this.emit('drain')
}
}

// if we receive an error event or error message
// during the connection process we handle it here
_handleErrorWhileConnecting(err) {
Expand Down Expand Up @@ -408,38 +443,68 @@ class Client extends EventEmitter {

_handleRowDescription(msg) {
// delegate rowDescription to active query
this._getActiveQuery().handleRowDescription(msg)
const query = this._getCurrentQuery()
if (query) {
query.handleRowDescription(msg)
}
}

_handleDataRow(msg) {
// delegate dataRow to active query
this._getActiveQuery().handleDataRow(msg)
const query = this._getCurrentQuery()
if (query) {
query.handleDataRow(msg)
}
}

_handlePortalSuspended(msg) {
// delegate portalSuspended to active query
this._getActiveQuery().handlePortalSuspended(this.connection)
const query = this._getCurrentQuery()
if (query) {
query.handlePortalSuspended(this.connection)
}
}

_handleEmptyQuery(msg) {
// delegate emptyQuery to active query
this._getActiveQuery().handleEmptyQuery(this.connection)
const query = this._getCurrentQuery()
if (query) {
query.handleEmptyQuery(this.connection)
}
}

_handleCommandComplete(msg) {
const activeQuery = this._getActiveQuery()
if (activeQuery == null) {
const query = this._getCurrentQuery()
if (query == null) {
// In pipeline mode, commandComplete might be received after query is processed
// This can happen due to message timing, so we can safely ignore it
if (this._pipelining) {
return
}
const error = new Error('Received unexpected commandComplete message from backend.')
this._handleErrorEvent(error)
return
}
// delegate commandComplete to active query
activeQuery.handleCommandComplete(msg, this.connection)
query.handleCommandComplete(msg, this.connection)
}

_getCurrentQuery() {
if (this._pipelining) {
// In pipeline mode, return the first query in the pipeline queue
return this._pipelineQueue.length > 0 ? this._pipelineQueue[0] : null
}
return this._getActiveQuery()
}

_handleParseComplete() {
const activeQuery = this._getActiveQuery()
const activeQuery = this._getCurrentQuery()
if (activeQuery == null) {
// In pipeline mode, parseComplete might be received before queries are fully processed
// This is normal behavior, so we can safely ignore it
if (this._pipelining) {
return
}
const error = new Error('Received unexpected parseComplete message from backend.')
this._handleErrorEvent(error)
return
Expand Down Expand Up @@ -644,6 +709,10 @@ class Client extends EventEmitter {
return result
}

if (this._pipelining) {
return this._pipelineQuery(query, result)
}

this._queryQueue.push(query)
this._pulseQueryQueue()
return result
Expand Down Expand Up @@ -689,6 +758,149 @@ class Client extends EventEmitter {
queryQueueDeprecationNotice()
return this._queryQueue
}

get pipelining() {
return this._pipelining
}

set pipelining(value) {
if (typeof value !== 'boolean') {
throw new TypeError('pipelining must be a boolean')
}

if (this._pipelining === value) {
return
}

if (value && !this._connected) {
throw new Error('Cannot enable pipelining mode before connection is established')
}

if (value && this._getActiveQuery()) {
throw new Error('Cannot enable pipelining mode while a query is active')
}

if (value) {
this._enterPipelineMode()
} else {
this._exitPipelineMode()
}
}

_enterPipelineMode() {
if (this._pipelining) {
return
}

this._pipelining = true
this._pipelineActive = true
this._pipelineQueue = []

// Send pipeline mode command to server
this.connection.enterPipelineMode()
}

_exitPipelineMode() {
if (!this._pipelining) {
return
}

// Process any remaining queries in pipeline
if (this._pipelineQueue.length > 0) {
throw new Error('Cannot exit pipeline mode with pending queries')
}

this._pipelining = false
this._pipelineActive = false

// Clear any pending sync timer
if (this._pipelineSyncTimer) {
clearTimeout(this._pipelineSyncTimer)
this._pipelineSyncTimer = null
}

// Send exit pipeline mode command to server
this.connection.exitPipelineMode()
}

_pipelineQuery(query, result) {
// Validate query for pipeline mode
if (query.text && typeof query.text === 'string' && query.text.includes(';')) {
const error = new Error('Multiple SQL commands in a single query are not allowed in pipeline mode')
process.nextTick(() => {
query.handleError(error, this.connection)
})
return result
}

// Disallow simple query protocol in pipeline mode
if (!query.requiresPreparation()) {
const error = new Error('Simple query protocol is not allowed in pipeline mode. Use parameterized queries.')
process.nextTick(() => {
query.handleError(error, this.connection)
})
return result
}

// Add query to pipeline queue
this._pipelineQueue.push(query)

// Submit query using pipeline-specific method
const queryError = this._submitPipelineQuery(query)
if (queryError) {
process.nextTick(() => {
query.handleError(queryError, this.connection)
// Remove from pipeline queue on error
const index = this._pipelineQueue.indexOf(query)
if (index > -1) {
this._pipelineQueue.splice(index, 1)
}
})
} else {
// Schedule a sync after a short delay to allow batching
this._schedulePipelineSync()
}

return result
}

_schedulePipelineSync() {
// Clear any existing sync timer
if (this._pipelineSyncTimer) {
clearTimeout(this._pipelineSyncTimer)
}

// Schedule a sync after a short delay to allow multiple queries to batch
this._pipelineSyncTimer = setTimeout(() => {
if (this._pipelining && this._pipelineQueue.length > 0) {
this.connection.pipelineSync()
}
this._pipelineSyncTimer = null
}, 0) // Use 0 delay to sync on next tick
}

_submitPipelineQuery(query) {
if (typeof query.text !== 'string' && typeof query.name !== 'string') {
return new Error('A query must have either text or a name. Supplying neither is unsupported.')
}
const previous = this.connection.parsedStatements[query.name]
if (query.text && previous && query.text !== previous) {
return new Error(`Prepared statements must be unique - '${query.name}' was used for a different statement`)
}
if (query.values && !Array.isArray(query.values)) {
return new Error('Query values must be an array')
}

// In pipeline mode, we always use extended query protocol
this.connection.stream.cork && this.connection.stream.cork()
try {
query.preparePipeline(this.connection)
} finally {
this.connection.stream.uncork && this.connection.stream.uncork()
}

return null
}
}

// expose a Query constructor
Expand Down
15 changes: 15 additions & 0 deletions packages/pg/lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ class Connection extends EventEmitter {
this._send(syncBuffer)
}

pipelineSync() {
// Send sync without marking connection as ending (for pipeline mode)
this._send(syncBuffer)
}

ref() {
this.stream.ref()
}
Expand Down Expand Up @@ -217,6 +222,16 @@ class Connection extends EventEmitter {
sendCopyFail(msg) {
this._send(serialize.copyFail(msg))
}

enterPipelineMode() {
this._pipelineMode = true
}

exitPipelineMode() {
// Send sync to end pipeline mode
this.sync()
this._pipelineMode = false
}
}

module.exports = Connection
35 changes: 35 additions & 0 deletions packages/pg/lib/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,41 @@ class Query extends EventEmitter {
handleCopyData(msg, connection) {
// noop
}

preparePipeline(connection) {
if (!this.hasBeenParsed(connection)) {
connection.parse({
text: this.text,
name: this.name,
types: this.types,
})
}

try {
connection.bind({
portal: this.portal,
statement: this.name,
values: this.values,
binary: this.binary,
valueMapper: utils.prepareValue,
})
} catch (err) {
this.handleError(err, connection)
return
}

connection.describe({
type: 'P',
name: this.portal || '',
})

connection.execute({
portal: this.portal,
rows: this.rows,
})

connection.flush()
}
}

module.exports = Query
Loading
Loading