@@ -8,6 +8,7 @@ const Connection = require('../../cmap/connection').Connection;
88const common = require ( './common' ) ;
99const makeStateMachine = require ( '../utils' ) . makeStateMachine ;
1010const MongoError = require ( '../error' ) . MongoError ;
11+ const makeInterruptableAsyncInterval = require ( '../../utils' ) . makeInterruptableAsyncInterval ;
1112
1213const sdamEvents = require ( './events' ) ;
1314const ServerHeartbeatStartedEvent = sdamEvents . ServerHeartbeatStartedEvent ;
@@ -18,7 +19,6 @@ const kServer = Symbol('server');
1819const kMonitorId = Symbol ( 'monitorId' ) ;
1920const kConnection = Symbol ( 'connection' ) ;
2021const kCancellationToken = Symbol ( 'cancellationToken' ) ;
21- const kLastCheckTime = Symbol ( 'lastCheckTime' ) ;
2222
2323const STATE_CLOSED = common . STATE_CLOSED ;
2424const STATE_CLOSING = common . STATE_CLOSING ;
@@ -33,6 +33,10 @@ const stateTransition = makeStateMachine({
3333
3434const INVALID_REQUEST_CHECK_STATES = new Set ( [ STATE_CLOSING , STATE_CLOSED , STATE_MONITORING ] ) ;
3535
36+ function isInCloseState ( monitor ) {
37+ return monitor . s . state === STATE_CLOSED || monitor . s . state === STATE_CLOSING ;
38+ }
39+
3640class Monitor extends EventEmitter {
3741 constructor ( server , options ) {
3842 super ( options ) ;
@@ -41,6 +45,7 @@ class Monitor extends EventEmitter {
4145 this [ kConnection ] = undefined ;
4246 this [ kCancellationToken ] = new EventEmitter ( ) ;
4347 this [ kCancellationToken ] . setMaxListeners ( Infinity ) ;
48+ this [ kMonitorId ] = null ;
4449 this . s = {
4550 state : STATE_CLOSED
4651 } ;
@@ -89,39 +94,34 @@ class Monitor extends EventEmitter {
8994 return ;
9095 }
9196
92- monitorServer ( this ) ;
97+ // start
98+ const heartbeatFrequencyMS = this . options . heartbeatFrequencyMS ;
99+ const minHeartbeatFrequencyMS = this . options . minHeartbeatFrequencyMS ;
100+ this [ kMonitorId ] = makeInterruptableAsyncInterval ( monitorServer ( this ) , {
101+ interval : heartbeatFrequencyMS ,
102+ minInterval : minHeartbeatFrequencyMS ,
103+ immediate : true
104+ } ) ;
93105 }
94106
95107 requestCheck ( ) {
96108 if ( INVALID_REQUEST_CHECK_STATES . has ( this . s . state ) ) {
97109 return ;
98110 }
99111
100- const heartbeatFrequencyMS = this . options . heartbeatFrequencyMS ;
101- const minHeartbeatFrequencyMS = this . options . minHeartbeatFrequencyMS ;
102- const remainingTime = heartbeatFrequencyMS - calculateDurationInMs ( this [ kLastCheckTime ] ) ;
103- if ( remainingTime > minHeartbeatFrequencyMS && this [ kMonitorId ] ) {
104- clearTimeout ( this [ kMonitorId ] ) ;
105- rescheduleMonitoring ( this , minHeartbeatFrequencyMS ) ;
106- return ;
107- }
108-
109- if ( this [ kMonitorId ] ) {
110- clearTimeout ( this [ kMonitorId ] ) ;
111- }
112-
113- monitorServer ( this ) ;
112+ this [ kMonitorId ] . wake ( ) ;
114113 }
115114
116115 close ( ) {
117- if ( this . s . state === STATE_CLOSED || this . s . state === STATE_CLOSING ) {
116+ if ( isInCloseState ( this ) ) {
118117 return ;
119118 }
120119
121120 stateTransition ( this , STATE_CLOSING ) ;
122121 this [ kCancellationToken ] . emit ( 'cancel' ) ;
123122 if ( this [ kMonitorId ] ) {
124- clearTimeout ( this [ kMonitorId ] ) ;
123+ this [ kMonitorId ] . stop ( ) ;
124+ this [ kMonitorId ] = null ;
125125 }
126126
127127 if ( this [ kConnection ] ) {
@@ -186,7 +186,7 @@ function checkServer(monitor, callback) {
186186 return ;
187187 }
188188
189- if ( monitor . s . state === STATE_CLOSING || monitor . s . state === STATE_CLOSED ) {
189+ if ( isInCloseState ( monitor ) ) {
190190 conn . destroy ( { force : true } ) ;
191191 failureHandler ( new MongoError ( 'monitor was destroyed' ) ) ;
192192 return ;
@@ -198,52 +198,44 @@ function checkServer(monitor, callback) {
198198}
199199
200200function monitorServer ( monitor ) {
201- stateTransition ( monitor , STATE_MONITORING ) ;
202-
203- // TODO: the next line is a legacy event, remove in v4
204- process . nextTick ( ( ) => monitor . emit ( 'monitoring' , monitor [ kServer ] ) ) ;
201+ return callback => {
202+ stateTransition ( monitor , STATE_MONITORING ) ;
203+ function done ( ) {
204+ if ( ! isInCloseState ( monitor ) ) {
205+ stateTransition ( monitor , STATE_IDLE ) ;
206+ }
205207
206- checkServer ( monitor , e0 => {
207- if ( e0 == null ) {
208- rescheduleMonitoring ( monitor ) ;
209- return ;
208+ callback ( ) ;
210209 }
211210
212- // otherwise an error occured on initial discovery, also bail
213- if ( monitor [ kServer ] . description . type === ServerType . Unknown ) {
214- monitor . emit ( 'resetServer' , e0 ) ;
215- rescheduleMonitoring ( monitor ) ;
216- return ;
217- }
211+ // TODO: the next line is a legacy event, remove in v4
212+ process . nextTick ( ( ) => monitor . emit ( 'monitoring' , monitor [ kServer ] ) ) ;
218213
219- // According to the SDAM specification's "Network error during server check" section, if
220- // an ismaster call fails we reset the server's pool. If a server was once connected,
221- // change its type to `Unknown` only after retrying once.
222- monitor . emit ( 'resetConnectionPool' ) ;
223-
224- checkServer ( monitor , e1 => {
225- if ( e1 ) {
226- monitor . emit ( 'resetServer' , e1 ) ;
214+ checkServer ( monitor , e0 => {
215+ if ( e0 == null ) {
216+ return done ( ) ;
227217 }
228218
229- rescheduleMonitoring ( monitor ) ;
230- } ) ;
231- } ) ;
232- }
219+ // otherwise an error occured on initial discovery, also bail
220+ if ( monitor [ kServer ] . description . type === ServerType . Unknown ) {
221+ monitor . emit ( 'resetServer' , e0 ) ;
222+ return done ( ) ;
223+ }
233224
234- function rescheduleMonitoring ( monitor , ms ) {
235- const heartbeatFrequencyMS = monitor . options . heartbeatFrequencyMS ;
236- if ( monitor . s . state === STATE_CLOSING || monitor . s . state === STATE_CLOSED ) {
237- return ;
238- }
225+ // According to the SDAM specification's "Network error during server check" section, if
226+ // an ismaster call fails we reset the server's pool. If a server was once connected,
227+ // change its type to `Unknown` only after retrying once.
228+ monitor . emit ( 'resetConnectionPool' ) ;
239229
240- stateTransition ( monitor , STATE_IDLE ) ;
230+ checkServer ( monitor , e1 => {
231+ if ( e1 ) {
232+ monitor . emit ( 'resetServer' , e1 ) ;
233+ }
241234
242- monitor [ kLastCheckTime ] = process . hrtime ( ) ;
243- monitor [ kMonitorId ] = setTimeout ( ( ) => {
244- monitor [ kMonitorId ] = undefined ;
245- monitor . requestCheck ( ) ;
246- } , ms || heartbeatFrequencyMS ) ;
235+ done ( ) ;
236+ } ) ;
237+ } ) ;
238+ } ;
247239}
248240
249241module . exports = {
0 commit comments