@@ -34,6 +34,8 @@ const kResumeQueue = Symbol('resumeQueue');
3434const kCursorStream = Symbol ( 'cursorStream' ) ;
3535/** @internal */
3636const kClosed = Symbol ( 'closed' ) ;
37+ /** @internal */
38+ const kMode = Symbol ( 'mode' ) ;
3739
3840const CHANGE_STREAM_OPTIONS = [ 'resumeAfter' , 'startAfter' , 'startAtOperationTime' , 'fullDocument' ] ;
3941const CURSOR_OPTIONS = [ 'batchSize' , 'maxAwaitTimeMS' , 'collation' , 'readPreference' ] . concat (
@@ -206,6 +208,8 @@ export class ChangeStream<TSchema extends Document = Document> extends TypedEven
206208 [ kCursorStream ] ?: Readable ;
207209 /** @internal */
208210 [ kClosed ] : boolean ;
211+ /** @internal */
212+ [ kMode ] : false | 'iterator' | 'emitter' ;
209213
210214 /** @event */
211215 static readonly RESPONSE = 'response' as const ;
@@ -272,6 +276,7 @@ export class ChangeStream<TSchema extends Document = Document> extends TypedEven
272276 this . cursor = createChangeStreamCursor ( this , options ) ;
273277
274278 this [ kClosed ] = false ;
279+ this [ kMode ] = false ;
275280
276281 // Listen for any `change` listeners being added to ChangeStream
277282 this . on ( 'newListener' , eventName => {
@@ -299,6 +304,7 @@ export class ChangeStream<TSchema extends Document = Document> extends TypedEven
299304
300305 /** Check if there is any document still available in the Change Stream */
301306 hasNext ( callback ?: Callback ) : Promise < void > | void {
307+ setIsIterator ( this ) ;
302308 return maybePromise ( callback , cb => {
303309 getCursor ( this , ( err , cursor ) => {
304310 if ( err || ! cursor ) return cb ( err ) ; // failed to resume, raise an error
@@ -313,6 +319,7 @@ export class ChangeStream<TSchema extends Document = Document> extends TypedEven
313319 next (
314320 callback ?: Callback < ChangeStreamDocument < TSchema > >
315321 ) : Promise < ChangeStreamDocument < TSchema > > | void {
322+ setIsIterator ( this ) ;
316323 return maybePromise ( callback , cb => {
317324 getCursor ( this , ( err , cursor ) => {
318325 if ( err || ! cursor ) return cb ( err ) ; // failed to resume, raise an error
@@ -367,6 +374,7 @@ export class ChangeStream<TSchema extends Document = Document> extends TypedEven
367374 tryNext ( ) : Promise < Document | null > ;
368375 tryNext ( callback : Callback < Document | null > ) : void ;
369376 tryNext ( callback ?: Callback < Document | null > ) : Promise < Document | null > | void {
377+ setIsIterator ( this ) ;
370378 return maybePromise ( callback , cb => {
371379 getCursor ( this , ( err , cursor ) => {
372380 if ( err || ! cursor ) return cb ( err ) ; // failed to resume, raise an error
@@ -535,6 +543,23 @@ const CHANGE_STREAM_EVENTS = [
535543 ChangeStream . CLOSE
536544] ;
537545
546+ function setIsEmitter < TSchema > ( changeStream : ChangeStream < TSchema > ) : void {
547+ if ( changeStream [ kMode ] === 'iterator' ) {
548+ throw new MongoDriverError (
549+ 'Cannot use ChangeStream as an EventEmitter after using as an iterator'
550+ ) ;
551+ }
552+ changeStream [ kMode ] = 'emitter' ;
553+ }
554+
555+ function setIsIterator < TSchema > ( changeStream : ChangeStream < TSchema > ) : void {
556+ if ( changeStream [ kMode ] === 'emitter' ) {
557+ throw new MongoDriverError (
558+ 'Cannot use ChangeStream as iterator after using as an EventEmitter'
559+ ) ;
560+ }
561+ changeStream [ kMode ] = 'iterator' ;
562+ }
538563/**
539564 * Create a new change stream cursor based on self's configuration
540565 * @internal
@@ -630,6 +655,7 @@ function streamEvents<TSchema>(
630655 changeStream : ChangeStream < TSchema > ,
631656 cursor : ChangeStreamCursor < TSchema >
632657) : void {
658+ setIsEmitter ( changeStream ) ;
633659 const stream = changeStream [ kCursorStream ] || cursor . stream ( ) ;
634660 changeStream [ kCursorStream ] = stream ;
635661 stream . on ( 'data' , change => processNewChange ( changeStream , change ) ) ;
0 commit comments