@@ -6,6 +6,7 @@ const MongoError = require('./core').MongoError;
66const Cursor = require ( './cursor' ) ;
77const relayEvents = require ( './core/utils' ) . relayEvents ;
88const maxWireVersion = require ( './core/utils' ) . maxWireVersion ;
9+ const maybePromise = require ( './utils' ) . maybePromise ;
910const AggregateOperation = require ( './operations/aggregate' ) ;
1011
1112const CHANGE_STREAM_OPTIONS = [ 'resumeAfter' , 'startAfter' , 'startAtOperationTime' , 'fullDocument' ] ;
@@ -124,30 +125,28 @@ class ChangeStream extends EventEmitter {
124125 * @function ChangeStream.prototype.hasNext
125126 * @param {ChangeStream~resultCallback } [callback] The result callback.
126127 * @throws {MongoError }
127- * @return {Promise } returns Promise if no callback passed
128+ * @returns {Promise|void } returns Promise if no callback passed
128129 */
129130 hasNext ( callback ) {
130- return this . cursor . hasNext ( callback ) ;
131+ return maybePromise ( this . parent , callback , cb => this . cursor . hasNext ( cb ) ) ;
131132 }
132133
133134 /**
134135 * Get the next available document from the Change Stream, returns null if no more documents are available.
135136 * @function ChangeStream.prototype.next
136137 * @param {ChangeStream~resultCallback } [callback] The result callback.
137138 * @throws {MongoError }
138- * @return {Promise } returns Promise if no callback passed
139+ * @returns {Promise|void } returns Promise if no callback passed
139140 */
140141 next ( callback ) {
141- var self = this ;
142- if ( this . isClosed ( ) ) {
143- if ( callback ) return callback ( new Error ( 'Change Stream is not open.' ) , null ) ;
144- return self . promiseLibrary . reject ( new Error ( 'Change Stream is not open.' ) ) ;
145- }
146-
147- return this . cursor
148- . next ( )
149- . then ( change => processNewChange ( { changeStream : self , change, callback } ) )
150- . catch ( error => processNewChange ( { changeStream : self , error, callback } ) ) ;
142+ return maybePromise ( this . parent , callback , cb => {
143+ if ( this . isClosed ( ) ) {
144+ return cb ( new Error ( 'Change Stream is not open.' ) ) ;
145+ }
146+ this . cursor . next ( ( error , change ) => {
147+ processNewChange ( { changeStream : this , error, change, callback : cb } ) ;
148+ } ) ;
149+ } ) ;
151150 }
152151
153152 /**
0 commit comments