@@ -41,7 +41,12 @@ import {
4141 ConnectionPoolReadyEvent ,
4242 ConnectionReadyEvent
4343} from './connection_pool_events' ;
44- import { PoolClearedError , PoolClosedError , WaitQueueTimeoutError } from './errors' ;
44+ import {
45+ PoolClearedError ,
46+ PoolClearedOnNetworkError ,
47+ PoolClosedError ,
48+ WaitQueueTimeoutError
49+ } from './errors' ;
4550import { ConnectionPoolMetrics } from './metrics' ;
4651
4752/** @internal */
@@ -391,10 +396,10 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
391396 this [ kConnections ] . unshift ( connection ) ;
392397 }
393398
394- this [ kCheckedOut ] . delete ( connection ) ;
399+ const wasConnectionDeleted = this [ kCheckedOut ] . delete ( connection ) ;
395400 this . emit ( ConnectionPool . CONNECTION_CHECKED_IN , new ConnectionCheckedInEvent ( this , connection ) ) ;
396401
397- if ( willDestroy ) {
402+ if ( wasConnectionDeleted && willDestroy ) {
398403 const reason = connection . closed ? 'error' : poolClosed ? 'poolClosed' : 'stale' ;
399404 this . destroyConnection ( connection , reason ) ;
400405 }
@@ -408,8 +413,9 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
408413 * Pool reset is handled by incrementing the pool's generation count. Any existing connection of a
409414 * previous generation will eventually be pruned during subsequent checkouts.
410415 */
411- clear ( options : { serviceId ?: ObjectId } = { } ) : void {
416+ clear ( options : { serviceId ?: ObjectId ; interruptInUseConnections ?: boolean } = { } ) : void {
412417 const { serviceId } = options ;
418+ const interruptInUseConnections = options . interruptInUseConnections ?? false ;
413419 if ( this . closed ) {
414420 return ;
415421 }
@@ -433,18 +439,72 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
433439 return ;
434440 }
435441
442+ const oldGeneration = this [ kGeneration ] ;
443+
436444 // handle non load-balanced case
437445 this [ kGeneration ] += 1 ;
438446 const alreadyPaused = this [ kPoolState ] === PoolState . paused ;
439447 this [ kPoolState ] = PoolState . paused ;
440448
441449 this . clearMinPoolSizeTimer ( ) ;
442450 if ( ! alreadyPaused ) {
443- this . emit ( ConnectionPool . CONNECTION_POOL_CLEARED , new ConnectionPoolClearedEvent ( this ) ) ;
451+ this . emit (
452+ ConnectionPool . CONNECTION_POOL_CLEARED ,
453+ new ConnectionPoolClearedEvent ( this , { interruptInUseConnections } )
454+ ) ;
444455 }
456+
457+ process . nextTick ( ( ) =>
458+ this . pruneConnections ( { minGeneration : oldGeneration , interruptInUseConnections } )
459+ ) ;
460+
445461 this . processWaitQueue ( ) ;
446462 }
447463
464+ /**
465+ * Closes all checked in perished connections in the pool with a resumable PoolClearedOnNetworkError.
466+ *
467+ * If interruptInUseConnections is `true`, this method attempts to kill checked out connections as well.
468+ * Only connections where `connection.generation <= minGeneration` are killed. Connections are closed with a
469+ * resumable PoolClearedOnNetworkTimeoutError.
470+ */
471+ private pruneConnections ( {
472+ interruptInUseConnections,
473+ minGeneration
474+ } : {
475+ interruptInUseConnections : boolean ;
476+ minGeneration : number ;
477+ } ) {
478+ this [ kConnections ] . prune ( connection => {
479+ if ( connection . generation <= minGeneration ) {
480+ connection . onError ( new PoolClearedOnNetworkError ( this ) ) ;
481+ this . emit (
482+ ConnectionPool . CONNECTION_CLOSED ,
483+ new ConnectionClosedEvent ( this , connection , 'stale' )
484+ ) ;
485+
486+ return true ;
487+ }
488+ return false ;
489+ } ) ;
490+
491+ if ( interruptInUseConnections ) {
492+ for ( const connection of this [ kCheckedOut ] ) {
493+ if ( connection . generation <= minGeneration ) {
494+ this [ kCheckedOut ] . delete ( connection ) ;
495+ connection . onError ( new PoolClearedOnNetworkError ( this ) ) ;
496+ this . emit (
497+ ConnectionPool . CONNECTION_CLOSED ,
498+ new ConnectionClosedEvent ( this , connection , 'stale' )
499+ ) ;
500+ }
501+ }
502+
503+ // TODO(NODE-xxxx): track pending connections and cancel
504+ // this[kCancellationToken].emit('cancel');
505+ }
506+ }
507+
448508 /** Close the pool */
449509 close ( callback : Callback < void > ) : void ;
450510 close ( options : CloseOptions , callback : Callback < void > ) : void ;
@@ -573,7 +633,12 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
573633 return ! ! ( this . options . maxIdleTimeMS && connection . idleTime > this . options . maxIdleTimeMS ) ;
574634 }
575635
576- private connectionIsPerished ( connection : Connection ) {
636+ /**
637+ * Destroys a connection if the connection is perished.
638+ *
639+ * @returns `true` if the connection was destroyed, `false` otherwise.
640+ */
641+ private destroyConnectionIfPerished ( connection : Connection ) {
577642 const isStale = this . connectionIsStale ( connection ) ;
578643 const isIdle = this . connectionIsIdle ( connection ) ;
579644 if ( ! isStale && ! isIdle && ! connection . closed ) {
@@ -659,7 +724,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
659724 return ;
660725 }
661726
662- this [ kConnections ] . prune ( connection => this . connectionIsPerished ( connection ) ) ;
727+ this [ kConnections ] . prune ( connection => this . destroyConnectionIfPerished ( connection ) ) ;
663728
664729 if (
665730 this . totalConnectionCount < minPoolSize &&
@@ -735,7 +800,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
735800 break ;
736801 }
737802
738- if ( ! this . connectionIsPerished ( connection ) ) {
803+ if ( ! this . destroyConnectionIfPerished ( connection ) ) {
739804 this [ kCheckedOut ] . add ( connection ) ;
740805 this . emit (
741806 ConnectionPool . CONNECTION_CHECKED_OUT ,
0 commit comments