From 3b13ebada48a9474537cb579bd02200b2ec0af88 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Thu, 24 Jun 2021 16:08:39 -0400 Subject: [PATCH 1/4] support isWritablePrimary hello response field --- src/is_master.rs | 3 +++ src/sdam/description/topology/server_selection/test/mod.rs | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/is_master.rs b/src/is_master.rs index b34b881d9..776c0f409 100644 --- a/src/is_master.rs +++ b/src/is_master.rs @@ -19,6 +19,7 @@ pub(crate) struct IsMasterReply { #[derive(Debug, Clone, Default, Deserialize)] #[serde(rename_all = "camelCase")] pub(crate) struct IsMasterCommandResponse { + pub is_writable_primary: Option, #[serde(rename = "ismaster")] pub is_master: Option, pub ok: Option, @@ -72,6 +73,8 @@ impl IsMasterCommandResponse { } else if self.set_name.is_some() { if let Some(true) = self.hidden { ServerType::RsOther + } else if let Some(true) = self.is_writable_primary { + ServerType::RsPrimary } else if let Some(true) = self.is_master { ServerType::RsPrimary } else if let Some(true) = self.secondary { diff --git a/src/sdam/description/topology/server_selection/test/mod.rs b/src/sdam/description/topology/server_selection/test/mod.rs index 588cb9037..fcc563a25 100644 --- a/src/sdam/description/topology/server_selection/test/mod.rs +++ b/src/sdam/description/topology/server_selection/test/mod.rs @@ -164,7 +164,7 @@ fn is_master_response_from_server_type(server_type: ServerType) -> IsMasterComma ServerType::RsPrimary => { response.ok = Some(1.0); response.set_name = Some("foo".into()); - response.is_master = Some(true); + response.is_writable_primary = Some(true); } ServerType::RsOther => { response.ok = Some(1.0); From 5146517b53c867330daf4fb344fd16e89ba34949 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Mon, 28 Jun 2021 12:11:47 -0400 Subject: [PATCH 2/4] move master command construction and execution to is_master.rs --- src/cmap/establish/handshake/mod.rs | 58 ++++------------------------- src/cmap/mod.rs | 2 +- src/is_master.rs | 45 ++++++++++++++++++++-- src/sdam/monitor.rs | 13 ++----- 4 files changed, 55 insertions(+), 63 deletions(-) diff --git a/src/cmap/establish/handshake/mod.rs b/src/cmap/establish/handshake/mod.rs index ce19c42fc..cf9d22fc4 100644 --- a/src/cmap/establish/handshake/mod.rs +++ b/src/cmap/establish/handshake/mod.rs @@ -1,8 +1,6 @@ #[cfg(test)] mod test; -use std::time::Instant; - use lazy_static::lazy_static; use os_info::{Type, Version}; @@ -10,8 +8,8 @@ use crate::{ bson::{doc, Bson, Document}, client::auth::{ClientFirst, FirstRound}, cmap::{options::ConnectionPoolOptions, Command, Connection, StreamDescription}, - error::{ErrorKind, Result}, - is_master::{IsMasterCommandResponse, IsMasterReply}, + error::Result, + is_master::{is_master_command, run_is_master, IsMasterReply}, options::{AuthMechanism, ClientOptions, Credential, DriverInfo, ServerApi}, }; @@ -148,12 +146,9 @@ impl Handshaker { pub(crate) fn new(options: Option) -> Self { let mut metadata = BASE_CLIENT_METADATA.clone(); let mut credential = None; - let mut db = None; - let mut server_api = None; - let mut body = doc! { - "isMaster": 1, - }; + let mut command = + is_master_command(options.as_ref().and_then(|opts| opts.server_api.as_ref())); if let Some(options) = options { if let Some(app_name) = options.app_name { @@ -178,25 +173,13 @@ impl Handshaker { } if let Some(cred) = options.credential { - cred.append_needed_mechanism_negotiation(&mut body); - db = Some(cred.resolved_source().to_string()); + cred.append_needed_mechanism_negotiation(&mut command.body); + command.target_db = cred.resolved_source().to_string(); credential = Some(cred); } - - server_api = options.server_api; } - body.insert("client", metadata); - - let mut command = Command::new( - "isMaster".to_string(), - db.unwrap_or_else(|| "admin".to_string()), - body, - ); - - if let Some(server_api) = server_api { - command.set_server_api(&server_api) - } + command.body.insert("client", metadata); Self { command, @@ -210,7 +193,7 @@ impl Handshaker { let client_first = set_speculative_auth_info(&mut command.body, self.credential.as_ref())?; - let mut is_master_reply = is_master(command, conn).await?; + let mut is_master_reply = run_is_master(command, conn).await?; conn.stream_description = Some(StreamDescription::from_is_master(is_master_reply.clone())); // Record the client's message and the server's response from speculative authentication if @@ -273,31 +256,6 @@ impl From for HandshakerOptions { } } -/// Run the given isMaster command. -/// -/// If the given command is not an isMaster, this function will return an error. -pub(crate) async fn is_master(command: Command, conn: &mut Connection) -> Result { - if !command.name.eq_ignore_ascii_case("ismaster") { - return Err(ErrorKind::Internal { - message: format!("invalid ismaster command: {}", command.name), - } - .into()); - } - let start_time = Instant::now(); - let response = conn.send_command(command, None).await?; - let end_time = Instant::now(); - - response.validate()?; - let cluster_time = response.cluster_time().cloned(); - let command_response: IsMasterCommandResponse = response.body()?; - - Ok(IsMasterReply { - command_response, - round_trip_time: Some(end_time.duration_since(start_time)), - cluster_time, - }) -} - /// Updates the handshake command document with the speculative authenitication info. fn set_speculative_auth_info( command: &mut Document, diff --git a/src/cmap/mod.rs b/src/cmap/mod.rs index 3bca5014f..cdfb30b9c 100644 --- a/src/cmap/mod.rs +++ b/src/cmap/mod.rs @@ -16,7 +16,7 @@ use derivative::Derivative; pub use self::conn::ConnectionInfo; pub(crate) use self::{ conn::{Command, CommandResponse, Connection, StreamDescription}, - establish::handshake::{is_master, Handshaker}, + establish::handshake::Handshaker, status::PoolGenerationSubscriber, }; use self::{connection_requester::ConnectionRequestResult, options::ConnectionPoolOptions}; diff --git a/src/is_master.rs b/src/is_master.rs index 776c0f409..d622649f1 100644 --- a/src/is_master.rs +++ b/src/is_master.rs @@ -1,14 +1,53 @@ -use std::time::Duration; +use std::time::{Duration, Instant}; use serde::Deserialize; use crate::{ - bson::{oid::ObjectId, DateTime, Document, Timestamp}, - client::ClusterTime, + bson::{doc, oid::ObjectId, DateTime, Document, Timestamp}, + client::{options::ServerApi, ClusterTime}, + cmap::{Command, Connection}, + error::{ErrorKind, Result}, sdam::ServerType, selection_criteria::TagSet, }; +/// Construct an isMaster command. +pub(crate) fn is_master_command(api: Option<&ServerApi>) -> Command { + let mut command = Command::new("isMaster".into(), "admin".into(), doc! { "isMaster": 1 }); + if let Some(server_api) = api { + command.set_server_api(server_api); + } + command +} + +/// Run the given isMaster command. +/// +/// If the given command is not an isMaster, this function will return an error. +pub(crate) async fn run_is_master( + command: Command, + conn: &mut Connection, +) -> Result { + if !command.name.eq_ignore_ascii_case("ismaster") { + return Err(ErrorKind::Internal { + message: format!("invalid ismaster command: {}", command.name), + } + .into()); + } + let start_time = Instant::now(); + let response = conn.send_command(command, None).await?; + let end_time = Instant::now(); + + response.validate()?; + let cluster_time = response.cluster_time().cloned(); + let command_response: IsMasterCommandResponse = response.body()?; + + Ok(IsMasterReply { + command_response, + round_trip_time: Some(end_time.duration_since(start_time)), + cluster_time, + }) +} + #[derive(Debug, Clone)] pub(crate) struct IsMasterReply { pub command_response: IsMasterCommandResponse, diff --git a/src/sdam/monitor.rs b/src/sdam/monitor.rs index 2925a1e80..354422bd4 100644 --- a/src/sdam/monitor.rs +++ b/src/sdam/monitor.rs @@ -10,10 +10,9 @@ use super::{ ServerUpdateReceiver, }; use crate::{ - bson::doc, - cmap::{is_master, Command, Connection, Handshaker}, + cmap::{Connection, Handshaker}, error::{Error, Result}, - is_master::IsMasterReply, + is_master::{is_master_command, run_is_master, IsMasterReply}, options::{ClientOptions, ServerAddress}, RUNTIME, }; @@ -184,12 +183,8 @@ impl HeartbeatMonitor { async fn perform_is_master(&mut self) -> Result { let result = match self.connection { Some(ref mut conn) => { - let mut command = - Command::new("isMaster".into(), "admin".into(), doc! { "isMaster": 1 }); - if let Some(ref server_api) = self.client_options.server_api { - command.set_server_api(server_api); - } - is_master(command, conn).await + let command = is_master_command(self.client_options.server_api.as_ref()); + run_is_master(command, conn).await } None => { let mut connection = Connection::connect_monitoring( From d6f4f78d4d1c5c2fbf36ec984dbb9b830129670d Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Mon, 28 Jun 2021 12:26:29 -0400 Subject: [PATCH 3/4] use hello for versioned connections --- src/is_master.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/is_master.rs b/src/is_master.rs index d622649f1..e59d00eed 100644 --- a/src/is_master.rs +++ b/src/is_master.rs @@ -13,7 +13,8 @@ use crate::{ /// Construct an isMaster command. pub(crate) fn is_master_command(api: Option<&ServerApi>) -> Command { - let mut command = Command::new("isMaster".into(), "admin".into(), doc! { "isMaster": 1 }); + let command_name = if api.is_some() { "hello" } else { "isMaster" }; + let mut command = Command::new(command_name.into(), "admin".into(), doc! { command_name: 1 }); if let Some(server_api) = api { command.set_server_api(server_api); } @@ -27,7 +28,8 @@ pub(crate) async fn run_is_master( command: Command, conn: &mut Connection, ) -> Result { - if !command.name.eq_ignore_ascii_case("ismaster") { + if !command.name.eq_ignore_ascii_case("ismaster") && + !command.name.eq_ignore_ascii_case("hello") { return Err(ErrorKind::Internal { message: format!("invalid ismaster command: {}", command.name), } From 995beac1ef844ae6176d49fffd608561fc9bb7a8 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Wed, 30 Jun 2021 10:18:37 -0400 Subject: [PATCH 4/4] update TODO to reference the right ticket --- src/test/spec/versioned_api.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/spec/versioned_api.rs b/src/test/spec/versioned_api.rs index 7cd1aba1f..16ca90553 100644 --- a/src/test/spec/versioned_api.rs +++ b/src/test/spec/versioned_api.rs @@ -12,7 +12,7 @@ use super::run_unified_format_test; #[cfg_attr(feature = "async-std-runtime", async_std::test)] async fn run() { let _guard: RwLockWriteGuard<_> = LOCK.run_exclusively().await; - // TODO RUST-725 Unskip these tests on 5.0 + // TODO RUST-768 Unskip these tests on 5.0 if TestClient::new().await.server_version_gte(5, 0) { return; }