diff --git a/src/client/executor.rs b/src/client/executor.rs index 2c660c42e..4e3036aa6 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -290,6 +290,7 @@ impl Client { } let stream_description = connection.stream_description()?; + let is_sharded = stream_description.initial_server_type == ServerType::Mongos; let mut cmd = op.build(stream_description)?; self.inner .topology @@ -328,15 +329,22 @@ impl Client { cmd.set_start_transaction(); cmd.set_autocommit(); cmd.set_txn_read_concern(*session)?; - if stream_description.initial_server_type == ServerType::Mongos { + if is_sharded { session.pin_mongos(connection.address().clone()); } session.transaction.state = TransactionState::InProgress; } - TransactionState::InProgress - | TransactionState::Committed { .. } - | TransactionState::Aborted => { + TransactionState::InProgress => cmd.set_autocommit(), + TransactionState::Committed { .. } | TransactionState::Aborted => { cmd.set_autocommit(); + + // Append the recovery token to the command if we are committing or aborting + // on a sharded transaction. + if is_sharded { + if let Some(ref recovery_token) = session.transaction.recovery_token { + cmd.set_recovery_token(recovery_token); + } + } } _ => {} } @@ -403,6 +411,9 @@ impl Client { Ok(r) => { self.update_cluster_time(&r, session).await; if r.is_success() { + // Retrieve recovery token from successful response. + Client::update_recovery_token(is_sharded, &r, session).await; + Ok(CommandResult { raw: response, deserialized: r.into_body(), @@ -447,7 +458,15 @@ impl Client { })) } // for ok: 1 just return the original deserialization error. - _ => Err(deserialize_error), + _ => { + Client::update_recovery_token( + is_sharded, + &error_response, + session, + ) + .await; + Err(deserialize_error) + } } } // We failed to deserialize even that, so just return the original @@ -626,6 +645,18 @@ impl Client { } } } + + async fn update_recovery_token( + is_sharded: bool, + response: &T, + session: &mut Option<&mut ClientSession>, + ) { + if let Some(ref mut session) = session { + if is_sharded && session.in_transaction() { + session.transaction.recovery_token = response.recovery_token().cloned(); + } + } + } } impl Error { diff --git a/src/client/session/mod.rs b/src/client/session/mod.rs index 513bd08b7..917eff42b 100644 --- a/src/client/session/mod.rs +++ b/src/client/session/mod.rs @@ -118,12 +118,14 @@ pub(crate) struct Transaction { pub(crate) state: TransactionState, pub(crate) options: Option, pub(crate) pinned_mongos: Option, + pub(crate) recovery_token: Option, } impl Transaction { pub(crate) fn start(&mut self, options: Option) { self.state = TransactionState::Starting; self.options = options; + self.recovery_token = None; } pub(crate) fn commit(&mut self, data_committed: bool) { @@ -140,6 +142,7 @@ impl Transaction { self.state = TransactionState::None; self.options = None; self.pinned_mongos = None; + self.recovery_token = None; } } @@ -149,6 +152,7 @@ impl Default for Transaction { state: TransactionState::None, options: None, pinned_mongos: None, + recovery_token: None, } } } diff --git a/src/cmap/conn/command.rs b/src/cmap/conn/command.rs index 0160ecde8..1aaa37a5a 100644 --- a/src/cmap/conn/command.rs +++ b/src/cmap/conn/command.rs @@ -41,6 +41,10 @@ impl Command { } } + pub(crate) fn set_recovery_token(&mut self, recovery_token: &Document) { + self.body.insert("recoveryToken", recovery_token); + } + pub(crate) fn set_txn_number(&mut self, txn_number: i64) { self.body.insert("txnNumber", txn_number); } diff --git a/src/operation/mod.rs b/src/operation/mod.rs index ec530a5b2..018fffab3 100644 --- a/src/operation/mod.rs +++ b/src/operation/mod.rs @@ -153,6 +153,9 @@ pub(crate) trait Response: Sized { /// The `atClusterTime` field of the response. fn at_cluster_time(&self) -> Option; + /// The `recoveryToken` field of the response. + fn recovery_token(&self) -> Option<&Document>; + /// Convert into the body of the response. fn into_body(self) -> Self::Body; } @@ -168,6 +171,8 @@ pub(crate) struct CommandResponse { pub(crate) at_cluster_time: Option, + pub(crate) recovery_token: Option, + #[serde(flatten)] pub(crate) body: T, } @@ -197,6 +202,10 @@ impl Response for CommandResponse { self.at_cluster_time } + fn recovery_token(&self) -> Option<&Document> { + self.recovery_token.as_ref() + } + fn into_body(self) -> Self::Body { self.body } @@ -229,6 +238,10 @@ impl Response for CursorResponse { self.response.body.cursor.at_cluster_time } + fn recovery_token(&self) -> Option<&Document> { + self.response.recovery_token() + } + fn into_body(self) -> Self::Body { self.response.body } diff --git a/src/operation/run_command/mod.rs b/src/operation/run_command/mod.rs index fbe8a72dc..a540ccc86 100644 --- a/src/operation/run_command/mod.rs +++ b/src/operation/run_command/mod.rs @@ -96,6 +96,7 @@ impl Operation for RunCommand { pub(crate) struct Response { doc: Document, cluster_time: Option, + recovery_token: Option, } impl super::Response for Response { @@ -109,7 +110,13 @@ impl super::Response for Response { .ok() .and_then(|doc| bson::from_document(doc.clone()).ok()); - Ok(Self { doc, cluster_time }) + let recovery_token = doc.get_document("recoveryToken").ok().cloned(); + + Ok(Self { + doc, + cluster_time, + recovery_token, + }) } fn ok(&self) -> Option<&Bson> { @@ -131,6 +138,10 @@ impl super::Response for Response { .ok() } + fn recovery_token(&self) -> Option<&Document> { + self.recovery_token.as_ref() + } + fn into_body(self) -> Self::Body { self.doc } diff --git a/src/test/spec/json/sharded-transactions/mongos-recovery-token.json b/src/test/spec/json/sharded-transactions/mongos-recovery-token.json new file mode 100644 index 000000000..02c2002f7 --- /dev/null +++ b/src/test/spec/json/sharded-transactions/mongos-recovery-token.json @@ -0,0 +1,511 @@ +{ + "runOn": [ + { + "minServerVersion": "4.1.8", + "topology": [ + "sharded" + ], + "serverless": "forbid" + } + ], + "database_name": "transaction-tests", + "collection_name": "test", + "data": [], + "tests": [ + { + "description": "commitTransaction explicit retries include recoveryToken", + "useMultipleMongoses": true, + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "session": "session0", + "document": { + "_id": 1 + } + }, + "result": { + "insertedId": 1 + } + }, + { + "name": "commitTransaction", + "object": "session0" + }, + { + "name": "commitTransaction", + "object": "session0" + }, + { + "name": "commitTransaction", + "object": "session0" + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "test", + "documents": [ + { + "_id": 1 + } + ], + "ordered": true, + "readConcern": null, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": true, + "autocommit": false, + "writeConcern": null + }, + "command_name": "insert", + "database_name": "transaction-tests" + } + }, + { + "command_started_event": { + "command": { + "commitTransaction": 1, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": null, + "autocommit": false, + "writeConcern": null, + "recoveryToken": 42 + }, + "command_name": "commitTransaction", + "database_name": "admin" + } + }, + { + "command_started_event": { + "command": { + "commitTransaction": 1, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": null, + "autocommit": false, + "writeConcern": { + "w": "majority", + "wtimeout": 10000 + }, + "recoveryToken": 42 + }, + "command_name": "commitTransaction", + "database_name": "admin" + } + }, + { + "command_started_event": { + "command": { + "commitTransaction": 1, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": null, + "autocommit": false, + "writeConcern": { + "w": "majority", + "wtimeout": 10000 + }, + "recoveryToken": 42 + }, + "command_name": "commitTransaction", + "database_name": "admin" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + } + ] + } + } + }, + { + "description": "commitTransaction retry succeeds on new mongos", + "useMultipleMongoses": true, + "operations": [ + { + "name": "startTransaction", + "object": "session0", + "arguments": { + "options": { + "writeConcern": { + "w": "majority" + } + } + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "session": "session0", + "document": { + "_id": 1 + } + }, + "result": { + "insertedId": 1 + } + }, + { + "name": "targetedFailPoint", + "object": "testRunner", + "arguments": { + "session": "session0", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "commitTransaction" + ], + "writeConcernError": { + "code": 91, + "errmsg": "Replication is being shut down", + "errorLabels": [ + "RetryableWriteError" + ] + } + } + } + } + }, + { + "name": "commitTransaction", + "object": "session0" + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "test", + "documents": [ + { + "_id": 1 + } + ], + "ordered": true, + "readConcern": null, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": true, + "autocommit": false, + "writeConcern": null + }, + "command_name": "insert", + "database_name": "transaction-tests" + } + }, + { + "command_started_event": { + "command": { + "commitTransaction": 1, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": null, + "autocommit": false, + "writeConcern": { + "w": "majority" + }, + "recoveryToken": 42 + }, + "command_name": "commitTransaction", + "database_name": "admin" + } + }, + { + "command_started_event": { + "command": { + "commitTransaction": 1, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": null, + "autocommit": false, + "writeConcern": { + "w": "majority", + "wtimeout": 10000 + }, + "recoveryToken": 42 + }, + "command_name": "commitTransaction", + "database_name": "admin" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + } + ] + } + } + }, + { + "description": "commitTransaction retry fails on new mongos", + "useMultipleMongoses": true, + "clientOptions": { + "heartbeatFrequencyMS": 30000 + }, + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "session": "session0", + "document": { + "_id": 1 + } + }, + "result": { + "insertedId": 1 + } + }, + { + "name": "targetedFailPoint", + "object": "testRunner", + "arguments": { + "session": "session0", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 7 + }, + "data": { + "failCommands": [ + "commitTransaction", + "isMaster", + "hello" + ], + "closeConnection": true + } + } + } + }, + { + "name": "commitTransaction", + "object": "session0", + "result": { + "errorLabelsContain": [ + "TransientTransactionError" + ], + "errorLabelsOmit": [ + "UnknownTransactionCommitResult" + ], + "errorCodeName": "NoSuchTransaction" + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "test", + "documents": [ + { + "_id": 1 + } + ], + "ordered": true, + "readConcern": null, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": true, + "autocommit": false, + "writeConcern": null + }, + "command_name": "insert", + "database_name": "transaction-tests" + } + }, + { + "command_started_event": { + "command": { + "commitTransaction": 1, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": null, + "autocommit": false, + "writeConcern": null, + "recoveryToken": 42 + }, + "command_name": "commitTransaction", + "database_name": "admin" + } + }, + { + "command_started_event": { + "command": { + "commitTransaction": 1, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": null, + "autocommit": false, + "writeConcern": { + "w": "majority", + "wtimeout": 10000 + }, + "recoveryToken": 42 + }, + "command_name": "commitTransaction", + "database_name": "admin" + } + } + ], + "outcome": { + "collection": { + "data": [] + } + } + }, + { + "description": "abortTransaction sends recoveryToken", + "useMultipleMongoses": true, + "operations": [ + { + "name": "startTransaction", + "object": "session0" + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "session": "session0", + "document": { + "_id": 1 + } + }, + "result": { + "insertedId": 1 + } + }, + { + "name": "targetedFailPoint", + "object": "testRunner", + "arguments": { + "session": "session0", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "abortTransaction" + ], + "closeConnection": true + } + } + } + }, + { + "name": "abortTransaction", + "object": "session0" + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "test", + "documents": [ + { + "_id": 1 + } + ], + "ordered": true, + "readConcern": null, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": true, + "autocommit": false, + "writeConcern": null + }, + "command_name": "insert", + "database_name": "transaction-tests" + } + }, + { + "command_started_event": { + "command": { + "abortTransaction": 1, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": null, + "autocommit": false, + "writeConcern": null, + "recoveryToken": 42 + }, + "command_name": "abortTransaction", + "database_name": "admin" + } + }, + { + "command_started_event": { + "command": { + "abortTransaction": 1, + "lsid": "session0", + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": null, + "autocommit": false, + "writeConcern": null, + "recoveryToken": 42 + }, + "command_name": "abortTransaction", + "database_name": "admin" + } + } + ], + "outcome": { + "collection": { + "data": [] + } + } + } + ] +} diff --git a/src/test/spec/json/sharded-transactions/mongos-recovery-token.yml b/src/test/spec/json/sharded-transactions/mongos-recovery-token.yml new file mode 100644 index 000000000..688f08256 --- /dev/null +++ b/src/test/spec/json/sharded-transactions/mongos-recovery-token.yml @@ -0,0 +1,350 @@ +runOn: + - + minServerVersion: "4.1.8" + topology: ["sharded"] + # serverless proxy doesn't use recovery tokens + serverless: "forbid" + +database_name: &database_name "transaction-tests" +collection_name: &collection_name "test" + +data: [] + +tests: + - description: commitTransaction explicit retries include recoveryToken + useMultipleMongoses: true + operations: + - name: startTransaction + object: session0 + - name: insertOne + object: collection + arguments: + session: session0 + document: + _id: 1 + result: + insertedId: 1 + - name: commitTransaction + object: session0 + - name: commitTransaction + object: session0 + - name: commitTransaction + object: session0 + + expectations: + - command_started_event: + command: + insert: *collection_name + documents: + - _id: 1 + ordered: true + readConcern: + lsid: session0 + txnNumber: + $numberLong: "1" + startTransaction: true + autocommit: false + writeConcern: + command_name: insert + database_name: *database_name + - command_started_event: + command: + commitTransaction: 1 + lsid: session0 + txnNumber: + $numberLong: "1" + startTransaction: + autocommit: false + writeConcern: + recoveryToken: 42 + command_name: commitTransaction + database_name: admin + - command_started_event: + command: + commitTransaction: 1 + lsid: session0 + txnNumber: + $numberLong: "1" + startTransaction: + autocommit: false + # commitTransaction applies w:majority on retries + writeConcern: { w: majority, wtimeout: 10000 } + recoveryToken: 42 + command_name: commitTransaction + database_name: admin + - command_started_event: + command: + commitTransaction: 1 + lsid: session0 + txnNumber: + $numberLong: "1" + startTransaction: + autocommit: false + # commitTransaction applies w:majority on retries + writeConcern: { w: majority, wtimeout: 10000 } + recoveryToken: 42 + command_name: commitTransaction + database_name: admin + + outcome: + collection: + data: + - _id: 1 + + - description: commitTransaction retry succeeds on new mongos + useMultipleMongoses: true + operations: + - name: startTransaction + object: session0 + arguments: + options: + writeConcern: + w: majority + - name: insertOne + object: collection + arguments: + session: session0 + document: + _id: 1 + result: + insertedId: 1 + # Enable the fail point only on the Mongos that session0 is pinned to. + - name: targetedFailPoint + object: testRunner + arguments: + session: session0 + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: ["commitTransaction"] + writeConcernError: + code: 91 + errmsg: Replication is being shut down + errorLabels: ["RetryableWriteError"] + # The client sees a retryable writeConcernError on the first + # commitTransaction due to the fail point but it actually succeeds on the + # server (SERVER-39346). The retry will succeed both on a new mongos and + # on the original. + - name: commitTransaction + object: session0 + + expectations: + - command_started_event: + command: + insert: *collection_name + documents: + - _id: 1 + ordered: true + readConcern: + lsid: session0 + txnNumber: + $numberLong: "1" + startTransaction: true + autocommit: false + writeConcern: + command_name: insert + database_name: *database_name + - command_started_event: + command: + commitTransaction: 1 + lsid: session0 + txnNumber: + $numberLong: "1" + startTransaction: + autocommit: false + writeConcern: + w: majority + recoveryToken: 42 + command_name: commitTransaction + database_name: admin + - command_started_event: + command: + commitTransaction: 1 + lsid: session0 + txnNumber: + $numberLong: "1" + startTransaction: + autocommit: false + # commitTransaction applies w:majority on retries + writeConcern: { w: majority, wtimeout: 10000 } + recoveryToken: 42 + command_name: commitTransaction + database_name: admin + + outcome: + collection: + data: + - _id: 1 + + - description: commitTransaction retry fails on new mongos + useMultipleMongoses: true + clientOptions: + # Increase heartbeatFrequencyMS to avoid the race condition where an in + # flight heartbeat refreshes the first mongoes' SDAM state in between + # the initial commitTransaction and the retry attempt. + heartbeatFrequencyMS: 30000 + operations: + - name: startTransaction + object: session0 + - name: insertOne + object: collection + arguments: + session: session0 + document: + _id: 1 + result: + insertedId: 1 + # Enable the fail point only on the Mongos that session0 is pinned to. + # Fail hello/legacy hello to prevent the heartbeat requested directly after the + # retryable commit error from racing with server selection for the retry. + # Note: times: 7 is slightly artbitrary but it accounts for one failed + # commit and some SDAM heartbeats. A test runner will have multiple + # clients connected to this server so this fail point configuration + # is also racy. + - name: targetedFailPoint + object: testRunner + arguments: + session: session0 + failPoint: + configureFailPoint: failCommand + mode: { times: 7 } + data: + failCommands: ["commitTransaction", "isMaster", "hello"] + closeConnection: true + # The first commitTransaction sees a retryable connection error due to + # the fail point and also fails on the server. The retry attempt on a + # new mongos will wait for the transaction to timeout and will fail + # because the transaction was aborted. Note that the retry attempt should + # not select the original mongos because that server's SDAM state is + # reset by the connection error, heartbeatFrequencyMS is high, and + # subsequent heartbeats should fail. + - name: commitTransaction + object: session0 + result: + errorLabelsContain: ["TransientTransactionError"] + errorLabelsOmit: ["UnknownTransactionCommitResult"] + errorCodeName: NoSuchTransaction + + expectations: + - command_started_event: + command: + insert: *collection_name + documents: + - _id: 1 + ordered: true + readConcern: + lsid: session0 + txnNumber: + $numberLong: "1" + startTransaction: true + autocommit: false + writeConcern: + command_name: insert + database_name: *database_name + - command_started_event: + command: + commitTransaction: 1 + lsid: session0 + txnNumber: + $numberLong: "1" + startTransaction: + autocommit: false + writeConcern: + recoveryToken: 42 + command_name: commitTransaction + database_name: admin + - command_started_event: + command: + commitTransaction: 1 + lsid: session0 + txnNumber: + $numberLong: "1" + startTransaction: + autocommit: false + # commitTransaction applies w:majority on retries + writeConcern: { w: majority, wtimeout: 10000 } + recoveryToken: 42 + command_name: commitTransaction + database_name: admin + + outcome: + collection: + data: [] + + - description: abortTransaction sends recoveryToken + useMultipleMongoses: true + operations: + - name: startTransaction + object: session0 + - name: insertOne + object: collection + arguments: + session: session0 + document: + _id: 1 + result: + insertedId: 1 + # Enable the fail point only on the Mongos that session0 is pinned to. + - name: targetedFailPoint + object: testRunner + arguments: + session: session0 + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: ["abortTransaction"] + closeConnection: true + # The first abortTransaction sees a retryable connection error due to + # the fail point. The retry attempt on a new mongos will send the + # recoveryToken. Note that the retry attempt will also fail because the + # server does not yet support aborting from a new mongos, however this + # operation should "succeed" since abortTransaction ignores errors. + - name: abortTransaction + object: session0 + + expectations: + - command_started_event: + command: + insert: *collection_name + documents: + - _id: 1 + ordered: true + readConcern: + lsid: session0 + txnNumber: + $numberLong: "1" + startTransaction: true + autocommit: false + writeConcern: + command_name: insert + database_name: *database_name + - command_started_event: + command: + abortTransaction: 1 + lsid: session0 + txnNumber: + $numberLong: "1" + startTransaction: + autocommit: false + writeConcern: + recoveryToken: 42 + command_name: abortTransaction + database_name: admin + - command_started_event: + command: + abortTransaction: 1 + lsid: session0 + txnNumber: + $numberLong: "1" + startTransaction: + autocommit: false + writeConcern: + recoveryToken: 42 + command_name: abortTransaction + database_name: admin + + outcome: + collection: + data: [] diff --git a/src/test/spec/transactions.rs b/src/test/spec/transactions.rs index a6696e976..04887feb8 100644 --- a/src/test/spec/transactions.rs +++ b/src/test/spec/transactions.rs @@ -5,6 +5,7 @@ use crate::{ bson::{doc, serde_helpers::serialize_u64_as_i32, Document}, client::session::TransactionState, test::{run_spec_test, TestClient, LOCK}, + Collection, }; use super::{run_unified_format_test, run_v2_test}; @@ -92,3 +93,57 @@ async fn client_errors() { assert!(result.is_err()); assert_eq!(session.transaction.state, TransactionState::InProgress); } + +#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +#[function_name::named] +// This test checks that deserializing an operation correctly still retrieves the recovery token. +async fn deserialize_recovery_token() { + let _guard: RwLockReadGuard<()> = LOCK.run_concurrently().await; + + #[derive(Debug, Serialize)] + struct A { + num: i32, + } + + #[derive(Debug, Deserialize)] + struct B { + str: String, + } + + let client = TestClient::new().await; + if !client.is_sharded() || client.server_version_lt(4, 2) { + return; + } + + let mut session = client.start_session(None).await.unwrap(); + + // Insert a document with schema A. + client + .database(function_name!()) + .collection::(function_name!()) + .drop(None) + .await + .unwrap(); + client + .database(function_name!()) + .create_collection(function_name!(), None) + .await + .unwrap(); + let coll = client + .database(function_name!()) + .collection(function_name!()); + coll.insert_one(A { num: 4 }, None).await.unwrap(); + + // Attempt to execute Find on a document with schema B. + let coll: Collection = client + .database(function_name!()) + .collection(function_name!()); + session.start_transaction(None).await.unwrap(); + assert!(session.transaction.recovery_token.is_none()); + let result = coll.find_one_with_session(None, None, &mut session).await; + assert!(result.is_err()); // Assert that the deserialization failed. + + // Nevertheless, the recovery token should have been retrieved from the ok: 1 response. + assert!(session.transaction.recovery_token.is_some()); +} diff --git a/src/test/spec/v2_runner/mod.rs b/src/test/spec/v2_runner/mod.rs index 15923f0bc..8156b02a4 100644 --- a/src/test/spec/v2_runner/mod.rs +++ b/src/test/spec/v2_runner/mod.rs @@ -271,7 +271,8 @@ pub async fn run_v2_test(test_file: TestFile) { .unwrap_or_else(|| panic!("ClientSession is not pinned")); fail_point_guards.push( - internal_client + client + .deref() .enable_failpoint(fail_point, Some(selection_criteria)) .await .unwrap(), diff --git a/src/test/util/failpoint.rs b/src/test/util/failpoint.rs index f87dfb031..27a917b29 100644 --- a/src/test/util/failpoint.rs +++ b/src/test/util/failpoint.rs @@ -1,4 +1,4 @@ -use bson::{doc, Document}; +use bson::{doc, Bson, Document}; use serde::{Deserialize, Serialize, Serializer}; use std::time::Duration; use typed_builder::TypedBuilder; @@ -48,10 +48,20 @@ impl FailPoint { client: &Client, criteria: impl Into>, ) -> Result { + // TODO: DRIVERS-1385 remove this logic for moving errorLabels to the top level. + let mut command = self.command.clone(); + if let Some(Bson::Document(data)) = command.get_mut("data") { + if let Some(Bson::Document(wc_error)) = data.get_mut("writeConcernError") { + if let Some(labels) = wc_error.remove("errorLabels") { + data.insert("errorLabels", labels); + } + } + } + let criteria = criteria.into(); client .database("admin") - .run_command(self.command.clone(), criteria.clone()) + .run_command(command, criteria.clone()) .await?; Ok(FailPointGuard { failpoint_name: self.name().to_string(), diff --git a/src/test/util/matchable.rs b/src/test/util/matchable.rs index 0c9046481..e977571da 100644 --- a/src/test/util/matchable.rs +++ b/src/test/util/matchable.rs @@ -67,8 +67,7 @@ impl Matchable for Document { if k == "afterClusterTime" { continue; } - // TODO RUST-97: Remove this logic to bypass recoveryToken - if k == "recoveryToken" { + if k == "recoveryToken" && v.is_placeholder() && self.get_document(k).is_ok() { continue; } if k == "readConcern" {