@@ -10,6 +10,7 @@ import { promisify } from 'util';
1010import {
1111 AbstractCursor ,
1212 ChangeStream ,
13+ ChangeStreamDocument ,
1314 ChangeStreamOptions ,
1415 Collection ,
1516 CommandStartedEvent ,
@@ -1037,56 +1038,6 @@ describe('Change Streams', function () {
10371038 } ) ;
10381039
10391040 describe ( 'Change Stream Resume Error Tests' , function ( ) {
1040- describe ( 'TODO(NODE-4670): fix consecutive resumes unified tests' , function ( ) {
1041- let client : MongoClient ;
1042- let changeStream : ChangeStream ;
1043-
1044- beforeEach ( async function ( ) {
1045- client = this . configuration . newClient ( ) ;
1046- await client . connect ( ) ;
1047- } ) ;
1048-
1049- afterEach ( async function ( ) {
1050- await changeStream . close ( ) ;
1051- await client . close ( ) ;
1052- } ) ;
1053-
1054- it ( 'should support consecutive resumes' , {
1055- metadata : { requires : { topology : 'replicaset' , mongodb : '>=4.2' } } ,
1056- async test ( ) {
1057- const failCommand : FailPoint = {
1058- configureFailPoint : 'failCommand' ,
1059- mode : {
1060- times : 2
1061- } ,
1062- data : {
1063- failCommands : [ 'getMore' ] ,
1064- closeConnection : true
1065- }
1066- } ;
1067-
1068- await client . db ( 'admin' ) . command ( failCommand ) ;
1069-
1070- const collection = client . db ( 'test_consecutive_resume' ) . collection ( 'collection' ) ;
1071-
1072- changeStream = collection . watch ( [ ] , { batchSize : 1 } ) ;
1073-
1074- await initIteratorMode ( changeStream ) ;
1075-
1076- await collection . insertOne ( { name : 'bumpy' } ) ;
1077- await collection . insertOne ( { name : 'bumpy' } ) ;
1078- await collection . insertOne ( { name : 'bumpy' } ) ;
1079-
1080- await sleep ( 1000 ) ;
1081-
1082- for ( let i = 0 ; i < 3 ; ++ i ) {
1083- const change = await changeStream . next ( ) ;
1084- expect ( change ) . not . to . be . null ;
1085- }
1086- }
1087- } ) ;
1088- } ) ;
1089-
10901041 it . skip ( 'should continue piping changes after a resumable error' , {
10911042 metadata : { requires : { topology : 'replicaset' } } ,
10921043 test : done => {
@@ -1767,7 +1718,44 @@ describe('ChangeStream resumability', function () {
17671718 expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
17681719 }
17691720 ) ;
1721+
1722+ it (
1723+ `supports consecutive resumes on error code ${ code } ${ error } ` ,
1724+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
1725+ async function ( ) {
1726+ changeStream = collection . watch ( [ ] ) ;
1727+ await initIteratorMode ( changeStream ) ;
1728+
1729+ await client . db ( 'admin' ) . command ( {
1730+ configureFailPoint : is4_2Server ( this . configuration . version )
1731+ ? 'failCommand'
1732+ : 'failGetMoreAfterCursorCheckout' ,
1733+ mode : { times : 5 } ,
1734+ data : {
1735+ failCommands : [ 'getMore' ] ,
1736+ errorCode : code ,
1737+ errmsg : message
1738+ }
1739+ } as FailPoint ) ;
1740+
1741+ // There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when
1742+ // resuming a change stream don't return the change event. So we defer the insert until a period of time
1743+ // after the change stream has started listening for a change. 2000ms is long enough for the change
1744+ // stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding.
1745+ const [ , value ] = await Promise . allSettled ( [
1746+ sleep ( 2000 ) . then ( ( ) => collection . insertOne ( { name : 'bailey' } ) ) ,
1747+ changeStream . next ( )
1748+ ] ) ;
1749+
1750+ const change = ( value as PromiseFulfilledResult < ChangeStreamDocument > ) . value ;
1751+
1752+ expect ( change ) . to . have . property ( 'operationType' , 'insert' ) ;
1753+
1754+ expect ( aggregateEvents ) . to . have . lengthOf ( 6 ) ;
1755+ }
1756+ ) ;
17701757 }
1758+
17711759 for ( const { error, code, message } of resumableErrorCodes ) {
17721760 it (
17731761 `resumes on error code ${ code } (${ error } )` ,
@@ -1896,6 +1884,42 @@ describe('ChangeStream resumability', function () {
18961884 expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
18971885 }
18981886 ) ;
1887+
1888+ it (
1889+ `supports consecutive resumes on error code ${ code } ${ error } ` ,
1890+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
1891+ async function ( ) {
1892+ changeStream = collection . watch ( [ ] ) ;
1893+ await initIteratorMode ( changeStream ) ;
1894+
1895+ await client . db ( 'admin' ) . command ( {
1896+ configureFailPoint : is4_2Server ( this . configuration . version )
1897+ ? 'failCommand'
1898+ : 'failGetMoreAfterCursorCheckout' ,
1899+ mode : { times : 5 } ,
1900+ data : {
1901+ failCommands : [ 'getMore' ] ,
1902+ errorCode : code ,
1903+ errmsg : message
1904+ }
1905+ } as FailPoint ) ;
1906+
1907+ // There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when
1908+ // resuming a change stream don't return the change event. So we defer the insert until a period of time
1909+ // after the change stream has started listening for a change. 2000ms is long enough for the change
1910+ // stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding.
1911+ const [ , value ] = await Promise . allSettled ( [
1912+ sleep ( 2000 ) . then ( ( ) => collection . insertOne ( { name : 'bailey' } ) ) ,
1913+ changeStream . hasNext ( )
1914+ ] ) ;
1915+
1916+ const change = ( value as PromiseFulfilledResult < boolean > ) . value ;
1917+
1918+ expect ( change ) . to . be . true ;
1919+
1920+ expect ( aggregateEvents ) . to . have . lengthOf ( 6 ) ;
1921+ }
1922+ ) ;
18991923 }
19001924
19011925 for ( const { error, code, message } of resumableErrorCodes ) {
@@ -2033,6 +2057,42 @@ describe('ChangeStream resumability', function () {
20332057 expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
20342058 }
20352059 ) ;
2060+
2061+ it (
2062+ `supports consecutive resumes on error code ${ code } ${ error } ` ,
2063+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
2064+ async function ( ) {
2065+ changeStream = collection . watch ( [ ] ) ;
2066+ await initIteratorMode ( changeStream ) ;
2067+
2068+ await client . db ( 'admin' ) . command ( {
2069+ configureFailPoint : is4_2Server ( this . configuration . version )
2070+ ? 'failCommand'
2071+ : 'failGetMoreAfterCursorCheckout' ,
2072+ mode : { times : 5 } ,
2073+ data : {
2074+ failCommands : [ 'getMore' ] ,
2075+ errorCode : code ,
2076+ errmsg : message
2077+ }
2078+ } as FailPoint ) ;
2079+
2080+ try {
2081+ // tryNext is not blocking and on sharded clusters we don't have control of when
2082+ // the actual change event will be ready on the change stream pipeline. This introduces
2083+ // a race condition, where sometimes we receive the change event and sometimes
2084+ // we don't when we call tryNext, depending on the timing of the sharded cluster.
2085+
2086+ // Since we really only care about the resumability, it's enough for this test to throw
2087+ // if tryNext ever throws and assert on the number of aggregate events.
2088+ await changeStream . tryNext ( ) ;
2089+ } catch ( err ) {
2090+ expect . fail ( `expected tryNext to resume, received error instead: ${ err } ` ) ;
2091+ }
2092+
2093+ expect ( aggregateEvents ) . to . have . lengthOf ( 6 ) ;
2094+ }
2095+ ) ;
20362096 }
20372097
20382098 for ( const { error, code, message } of resumableErrorCodes ) {
@@ -2171,6 +2231,43 @@ describe('ChangeStream resumability', function () {
21712231 expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
21722232 }
21732233 ) ;
2234+
2235+ it (
2236+ `supports consecutive resumes on error code ${ code } (${ error } )` ,
2237+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
2238+ async function ( ) {
2239+ changeStream = collection . watch ( [ ] ) ;
2240+
2241+ await client . db ( 'admin' ) . command ( {
2242+ configureFailPoint : is4_2Server ( this . configuration . version )
2243+ ? 'failCommand'
2244+ : 'failGetMoreAfterCursorCheckout' ,
2245+ mode : { times : 5 } ,
2246+ data : {
2247+ failCommands : [ 'getMore' ] ,
2248+ errorCode : code ,
2249+ errmsg : message
2250+ }
2251+ } as FailPoint ) ;
2252+
2253+ const changes = once ( changeStream , 'change' ) ;
2254+ await once ( changeStream . cursor , 'init' ) ;
2255+
2256+ // There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when
2257+ // resuming a change stream don't return the change event. So we defer the insert until a period of time
2258+ // after the change stream has started listening for a change. 2000ms is long enough for the change
2259+ // stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding.
2260+ const [ , value ] = await Promise . allSettled ( [
2261+ sleep ( 2000 ) . then ( ( ) => collection . insertOne ( { name : 'bailey' } ) ) ,
2262+ changes
2263+ ] ) ;
2264+
2265+ const [ change ] = ( value as PromiseFulfilledResult < ChangeStreamDocument [ ] > ) . value ;
2266+ expect ( change ) . to . have . property ( 'operationType' , 'insert' ) ;
2267+
2268+ expect ( aggregateEvents ) . to . have . lengthOf ( 6 ) ;
2269+ }
2270+ ) ;
21742271 }
21752272
21762273 it (
0 commit comments