From c85cb291c13cc148b56cf968d88a09fbc5cc8171 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Wed, 14 Feb 2024 10:25:44 -0500 Subject: [PATCH 1/8] allow streaming hello to be disabled --- src/cmap.rs | 4 ++++ src/cmap/establish/handshake.rs | 4 ++-- src/sdam/monitor.rs | 40 ++++++++++++++++++++++++--------- 3 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/cmap.rs b/src/cmap.rs index c3f981352..4d8e6c9e3 100644 --- a/src/cmap.rs +++ b/src/cmap.rs @@ -184,3 +184,7 @@ impl ConnectionPool { self.manager.broadcast(msg) } } + +pub(crate) fn is_faas() -> bool { + establish::handshake::FaasEnvironmentName::new().is_some() +} diff --git a/src/cmap/establish/handshake.rs b/src/cmap/establish/handshake.rs index e58d50d4d..f5aedc3a5 100644 --- a/src/cmap/establish/handshake.rs +++ b/src/cmap/establish/handshake.rs @@ -67,7 +67,7 @@ struct RuntimeEnvironment { } #[derive(Copy, Clone, Debug, PartialEq)] -enum FaasEnvironmentName { +pub(crate) enum FaasEnvironmentName { AwsLambda, AzureFunc, GcpFunc, @@ -221,7 +221,7 @@ fn var_set(name: &str) -> bool { } impl FaasEnvironmentName { - fn new() -> Option { + pub(crate) fn new() -> Option { use FaasEnvironmentName::*; let mut found: Option = None; let lambda_env = env::var_os("AWS_EXECUTION_ENV") diff --git a/src/sdam/monitor.rs b/src/sdam/monitor.rs index cb5db9d14..306c05a8d 100644 --- a/src/sdam/monitor.rs +++ b/src/sdam/monitor.rs @@ -48,9 +48,10 @@ pub(crate) struct Monitor { sdam_event_emitter: Option, client_options: ClientOptions, + /// Whether this monitor is allowed to use the streaming protocol. + allow_streaming: bool, + /// The most recent topology version returned by the server in a hello response. - /// If some, indicates that this monitor should use the streaming protocol. If none, it should - /// use the polling protocol. topology_version: Option, /// Handle to the RTT monitor, used to get the latest known round trip time for a given server @@ -63,6 +64,14 @@ pub(crate) struct Monitor { request_receiver: MonitorRequestReceiver, } +// TODO: put this in client options +#[non_exhaustive] +enum ServerMonitoringMode { + Stream, + Poll, + Auto, +} + impl Monitor { pub(crate) fn start( address: ServerAddress, @@ -79,6 +88,12 @@ impl Monitor { connection_establisher.clone(), client_options.clone(), ); + let monitoring_mode = ServerMonitoringMode::Auto; // TODO + let streaming = match monitoring_mode { + ServerMonitoringMode::Stream => true, + ServerMonitoringMode::Poll => false, + ServerMonitoringMode::Auto => !crate::cmap::is_faas(), + }; let monitor = Self { address, client_options, @@ -89,6 +104,7 @@ impl Monitor { rtt_monitor_handle, request_receiver: manager_receiver, connection: None, + allow_streaming: streaming, topology_version: None, }; @@ -108,7 +124,7 @@ impl Monitor { // // We only go to sleep when using the polling protocol (i.e. server never returned a // topologyVersion) or when the most recent check failed. - if self.topology_version.is_none() || !check_succeeded { + if self.topology_version.is_none() || !check_succeeded || !self.allow_streaming { self.request_receiver .wait_for_check_request( self.client_options.min_heartbeat_frequency(), @@ -180,7 +196,7 @@ impl Monitor { self.emit_event(|| { SdamEvent::ServerHeartbeatStarted(ServerHeartbeatStartedEvent { server_address: self.address.clone(), - awaited: self.topology_version.is_some(), + awaited: self.topology_version.is_some() && self.allow_streaming, driver_connection_id, server_connection_id: self.connection.as_ref().and_then(|c| c.server_id), }) @@ -213,10 +229,14 @@ impl Monitor { } else { // If the initial handshake returned a topology version, send it back to the // server to begin streaming responses. - let opts = self.topology_version.map(|tv| AwaitableHelloOptions { - topology_version: tv, - max_await_time: heartbeat_frequency, - }); + let opts = if self.allow_streaming { + self.topology_version.map(|tv| AwaitableHelloOptions { + topology_version: tv, + max_await_time: heartbeat_frequency, + }) + } else { + None + }; let command = hello_command( self.client_options.server_api.as_ref(), @@ -280,7 +300,7 @@ impl Monitor { duration, reply, server_address: self.address.clone(), - awaited: self.topology_version.is_some(), + awaited: self.topology_version.is_some() && self.allow_streaming, driver_connection_id, server_connection_id: self.connection.as_ref().and_then(|c| c.server_id), }) @@ -296,7 +316,7 @@ impl Monitor { duration, failure: e.clone(), server_address: self.address.clone(), - awaited: self.topology_version.is_some(), + awaited: self.topology_version.is_some() && self.allow_streaming, driver_connection_id, server_connection_id: self.connection.as_ref().and_then(|c| c.server_id), }) From dde79b2be6b67a467743b6aabb84957f7b615d30 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Wed, 14 Feb 2024 11:00:21 -0500 Subject: [PATCH 2/8] disable dedicated rtt monitor when streaming is disabled --- src/sdam/monitor.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/sdam/monitor.rs b/src/sdam/monitor.rs index 306c05a8d..36b659043 100644 --- a/src/sdam/monitor.rs +++ b/src/sdam/monitor.rs @@ -89,7 +89,7 @@ impl Monitor { client_options.clone(), ); let monitoring_mode = ServerMonitoringMode::Auto; // TODO - let streaming = match monitoring_mode { + let allow_streaming = match monitoring_mode { ServerMonitoringMode::Stream => true, ServerMonitoringMode::Poll => false, ServerMonitoringMode::Auto => !crate::cmap::is_faas(), @@ -104,12 +104,14 @@ impl Monitor { rtt_monitor_handle, request_receiver: manager_receiver, connection: None, - allow_streaming: streaming, + allow_streaming, topology_version: None, }; runtime::execute(monitor.execute()); - runtime::execute(rtt_monitor.execute()); + if allow_streaming { + runtime::execute(rtt_monitor.execute()); + } } async fn execute(mut self) { @@ -285,6 +287,9 @@ impl Monitor { } }; let duration = start.elapsed(); + if !self.allow_streaming && matches!(result, HelloResult::Ok(_)) { + self.rtt_monitor_handle.add_sample(duration); + } match result { HelloResult::Ok(ref r) => { From 64c014006ab3782c32d7a60578eb873f02e1c1b6 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Wed, 14 Feb 2024 11:32:52 -0500 Subject: [PATCH 3/8] disable dedicated rtt monitor when streaming is unsupported --- src/sdam/monitor.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/sdam/monitor.rs b/src/sdam/monitor.rs index 36b659043..df913db94 100644 --- a/src/sdam/monitor.rs +++ b/src/sdam/monitor.rs @@ -54,6 +54,9 @@ pub(crate) struct Monitor { /// The most recent topology version returned by the server in a hello response. topology_version: Option, + /// The RTT monitor; once it's started this is None. + pending_rtt_monitor: Option, + /// Handle to the RTT monitor, used to get the latest known round trip time for a given server /// and to reset the RTT when the monitor disconnects from the server. rtt_monitor_handle: RttMonitorHandle, @@ -101,6 +104,7 @@ impl Monitor { topology_updater, topology_watcher, sdam_event_emitter, + pending_rtt_monitor: Some(rtt_monitor), rtt_monitor_handle, request_receiver: manager_receiver, connection: None, @@ -109,9 +113,6 @@ impl Monitor { }; runtime::execute(monitor.execute()); - if allow_streaming { - runtime::execute(rtt_monitor.execute()); - } } async fn execute(mut self) { @@ -120,6 +121,12 @@ impl Monitor { while self.is_alive() { let check_succeeded = self.check_server().await; + if self.topology_version.is_some() && self.allow_streaming { + if let Some(rtt_monitor) = self.pending_rtt_monitor.take() { + runtime::execute(rtt_monitor.execute()); + } + } + // In the streaming protocol, we read from the socket continuously // rather than polling at specific intervals, unless the most recent check // failed. @@ -287,12 +294,13 @@ impl Monitor { } }; let duration = start.elapsed(); - if !self.allow_streaming && matches!(result, HelloResult::Ok(_)) { - self.rtt_monitor_handle.add_sample(duration); - } + let awaited = self.topology_version.is_some() && self.allow_streaming; match result { HelloResult::Ok(ref r) => { + if !awaited { + self.rtt_monitor_handle.add_sample(duration); + } self.emit_event(|| { let mut reply = r .raw_command_response @@ -305,7 +313,7 @@ impl Monitor { duration, reply, server_address: self.address.clone(), - awaited: self.topology_version.is_some() && self.allow_streaming, + awaited, driver_connection_id, server_connection_id: self.connection.as_ref().and_then(|c| c.server_id), }) @@ -321,7 +329,7 @@ impl Monitor { duration, failure: e.clone(), server_address: self.address.clone(), - awaited: self.topology_version.is_some() && self.allow_streaming, + awaited, driver_connection_id, server_connection_id: self.connection.as_ref().and_then(|c| c.server_id), }) From 96aa9ad19deb0665c2e701a2838c95e4eafffe1a Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Thu, 15 Feb 2024 10:31:29 -0500 Subject: [PATCH 4/8] expose server monitoring mode in options/uri --- src/client/options.rs | 40 ++++++++++++++++++++++++++++++++++++++++ src/sdam/monitor.rs | 16 ++++++---------- 2 files changed, 46 insertions(+), 10 deletions(-) diff --git a/src/client/options.rs b/src/client/options.rs index bc8cd5fc2..6f8921e86 100644 --- a/src/client/options.rs +++ b/src/client/options.rs @@ -68,6 +68,7 @@ const URI_OPTIONS: &[&str] = &[ "replicaset", "retrywrites", "retryreads", + "servermonitoringmode", "serverselectiontimeoutms", "sockettimeoutms", "tls", @@ -512,6 +513,12 @@ pub struct ClientOptions { #[builder(default)] pub retry_writes: Option, + /// Configures which server monitoring protocol to use. + /// + /// The default is [`Auto`](ServerMonitoringMode::Auto). + #[builder(default)] + pub server_monitoring_mode: Option, + /// The handler that should process all Server Discovery and Monitoring events. #[derivative(Debug = "ignore", PartialEq = "ignore")] #[builder(default, setter(strip_option))] @@ -844,6 +851,11 @@ pub struct ConnectionString { /// The default value is true. pub retry_writes: Option, + /// Configures which server monitoring protocol to use. + /// + /// The default is [`Auto`](ServerMonitoringMode::Auto). + pub server_monitoring_mode: Option, + /// Specifies whether the Client should directly connect to a single host rather than /// autodiscover all servers in the cluster. /// @@ -1340,6 +1352,7 @@ impl ClientOptions { connect_timeout: conn_str.connect_timeout, retry_reads: conn_str.retry_reads, retry_writes: conn_str.retry_writes, + server_monitoring_mode: conn_str.server_monitoring_mode, socket_timeout: conn_str.socket_timeout, direct_connection: conn_str.direct_connection, default_database: conn_str.default_database, @@ -2182,6 +2195,19 @@ impl ConnectionString { k @ "retryreads" => { self.retry_reads = Some(get_bool!(value, k)); } + "servermonitoringmode" => { + self.server_monitoring_mode = Some(match value.to_lowercase().as_str() { + "stream" => ServerMonitoringMode::Stream, + "poll" => ServerMonitoringMode::Poll, + "auto" => ServerMonitoringMode::Auto, + other => { + return Err(Error::invalid_argument(format!( + "{:?} is not a valid server monitoring mode", + other + ))); + } + }); + } k @ "serverselectiontimeoutms" => { self.server_selection_timeout = Some(Duration::from_millis(get_duration!(value, k))) } @@ -2875,3 +2901,17 @@ pub struct TransactionOptions { )] pub max_commit_time: Option, } + +/// Which server monitoring protocol to use. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[non_exhaustive] +pub enum ServerMonitoringMode { + /// The client will use the streaming protocol when the server supports it and fall back to the + /// polling protocol otherwise. + Stream, + /// The client will use the polling protocol. + Poll, + /// The client will use the polling protocol when running on a FaaS platform and behave the + /// same as `Stream` otherwise. + Auto, +} diff --git a/src/sdam/monitor.rs b/src/sdam/monitor.rs index df913db94..6ce3429ae 100644 --- a/src/sdam/monitor.rs +++ b/src/sdam/monitor.rs @@ -16,6 +16,7 @@ use super::{ TopologyWatcher, }; use crate::{ + client::options::ServerMonitoringMode, cmap::{establish::ConnectionEstablisher, Connection}, error::{Error, Result}, event::sdam::{ @@ -67,14 +68,6 @@ pub(crate) struct Monitor { request_receiver: MonitorRequestReceiver, } -// TODO: put this in client options -#[non_exhaustive] -enum ServerMonitoringMode { - Stream, - Poll, - Auto, -} - impl Monitor { pub(crate) fn start( address: ServerAddress, @@ -91,8 +84,11 @@ impl Monitor { connection_establisher.clone(), client_options.clone(), ); - let monitoring_mode = ServerMonitoringMode::Auto; // TODO - let allow_streaming = match monitoring_mode { + let allow_streaming = match client_options + .server_monitoring_mode + .clone() + .unwrap_or(ServerMonitoringMode::Auto) + { ServerMonitoringMode::Stream => true, ServerMonitoringMode::Poll => false, ServerMonitoringMode::Auto => !crate::cmap::is_faas(), From 404e6edb1b1db1493defbb7fa19d4d4f5d28e5b5 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Thu, 15 Feb 2024 10:50:56 -0500 Subject: [PATCH 5/8] sdam tests --- .../README.rst | 28 +- .../unified/pool-cleared-error.yml | 2 +- .../unified/serverMonitoringMode.json | 449 ++++++++++++++++++ .../unified/serverMonitoringMode.yml | 173 +++++++ src/test/spec/unified_runner/matcher.rs | 22 +- src/test/spec/unified_runner/test_event.rs | 6 +- src/test/spec/unified_runner/test_file.rs | 1 + src/test/spec/unified_runner/test_runner.rs | 2 +- src/test/util/event.rs | 9 + 9 files changed, 683 insertions(+), 9 deletions(-) create mode 100644 src/test/spec/json/server-discovery-and-monitoring/unified/serverMonitoringMode.json create mode 100644 src/test/spec/json/server-discovery-and-monitoring/unified/serverMonitoringMode.yml diff --git a/src/test/spec/json/server-discovery-and-monitoring/README.rst b/src/test/spec/json/server-discovery-and-monitoring/README.rst index 9dce162f3..95b9865b7 100644 --- a/src/test/spec/json/server-discovery-and-monitoring/README.rst +++ b/src/test/spec/json/server-discovery-and-monitoring/README.rst @@ -192,7 +192,7 @@ Integration Tests Integration tests are provided in the "unified" directory and are written in the `Unified Test Format -<../unified-test-format/unified-test-format.rst>`_. +<../../unified-test-format/unified-test-format.md>`_. Prose Tests ----------- @@ -262,6 +262,32 @@ Run the following test(s) on MongoDB 4.4+. mode: "off", }); +Heartbeat Tests +~~~~~~~~~~~~~~~ + +1. Test that ``ServerHeartbeatStartedEvent`` is emitted before the monitoring socket was created + + #. Create a mock TCP server (example shown below) that pushes a ``client connected`` event to a shared array when a client connects and a ``client hello received`` event when the server receives the client hello and then closes the connection:: + + let events = []; + server = createServer(clientSocket => { + events.push('client connected'); + + clientSocket.on('data', () => { + events.push('client hello received'); + clientSocket.destroy(); + }); + }); + server.listen(9999); + + #. Create a client with ``serverSelectionTimeoutMS: 500`` and listen to ``ServerHeartbeatStartedEvent`` and ``ServerHeartbeatFailedEvent``, pushing the event name to the same shared array as the mock TCP server + + #. Attempt to connect client to previously created TCP server, catching the error when the client fails to connect + + #. Assert that the first four elements in the array are: :: + + ['serverHeartbeatStartedEvent', 'client connected', 'client hello received', 'serverHeartbeatFailedEvent'] + .. Section for links. .. _Server Description Equality: /source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#server-description-equality diff --git a/src/test/spec/json/server-discovery-and-monitoring/unified/pool-cleared-error.yml b/src/test/spec/json/server-discovery-and-monitoring/unified/pool-cleared-error.yml index 7a98e2b32..f3bad7959 100644 --- a/src/test/spec/json/server-discovery-and-monitoring/unified/pool-cleared-error.yml +++ b/src/test/spec/json/server-discovery-and-monitoring/unified/pool-cleared-error.yml @@ -200,7 +200,7 @@ tests: event: poolClearedEvent: {} count: 1 - # Perform an operation to ensure the node still useable. + # Perform an operation to ensure the node still usable. - name: insertOne object: *collection arguments: diff --git a/src/test/spec/json/server-discovery-and-monitoring/unified/serverMonitoringMode.json b/src/test/spec/json/server-discovery-and-monitoring/unified/serverMonitoringMode.json new file mode 100644 index 000000000..7d681b4f9 --- /dev/null +++ b/src/test/spec/json/server-discovery-and-monitoring/unified/serverMonitoringMode.json @@ -0,0 +1,449 @@ +{ + "description": "serverMonitoringMode", + "schemaVersion": "1.17", + "runOnRequirements": [ + { + "topologies": [ + "single", + "sharded", + "sharded-replicaset" + ], + "serverless": "forbid" + } + ], + "tests": [ + { + "description": "connect with serverMonitoringMode=auto >=4.4", + "runOnRequirements": [ + { + "minServerVersion": "4.4.0" + } + ], + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "serverMonitoringMode": "auto" + }, + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "sdam-tests" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatStartedEvent": {} + }, + "count": 2 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": true, + "events": [ + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + }, + { + "serverHeartbeatSucceededEvent": { + "awaited": false + } + }, + { + "serverHeartbeatStartedEvent": { + "awaited": true + } + } + ] + } + ] + }, + { + "description": "connect with serverMonitoringMode=auto <4.4", + "runOnRequirements": [ + { + "maxServerVersion": "4.2.99" + } + ], + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "serverMonitoringMode": "auto", + "heartbeatFrequencyMS": 500 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "sdam-tests" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatStartedEvent": {} + }, + "count": 2 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": true, + "events": [ + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + }, + { + "serverHeartbeatSucceededEvent": { + "awaited": false + } + }, + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + } + ] + } + ] + }, + { + "description": "connect with serverMonitoringMode=stream >=4.4", + "runOnRequirements": [ + { + "minServerVersion": "4.4.0" + } + ], + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "serverMonitoringMode": "stream" + }, + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "sdam-tests" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatStartedEvent": {} + }, + "count": 2 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": true, + "events": [ + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + }, + { + "serverHeartbeatSucceededEvent": { + "awaited": false + } + }, + { + "serverHeartbeatStartedEvent": { + "awaited": true + } + } + ] + } + ] + }, + { + "description": "connect with serverMonitoringMode=stream <4.4", + "runOnRequirements": [ + { + "maxServerVersion": "4.2.99" + } + ], + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "serverMonitoringMode": "stream", + "heartbeatFrequencyMS": 500 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "sdam-tests" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatStartedEvent": {} + }, + "count": 2 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": true, + "events": [ + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + }, + { + "serverHeartbeatSucceededEvent": { + "awaited": false + } + }, + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + } + ] + } + ] + }, + { + "description": "connect with serverMonitoringMode=poll", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "serverMonitoringMode": "poll", + "heartbeatFrequencyMS": 500 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "sdam-tests" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatStartedEvent": {} + }, + "count": 2 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": true, + "events": [ + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + }, + { + "serverHeartbeatSucceededEvent": { + "awaited": false + } + }, + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + } + ] + } + ] + } + ] +} diff --git a/src/test/spec/json/server-discovery-and-monitoring/unified/serverMonitoringMode.yml b/src/test/spec/json/server-discovery-and-monitoring/unified/serverMonitoringMode.yml new file mode 100644 index 000000000..28c7853d0 --- /dev/null +++ b/src/test/spec/json/server-discovery-and-monitoring/unified/serverMonitoringMode.yml @@ -0,0 +1,173 @@ +description: serverMonitoringMode + +schemaVersion: "1.17" +# These tests cannot run on replica sets because the order of the expected +# SDAM events are non-deterministic when monitoring multiple servers. +# They also cannot run on Serverless or load balanced clusters where SDAM is disabled. +runOnRequirements: + - topologies: [single, sharded, sharded-replicaset] + serverless: forbid +tests: + - description: "connect with serverMonitoringMode=auto >=4.4" + runOnRequirements: + - minServerVersion: "4.4.0" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: client + uriOptions: + serverMonitoringMode: "auto" + useMultipleMongoses: false + observeEvents: + - serverHeartbeatStartedEvent + - serverHeartbeatSucceededEvent + - serverHeartbeatFailedEvent + - database: + id: db + client: client + databaseName: sdam-tests + - &ping + name: runCommand + object: db + arguments: + commandName: ping + command: { ping: 1 } + expectResult: { ok: 1 } + # Wait for the second serverHeartbeatStartedEvent to ensure we start streaming. + - &waitForSecondHeartbeatStarted + name: waitForEvent + object: testRunner + arguments: + client: client + event: + serverHeartbeatStartedEvent: {} + count: 2 + expectEvents: &streamingStartedEvents + - client: client + eventType: sdam + ignoreExtraEvents: true + events: + - serverHeartbeatStartedEvent: + awaited: False + - serverHeartbeatSucceededEvent: + awaited: False + - serverHeartbeatStartedEvent: + awaited: True + + - description: "connect with serverMonitoringMode=auto <4.4" + runOnRequirements: + - maxServerVersion: "4.2.99" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: client + uriOptions: + serverMonitoringMode: "auto" + heartbeatFrequencyMS: 500 + useMultipleMongoses: false + observeEvents: + - serverHeartbeatStartedEvent + - serverHeartbeatSucceededEvent + - serverHeartbeatFailedEvent + - database: + id: db + client: client + databaseName: sdam-tests + - *ping + # Wait for the second serverHeartbeatStartedEvent to ensure we do not stream. + - *waitForSecondHeartbeatStarted + expectEvents: &pollingStartedEvents + - client: client + eventType: sdam + ignoreExtraEvents: true + events: + - serverHeartbeatStartedEvent: + awaited: False + - serverHeartbeatSucceededEvent: + awaited: False + - serverHeartbeatStartedEvent: + awaited: False + + - description: "connect with serverMonitoringMode=stream >=4.4" + runOnRequirements: + - minServerVersion: "4.4.0" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: client + uriOptions: + serverMonitoringMode: "stream" + useMultipleMongoses: false + observeEvents: + - serverHeartbeatStartedEvent + - serverHeartbeatSucceededEvent + - serverHeartbeatFailedEvent + - database: + id: db + client: client + databaseName: sdam-tests + - *ping + # Wait for the second serverHeartbeatStartedEvent to ensure we start streaming. + - *waitForSecondHeartbeatStarted + expectEvents: *streamingStartedEvents + + - description: "connect with serverMonitoringMode=stream <4.4" + runOnRequirements: + - maxServerVersion: "4.2.99" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: client + uriOptions: + serverMonitoringMode: "stream" + heartbeatFrequencyMS: 500 + useMultipleMongoses: false + observeEvents: + - serverHeartbeatStartedEvent + - serverHeartbeatSucceededEvent + - serverHeartbeatFailedEvent + - database: + id: db + client: client + databaseName: sdam-tests + - *ping + # Wait for the second serverHeartbeatStartedEvent to ensure we do not stream. + - *waitForSecondHeartbeatStarted + expectEvents: *pollingStartedEvents + + - description: "connect with serverMonitoringMode=poll" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: client + uriOptions: + serverMonitoringMode: "poll" + heartbeatFrequencyMS: 500 + useMultipleMongoses: false + observeEvents: + - serverHeartbeatStartedEvent + - serverHeartbeatSucceededEvent + - serverHeartbeatFailedEvent + - database: + id: db + client: client + databaseName: sdam-tests + - *ping + # Wait for the second serverHeartbeatStartedEvent to ensure we do not stream. + - *waitForSecondHeartbeatStarted + expectEvents: *pollingStartedEvents diff --git a/src/test/spec/unified_runner/matcher.rs b/src/test/spec/unified_runner/matcher.rs index f37d118ab..86104d039 100644 --- a/src/test/spec/unified_runner/matcher.rs +++ b/src/test/spec/unified_runner/matcher.rs @@ -356,10 +356,24 @@ fn sdam_events_match(actual: &SdamEvent, expected: &ExpectedSdamEvent) -> Result ExpectedSdamEvent::TopologyDescriptionChanged {}, ) => Ok(()), ( - SdamEvent::ServerHeartbeatSucceeded(_), - ExpectedSdamEvent::ServerHeartbeatSucceeded {}, - ) => Ok(()), - (SdamEvent::ServerHeartbeatFailed(_), ExpectedSdamEvent::ServerHeartbeatFailed {}) => { + SdamEvent::ServerHeartbeatStarted(actual), + ExpectedSdamEvent::ServerHeartbeatStarted { awaited }, + ) => { + match_opt(&actual.awaited, awaited)?; + Ok(()) + } + ( + SdamEvent::ServerHeartbeatSucceeded(actual), + ExpectedSdamEvent::ServerHeartbeatSucceeded { awaited }, + ) => { + match_opt(&actual.awaited, awaited)?; + Ok(()) + } + ( + SdamEvent::ServerHeartbeatFailed(actual), + ExpectedSdamEvent::ServerHeartbeatFailed { awaited }, + ) => { + match_opt(&actual.awaited, awaited)?; Ok(()) } _ => expected_err(actual, expected), diff --git a/src/test/spec/unified_runner/test_event.rs b/src/test/spec/unified_runner/test_event.rs index ab27f7c4e..cf8686d22 100644 --- a/src/test/spec/unified_runner/test_event.rs +++ b/src/test/spec/unified_runner/test_event.rs @@ -85,10 +85,12 @@ pub(crate) enum ExpectedSdamEvent { }, #[serde(rename = "topologyDescriptionChangedEvent")] TopologyDescriptionChanged {}, + #[serde(rename = "serverHeartbeatStartedEvent", rename_all = "camelCase")] + ServerHeartbeatStarted { awaited: Option }, #[serde(rename = "serverHeartbeatSucceededEvent", rename_all = "camelCase")] - ServerHeartbeatSucceeded {}, + ServerHeartbeatSucceeded { awaited: Option }, #[serde(rename = "serverHeartbeatFailedEvent", rename_all = "camelCase")] - ServerHeartbeatFailed {}, + ServerHeartbeatFailed { awaited: Option }, } #[derive(Debug, Deserialize)] diff --git a/src/test/spec/unified_runner/test_file.rs b/src/test/spec/unified_runner/test_file.rs index a232db28e..0967dc9fb 100644 --- a/src/test/spec/unified_runner/test_file.rs +++ b/src/test/spec/unified_runner/test_file.rs @@ -425,6 +425,7 @@ pub(crate) enum ExpectedEventType { // TODO RUST-1055 Remove this when connection usage is serialized. #[serde(skip)] CmapWithoutConnectionReady, + Sdam, } #[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize)] diff --git a/src/test/spec/unified_runner/test_runner.rs b/src/test/spec/unified_runner/test_runner.rs index 71cdc3f24..c6af32be3 100644 --- a/src/test/spec/unified_runner/test_runner.rs +++ b/src/test/spec/unified_runner/test_runner.rs @@ -75,7 +75,7 @@ const SKIPPED_OPERATIONS: &[&str] = &[ ]; static MIN_SPEC_VERSION: Version = Version::new(1, 0, 0); -static MAX_SPEC_VERSION: Version = Version::new(1, 16, 0); +static MAX_SPEC_VERSION: Version = Version::new(1, 17, 0); pub(crate) type EntityMap = HashMap; diff --git a/src/test/util/event.rs b/src/test/util/event.rs index 5893306d9..29d6919ec 100644 --- a/src/test/util/event.rs +++ b/src/test/util/event.rs @@ -291,6 +291,15 @@ impl EventHandler { events.retain(|ev| !matches!(ev, Event::Cmap(CmapEvent::ConnectionReady(_)))); events } + ExpectedEventType::Sdam => { + let events = self.sdam_events.read().unwrap(); + events + .iter() + .cloned() + .map(|(event, _)| Event::Sdam(event)) + .filter(filter) + .collect() + } } } From ad5026b4daab5be6c92fcc47585f4e0f6d03bc12 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Thu, 15 Feb 2024 11:06:31 -0500 Subject: [PATCH 6/8] uri tests --- src/client/options.rs | 3 ++ .../spec/json/uri-options/sdam-options.json | 46 +++++++++++++++++++ .../spec/json/uri-options/sdam-options.yml | 35 ++++++++++++++ 3 files changed, 84 insertions(+) create mode 100644 src/test/spec/json/uri-options/sdam-options.json create mode 100644 src/test/spec/json/uri-options/sdam-options.yml diff --git a/src/client/options.rs b/src/client/options.rs index 6f8921e86..f5d5b33e8 100644 --- a/src/client/options.rs +++ b/src/client/options.rs @@ -690,6 +690,8 @@ impl Serialize for ClientOptions { retrywrites: &'a Option, + servermonitoringmode: Option, + #[serde( flatten, serialize_with = "SelectionCriteria::serialize_for_client_options" @@ -730,6 +732,7 @@ impl Serialize for ClientOptions { replicaset: &self.repl_set_name, retryreads: &self.retry_reads, retrywrites: &self.retry_writes, + servermonitoringmode: self.server_monitoring_mode.as_ref().map(|m| format!("{:?}", m).to_lowercase()), selectioncriteria: &self.selection_criteria, serverselectiontimeoutms: &self.server_selection_timeout, sockettimeoutms: &self.socket_timeout, diff --git a/src/test/spec/json/uri-options/sdam-options.json b/src/test/spec/json/uri-options/sdam-options.json new file mode 100644 index 000000000..673f5607e --- /dev/null +++ b/src/test/spec/json/uri-options/sdam-options.json @@ -0,0 +1,46 @@ +{ + "tests": [ + { + "description": "serverMonitoringMode=auto", + "uri": "mongodb://example.com/?serverMonitoringMode=auto", + "valid": true, + "warning": false, + "hosts": null, + "auth": null, + "options": { + "serverMonitoringMode": "auto" + } + }, + { + "description": "serverMonitoringMode=stream", + "uri": "mongodb://example.com/?serverMonitoringMode=stream", + "valid": true, + "warning": false, + "hosts": null, + "auth": null, + "options": { + "serverMonitoringMode": "stream" + } + }, + { + "description": "serverMonitoringMode=poll", + "uri": "mongodb://example.com/?serverMonitoringMode=poll", + "valid": true, + "warning": false, + "hosts": null, + "auth": null, + "options": { + "serverMonitoringMode": "poll" + } + }, + { + "description": "invalid serverMonitoringMode", + "uri": "mongodb://example.com/?serverMonitoringMode=invalid", + "valid": true, + "warning": true, + "hosts": null, + "auth": null, + "options": {} + } + ] +} diff --git a/src/test/spec/json/uri-options/sdam-options.yml b/src/test/spec/json/uri-options/sdam-options.yml new file mode 100644 index 000000000..8f72ff409 --- /dev/null +++ b/src/test/spec/json/uri-options/sdam-options.yml @@ -0,0 +1,35 @@ +tests: + - description: "serverMonitoringMode=auto" + uri: "mongodb://example.com/?serverMonitoringMode=auto" + valid: true + warning: false + hosts: ~ + auth: ~ + options: + serverMonitoringMode: "auto" + + - description: "serverMonitoringMode=stream" + uri: "mongodb://example.com/?serverMonitoringMode=stream" + valid: true + warning: false + hosts: ~ + auth: ~ + options: + serverMonitoringMode: "stream" + + - description: "serverMonitoringMode=poll" + uri: "mongodb://example.com/?serverMonitoringMode=poll" + valid: true + warning: false + hosts: ~ + auth: ~ + options: + serverMonitoringMode: "poll" + + - description: "invalid serverMonitoringMode" + uri: "mongodb://example.com/?serverMonitoringMode=invalid" + valid: true + warning: true + hosts: ~ + auth: ~ + options: {} From 82b77f2548299950588c7404fa653734042b3c58 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Thu, 15 Feb 2024 12:07:36 -0500 Subject: [PATCH 7/8] lambda test --- .evergreen/aws-lambda-test/mongodb/src/main.rs | 4 +++- src/client/options.rs | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/.evergreen/aws-lambda-test/mongodb/src/main.rs b/.evergreen/aws-lambda-test/mongodb/src/main.rs index 03173e4f0..5c370be24 100644 --- a/.evergreen/aws-lambda-test/mongodb/src/main.rs +++ b/.evergreen/aws-lambda-test/mongodb/src/main.rs @@ -34,10 +34,12 @@ impl Stats { fn handle_sdam(&mut self, event: &SdamEvent) { match event { - SdamEvent::ServerHeartbeatStarted(_) => { + SdamEvent::ServerHeartbeatStarted(ev) => { + assert!(!ev.awaited); self.heartbeats_started += 1; } SdamEvent::ServerHeartbeatFailed(ev) => { + assert!(!ev.awaited); self.failed_heartbeat_durations_millis.push(ev.duration.as_millis()); } _ => (), diff --git a/src/client/options.rs b/src/client/options.rs index f5d5b33e8..b4149c8d8 100644 --- a/src/client/options.rs +++ b/src/client/options.rs @@ -732,7 +732,10 @@ impl Serialize for ClientOptions { replicaset: &self.repl_set_name, retryreads: &self.retry_reads, retrywrites: &self.retry_writes, - servermonitoringmode: self.server_monitoring_mode.as_ref().map(|m| format!("{:?}", m).to_lowercase()), + servermonitoringmode: self + .server_monitoring_mode + .as_ref() + .map(|m| format!("{:?}", m).to_lowercase()), selectioncriteria: &self.selection_criteria, serverselectiontimeoutms: &self.server_selection_timeout, sockettimeoutms: &self.socket_timeout, From d1814815bae7d6ed26dd090c429cb06dbb52322a Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Thu, 15 Feb 2024 12:18:17 -0500 Subject: [PATCH 8/8] revert unrelated readme change --- .../README.rst | 28 +------------------ 1 file changed, 1 insertion(+), 27 deletions(-) diff --git a/src/test/spec/json/server-discovery-and-monitoring/README.rst b/src/test/spec/json/server-discovery-and-monitoring/README.rst index 95b9865b7..bf65053cb 100644 --- a/src/test/spec/json/server-discovery-and-monitoring/README.rst +++ b/src/test/spec/json/server-discovery-and-monitoring/README.rst @@ -262,32 +262,6 @@ Run the following test(s) on MongoDB 4.4+. mode: "off", }); -Heartbeat Tests -~~~~~~~~~~~~~~~ - -1. Test that ``ServerHeartbeatStartedEvent`` is emitted before the monitoring socket was created - - #. Create a mock TCP server (example shown below) that pushes a ``client connected`` event to a shared array when a client connects and a ``client hello received`` event when the server receives the client hello and then closes the connection:: - - let events = []; - server = createServer(clientSocket => { - events.push('client connected'); - - clientSocket.on('data', () => { - events.push('client hello received'); - clientSocket.destroy(); - }); - }); - server.listen(9999); - - #. Create a client with ``serverSelectionTimeoutMS: 500`` and listen to ``ServerHeartbeatStartedEvent`` and ``ServerHeartbeatFailedEvent``, pushing the event name to the same shared array as the mock TCP server - - #. Attempt to connect client to previously created TCP server, catching the error when the client fails to connect - - #. Assert that the first four elements in the array are: :: - - ['serverHeartbeatStartedEvent', 'client connected', 'client hello received', 'serverHeartbeatFailedEvent'] - .. Section for links. -.. _Server Description Equality: /source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#server-description-equality +.. _Server Description Equality: /source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#server-description-equality \ No newline at end of file