@@ -94,6 +94,8 @@ class ChangeStream extends EventEmitter {
9494 // Create contained Change Stream cursor
9595 this . cursor = createChangeStreamCursor ( this , options ) ;
9696
97+ this . closed = false ;
98+
9799 // Listen for any `change` listeners being added to ChangeStream
98100 this . on ( 'newListener' , eventName => {
99101 if ( eventName === 'change' && this . cursor && this . listenerCount ( 'change' ) === 0 ) {
@@ -141,7 +143,7 @@ class ChangeStream extends EventEmitter {
141143 next ( callback ) {
142144 return maybePromise ( this . parent , callback , cb => {
143145 if ( this . isClosed ( ) ) {
144- return cb ( new Error ( 'Change Stream is not open. ') ) ;
146+ return cb ( new MongoError ( 'ChangeStream is closed ') ) ;
145147 }
146148 this . cursor . next ( ( error , change ) => {
147149 processNewChange ( { changeStream : this , error, change, callback : cb } ) ;
@@ -155,10 +157,7 @@ class ChangeStream extends EventEmitter {
155157 * @return {boolean }
156158 */
157159 isClosed ( ) {
158- if ( this . cursor ) {
159- return this . cursor . isClosed ( ) ;
160- }
161- return true ;
160+ return this . closed || ( this . cursor && this . cursor . isClosed ( ) ) ;
162161 }
163162
164163 /**
@@ -168,31 +167,20 @@ class ChangeStream extends EventEmitter {
168167 * @return {Promise } returns Promise if no callback passed
169168 */
170169 close ( callback ) {
171- if ( ! this . cursor ) {
172- if ( callback ) return callback ( ) ;
173- return this . promiseLibrary . resolve ( ) ;
174- }
170+ return maybePromise ( this . parent , callback , cb => {
171+ if ( this . closed ) return cb ( ) ;
175172
176- // Tidy up the existing cursor
177- const cursor = this . cursor ;
173+ // flag the change stream as explicitly closed
174+ this . closed = true ;
178175
179- if ( callback ) {
180- return cursor . close ( err => {
181- [ 'data' , 'close' , 'end' , 'error' ] . forEach ( event => cursor . removeAllListeners ( event ) ) ;
182- delete this . cursor ;
176+ // Tidy up the existing cursor
177+ const cursor = this . cursor ;
183178
184- return callback ( err ) ;
185- } ) ;
186- }
187-
188- const PromiseCtor = this . promiseLibrary || Promise ;
189- return new PromiseCtor ( ( resolve , reject ) => {
190- cursor . close ( err => {
179+ return cursor . close ( err => {
191180 [ 'data' , 'close' , 'end' , 'error' ] . forEach ( event => cursor . removeAllListeners ( event ) ) ;
192- delete this . cursor ;
181+ this . cursor = undefined ;
193182
194- if ( err ) return reject ( err ) ;
195- resolve ( ) ;
183+ return cb ( err ) ;
196184 } ) ;
197185 } ) ;
198186 }
@@ -321,7 +309,7 @@ class ChangeStreamCursor extends Cursor {
321309 _initializeCursor ( callback ) {
322310 super . _initializeCursor ( ( err , result ) => {
323311 if ( err ) {
324- callback ( err , null ) ;
312+ callback ( err ) ;
325313 return ;
326314 }
327315
@@ -347,7 +335,7 @@ class ChangeStreamCursor extends Cursor {
347335 _getMore ( callback ) {
348336 super . _getMore ( ( err , response ) => {
349337 if ( err ) {
350- callback ( err , null ) ;
338+ callback ( err ) ;
351339 return ;
352340 }
353341
@@ -460,12 +448,12 @@ function waitForTopologyConnected(topology, options, callback) {
460448 const timeout = options . timeout || SELECTION_TIMEOUT ;
461449 const readPreference = options . readPreference ;
462450
463- if ( topology . isConnected ( { readPreference } ) ) return callback ( null , null ) ;
451+ if ( topology . isConnected ( { readPreference } ) ) return callback ( ) ;
464452 const hrElapsed = process . hrtime ( start ) ;
465453 const elapsed = ( hrElapsed [ 0 ] * 1e9 + hrElapsed [ 1 ] ) / 1e6 ;
466454 if ( elapsed > timeout ) return callback ( new MongoError ( 'Timed out waiting for connection' ) ) ;
467455 waitForTopologyConnected ( topology , options , callback ) ;
468- } , 3000 ) ; // this is an arbitrary wait time to allow SDAM to transition
456+ } , 500 ) ; // this is an arbitrary wait time to allow SDAM to transition
469457}
470458
471459// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
@@ -477,17 +465,15 @@ function processNewChange(args) {
477465 const eventEmitter = args . eventEmitter || false ;
478466 const cursor = changeStream . cursor ;
479467
480- // If the cursor is null, then it should not process a change.
481- if ( cursor == null ) {
468+ // If the cursor is null or the change stream has been closed explictly, do not process a change.
469+ if ( cursor == null || changeStream . closed ) {
482470 // We do not error in the eventEmitter case.
471+ changeStream . closed = true ;
483472 if ( eventEmitter ) {
484473 return ;
485474 }
486-
487- const error = new MongoError ( 'ChangeStream is closed' ) ;
488- return typeof callback === 'function'
489- ? callback ( error , null )
490- : changeStream . promiseLibrary . reject ( error ) ;
475+ callback ( new MongoError ( 'ChangeStream is closed' ) ) ;
476+ return ;
491477 }
492478
493479 const topology = changeStream . topology ;
@@ -506,46 +492,27 @@ function processNewChange(args) {
506492 // close internal cursor, ignore errors
507493 changeStream . cursor . close ( ) ;
508494
509- // attempt recreating the cursor
510- if ( eventEmitter ) {
511- waitForTopologyConnected ( topology , { readPreference : options . readPreference } , err => {
512- if ( err ) {
495+ waitForTopologyConnected ( topology , { readPreference : options . readPreference } , err => {
496+ if ( err ) {
497+ // if there's an error reconnecting, close the change stream
498+ changeStream . closed = true ;
499+ if ( eventEmitter ) {
513500 changeStream . emit ( 'error' , err ) ;
514501 changeStream . emit ( 'close' ) ;
515502 return ;
516503 }
517- changeStream . cursor = createChangeStreamCursor ( changeStream , cursor . resumeOptions ) ;
518- } ) ;
504+ return callback ( err ) ;
505+ }
519506
520- return ;
521- }
522-
523- if ( callback ) {
524- waitForTopologyConnected ( topology , { readPreference : options . readPreference } , err => {
525- if ( err ) return callback ( err , null ) ;
526-
527- changeStream . cursor = createChangeStreamCursor ( changeStream , cursor . resumeOptions ) ;
528- changeStream . next ( callback ) ;
529- } ) ;
530-
531- return ;
532- }
533-
534- return new Promise ( ( resolve , reject ) => {
535- waitForTopologyConnected ( topology , { readPreference : options . readPreference } , err => {
536- if ( err ) return reject ( err ) ;
537- resolve ( ) ;
538- } ) ;
539- } )
540- . then (
541- ( ) => ( changeStream . cursor = createChangeStreamCursor ( changeStream , cursor . resumeOptions ) )
542- )
543- . then ( ( ) => changeStream . next ( ) ) ;
507+ changeStream . cursor = createChangeStreamCursor ( changeStream , cursor . resumeOptions ) ;
508+ if ( eventEmitter ) return ;
509+ changeStream . next ( callback ) ;
510+ } ) ;
511+ return ;
544512 }
545513
546514 if ( eventEmitter ) return changeStream . emit ( 'error' , error ) ;
547- if ( typeof callback === 'function' ) return callback ( error , null ) ;
548- return changeStream . promiseLibrary . reject ( error ) ;
515+ return callback ( error ) ;
549516 }
550517
551518 changeStream . attemptingResume = false ;
@@ -556,8 +523,7 @@ function processNewChange(args) {
556523 ) ;
557524
558525 if ( eventEmitter ) return changeStream . emit ( 'error' , noResumeTokenError ) ;
559- if ( typeof callback === 'function' ) return callback ( noResumeTokenError , null ) ;
560- return changeStream . promiseLibrary . reject ( noResumeTokenError ) ;
526+ return callback ( noResumeTokenError ) ;
561527 }
562528
563529 // cache the resume token
@@ -569,8 +535,7 @@ function processNewChange(args) {
569535
570536 // Return the change
571537 if ( eventEmitter ) return changeStream . emit ( 'change' , change ) ;
572- if ( typeof callback === 'function' ) return callback ( error , change ) ;
573- return changeStream . promiseLibrary . resolve ( change ) ;
538+ return callback ( error , change ) ;
574539}
575540
576541/**
0 commit comments