Skip to content

Commit ae2b7be

Browse files
authored
RUST-97 Support sharded transactions recovery token (#398)
1 parent b9a7c53 commit ae2b7be

File tree

11 files changed

+1000
-11
lines changed

11 files changed

+1000
-11
lines changed

src/client/executor.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ impl Client {
290290
}
291291

292292
let stream_description = connection.stream_description()?;
293+
let is_sharded = stream_description.initial_server_type == ServerType::Mongos;
293294
let mut cmd = op.build(stream_description)?;
294295
self.inner
295296
.topology
@@ -328,15 +329,22 @@ impl Client {
328329
cmd.set_start_transaction();
329330
cmd.set_autocommit();
330331
cmd.set_txn_read_concern(*session)?;
331-
if stream_description.initial_server_type == ServerType::Mongos {
332+
if is_sharded {
332333
session.pin_mongos(connection.address().clone());
333334
}
334335
session.transaction.state = TransactionState::InProgress;
335336
}
336-
TransactionState::InProgress
337-
| TransactionState::Committed { .. }
338-
| TransactionState::Aborted => {
337+
TransactionState::InProgress => cmd.set_autocommit(),
338+
TransactionState::Committed { .. } | TransactionState::Aborted => {
339339
cmd.set_autocommit();
340+
341+
// Append the recovery token to the command if we are committing or aborting
342+
// on a sharded transaction.
343+
if is_sharded {
344+
if let Some(ref recovery_token) = session.transaction.recovery_token {
345+
cmd.set_recovery_token(recovery_token);
346+
}
347+
}
340348
}
341349
_ => {}
342350
}
@@ -403,6 +411,9 @@ impl Client {
403411
Ok(r) => {
404412
self.update_cluster_time(&r, session).await;
405413
if r.is_success() {
414+
// Retrieve recovery token from successful response.
415+
Client::update_recovery_token(is_sharded, &r, session).await;
416+
406417
Ok(CommandResult {
407418
raw: response,
408419
deserialized: r.into_body(),
@@ -447,7 +458,15 @@ impl Client {
447458
}))
448459
}
449460
// for ok: 1 just return the original deserialization error.
450-
_ => Err(deserialize_error),
461+
_ => {
462+
Client::update_recovery_token(
463+
is_sharded,
464+
&error_response,
465+
session,
466+
)
467+
.await;
468+
Err(deserialize_error)
469+
}
451470
}
452471
}
453472
// We failed to deserialize even that, so just return the original
@@ -626,6 +645,18 @@ impl Client {
626645
}
627646
}
628647
}
648+
649+
async fn update_recovery_token<T: Response>(
650+
is_sharded: bool,
651+
response: &T,
652+
session: &mut Option<&mut ClientSession>,
653+
) {
654+
if let Some(ref mut session) = session {
655+
if is_sharded && session.in_transaction() {
656+
session.transaction.recovery_token = response.recovery_token().cloned();
657+
}
658+
}
659+
}
629660
}
630661

631662
impl Error {

src/client/session/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,14 @@ pub(crate) struct Transaction {
118118
pub(crate) state: TransactionState,
119119
pub(crate) options: Option<TransactionOptions>,
120120
pub(crate) pinned_mongos: Option<SelectionCriteria>,
121+
pub(crate) recovery_token: Option<Document>,
121122
}
122123

123124
impl Transaction {
124125
pub(crate) fn start(&mut self, options: Option<TransactionOptions>) {
125126
self.state = TransactionState::Starting;
126127
self.options = options;
128+
self.recovery_token = None;
127129
}
128130

129131
pub(crate) fn commit(&mut self, data_committed: bool) {
@@ -140,6 +142,7 @@ impl Transaction {
140142
self.state = TransactionState::None;
141143
self.options = None;
142144
self.pinned_mongos = None;
145+
self.recovery_token = None;
143146
}
144147
}
145148

@@ -149,6 +152,7 @@ impl Default for Transaction {
149152
state: TransactionState::None,
150153
options: None,
151154
pinned_mongos: None,
155+
recovery_token: None,
152156
}
153157
}
154158
}

src/cmap/conn/command.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ impl Command {
4141
}
4242
}
4343

44+
pub(crate) fn set_recovery_token(&mut self, recovery_token: &Document) {
45+
self.body.insert("recoveryToken", recovery_token);
46+
}
47+
4448
pub(crate) fn set_txn_number(&mut self, txn_number: i64) {
4549
self.body.insert("txnNumber", txn_number);
4650
}

src/operation/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,9 @@ pub(crate) trait Response: Sized {
153153
/// The `atClusterTime` field of the response.
154154
fn at_cluster_time(&self) -> Option<Timestamp>;
155155

156+
/// The `recoveryToken` field of the response.
157+
fn recovery_token(&self) -> Option<&Document>;
158+
156159
/// Convert into the body of the response.
157160
fn into_body(self) -> Self::Body;
158161
}
@@ -168,6 +171,8 @@ pub(crate) struct CommandResponse<T> {
168171

169172
pub(crate) at_cluster_time: Option<Timestamp>,
170173

174+
pub(crate) recovery_token: Option<Document>,
175+
171176
#[serde(flatten)]
172177
pub(crate) body: T,
173178
}
@@ -197,6 +202,10 @@ impl<T: DeserializeOwned> Response for CommandResponse<T> {
197202
self.at_cluster_time
198203
}
199204

205+
fn recovery_token(&self) -> Option<&Document> {
206+
self.recovery_token.as_ref()
207+
}
208+
200209
fn into_body(self) -> Self::Body {
201210
self.body
202211
}
@@ -229,6 +238,10 @@ impl<T: DeserializeOwned> Response for CursorResponse<T> {
229238
self.response.body.cursor.at_cluster_time
230239
}
231240

241+
fn recovery_token(&self) -> Option<&Document> {
242+
self.response.recovery_token()
243+
}
244+
232245
fn into_body(self) -> Self::Body {
233246
self.response.body
234247
}

src/operation/run_command/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ impl Operation for RunCommand {
9696
pub(crate) struct Response {
9797
doc: Document,
9898
cluster_time: Option<ClusterTime>,
99+
recovery_token: Option<Document>,
99100
}
100101

101102
impl super::Response for Response {
@@ -109,7 +110,13 @@ impl super::Response for Response {
109110
.ok()
110111
.and_then(|doc| bson::from_document(doc.clone()).ok());
111112

112-
Ok(Self { doc, cluster_time })
113+
let recovery_token = doc.get_document("recoveryToken").ok().cloned();
114+
115+
Ok(Self {
116+
doc,
117+
cluster_time,
118+
recovery_token,
119+
})
113120
}
114121

115122
fn ok(&self) -> Option<&Bson> {
@@ -131,6 +138,10 @@ impl super::Response for Response {
131138
.ok()
132139
}
133140

141+
fn recovery_token(&self) -> Option<&Document> {
142+
self.recovery_token.as_ref()
143+
}
144+
134145
fn into_body(self) -> Self::Body {
135146
self.doc
136147
}

0 commit comments

Comments
 (0)