From 493e0cd503d2619f5bc9897429db3880919c677b Mon Sep 17 00:00:00 2001 From: Patrick Freed Date: Fri, 3 Dec 2021 17:03:39 -0500 Subject: [PATCH 1/6] add some debug print --- .../server_selection/test/in_window.rs | 50 ++++++++++++++++--- src/test/util/event.rs | 9 +++- src/test/util/mod.rs | 2 +- 3 files changed, 50 insertions(+), 11 deletions(-) diff --git a/src/sdam/description/topology/server_selection/test/in_window.rs b/src/sdam/description/topology/server_selection/test/in_window.rs index 6e6a9d668..72eb027e0 100644 --- a/src/sdam/description/topology/server_selection/test/in_window.rs +++ b/src/sdam/description/topology/server_selection/test/in_window.rs @@ -13,14 +13,17 @@ use crate::{ selection_criteria::ReadPreference, test::{ run_spec_test, - EventClient, + Event, + EventHandler, FailCommandOptions, FailPoint, FailPointMode, + SdamEvent, TestClient, CLIENT_OPTIONS, LOCK, }, + ServerType, RUNTIME, }; @@ -158,8 +161,14 @@ async fn load_balancing_test() { /// min_share is the lower bound for the % of times the the less selected server /// was selected. max_share is the upper bound. - async fn do_test(client: &mut EventClient, min_share: f64, max_share: f64, iterations: usize) { - client.clear_cached_events(); + async fn do_test( + client: &TestClient, + handler: &mut EventHandler, + min_share: f64, + max_share: f64, + iterations: usize, + ) { + handler.clear_cached_events(); let mut handles: Vec> = Vec::new(); for _ in 0..10 { @@ -180,10 +189,15 @@ async fn load_balancing_test() { futures::future::join_all(handles).await; let mut tallies: HashMap = HashMap::new(); - for event in client.get_command_started_events(&["find"]) { + for event in handler.get_command_started_events(&["find"]) { *tallies.entry(event.connection.address.clone()).or_insert(0) += 1; } + if tallies.len() < 2 { + println!("{:#?}", tallies); + println!("{:#?}", client.topology_description().await); + } + assert_eq!(tallies.len(), 2); let mut counts: Vec<_> = tallies.values().collect(); counts.sort(); @@ -203,10 +217,30 @@ async fn load_balancing_test() { ); } - let mut client = EventClient::new().await; + let mut handler = EventHandler::new(); + let mut subscriber = handler.subscribe(); + let client = TestClient::with_handler(Some(Arc::new(handler.clone())), None).await; + + subscriber + .wait_for_event(Duration::from_secs(30), |event| { + if let Event::Sdam(SdamEvent::TopologyDescriptionChanged(event)) = event { + event + .new_description + .servers() + .into_iter() + .filter(|s| matches!(s.1.server_type(), ServerType::Mongos)) + .count() + == 2 + } else { + false + } + }) + .await + .expect("timed out waiting for both mongoses to be discovered"); + drop(subscriber); // saturate pools - do_test(&mut client, 0.0, 0.50, 100).await; + do_test(&client, &mut handler, 0.0, 0.50, 100).await; // enable a failpoint on one of the mongoses to slow it down let options = FailCommandOptions::builder() @@ -220,9 +254,9 @@ async fn load_balancing_test() { .expect("enabling failpoint should succeed"); // verify that the lesser picked server (slower one) was picked less than 25% of the time. - do_test(&mut client, 0.05, 0.25, 10).await; + do_test(&client, &mut handler, 0.05, 0.25, 10).await; // disable failpoint and rerun, should be back to even split drop(fp_guard); - do_test(&mut client, 0.40, 0.50, 100).await; + do_test(&client, &mut handler, 0.40, 0.50, 100).await; } diff --git a/src/test/util/event.rs b/src/test/util/event.rs index 3355b57a9..5195cd564 100644 --- a/src/test/util/event.rs +++ b/src/test/util/event.rs @@ -219,6 +219,12 @@ impl EventHandler { pub fn connections_checked_out(&self) -> u32 { *self.connections_checked_out.lock().unwrap() } + + pub fn clear_cached_events(&self) { + self.command_events.write().unwrap().clear(); + self.cmap_events.write().unwrap().clear(); + self.sdam_events.write().unwrap().clear(); + } } impl CmapEventHandler for EventHandler { @@ -538,8 +544,7 @@ impl EventClient { } pub fn clear_cached_events(&self) { - self.handler.command_events.write().unwrap().clear(); - self.handler.cmap_events.write().unwrap().clear(); + self.handler.clear_cached_events() } } diff --git a/src/test/util/mod.rs b/src/test/util/mod.rs index 7ba8ed82a..26a1baf70 100644 --- a/src/test/util/mod.rs +++ b/src/test/util/mod.rs @@ -61,7 +61,7 @@ impl TestClient { Self::with_handler(None, options).await } - async fn with_handler( + pub async fn with_handler( event_handler: Option>, options: impl Into>, ) -> Self { From 450f17bdae25b4a582a9cac427e6663e8c4ed2e9 Mon Sep 17 00:00:00 2001 From: Patrick Freed Date: Mon, 13 Dec 2021 12:10:34 -0500 Subject: [PATCH 2/6] unskip on auth variants --- .../topology/server_selection/test/in_window.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/sdam/description/topology/server_selection/test/in_window.rs b/src/sdam/description/topology/server_selection/test/in_window.rs index 72eb027e0..5cea4472b 100644 --- a/src/sdam/description/topology/server_selection/test/in_window.rs +++ b/src/sdam/description/topology/server_selection/test/in_window.rs @@ -7,6 +7,7 @@ use serde::Deserialize; use tokio::sync::RwLockWriteGuard; use crate::{ + client::options::ClientOptions, options::ServerAddress, runtime::AsyncJoinHandle, sdam::{description::topology::server_selection, Server}, @@ -117,12 +118,6 @@ async fn load_balancing_test() { let mut setup_client_options = CLIENT_OPTIONS.clone(); - // TODO: RUST-1004 unskip on auth variants - if setup_client_options.credential.is_some() { - println!("skipping load_balancing_test test due to auth being enabled"); - return; - } - if setup_client_options.load_balanced.unwrap_or(false) { println!("skipping load_balancing_test test due to load-balanced topology"); return; @@ -219,8 +214,12 @@ async fn load_balancing_test() { let mut handler = EventHandler::new(); let mut subscriber = handler.subscribe(); - let client = TestClient::with_handler(Some(Arc::new(handler.clone())), None).await; + let options = ClientOptions::builder() + .local_threshold(Duration::from_secs(30)) + .build(); + let client = TestClient::with_handler(Some(Arc::new(handler.clone())), options).await; + // wait for both servers to be discovered. subscriber .wait_for_event(Duration::from_secs(30), |event| { if let Event::Sdam(SdamEvent::TopologyDescriptionChanged(event)) = event { From ab7e05648fd26f26366e3431d566e1c4b1f69d2f Mon Sep 17 00:00:00 2001 From: Patrick Freed Date: Fri, 14 Jan 2022 17:27:51 -0500 Subject: [PATCH 3/6] use proper options as base --- src/bson_util/mod.rs | 1 - .../description/topology/server_selection/test/in_window.rs | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/bson_util/mod.rs b/src/bson_util/mod.rs index cc24e8022..c1057e297 100644 --- a/src/bson_util/mod.rs +++ b/src/bson_util/mod.rs @@ -191,7 +191,6 @@ where /// The size in bytes of the provided document's entry in a BSON array at the given index. pub(crate) fn array_entry_size_bytes(index: usize, doc_len: usize) -> u64 { - // // * type (1 byte) // * number of decimal digits in key // * null terminator for the key (1 byte) diff --git a/src/sdam/description/topology/server_selection/test/in_window.rs b/src/sdam/description/topology/server_selection/test/in_window.rs index 5cea4472b..7b6a5d142 100644 --- a/src/sdam/description/topology/server_selection/test/in_window.rs +++ b/src/sdam/description/topology/server_selection/test/in_window.rs @@ -214,9 +214,8 @@ async fn load_balancing_test() { let mut handler = EventHandler::new(); let mut subscriber = handler.subscribe(); - let options = ClientOptions::builder() - .local_threshold(Duration::from_secs(30)) - .build(); + let mut options = CLIENT_OPTIONS.clone(); + options.local_threshold = Duration::from_secs(30).into(); let client = TestClient::with_handler(Some(Arc::new(handler.clone())), options).await; // wait for both servers to be discovered. From 7059a1002d6d18b1a9f39a5e367dd0b479aee120 Mon Sep 17 00:00:00 2001 From: Patrick Freed Date: Wed, 19 Jan 2022 14:57:51 -0500 Subject: [PATCH 4/6] skip on auth variants again --- .../description/topology/server_selection/test/in_window.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/sdam/description/topology/server_selection/test/in_window.rs b/src/sdam/description/topology/server_selection/test/in_window.rs index 7b6a5d142..b921972f7 100644 --- a/src/sdam/description/topology/server_selection/test/in_window.rs +++ b/src/sdam/description/topology/server_selection/test/in_window.rs @@ -123,6 +123,11 @@ async fn load_balancing_test() { return; } + if setup_client_options.credential.is_some() { + println!("skipping load_balancing_test test due to auth being enabled"); + return; + } + setup_client_options.hosts.drain(1..); setup_client_options.direct_connection = Some(true); let setup_client = TestClient::with_options(Some(setup_client_options)).await; From 6026a1f971736ed350b653148ee9311d475c5814 Mon Sep 17 00:00:00 2001 From: Patrick Freed Date: Wed, 26 Jan 2022 18:13:46 -0500 Subject: [PATCH 5/6] remove import --- src/sdam/description/topology/server_selection/test/in_window.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sdam/description/topology/server_selection/test/in_window.rs b/src/sdam/description/topology/server_selection/test/in_window.rs index b921972f7..289d14ab7 100644 --- a/src/sdam/description/topology/server_selection/test/in_window.rs +++ b/src/sdam/description/topology/server_selection/test/in_window.rs @@ -7,7 +7,6 @@ use serde::Deserialize; use tokio::sync::RwLockWriteGuard; use crate::{ - client::options::ClientOptions, options::ServerAddress, runtime::AsyncJoinHandle, sdam::{description::topology::server_selection, Server}, From 8bb71719d3e8dfcf09cf23bfb5a39afde4efd55c Mon Sep 17 00:00:00 2001 From: Patrick Freed Date: Fri, 28 Jan 2022 16:03:39 -0500 Subject: [PATCH 6/6] remove debug print --- .../description/topology/server_selection/test/in_window.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/sdam/description/topology/server_selection/test/in_window.rs b/src/sdam/description/topology/server_selection/test/in_window.rs index 289d14ab7..813f79d8f 100644 --- a/src/sdam/description/topology/server_selection/test/in_window.rs +++ b/src/sdam/description/topology/server_selection/test/in_window.rs @@ -192,11 +192,6 @@ async fn load_balancing_test() { *tallies.entry(event.connection.address.clone()).or_insert(0) += 1; } - if tallies.len() < 2 { - println!("{:#?}", tallies); - println!("{:#?}", client.topology_description().await); - } - assert_eq!(tallies.len(), 2); let mut counts: Vec<_> = tallies.values().collect(); counts.sort();