diff --git a/src/client/executor.rs b/src/client/executor.rs index 234135f41..9ffa36a65 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -26,7 +26,13 @@ use crate::{ Retryability, }, options::SelectionCriteria, - sdam::{HandshakePhase, SelectedServer, SessionSupportStatus, TransactionSupportStatus}, + sdam::{ + HandshakePhase, + SelectedServer, + ServerType, + SessionSupportStatus, + TransactionSupportStatus, + }, selection_criteria::ReadPreference, }; @@ -135,10 +141,15 @@ impl Client { } } - let server = match self.select_server(op.selection_criteria()).await { + let selection_criteria = session + .as_ref() + .and_then(|s| s.transaction.pinned_mongos.as_ref()) + .or_else(|| op.selection_criteria()); + + let server = match self.select_server(selection_criteria).await { Ok(server) => server, Err(mut err) => { - err.add_labels(None, &session, None)?; + err.add_labels_and_update_pin(None, &mut session, None)?; return Err(err); } }; @@ -146,7 +157,7 @@ impl Client { let mut conn = match server.pool.check_out().await { Ok(conn) => conn, Err(mut err) => { - err.add_labels(None, &session, None)?; + err.add_labels_and_update_pin(None, &mut session, None)?; if err.is_pool_cleared() { return self.execute_retry(&mut op, &mut session, None, err).await; @@ -229,6 +240,8 @@ impl Client { txn_number: Option, first_error: Error, ) -> Result { + op.update_for_retry(); + let server = match self.select_server(op.selection_criteria()).await { Ok(server) => server, Err(_) => { @@ -246,8 +259,6 @@ impl Client { return Err(first_error); } - op.update_for_retry(); - match self .execute_operation_on_connection(op, &mut conn, session, txn_number, &retryability) .await @@ -286,7 +297,9 @@ impl Client { wc.validate()?; } - let mut cmd = op.build(connection.stream_description()?)?; + let stream_description = connection.stream_description()?; + let is_sharded = stream_description.initial_server_type == ServerType::Mongos; + let mut cmd = op.build(stream_description)?; self.inner .topology .update_command_with_read_pref(connection.address(), &mut cmd, op.selection_criteria()) @@ -324,12 +337,22 @@ impl Client { cmd.set_start_transaction(); cmd.set_autocommit(); cmd.set_txn_read_concern(*session)?; + if is_sharded { + session.pin_mongos(connection.address().clone()); + } session.transaction.state = TransactionState::InProgress; } - TransactionState::InProgress - | TransactionState::Committed { .. } - | TransactionState::Aborted => { + TransactionState::InProgress => cmd.set_autocommit(), + TransactionState::Committed { .. } | TransactionState::Aborted => { cmd.set_autocommit(); + + // Append the recovery token to the command if we are committing or aborting + // on a sharded transaction. + if is_sharded { + if let Some(ref recovery_token) = session.transaction.recovery_token { + cmd.set_recovery_token(recovery_token); + } + } } _ => {} } @@ -398,6 +421,9 @@ impl Client { Ok(r) => { self.update_cluster_time(&r, session).await; if r.is_success() { + // Retrieve recovery token from successful response. + Client::update_recovery_token(is_sharded, &r, session).await; + Ok(CommandResult { raw: response, deserialized: r.into_body(), @@ -442,7 +468,15 @@ impl Client { })) } // for ok: 1 just return the original deserialization error. - _ => Err(deserialize_error), + _ => { + Client::update_recovery_token( + is_sharded, + &error_response, + session, + ) + .await; + Err(deserialize_error) + } } } // We failed to deserialize even that, so just return the original @@ -471,13 +505,13 @@ impl Client { handler.handle_command_failed_event(command_failed_event); }); - if let Some(session) = session { + if let Some(ref mut session) = session { if err.is_network_error() { session.mark_dirty(); } } - err.add_labels(Some(connection), session, Some(retryability))?; + err.add_labels_and_update_pin(Some(connection), session, Some(retryability))?; op.handle_error(err) } Ok(response) => { @@ -504,7 +538,11 @@ impl Client { match op.handle_response(response.deserialized, connection.stream_description()?) { Ok(response) => Ok(response), Err(mut err) => { - err.add_labels(Some(connection), session, Some(retryability))?; + err.add_labels_and_update_pin( + Some(connection), + session, + Some(retryability), + )?; Err(err) } } @@ -615,10 +653,22 @@ impl Client { } } } + + async fn update_recovery_token( + is_sharded: bool, + response: &T, + session: &mut Option<&mut ClientSession>, + ) { + if let Some(ref mut session) = session { + if is_sharded && session.in_transaction() { + session.transaction.recovery_token = response.recovery_token().cloned(); + } + } + } } impl Error { - /// Adds the necessary labels to this Error. + /// Adds the necessary labels to this Error, and unpins the session if needed. /// /// A TransientTransactionError label should be added if a transaction is in progress and the /// error is a network or server selection error. @@ -628,10 +678,13 @@ impl Error { /// server version, a label should only be added if the `retry_writes` client option is not set /// to `false`, the operation during which the error occured is write-retryable, and a /// TransientTransactionError label has not already been added. - fn add_labels( + /// + /// If the TransientTransactionError or UnknownTransactionCommitResult labels are added, the + /// ClientSession should be unpinned. + fn add_labels_and_update_pin( &mut self, conn: Option<&Connection>, - session: &Option<&mut ClientSession>, + session: &mut Option<&mut ClientSession>, retryability: Option<&Retryability>, ) -> Result<()> { let transaction_state = session.as_ref().map_or(&TransactionState::None, |session| { @@ -675,6 +728,15 @@ impl Error { } } } + + if let Some(ref mut session) = session { + if self.contains_label(TRANSIENT_TRANSACTION_ERROR) + || self.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) + { + session.unpin_mongos(); + } + } + Ok(()) } } diff --git a/src/client/session/mod.rs b/src/client/session/mod.rs index 9d0d2e47f..a0dfb2666 100644 --- a/src/client/session/mod.rs +++ b/src/client/session/mod.rs @@ -5,6 +5,7 @@ mod test; use std::{ collections::HashSet, + sync::Arc, time::{Duration, Instant}, }; @@ -16,13 +17,16 @@ use crate::{ error::{ErrorKind, Result}, operation::{AbortTransaction, CommitTransaction, Operation}, options::{SessionOptions, TransactionOptions}, - sdam::TransactionSupportStatus, + sdam::{ServerInfo, TransactionSupportStatus}, + selection_criteria::SelectionCriteria, Client, RUNTIME, }; pub(crate) use cluster_time::ClusterTime; pub(super) use pool::ServerSessionPool; +use super::options::ServerAddress; + lazy_static! { pub(crate) static ref SESSIONS_UNSUPPORTED_COMMANDS: HashSet<&'static str> = { let mut hash_set = HashSet::new(); @@ -43,10 +47,10 @@ lazy_static! { /// collections atomically. For more information about when and how to use transactions in MongoDB, /// see the [manual](https://docs.mongodb.com/manual/core/transactions/). /// -/// Replica set transactions are supported on MongoDB 4.0+. Transactions are associated with a -/// `ClientSession`. To begin a transaction, call [`ClientSession::start_transaction`] on a -/// `ClientSession`. The `ClientSession` must be passed to operations to be executed within the -/// transaction. +/// Replica set transactions are supported on MongoDB 4.0+. Sharded transactions are supported on +/// MongoDDB 4.2+. Transactions are associated with a `ClientSession`. To begin a transaction, call +/// [`ClientSession::start_transaction`] on a `ClientSession`. The `ClientSession` must be passed to +/// operations to be executed within the transaction. /// /// ```rust /// use mongodb::{ @@ -95,9 +99,6 @@ lazy_static! { /// } /// } /// ``` -// TODO RUST-122 Remove this note and adjust the above description to indicate that sharded -// transactions are supported on 4.2+ -/// Note: the driver does not currently support transactions on sharded clusters. #[derive(Clone, Debug)] pub struct ClientSession { cluster_time: Option, @@ -113,12 +114,15 @@ pub struct ClientSession { pub(crate) struct Transaction { pub(crate) state: TransactionState, pub(crate) options: Option, + pub(crate) pinned_mongos: Option, + pub(crate) recovery_token: Option, } impl Transaction { pub(crate) fn start(&mut self, options: Option) { self.state = TransactionState::Starting; self.options = options; + self.recovery_token = None; } pub(crate) fn commit(&mut self, data_committed: bool) { @@ -128,11 +132,14 @@ impl Transaction { pub(crate) fn abort(&mut self) { self.state = TransactionState::Aborted; self.options = None; + self.pinned_mongos = None; } pub(crate) fn reset(&mut self) { self.state = TransactionState::None; self.options = None; + self.pinned_mongos = None; + self.recovery_token = None; } } @@ -141,6 +148,8 @@ impl Default for Transaction { Self { state: TransactionState::None, options: None, + pinned_mongos: None, + recovery_token: None, } } } @@ -245,6 +254,17 @@ impl ClientSession { self.server_session.txn_number } + /// Pin mongos to session. + pub(crate) fn pin_mongos(&mut self, address: ServerAddress) { + self.transaction.pinned_mongos = Some(SelectionCriteria::Predicate(Arc::new( + move |server_info: &ServerInfo| *server_info.address() == address, + ))); + } + + pub(crate) fn unpin_mongos(&mut self) { + self.transaction.pinned_mongos = None; + } + /// Whether this session is dirty. #[cfg(test)] pub(crate) fn is_dirty(&self) -> bool { @@ -298,6 +318,9 @@ impl ClientSession { } .into()); } + TransactionState::Committed { .. } => { + self.unpin_mongos(); // Unpin session if previous transaction is committed. + } _ => {} } match self.client.transaction_support_status().await? { @@ -472,7 +495,8 @@ impl ClientSession { .as_ref() .and_then(|options| options.write_concern.as_ref()) .cloned(); - let abort_transaction = AbortTransaction::new(write_concern); + let selection_criteria = self.transaction.pinned_mongos.clone(); + let abort_transaction = AbortTransaction::new(write_concern, selection_criteria); self.transaction.abort(); // Errors returned from running an abortTransaction command should be ignored. let _result = self diff --git a/src/cmap/conn/command.rs b/src/cmap/conn/command.rs index 0160ecde8..1aaa37a5a 100644 --- a/src/cmap/conn/command.rs +++ b/src/cmap/conn/command.rs @@ -41,6 +41,10 @@ impl Command { } } + pub(crate) fn set_recovery_token(&mut self, recovery_token: &Document) { + self.body.insert("recoveryToken", recovery_token); + } + pub(crate) fn set_txn_number(&mut self, txn_number: i64) { self.body.insert("txnNumber", txn_number); } diff --git a/src/concern/test.rs b/src/concern/test.rs index 0f9c0d3af..d5e094cc9 100644 --- a/src/concern/test.rs +++ b/src/concern/test.rs @@ -179,8 +179,7 @@ async fn snapshot_read_concern() { .database(function_name!()) .collection::(function_name!()); - // TODO RUST-122 run this test on sharded clusters - if client.is_replica_set() && client.server_version_gte(4, 0) { + if client.supports_transactions() { let mut session = client.start_session(None).await.unwrap(); let options = TransactionOptions::builder() .read_concern(ReadConcern::snapshot()) diff --git a/src/operation/abort_transaction/mod.rs b/src/operation/abort_transaction/mod.rs index 5c6206edc..60510f301 100644 --- a/src/operation/abort_transaction/mod.rs +++ b/src/operation/abort_transaction/mod.rs @@ -4,17 +4,25 @@ use crate::{ error::Result, operation::{Operation, Retryability}, options::WriteConcern, + selection_criteria::SelectionCriteria, }; use super::{CommandResponse, Response, WriteConcernOnlyBody}; pub(crate) struct AbortTransaction { write_concern: Option, + selection_criteria: Option, } impl AbortTransaction { - pub(crate) fn new(write_concern: Option) -> Self { - Self { write_concern } + pub(crate) fn new( + write_concern: Option, + selection_criteria: Option, + ) -> Self { + Self { + write_concern, + selection_criteria, + } } } @@ -47,6 +55,10 @@ impl Operation for AbortTransaction { response.validate() } + fn selection_criteria(&self) -> Option<&SelectionCriteria> { + self.selection_criteria.as_ref() + } + fn write_concern(&self) -> Option<&WriteConcern> { self.write_concern.as_ref() } @@ -54,4 +66,9 @@ impl Operation for AbortTransaction { fn retryability(&self) -> Retryability { Retryability::Write } + + fn update_for_retry(&mut self) { + // The session must be "unpinned" before server selection for a retry. + self.selection_criteria = None; + } } diff --git a/src/operation/mod.rs b/src/operation/mod.rs index 6c981cbe1..b7f0fa237 100644 --- a/src/operation/mod.rs +++ b/src/operation/mod.rs @@ -153,6 +153,9 @@ pub(crate) trait Response: Sized { /// The `atClusterTime` field of the response. fn at_cluster_time(&self) -> Option; + /// The `recoveryToken` field of the response. + fn recovery_token(&self) -> Option<&Document>; + /// Convert into the body of the response. fn into_body(self) -> Self::Body; } @@ -168,6 +171,8 @@ pub(crate) struct CommandResponse { pub(crate) at_cluster_time: Option, + pub(crate) recovery_token: Option, + #[serde(flatten)] pub(crate) body: T, } @@ -197,6 +202,10 @@ impl Response for CommandResponse { self.at_cluster_time } + fn recovery_token(&self) -> Option<&Document> { + self.recovery_token.as_ref() + } + fn into_body(self) -> Self::Body { self.body } @@ -229,6 +238,10 @@ impl Response for CursorResponse { self.response.body.cursor.at_cluster_time } + fn recovery_token(&self) -> Option<&Document> { + self.response.recovery_token() + } + fn into_body(self) -> Self::Body { self.response.body } diff --git a/src/operation/run_command/mod.rs b/src/operation/run_command/mod.rs index fbe8a72dc..a540ccc86 100644 --- a/src/operation/run_command/mod.rs +++ b/src/operation/run_command/mod.rs @@ -96,6 +96,7 @@ impl Operation for RunCommand { pub(crate) struct Response { doc: Document, cluster_time: Option, + recovery_token: Option, } impl super::Response for Response { @@ -109,7 +110,13 @@ impl super::Response for Response { .ok() .and_then(|doc| bson::from_document(doc.clone()).ok()); - Ok(Self { doc, cluster_time }) + let recovery_token = doc.get_document("recoveryToken").ok().cloned(); + + Ok(Self { + doc, + cluster_time, + recovery_token, + }) } fn ok(&self) -> Option<&Bson> { @@ -131,6 +138,10 @@ impl super::Response for Response { .ok() } + fn recovery_token(&self) -> Option<&Document> { + self.recovery_token.as_ref() + } + fn into_body(self) -> Self::Body { self.doc } diff --git a/src/sdam/description/topology/mod.rs b/src/sdam/description/topology/mod.rs index 7e98b5d74..3870aef47 100644 --- a/src/sdam/description/topology/mod.rs +++ b/src/sdam/description/topology/mod.rs @@ -326,21 +326,13 @@ impl TopologyDescription { self.transaction_support_status = TransactionSupportStatus::Unsupported; } if let Ok(Some(max_wire_version)) = server_description.max_wire_version() { - match self.topology_type { - TopologyType::Sharded => { - // TODO RUST-122: support transactions on sharded clusters - self.transaction_support_status = TransactionSupportStatus::Unsupported; - } - _ => { - if max_wire_version < 7 { - self.transaction_support_status = TransactionSupportStatus::Unsupported; - } else { - self.transaction_support_status = TransactionSupportStatus::Supported; - } - } + self.transaction_support_status = if max_wire_version < 7 + || (max_wire_version < 8 && self.topology_type == TopologyType::Sharded) + { + TransactionSupportStatus::Unsupported + } else { + TransactionSupportStatus::Supported } - } else { - self.transaction_support_status = TransactionSupportStatus::Unsupported; } } @@ -748,8 +740,8 @@ pub(crate) enum TransactionSupportStatus { Unsupported, /// Transactions are supported by this topology. A topology supports transactions if it - /// supports sessions and its maxWireVersion >= 7. Transactions are not currently supported - /// on sharded clusters (TODO RUST-122). + /// supports sessions and its maxWireVersion >= 7. If the topology is sharded, maxWireVersion + /// must be >= 8 for transactions to be supported. /// /// Note that meeting these conditions does not guarantee that a deployment /// supports transactions; any other missing qualification will be reported by the server. diff --git a/src/sync/test.rs b/src/sync/test.rs index ab8e1c4bf..3236044d9 100644 --- a/src/sync/test.rs +++ b/src/sync/test.rs @@ -202,8 +202,7 @@ fn transactions() { let should_skip = RUNTIME.block_on(async { let test_client = AsyncTestClient::new().await; - // TODO RUST-122: Unskip this test on sharded clusters - !test_client.is_replica_set() || test_client.server_version_lt(4, 0) + !test_client.supports_transactions() }); if should_skip { return; diff --git a/src/test/spec/json/transactions/abort.json b/src/test/spec/json/transactions/legacy/abort.json similarity index 100% rename from src/test/spec/json/transactions/abort.json rename to src/test/spec/json/transactions/legacy/abort.json diff --git a/src/test/spec/json/transactions/abort.yml b/src/test/spec/json/transactions/legacy/abort.yml similarity index 100% rename from src/test/spec/json/transactions/abort.yml rename to src/test/spec/json/transactions/legacy/abort.yml diff --git a/src/test/spec/json/transactions/bulk.json b/src/test/spec/json/transactions/legacy/bulk.json similarity index 100% rename from src/test/spec/json/transactions/bulk.json rename to src/test/spec/json/transactions/legacy/bulk.json diff --git a/src/test/spec/json/transactions/bulk.yml b/src/test/spec/json/transactions/legacy/bulk.yml similarity index 100% rename from src/test/spec/json/transactions/bulk.yml rename to src/test/spec/json/transactions/legacy/bulk.yml diff --git a/src/test/spec/json/transactions/causal-consistency.json b/src/test/spec/json/transactions/legacy/causal-consistency.json similarity index 100% rename from src/test/spec/json/transactions/causal-consistency.json rename to src/test/spec/json/transactions/legacy/causal-consistency.json diff --git a/src/test/spec/json/transactions/causal-consistency.yml b/src/test/spec/json/transactions/legacy/causal-consistency.yml similarity index 100% rename from src/test/spec/json/transactions/causal-consistency.yml rename to src/test/spec/json/transactions/legacy/causal-consistency.yml diff --git a/src/test/spec/json/transactions/commit.json b/src/test/spec/json/transactions/legacy/commit.json similarity index 100% rename from src/test/spec/json/transactions/commit.json rename to src/test/spec/json/transactions/legacy/commit.json diff --git a/src/test/spec/json/transactions/commit.yml b/src/test/spec/json/transactions/legacy/commit.yml similarity index 100% rename from src/test/spec/json/transactions/commit.yml rename to src/test/spec/json/transactions/legacy/commit.yml diff --git a/src/test/spec/json/transactions/count.json b/src/test/spec/json/transactions/legacy/count.json similarity index 100% rename from src/test/spec/json/transactions/count.json rename to src/test/spec/json/transactions/legacy/count.json diff --git a/src/test/spec/json/transactions/count.yml b/src/test/spec/json/transactions/legacy/count.yml similarity index 100% rename from src/test/spec/json/transactions/count.yml rename to src/test/spec/json/transactions/legacy/count.yml diff --git a/src/test/spec/json/transactions/create-collection.json b/src/test/spec/json/transactions/legacy/create-collection.json similarity index 100% rename from src/test/spec/json/transactions/create-collection.json rename to src/test/spec/json/transactions/legacy/create-collection.json diff --git a/src/test/spec/json/transactions/create-collection.yml b/src/test/spec/json/transactions/legacy/create-collection.yml similarity index 100% rename from src/test/spec/json/transactions/create-collection.yml rename to src/test/spec/json/transactions/legacy/create-collection.yml diff --git a/src/test/spec/json/transactions/create-index.json b/src/test/spec/json/transactions/legacy/create-index.json similarity index 100% rename from src/test/spec/json/transactions/create-index.json rename to src/test/spec/json/transactions/legacy/create-index.json diff --git a/src/test/spec/json/transactions/create-index.yml b/src/test/spec/json/transactions/legacy/create-index.yml similarity index 100% rename from src/test/spec/json/transactions/create-index.yml rename to src/test/spec/json/transactions/legacy/create-index.yml diff --git a/src/test/spec/json/transactions/delete.json b/src/test/spec/json/transactions/legacy/delete.json similarity index 100% rename from src/test/spec/json/transactions/delete.json rename to src/test/spec/json/transactions/legacy/delete.json diff --git a/src/test/spec/json/transactions/delete.yml b/src/test/spec/json/transactions/legacy/delete.yml similarity index 100% rename from src/test/spec/json/transactions/delete.yml rename to src/test/spec/json/transactions/legacy/delete.yml diff --git a/src/test/spec/json/transactions/error-labels.json b/src/test/spec/json/transactions/legacy/error-labels.json similarity index 100% rename from src/test/spec/json/transactions/error-labels.json rename to src/test/spec/json/transactions/legacy/error-labels.json diff --git a/src/test/spec/json/transactions/error-labels.yml b/src/test/spec/json/transactions/legacy/error-labels.yml similarity index 100% rename from src/test/spec/json/transactions/error-labels.yml rename to src/test/spec/json/transactions/legacy/error-labels.yml diff --git a/src/test/spec/json/transactions/errors.json b/src/test/spec/json/transactions/legacy/errors.json similarity index 100% rename from src/test/spec/json/transactions/errors.json rename to src/test/spec/json/transactions/legacy/errors.json diff --git a/src/test/spec/json/transactions/errors.yml b/src/test/spec/json/transactions/legacy/errors.yml similarity index 100% rename from src/test/spec/json/transactions/errors.yml rename to src/test/spec/json/transactions/legacy/errors.yml diff --git a/src/test/spec/json/transactions/findOneAndDelete.json b/src/test/spec/json/transactions/legacy/findOneAndDelete.json similarity index 100% rename from src/test/spec/json/transactions/findOneAndDelete.json rename to src/test/spec/json/transactions/legacy/findOneAndDelete.json diff --git a/src/test/spec/json/transactions/findOneAndDelete.yml b/src/test/spec/json/transactions/legacy/findOneAndDelete.yml similarity index 100% rename from src/test/spec/json/transactions/findOneAndDelete.yml rename to src/test/spec/json/transactions/legacy/findOneAndDelete.yml diff --git a/src/test/spec/json/transactions/findOneAndReplace.json b/src/test/spec/json/transactions/legacy/findOneAndReplace.json similarity index 100% rename from src/test/spec/json/transactions/findOneAndReplace.json rename to src/test/spec/json/transactions/legacy/findOneAndReplace.json diff --git a/src/test/spec/json/transactions/findOneAndReplace.yml b/src/test/spec/json/transactions/legacy/findOneAndReplace.yml similarity index 100% rename from src/test/spec/json/transactions/findOneAndReplace.yml rename to src/test/spec/json/transactions/legacy/findOneAndReplace.yml diff --git a/src/test/spec/json/transactions/findOneAndUpdate.json b/src/test/spec/json/transactions/legacy/findOneAndUpdate.json similarity index 100% rename from src/test/spec/json/transactions/findOneAndUpdate.json rename to src/test/spec/json/transactions/legacy/findOneAndUpdate.json diff --git a/src/test/spec/json/transactions/findOneAndUpdate.yml b/src/test/spec/json/transactions/legacy/findOneAndUpdate.yml similarity index 100% rename from src/test/spec/json/transactions/findOneAndUpdate.yml rename to src/test/spec/json/transactions/legacy/findOneAndUpdate.yml diff --git a/src/test/spec/json/transactions/insert.json b/src/test/spec/json/transactions/legacy/insert.json similarity index 100% rename from src/test/spec/json/transactions/insert.json rename to src/test/spec/json/transactions/legacy/insert.json diff --git a/src/test/spec/json/transactions/insert.yml b/src/test/spec/json/transactions/legacy/insert.yml similarity index 100% rename from src/test/spec/json/transactions/insert.yml rename to src/test/spec/json/transactions/legacy/insert.yml diff --git a/src/test/spec/json/transactions/isolation.json b/src/test/spec/json/transactions/legacy/isolation.json similarity index 100% rename from src/test/spec/json/transactions/isolation.json rename to src/test/spec/json/transactions/legacy/isolation.json diff --git a/src/test/spec/json/transactions/isolation.yml b/src/test/spec/json/transactions/legacy/isolation.yml similarity index 100% rename from src/test/spec/json/transactions/isolation.yml rename to src/test/spec/json/transactions/legacy/isolation.yml diff --git a/src/test/spec/json/transactions/mongos-pin-auto.json b/src/test/spec/json/transactions/legacy/mongos-pin-auto.json similarity index 100% rename from src/test/spec/json/transactions/mongos-pin-auto.json rename to src/test/spec/json/transactions/legacy/mongos-pin-auto.json diff --git a/src/test/spec/json/transactions/mongos-pin-auto.yml b/src/test/spec/json/transactions/legacy/mongos-pin-auto.yml similarity index 100% rename from src/test/spec/json/transactions/mongos-pin-auto.yml rename to src/test/spec/json/transactions/legacy/mongos-pin-auto.yml diff --git a/src/test/spec/json/transactions/mongos-recovery-token.json b/src/test/spec/json/transactions/legacy/mongos-recovery-token.json similarity index 100% rename from src/test/spec/json/transactions/mongos-recovery-token.json rename to src/test/spec/json/transactions/legacy/mongos-recovery-token.json diff --git a/src/test/spec/json/transactions/mongos-recovery-token.yml b/src/test/spec/json/transactions/legacy/mongos-recovery-token.yml similarity index 100% rename from src/test/spec/json/transactions/mongos-recovery-token.yml rename to src/test/spec/json/transactions/legacy/mongos-recovery-token.yml diff --git a/src/test/spec/json/transactions/pin-mongos.json b/src/test/spec/json/transactions/legacy/pin-mongos.json similarity index 100% rename from src/test/spec/json/transactions/pin-mongos.json rename to src/test/spec/json/transactions/legacy/pin-mongos.json diff --git a/src/test/spec/json/transactions/pin-mongos.yml b/src/test/spec/json/transactions/legacy/pin-mongos.yml similarity index 100% rename from src/test/spec/json/transactions/pin-mongos.yml rename to src/test/spec/json/transactions/legacy/pin-mongos.yml diff --git a/src/test/spec/json/transactions/read-concern.json b/src/test/spec/json/transactions/legacy/read-concern.json similarity index 100% rename from src/test/spec/json/transactions/read-concern.json rename to src/test/spec/json/transactions/legacy/read-concern.json diff --git a/src/test/spec/json/transactions/read-concern.yml b/src/test/spec/json/transactions/legacy/read-concern.yml similarity index 100% rename from src/test/spec/json/transactions/read-concern.yml rename to src/test/spec/json/transactions/legacy/read-concern.yml diff --git a/src/test/spec/json/transactions/read-pref.json b/src/test/spec/json/transactions/legacy/read-pref.json similarity index 100% rename from src/test/spec/json/transactions/read-pref.json rename to src/test/spec/json/transactions/legacy/read-pref.json diff --git a/src/test/spec/json/transactions/read-pref.yml b/src/test/spec/json/transactions/legacy/read-pref.yml similarity index 100% rename from src/test/spec/json/transactions/read-pref.yml rename to src/test/spec/json/transactions/legacy/read-pref.yml diff --git a/src/test/spec/json/transactions/reads.json b/src/test/spec/json/transactions/legacy/reads.json similarity index 100% rename from src/test/spec/json/transactions/reads.json rename to src/test/spec/json/transactions/legacy/reads.json diff --git a/src/test/spec/json/transactions/reads.yml b/src/test/spec/json/transactions/legacy/reads.yml similarity index 100% rename from src/test/spec/json/transactions/reads.yml rename to src/test/spec/json/transactions/legacy/reads.yml diff --git a/src/test/spec/json/transactions/retryable-abort-errorLabels.json b/src/test/spec/json/transactions/legacy/retryable-abort-errorLabels.json similarity index 100% rename from src/test/spec/json/transactions/retryable-abort-errorLabels.json rename to src/test/spec/json/transactions/legacy/retryable-abort-errorLabels.json diff --git a/src/test/spec/json/transactions/retryable-abort-errorLabels.yml b/src/test/spec/json/transactions/legacy/retryable-abort-errorLabels.yml similarity index 100% rename from src/test/spec/json/transactions/retryable-abort-errorLabels.yml rename to src/test/spec/json/transactions/legacy/retryable-abort-errorLabels.yml diff --git a/src/test/spec/json/transactions/retryable-abort.json b/src/test/spec/json/transactions/legacy/retryable-abort.json similarity index 100% rename from src/test/spec/json/transactions/retryable-abort.json rename to src/test/spec/json/transactions/legacy/retryable-abort.json diff --git a/src/test/spec/json/transactions/retryable-abort.yml b/src/test/spec/json/transactions/legacy/retryable-abort.yml similarity index 100% rename from src/test/spec/json/transactions/retryable-abort.yml rename to src/test/spec/json/transactions/legacy/retryable-abort.yml diff --git a/src/test/spec/json/transactions/retryable-commit-errorLabels.json b/src/test/spec/json/transactions/legacy/retryable-commit-errorLabels.json similarity index 100% rename from src/test/spec/json/transactions/retryable-commit-errorLabels.json rename to src/test/spec/json/transactions/legacy/retryable-commit-errorLabels.json diff --git a/src/test/spec/json/transactions/retryable-commit-errorLabels.yml b/src/test/spec/json/transactions/legacy/retryable-commit-errorLabels.yml similarity index 100% rename from src/test/spec/json/transactions/retryable-commit-errorLabels.yml rename to src/test/spec/json/transactions/legacy/retryable-commit-errorLabels.yml diff --git a/src/test/spec/json/transactions/retryable-commit.json b/src/test/spec/json/transactions/legacy/retryable-commit.json similarity index 100% rename from src/test/spec/json/transactions/retryable-commit.json rename to src/test/spec/json/transactions/legacy/retryable-commit.json diff --git a/src/test/spec/json/transactions/retryable-commit.yml b/src/test/spec/json/transactions/legacy/retryable-commit.yml similarity index 100% rename from src/test/spec/json/transactions/retryable-commit.yml rename to src/test/spec/json/transactions/legacy/retryable-commit.yml diff --git a/src/test/spec/json/transactions/retryable-writes.json b/src/test/spec/json/transactions/legacy/retryable-writes.json similarity index 100% rename from src/test/spec/json/transactions/retryable-writes.json rename to src/test/spec/json/transactions/legacy/retryable-writes.json diff --git a/src/test/spec/json/transactions/retryable-writes.yml b/src/test/spec/json/transactions/legacy/retryable-writes.yml similarity index 100% rename from src/test/spec/json/transactions/retryable-writes.yml rename to src/test/spec/json/transactions/legacy/retryable-writes.yml diff --git a/src/test/spec/json/transactions/run-command.json b/src/test/spec/json/transactions/legacy/run-command.json similarity index 100% rename from src/test/spec/json/transactions/run-command.json rename to src/test/spec/json/transactions/legacy/run-command.json diff --git a/src/test/spec/json/transactions/run-command.yml b/src/test/spec/json/transactions/legacy/run-command.yml similarity index 100% rename from src/test/spec/json/transactions/run-command.yml rename to src/test/spec/json/transactions/legacy/run-command.yml diff --git a/src/test/spec/json/transactions/transaction-options-repl.json b/src/test/spec/json/transactions/legacy/transaction-options-repl.json similarity index 100% rename from src/test/spec/json/transactions/transaction-options-repl.json rename to src/test/spec/json/transactions/legacy/transaction-options-repl.json diff --git a/src/test/spec/json/transactions/transaction-options-repl.yml b/src/test/spec/json/transactions/legacy/transaction-options-repl.yml similarity index 100% rename from src/test/spec/json/transactions/transaction-options-repl.yml rename to src/test/spec/json/transactions/legacy/transaction-options-repl.yml diff --git a/src/test/spec/json/transactions/transaction-options.json b/src/test/spec/json/transactions/legacy/transaction-options.json similarity index 100% rename from src/test/spec/json/transactions/transaction-options.json rename to src/test/spec/json/transactions/legacy/transaction-options.json diff --git a/src/test/spec/json/transactions/transaction-options.yml b/src/test/spec/json/transactions/legacy/transaction-options.yml similarity index 100% rename from src/test/spec/json/transactions/transaction-options.yml rename to src/test/spec/json/transactions/legacy/transaction-options.yml diff --git a/src/test/spec/json/transactions/update.json b/src/test/spec/json/transactions/legacy/update.json similarity index 100% rename from src/test/spec/json/transactions/update.json rename to src/test/spec/json/transactions/legacy/update.json diff --git a/src/test/spec/json/transactions/update.yml b/src/test/spec/json/transactions/legacy/update.yml similarity index 100% rename from src/test/spec/json/transactions/update.yml rename to src/test/spec/json/transactions/legacy/update.yml diff --git a/src/test/spec/json/transactions/write-concern.json b/src/test/spec/json/transactions/legacy/write-concern.json similarity index 100% rename from src/test/spec/json/transactions/write-concern.json rename to src/test/spec/json/transactions/legacy/write-concern.json diff --git a/src/test/spec/json/transactions/write-concern.yml b/src/test/spec/json/transactions/legacy/write-concern.yml similarity index 100% rename from src/test/spec/json/transactions/write-concern.yml rename to src/test/spec/json/transactions/legacy/write-concern.yml diff --git a/src/test/spec/json/transactions/mongos-pin-auto-tests.py b/src/test/spec/json/transactions/mongos-pin-auto-tests.py deleted file mode 100644 index b035c4368..000000000 --- a/src/test/spec/json/transactions/mongos-pin-auto-tests.py +++ /dev/null @@ -1,337 +0,0 @@ -import itertools -import sys - -# Require Python 3.7+ for ordered dictionaries so that the order of the -# generated tests remain the same. -# Usage: -# python3.7 mongos-pin-auto-tests.py > mongos-pin-auto.yml -if sys.version_info[:2] < (3, 7): - print('ERROR: This script requires Python >= 3.7, not:') - print(sys.version) - print('Usage: python3.7 mongos-pin-auto-tests.py > mongos-pin-auto.yml') - exit(1) - -HEADER = '''# Autogenerated tests that transient errors in a transaction unpin the session. -# See mongos-pin-auto-tests.py -runOn: - - - minServerVersion: "4.1.8" - topology: ["sharded"] - -database_name: &database_name "transaction-tests" -collection_name: &collection_name "test" - -data: &data - - {_id: 1} - - {_id: 2} - -tests: - - description: remain pinned after non-transient Interrupted error on insertOne - useMultipleMongoses: true - operations: - - &startTransaction - name: startTransaction - object: session0 - - &initialCommand - name: insertOne - object: collection - arguments: - session: session0 - document: {_id: 3} - result: - insertedId: 3 - - name: targetedFailPoint - object: testRunner - arguments: - session: session0 - failPoint: - configureFailPoint: failCommand - mode: {times: 1} - data: - failCommands: ["insert"] - errorCode: 11601 - - name: insertOne - object: collection - arguments: - session: session0 - document: - _id: 4 - result: - errorLabelsOmit: ["TransientTransactionError", "UnknownTransactionCommitResult"] - errorCodeName: Interrupted - - &assertSessionPinned - name: assertSessionPinned - object: testRunner - arguments: - session: session0 - - &commitTransaction - name: commitTransaction - object: session0 - - expectations: - - command_started_event: - command: - insert: *collection_name - documents: - - _id: 3 - ordered: true - readConcern: - lsid: session0 - txnNumber: - $numberLong: "1" - startTransaction: true - autocommit: false - writeConcern: - command_name: insert - database_name: *database_name - - command_started_event: - command: - insert: *collection_name - documents: - - _id: 4 - ordered: true - readConcern: - lsid: session0 - txnNumber: - $numberLong: "1" - startTransaction: - autocommit: false - writeConcern: - command_name: insert - database_name: *database_name - - command_started_event: - command: - commitTransaction: 1 - lsid: session0 - txnNumber: - $numberLong: "1" - startTransaction: - autocommit: false - writeConcern: - recoveryToken: 42 - command_name: commitTransaction - database_name: admin - - outcome: &outcome - collection: - data: - - {_id: 1} - - {_id: 2} - - {_id: 3} - - - description: unpin after transient error within a transaction - useMultipleMongoses: true - operations: - - &startTransaction - name: startTransaction - object: session0 - - &initialCommand - name: insertOne - object: collection - arguments: - session: session0 - document: - _id: 3 - result: - insertedId: 3 - - name: targetedFailPoint - object: testRunner - arguments: - session: session0 - failPoint: - configureFailPoint: failCommand - mode: { times: 1 } - data: - failCommands: ["insert"] - closeConnection: true - - name: insertOne - object: collection - arguments: - session: session0 - document: - _id: 4 - result: - errorLabelsContain: ["TransientTransactionError"] - errorLabelsOmit: ["UnknownTransactionCommitResult"] - # Session unpins from the first mongos after the insert error and - # abortTransaction succeeds immediately on any mongos. - - &assertSessionUnpinned - name: assertSessionUnpinned - object: testRunner - arguments: - session: session0 - - &abortTransaction - name: abortTransaction - object: session0 - - expectations: - - command_started_event: - command: - insert: *collection_name - documents: - - _id: 3 - ordered: true - readConcern: - lsid: session0 - txnNumber: - $numberLong: "1" - startTransaction: true - autocommit: false - writeConcern: - command_name: insert - database_name: *database_name - - command_started_event: - command: - insert: *collection_name - documents: - - _id: 4 - ordered: true - readConcern: - lsid: session0 - txnNumber: - $numberLong: "1" - startTransaction: - autocommit: false - writeConcern: - command_name: insert - database_name: *database_name - - command_started_event: - command: - abortTransaction: 1 - lsid: session0 - txnNumber: - $numberLong: "1" - startTransaction: - autocommit: false - writeConcern: - recoveryToken: 42 - command_name: abortTransaction - database_name: admin - - outcome: &outcome - collection: - data: *data - - # The rest of the tests in this file test every operation type against - # multiple types of transient errors (connection and error code).''' - -TEMPLATE = ''' - - description: {test_name} {error_name} error on {op_name} {command_name} - useMultipleMongoses: true - operations: - - *startTransaction - - *initialCommand - - name: targetedFailPoint - object: testRunner - arguments: - session: session0 - failPoint: - configureFailPoint: failCommand - mode: {{times: 1}} - data: - failCommands: ["{command_name}"] - {error_data} - - name: {op_name} - object: {object_name} - arguments: - session: session0 - {op_args} - result: - {error_labels}: ["TransientTransactionError"] - - *{assertion} - - *abortTransaction - outcome: *outcome -''' - - -# Maps from op_name to (command_name, object_name, op_args) -OPS = { - # Write ops: - 'insertOne': ('insert', 'collection', r'document: {_id: 4}'), - 'insertMany': ('insert', 'collection', r'documents: [{_id: 4}, {_id: 5}]'), - 'updateOne': ('update', 'collection', r'''filter: {_id: 1} - update: {$inc: {x: 1}}'''), - 'replaceOne': ('update', 'collection', r'''filter: {_id: 1} - replacement: {y: 1}'''), - 'updateMany': ('update', 'collection', r'''filter: {_id: {$gte: 1}} - update: {$set: {z: 1}}'''), - 'deleteOne': ('delete', 'collection', r'filter: {_id: 1}'), - 'deleteMany': ('delete', 'collection', r'filter: {_id: {$gte: 1}}'), - 'findOneAndDelete': ('findAndModify', 'collection', r'filter: {_id: 1}'), - 'findOneAndUpdate': ('findAndModify', 'collection', r'''filter: {_id: 1} - update: {$inc: {x: 1}} - returnDocument: Before'''), - 'findOneAndReplace': ('findAndModify', 'collection', r'''filter: {_id: 1} - replacement: {y: 1} - returnDocument: Before'''), - # Bulk write insert/update/delete: - 'bulkWrite insert': ('insert', 'collection', r'''requests: - - name: insertOne - arguments: - document: {_id: 1}'''), - 'bulkWrite update': ('update', 'collection', r'''requests: - - name: updateOne - arguments: - filter: {_id: 1} - update: {$set: {x: 1}}'''), - 'bulkWrite delete': ('delete', 'collection', r'''requests: - - name: deleteOne - arguments: - filter: {_id: 1}'''), - # Read ops: - 'find': ('find', 'collection', r'filter: {_id: 1}'), - 'countDocuments': ('aggregate', 'collection', r'filter: {}'), - 'aggregate': ('aggregate', 'collection', r'pipeline: []'), - 'distinct': ('distinct', 'collection', r'fieldName: _id'), - # runCommand: - 'runCommand': ( - 'insert', - r'''database - command_name: insert''', # runCommand requires command_name. - r'''command: - insert: *collection_name - documents: - - _id : 1'''), -} - -# Maps from error_name to error_data. -NON_TRANSIENT_ERRORS = { - 'Interrupted': 'errorCode: 11601', -} - -# Maps from error_name to error_data. -TRANSIENT_ERRORS = { - 'connection': 'closeConnection: true', - 'ShutdownInProgress': 'errorCode: 91', -} - - -def create_pin_test(op_name, error_name): - test_name = 'remain pinned after non-transient' - assertion = 'assertSessionPinned' - error_labels = 'errorLabelsOmit' - command_name, object_name, op_args = OPS[op_name] - error_data = NON_TRANSIENT_ERRORS[error_name] - if op_name.startswith('bulkWrite'): - op_name = 'bulkWrite' - return TEMPLATE.format(**locals()) - - -def create_unpin_test(op_name, error_name): - test_name = 'unpin after transient' - assertion = 'assertSessionUnpinned' - error_labels = 'errorLabelsContain' - command_name, object_name, op_args = OPS[op_name] - error_data = TRANSIENT_ERRORS[error_name] - if op_name.startswith('bulkWrite'): - op_name = 'bulkWrite' - return TEMPLATE.format(**locals()) - -tests = [] -for op_name, error_name in itertools.product(OPS, NON_TRANSIENT_ERRORS): - tests.append(create_pin_test(op_name, error_name)) -for op_name, error_name in itertools.product(OPS, TRANSIENT_ERRORS): - tests.append(create_unpin_test(op_name, error_name)) - -print(HEADER) -print(''.join(tests)) diff --git a/src/test/spec/json/transactions/unified/mongos-unpin.json b/src/test/spec/json/transactions/unified/mongos-unpin.json new file mode 100644 index 000000000..4f7ae4379 --- /dev/null +++ b/src/test/spec/json/transactions/unified/mongos-unpin.json @@ -0,0 +1,437 @@ +{ + "description": "mongos-unpin", + "schemaVersion": "1.4", + "runOnRequirements": [ + { + "minServerVersion": "4.2", + "topologies": [ + "sharded-replicaset" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "useMultipleMongoses": true + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "mongos-unpin-db" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "test" + } + }, + { + "session": { + "id": "session0", + "client": "client0" + } + } + ], + "initialData": [ + { + "collectionName": "test", + "databaseName": "mongos-unpin-db", + "documents": [] + } + ], + "_yamlAnchors": { + "anchors": 24 + }, + "tests": [ + { + "description": "unpin after TransientTransactionError error on commit", + "runOnRequirements": [ + { + "serverless": "forbid" + } + ], + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "x": 1 + }, + "session": "session0" + } + }, + { + "name": "targetedFailPoint", + "object": "testRunner", + "arguments": { + "session": "session0", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "commitTransaction" + ], + "errorCode": 24 + } + } + } + }, + { + "name": "commitTransaction", + "object": "session0", + "expectError": { + "errorCode": 24, + "errorLabelsContain": [ + "TransientTransactionError" + ], + "errorLabelsOmit": [ + "UnknownTransactionCommitResult" + ] + } + }, + { + "name": "assertSessionUnpinned", + "object": "testRunner", + "arguments": { + "session": "session0" + } + }, + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "x": 1 + }, + "session": "session0" + } + }, + { + "name": "abortTransaction", + "object": "session0" + } + ] + }, + { + "description": "unpin on successful abort", + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "x": 1 + }, + "session": "session0" + } + }, + { + "name": "abortTransaction", + "object": "session0" + }, + { + "name": "assertSessionUnpinned", + "object": "testRunner", + "arguments": { + "session": "session0" + } + } + ] + }, + { + "description": "unpin after non-transient error on abort", + "runOnRequirements": [ + { + "serverless": "forbid" + } + ], + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "x": 1 + }, + "session": "session0" + } + }, + { + "name": "targetedFailPoint", + "object": "testRunner", + "arguments": { + "session": "session0", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "abortTransaction" + ], + "errorCode": 24 + } + } + } + }, + { + "name": "abortTransaction", + "object": "session0" + }, + { + "name": "assertSessionUnpinned", + "object": "testRunner", + "arguments": { + "session": "session0" + } + }, + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "x": 1 + }, + "session": "session0" + } + }, + { + "name": "abortTransaction", + "object": "session0" + } + ] + }, + { + "description": "unpin after TransientTransactionError error on abort", + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "x": 1 + }, + "session": "session0" + } + }, + { + "name": "targetedFailPoint", + "object": "testRunner", + "arguments": { + "session": "session0", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "abortTransaction" + ], + "errorCode": 91 + } + } + } + }, + { + "name": "abortTransaction", + "object": "session0" + }, + { + "name": "assertSessionUnpinned", + "object": "testRunner", + "arguments": { + "session": "session0" + } + }, + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "x": 1 + }, + "session": "session0" + } + }, + { + "name": "abortTransaction", + "object": "session0" + } + ] + }, + { + "description": "unpin when a new transaction is started", + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "x": 1 + }, + "session": "session0" + } + }, + { + "name": "commitTransaction", + "object": "session0" + }, + { + "name": "assertSessionPinned", + "object": "testRunner", + "arguments": { + "session": "session0" + } + }, + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "assertSessionUnpinned", + "object": "testRunner", + "arguments": { + "session": "session0" + } + } + ] + }, + { + "description": "unpin when a non-transaction write operation uses a session", + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "x": 1 + }, + "session": "session0" + } + }, + { + "name": "commitTransaction", + "object": "session0" + }, + { + "name": "assertSessionPinned", + "object": "testRunner", + "arguments": { + "session": "session0" + } + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "x": 1 + }, + "session": "session0" + } + }, + { + "name": "assertSessionUnpinned", + "object": "testRunner", + "arguments": { + "session": "session0" + } + } + ] + }, + { + "description": "unpin when a non-transaction read operation uses a session", + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "x": 1 + }, + "session": "session0" + } + }, + { + "name": "commitTransaction", + "object": "session0" + }, + { + "name": "assertSessionPinned", + "object": "testRunner", + "arguments": { + "session": "session0" + } + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "filter": { + "x": 1 + }, + "session": "session0" + } + }, + { + "name": "assertSessionUnpinned", + "object": "testRunner", + "arguments": { + "session": "session0" + } + } + ] + } + ] +} diff --git a/src/test/spec/json/transactions/unified/mongos-unpin.yml b/src/test/spec/json/transactions/unified/mongos-unpin.yml new file mode 100644 index 000000000..c13798aca --- /dev/null +++ b/src/test/spec/json/transactions/unified/mongos-unpin.yml @@ -0,0 +1,172 @@ +description: mongos-unpin + +schemaVersion: '1.4' + +runOnRequirements: + - minServerVersion: '4.2' + topologies: [ sharded-replicaset ] + +createEntities: + - client: + id: &client0 client0 + useMultipleMongoses: true + - database: + id: &database0 database0 + client: *client0 + databaseName: &database0Name mongos-unpin-db + - collection: + id: &collection0 collection0 + database: *database0 + collectionName: &collection0Name test + - session: + id: &session0 session0 + client: *client0 + +initialData: + - collectionName: *collection0Name + databaseName: *database0Name + documents: [] + +_yamlAnchors: + anchors: + # LockTimeout will cause the server to add a TransientTransactionError label. It is not retryable. + &lockTimeoutErrorCode 24 + +tests: + - description: unpin after TransientTransactionError error on commit + runOnRequirements: + # serverless proxy doesn't append error labels to errors in transactions + # caused by failpoints (CLOUDP-88216) + - serverless: "forbid" + operations: + - &startTransaction + name: startTransaction + object: *session0 + - &insertOne + name: insertOne + object: *collection0 + arguments: + document: { x: 1 } + session: *session0 + - name: targetedFailPoint + object: testRunner + arguments: + session: *session0 + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: [ commitTransaction ] + errorCode: *lockTimeoutErrorCode + - name: commitTransaction + object: *session0 + expectError: + # LockTimeout is not retryable, so the commit fails. + errorCode: *lockTimeoutErrorCode + errorLabelsContain: [ TransientTransactionError ] + errorLabelsOmit: [ UnknownTransactionCommitResult ] + - &assertNoPinnedServer + name: assertSessionUnpinned + object: testRunner + arguments: + session: *session0 + # Cleanup the potentionally open server transaction by starting and + # aborting a new transaction on the same session. + - *startTransaction + - *insertOne + - &abortTransaction + name: abortTransaction + object: *session0 + + - description: unpin on successful abort + operations: + - *startTransaction + - *insertOne + - *abortTransaction + - *assertNoPinnedServer + + - description: unpin after non-transient error on abort + runOnRequirements: + # serverless proxy doesn't append error labels to errors in transactions + # caused by failpoints (CLOUDP-88216) + - serverless: "forbid" + operations: + - *startTransaction + - *insertOne + - name: targetedFailPoint + object: testRunner + arguments: + session: *session0 + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: [ abortTransaction ] + errorCode: *lockTimeoutErrorCode + - *abortTransaction + - *assertNoPinnedServer + # Cleanup the potentionally open server transaction by starting and + # aborting a new transaction on the same session. + - *startTransaction + - *insertOne + - *abortTransaction + + - description: unpin after TransientTransactionError error on abort + operations: + - *startTransaction + - *insertOne + - name: targetedFailPoint + object: testRunner + arguments: + session: *session0 + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: [ abortTransaction ] + errorCode: 91 # ShutdownInProgress + - *abortTransaction + - *assertNoPinnedServer + # Cleanup the potentionally open server transaction by starting and + # aborting a new transaction on the same session. + - *startTransaction + - *insertOne + - *abortTransaction + + - description: unpin when a new transaction is started + operations: + - *startTransaction + - *insertOne + - name: commitTransaction + object: *session0 + - &assertPinnedServer + name: assertSessionPinned + object: testRunner + arguments: + session: *session0 + - *startTransaction + - *assertNoPinnedServer + + - description: unpin when a non-transaction write operation uses a session + operations: + - *startTransaction + - *insertOne + - name: commitTransaction + object: *session0 + - *assertPinnedServer + - *insertOne + - *assertNoPinnedServer + + - description: unpin when a non-transaction read operation uses a session + operations: + - *startTransaction + - *insertOne + - name: commitTransaction + object: *session0 + - *assertPinnedServer + - name: find + object: *collection0 + arguments: + filter: { x: 1 } + session: *session0 + - *assertNoPinnedServer diff --git a/src/test/spec/transactions.rs b/src/test/spec/transactions.rs index d015e9774..4735d6d5b 100644 --- a/src/test/spec/transactions.rs +++ b/src/test/spec/transactions.rs @@ -5,19 +5,26 @@ use crate::{ bson::{doc, serde_helpers::serialize_u64_as_i32, Document}, client::session::TransactionState, test::{run_spec_test, TestClient, LOCK}, + Collection, }; -use super::run_v2_test; +use super::{run_unified_format_test, run_v2_test}; #[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))] #[cfg_attr(feature = "async-std-runtime", async_std::test)] -async fn run() { +async fn run_legacy() { let _guard: RwLockWriteGuard<()> = LOCK.run_exclusively().await; - // TODO RUST-122: Unskip tests on sharded clusters - if TestClient::new().await.is_sharded() { - return; - } - run_spec_test(&["transactions"], run_v2_test).await; + + run_spec_test(&["transactions", "legacy"], run_v2_test).await; +} + +#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn run_unified() { + let _guard: RwLockWriteGuard<()> = LOCK.run_exclusively().await; + + // TODO RUST-902: Reduce transactionLifetimeLimitSeconds. + run_spec_test(&["transactions", "unified"], run_unified_format_test).await; } #[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))] @@ -78,3 +85,57 @@ async fn client_errors() { assert!(result.is_err()); assert_eq!(session.transaction.state, TransactionState::InProgress); } + +#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +#[function_name::named] +// This test checks that deserializing an operation correctly still retrieves the recovery token. +async fn deserialize_recovery_token() { + let _guard: RwLockReadGuard<()> = LOCK.run_concurrently().await; + + #[derive(Debug, Serialize)] + struct A { + num: i32, + } + + #[derive(Debug, Deserialize)] + struct B { + str: String, + } + + let client = TestClient::new().await; + if !client.is_sharded() || client.server_version_lt(4, 2) { + return; + } + + let mut session = client.start_session(None).await.unwrap(); + + // Insert a document with schema A. + client + .database(function_name!()) + .collection::(function_name!()) + .drop(None) + .await + .unwrap(); + client + .database(function_name!()) + .create_collection(function_name!(), None) + .await + .unwrap(); + let coll = client + .database(function_name!()) + .collection(function_name!()); + coll.insert_one(A { num: 4 }, None).await.unwrap(); + + // Attempt to execute Find on a document with schema B. + let coll: Collection = client + .database(function_name!()) + .collection(function_name!()); + session.start_transaction(None).await.unwrap(); + assert!(session.transaction.recovery_token.is_none()); + let result = coll.find_one_with_session(None, None, &mut session).await; + assert!(result.is_err()); // Assert that the deserialization failed. + + // Nevertheless, the recovery token should have been retrieved from the ok: 1 response. + assert!(session.transaction.recovery_token.is_some()); +} diff --git a/src/test/spec/unified_runner/mod.rs b/src/test/spec/unified_runner/mod.rs index c2da33cde..7743733f0 100644 --- a/src/test/spec/unified_runner/mod.rs +++ b/src/test/spec/unified_runner/mod.rs @@ -78,15 +78,6 @@ pub async fn run_unified_format_test(test_file: TestFile) { let mut test_runner = TestRunner::new().await; if let Some(requirements) = test_file.run_on_requirements { - // TODO RUST-122: Unskip this test on sharded clusters - if test_runner.internal_client.is_sharded() && test_file.description == "poc-transactions" { - println!( - "Skipping {}: sharded transactions not supported", - &test_file.description - ); - return; - } - let mut can_run_on = false; for requirement in requirements { if requirement.can_run_on(&test_runner.internal_client).await { diff --git a/src/test/spec/unified_runner/operation.rs b/src/test/spec/unified_runner/operation.rs index c93532f7d..104e723cd 100644 --- a/src/test/spec/unified_runner/operation.rs +++ b/src/test/spec/unified_runner/operation.rs @@ -169,6 +169,10 @@ impl<'de> Deserialize<'de> for Operation { FailPointCommand::deserialize(BsonDeserializer::new(definition.arguments)) .map(|op| Box::new(op) as Box) } + "targetedFailPoint" => { + TargetedFailPoint::deserialize(BsonDeserializer::new(definition.arguments)) + .map(|op| Box::new(op) as Box) + } "assertCollectionExists" => { AssertCollectionExists::deserialize(BsonDeserializer::new(definition.arguments)) .map(|op| Box::new(op) as Box) @@ -193,6 +197,14 @@ impl<'de> Deserialize<'de> for Operation { BsonDeserializer::new(definition.arguments), ) .map(|op| Box::new(op) as Box), + "assertSessionPinned" => { + AssertSessionPinned::deserialize(BsonDeserializer::new(definition.arguments)) + .map(|op| Box::new(op) as Box) + } + "assertSessionUnpinned" => { + AssertSessionUnpinned::deserialize(BsonDeserializer::new(definition.arguments)) + .map(|op| Box::new(op) as Box) + } "assertDifferentLsidOnLastTwoCommands" => { AssertDifferentLsidOnLastTwoCommands::deserialize(BsonDeserializer::new( definition.arguments, @@ -1017,6 +1029,36 @@ impl TestOperation for FailPointCommand { } } +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub(super) struct TargetedFailPoint { + fail_point: FailPoint, + session: String, +} + +impl TestOperation for TargetedFailPoint { + fn execute_test_runner_operation<'a>( + &'a self, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, ()> { + async move { + let session = test_runner.get_session(&self.session); + let selection_criteria = session + .transaction + .pinned_mongos + .clone() + .unwrap_or_else(|| panic!("ClientSession not pinned")); + let fail_point_guard = test_runner + .internal_client + .enable_failpoint(self.fail_point.clone(), Some(selection_criteria)) + .await + .unwrap(); + test_runner.fail_point_guards.push(fail_point_guard); + } + .boxed() + } +} + #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct AssertCollectionExists { @@ -1223,6 +1265,50 @@ impl TestOperation for AssertSessionTransactionState { } } +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub(super) struct AssertSessionPinned { + session: String, +} + +impl TestOperation for AssertSessionPinned { + fn execute_test_runner_operation<'a>( + &'a self, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, ()> { + async move { + assert!(test_runner + .get_session(&self.session) + .transaction + .pinned_mongos + .is_some()); + } + .boxed() + } +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub(super) struct AssertSessionUnpinned { + session: String, +} + +impl TestOperation for AssertSessionUnpinned { + fn execute_test_runner_operation<'a>( + &'a self, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, ()> { + async move { + assert!(test_runner + .get_session(&self.session) + .transaction + .pinned_mongos + .is_none()); + } + .boxed() + } +} + #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct AssertDifferentLsidOnLastTwoCommands { diff --git a/src/test/spec/unified_runner/test_runner.rs b/src/test/spec/unified_runner/test_runner.rs index 47eaa682c..d8402f020 100644 --- a/src/test/spec/unified_runner/test_runner.rs +++ b/src/test/spec/unified_runner/test_runner.rs @@ -4,6 +4,7 @@ use crate::{ bson::Document, client::{options::ClientOptions, REDACTED_COMMANDS}, concern::{Acknowledgment, WriteConcern}, + db::options::CreateCollectionOptions, options::CollectionOptions, test::{util::FailPointGuard, EventHandler, TestClient, SERVER_API}, Client, @@ -32,22 +33,33 @@ impl TestRunner { pub async fn insert_initial_data(&self, data: &CollectionData) { let write_concern = WriteConcern::builder().w(Acknowledgment::Majority).build(); - let collection_options = CollectionOptions::builder() - .write_concern(write_concern) - .build(); - let coll = self - .internal_client - .init_db_and_coll_with_options( - &data.database_name, - &data.collection_name, - collection_options, - ) - .await; if !data.documents.is_empty() { + let collection_options = CollectionOptions::builder() + .write_concern(write_concern) + .build(); + let coll = self + .internal_client + .init_db_and_coll_with_options( + &data.database_name, + &data.collection_name, + collection_options, + ) + .await; coll.insert_many(data.documents.clone(), None) .await .unwrap(); + } else { + let collection_options = CreateCollectionOptions::builder() + .write_concern(write_concern) + .build(); + self.internal_client + .create_fresh_collection( + &data.database_name, + &data.collection_name, + collection_options, + ) + .await; } } diff --git a/src/test/spec/v2_runner/mod.rs b/src/test/spec/v2_runner/mod.rs index d5b075ca4..588472e9f 100644 --- a/src/test/spec/v2_runner/mod.rs +++ b/src/test/spec/v2_runner/mod.rs @@ -2,16 +2,23 @@ pub mod operation; pub mod test_event; pub mod test_file; -use std::{ops::Deref, time::Duration}; +use std::{ops::Deref, sync::Arc, time::Duration}; use semver::VersionReq; use crate::{ - bson::doc, - coll::options::DropCollectionOptions, + bson::{doc, from_bson}, + coll::options::{DistinctOptions, DropCollectionOptions}, concern::{Acknowledgment, WriteConcern}, options::{ClientOptions, CreateCollectionOptions, InsertManyOptions}, - test::{assert_matches, util::get_default_name, EventClient, TestClient}, + sdam::ServerInfo, + selection_criteria::SelectionCriteria, + test::{ + assert_matches, + util::{get_default_name, FailPointGuard}, + EventClient, + TestClient, + }, RUNTIME, }; @@ -126,10 +133,25 @@ pub async fn run_v2_test(test_file: TestFile) { EventClient::with_additional_options(options, None, test.use_multiple_mongoses, None) .await; - let _fp_guard = match test.fail_point { - Some(fail_point) => Some(fail_point.enable(client.deref(), None).await.unwrap()), - None => None, - }; + // TODO RUST-900: Remove this extraneous call. + if internal_client.is_sharded() + && internal_client.server_version_lte(4, 2) + && test.operations.iter().any(|op| op.name == "distinct") + { + for server_address in internal_client.options.hosts.clone() { + let options = DistinctOptions::builder() + .selection_criteria(Some(SelectionCriteria::Predicate(Arc::new( + move |server_info: &ServerInfo| *server_info.address() == server_address, + )))) + .build(); + coll.distinct("_id", None, options).await.unwrap(); + } + } + + let mut fail_point_guards: Vec = Vec::new(); + if let Some(fail_point) = test.fail_point { + fail_point_guards.push(fail_point.enable(client.deref(), None).await.unwrap()); + } let options = match test.session_options { Some(ref options) => options.get("session0").cloned(), @@ -217,7 +239,9 @@ pub async fn run_v2_test(test_file: TestFile) { "assertSessionNotDirty" => { assert!(!session.unwrap().is_dirty()) } - "assertSessionTransactionState" => { + "assertSessionTransactionState" + | "assertSessionPinned" + | "assertSessionUnpinned" => { operation .execute_on_session(session.unwrap()) .await @@ -229,6 +253,31 @@ pub async fn run_v2_test(test_file: TestFile) { "assertCollectionNotExists" => { operation.execute_on_client(&internal_client).await.unwrap(); } + "targetedFailPoint" => { + let fail_point = from_bson( + operation + .execute_on_client(&internal_client) + .await + .unwrap() + .unwrap(), + ) + .unwrap(); + + let selection_criteria = session + .unwrap() + .transaction + .pinned_mongos + .clone() + .unwrap_or_else(|| panic!("ClientSession is not pinned")); + + fail_point_guards.push( + client + .deref() + .enable_failpoint(fail_point, Some(selection_criteria)) + .await + .unwrap(), + ); + } other => panic!("unknown operation: {}", other), } continue; @@ -258,6 +307,10 @@ pub async fn run_v2_test(test_file: TestFile) { let code_name = error.code_name().unwrap(); assert_eq!(error_code_name, code_name); } + if let Some(error_code) = operation_error.error_code { + let code = error.code().unwrap(); + assert_eq!(error_code, code); + } if let Some(error_labels_contain) = operation_error.error_labels_contain { let labels = error.labels(); error_labels_contain diff --git a/src/test/spec/v2_runner/operation.rs b/src/test/spec/v2_runner/operation.rs index a3628035a..65ac96d28 100644 --- a/src/test/spec/v2_runner/operation.rs +++ b/src/test/spec/v2_runner/operation.rs @@ -4,7 +4,7 @@ use futures::{future::BoxFuture, stream::TryStreamExt, FutureExt}; use serde::{de::Deserializer, Deserialize}; use crate::{ - bson::{doc, Bson, Deserializer as BsonDeserializer, Document}, + bson::{doc, to_bson, Bson, Deserializer as BsonDeserializer, Document}, client::session::TransactionState, coll::options::CollectionOptions, db::options::DatabaseOptions, @@ -33,7 +33,7 @@ use crate::{ WriteConcern, }, selection_criteria::{ReadPreference, SelectionCriteria}, - test::TestClient, + test::{FailPoint, TestClient}, ClientSession, Collection, Database, @@ -108,6 +108,7 @@ pub enum OperationResult { pub struct OperationError { pub error_contains: Option, pub error_code_name: Option, + pub error_code: Option, pub error_labels_contain: Option>, pub error_labels_omit: Option>, } @@ -230,6 +231,18 @@ impl<'de> Deserialize<'de> for Operation { definition.arguments, ))) .map(|op| Box::new(op) as Box), + "targetedFailPoint" => TargetedFailPoint::deserialize(BsonDeserializer::new( + Bson::Document(definition.arguments), + )) + .map(|op| Box::new(op) as Box), + "assertSessionPinned" => AssertSessionPinned::deserialize(BsonDeserializer::new( + Bson::Document(definition.arguments), + )) + .map(|op| Box::new(op) as Box), + "assertSessionUnpinned" => AssertSessionUnpinned::deserialize(BsonDeserializer::new( + Bson::Document(definition.arguments), + )) + .map(|op| Box::new(op) as Box), "listDatabaseNames" => ListDatabaseNames::deserialize(BsonDeserializer::new( Bson::Document(definition.arguments), )) @@ -994,6 +1007,50 @@ impl TestOperation for FindOneAndDelete { } } +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub(super) struct TargetedFailPoint { + fail_point: FailPoint, +} + +impl TestOperation for TargetedFailPoint { + fn execute_on_client<'a>(&'a self, _client: &'a TestClient) -> BoxFuture>> { + async move { Ok(Some(to_bson(&self.fail_point)?)) }.boxed() + } +} + +#[derive(Debug, Deserialize)] +pub(super) struct AssertSessionPinned {} + +impl TestOperation for AssertSessionPinned { + fn execute_on_session<'a>( + &'a self, + session: &'a mut ClientSession, + ) -> BoxFuture<'a, Result>> { + async move { + assert!(session.transaction.pinned_mongos.is_some()); + Ok(None) + } + .boxed() + } +} + +#[derive(Debug, Deserialize)] +pub(super) struct AssertSessionUnpinned {} + +impl TestOperation for AssertSessionUnpinned { + fn execute_on_session<'a>( + &'a self, + session: &'a mut ClientSession, + ) -> BoxFuture<'a, Result>> { + async move { + assert!(session.transaction.pinned_mongos.is_none()); + Ok(None) + } + .boxed() + } +} + #[derive(Debug, Deserialize)] pub(super) struct ListDatabases { filter: Option, diff --git a/src/test/spec/versioned_api.rs b/src/test/spec/versioned_api.rs index c38ad8b6f..919b18f39 100644 --- a/src/test/spec/versioned_api.rs +++ b/src/test/spec/versioned_api.rs @@ -10,7 +10,6 @@ use crate::{ run_spec_test_with_path, spec::unified_runner::TestFile, EventClient, - TestClient, CLIENT_OPTIONS, LOCK, }, @@ -29,10 +28,6 @@ async fn run() { #[cfg_attr(feature = "async-std-runtime", async_std::test)] async fn run_transaction_handling_spec_test() { let _guard: RwLockWriteGuard<_> = LOCK.run_exclusively().await; - if TestClient::new().await.is_sharded() { - // TODO RUST-734 Unskip these tests on sharded deployments. - return; - } let path: PathBuf = [ env!("CARGO_MANIFEST_DIR"), "src", diff --git a/src/test/util/failpoint.rs b/src/test/util/failpoint.rs index 661702a12..27a917b29 100644 --- a/src/test/util/failpoint.rs +++ b/src/test/util/failpoint.rs @@ -1,4 +1,4 @@ -use bson::{doc, Document}; +use bson::{doc, Bson, Document}; use serde::{Deserialize, Serialize, Serializer}; use std::time::Duration; use typed_builder::TypedBuilder; @@ -11,7 +11,7 @@ use crate::{ RUNTIME, }; -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct FailPoint { #[serde(flatten)] command: Document, @@ -48,10 +48,20 @@ impl FailPoint { client: &Client, criteria: impl Into>, ) -> Result { + // TODO: DRIVERS-1385 remove this logic for moving errorLabels to the top level. + let mut command = self.command.clone(); + if let Some(Bson::Document(data)) = command.get_mut("data") { + if let Some(Bson::Document(wc_error)) = data.get_mut("writeConcernError") { + if let Some(labels) = wc_error.remove("errorLabels") { + data.insert("errorLabels", labels); + } + } + } + let criteria = criteria.into(); client .database("admin") - .run_command(self.command.clone(), criteria.clone()) + .run_command(command, criteria.clone()) .await?; Ok(FailPointGuard { failpoint_name: self.name().to_string(), diff --git a/src/test/util/matchable.rs b/src/test/util/matchable.rs index d8e66511f..e977571da 100644 --- a/src/test/util/matchable.rs +++ b/src/test/util/matchable.rs @@ -67,6 +67,9 @@ impl Matchable for Document { if k == "afterClusterTime" { continue; } + if k == "recoveryToken" && v.is_placeholder() && self.get_document(k).is_ok() { + continue; + } if k == "readConcern" { if let Some(doc) = v.as_document() { if doc.len() == 1 && doc.get_i32("afterClusterTime") == Ok(42) { diff --git a/src/test/util/mod.rs b/src/test/util/mod.rs index edc64afe7..85ac1c380 100644 --- a/src/test/util/mod.rs +++ b/src/test/util/mod.rs @@ -247,6 +247,11 @@ impl TestClient { version.matches(&self.server_version) } + pub fn supports_transactions(&self) -> bool { + self.is_replica_set() && self.server_version_gte(4, 0) + || self.is_sharded() && self.server_version_gte(4, 2) + } + pub async fn enable_failpoint( &self, fp: FailPoint,