From e68b371fdfe2ba4ae767ad4c7256ed50bdd42aeb Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Tue, 6 Jul 2021 15:37:25 -0400 Subject: [PATCH 01/22] add snapshot option and client state --- src/client/options/mod.rs | 5 +++++ src/client/session/mod.rs | 12 +++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/client/options/mod.rs b/src/client/options/mod.rs index 5ad92f518..4a592a834 100644 --- a/src/client/options/mod.rs +++ b/src/client/options/mod.rs @@ -2307,6 +2307,11 @@ pub struct SessionOptions { /// on the [`Database`](../struct.Database.html) or [`Collection`](../struct.Collection.html) /// associated with the operations within the transaction. pub default_transaction_options: Option, + + /// If true, all read operations performed using this client session will share the same + /// snapshot. Defaults to false. + // TODO RUST-18 enforce snapshot exclusivity with causalConsistency. + pub snapshot: Option, } /// Contains the options that can be used for a transaction. diff --git a/src/client/session/mod.rs b/src/client/session/mod.rs index 619d1f823..37082d547 100644 --- a/src/client/session/mod.rs +++ b/src/client/session/mod.rs @@ -12,7 +12,7 @@ use lazy_static::lazy_static; use uuid::Uuid; use crate::{ - bson::{doc, spec::BinarySubtype, Binary, Bson, Document}, + bson::{doc, spec::BinarySubtype, Binary, Bson, Document, Timestamp}, error::{ErrorKind, Result}, operation::{AbortTransaction, CommitTransaction, Operation}, options::{SessionOptions, TransactionOptions}, @@ -106,6 +106,7 @@ pub struct ClientSession { is_implicit: bool, options: Option, pub(crate) transaction: Transaction, + snapshot_time: Option, } #[derive(Clone, Debug)] @@ -173,6 +174,7 @@ impl ClientSession { is_implicit, options, transaction: Default::default(), + snapshot_time: None, } } @@ -278,6 +280,11 @@ impl ClientSession { &mut self, options: impl Into>, ) -> Result<()> { + if self.options.as_ref().and_then(|o| o.snapshot).unwrap_or(false) { + return Err(ErrorKind::Transaction { + message: "transactions are not allowed on snapshot sessions".into(), + }.into()) + } match self.transaction.state { TransactionState::Starting | TransactionState::InProgress => { return Err(ErrorKind::Transaction { @@ -486,6 +493,7 @@ struct DroppedClientSession { is_implicit: bool, options: Option, transaction: Transaction, + snapshot_time: Option, } impl From for ClientSession { @@ -497,6 +505,7 @@ impl From for ClientSession { is_implicit: dropped_session.is_implicit, options: dropped_session.options, transaction: dropped_session.transaction, + snapshot_time: dropped_session.snapshot_time, } } } @@ -511,6 +520,7 @@ impl Drop for ClientSession { is_implicit: self.is_implicit, options: self.options.clone(), transaction: self.transaction.clone(), + snapshot_time: self.snapshot_time.clone(), }; RUNTIME.execute(async move { let mut session: ClientSession = dropped_session.into(); From fec46afc1fb1dbcf6cf6d19c34b62f9eb1ade53c Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Tue, 6 Jul 2021 16:02:41 -0400 Subject: [PATCH 02/22] allow atClusterTime in relevant command responses --- src/operation/distinct/mod.rs | 3 ++- src/operation/mod.rs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/operation/distinct/mod.rs b/src/operation/distinct/mod.rs index 4bdf51a18..0ae0ea3ef 100644 --- a/src/operation/distinct/mod.rs +++ b/src/operation/distinct/mod.rs @@ -4,7 +4,7 @@ mod test; use serde::Deserialize; use crate::{ - bson::{doc, Bson, Document}, + bson::{doc, Bson, Document, Timestamp}, cmap::{Command, CommandResponse, StreamDescription}, coll::{options::DistinctOptions, Namespace}, error::Result, @@ -93,4 +93,5 @@ impl Operation for Distinct { #[derive(Debug, Deserialize)] struct ResponseBody { values: Vec, + at_cluster_time: Option, } diff --git a/src/operation/mod.rs b/src/operation/mod.rs index 13a8f66eb..66e3d453c 100644 --- a/src/operation/mod.rs +++ b/src/operation/mod.rs @@ -22,7 +22,7 @@ use std::{collections::VecDeque, fmt::Debug, ops::Deref}; use serde::{Deserialize, Serialize}; use crate::{ - bson::{self, Bson, Document}, + bson::{self, Bson, Document, Timestamp}, cmap::{Command, CommandResponse, StreamDescription}, error::{ BulkWriteError, @@ -221,6 +221,7 @@ struct CursorInfo { ns: Namespace, #[serde(rename = "firstBatch")] first_batch: VecDeque, + at_cluster_time: Option, } #[derive(Debug, PartialEq)] From 96f4c27b0385ca7e82e95d7d3167fd65f2cbf062 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Wed, 7 Jul 2021 12:15:30 -0400 Subject: [PATCH 03/22] add an OperationResult trait --- src/client/executor.rs | 14 +++++++++++++- src/client/session/mod.rs | 2 +- src/cmap/conn/command.rs | 10 ++++++++++ src/concern/mod.rs | 7 +++++-- src/cursor/common.rs | 23 +++++++++++++++-------- src/operation/aggregate/mod.rs | 4 +--- src/operation/find/mod.rs | 7 +++---- src/operation/list_collections/mod.rs | 4 +--- src/operation/mod.rs | 13 +++++++------ src/results.rs | 23 ++++++++++++++++++++++- 10 files changed, 78 insertions(+), 29 deletions(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index 5ec9645aa..531c7c501 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -19,6 +19,7 @@ use crate::{ event::command::{CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent}, operation::{AbortTransaction, CommitTransaction, Operation, Retryability}, options::SelectionCriteria, + results::OperationResult, sdam::{HandshakePhase, SelectedServer, SessionSupportStatus, TransactionSupportStatus}, selection_criteria::ReadPreference, }; @@ -282,6 +283,9 @@ impl Client { if let Some(txn_number) = txn_number { cmd.set_txn_number(txn_number); } + if session.options().and_then(|opts| opts.snapshot).unwrap_or(false) { + cmd.set_snapshot_read_concern(session)?; + } match session.transaction.state { TransactionState::Starting => { cmd.set_start_transaction(); @@ -412,7 +416,15 @@ impl Client { }); match op.handle_response(response, connection.stream_description()?) { - Ok(response) => Ok(response), + Ok(response) => { + match (response.snapshot_timestamp(), session.as_mut()) { + (Some(timestamp), Some(session)) => { + session.snapshot_time = Some(*timestamp); + } + _ => (), + } + Ok(response) + } Err(mut err) => { err.add_labels(Some(connection), session, Some(retryability))?; Err(err) diff --git a/src/client/session/mod.rs b/src/client/session/mod.rs index 37082d547..04316ec86 100644 --- a/src/client/session/mod.rs +++ b/src/client/session/mod.rs @@ -106,7 +106,7 @@ pub struct ClientSession { is_implicit: bool, options: Option, pub(crate) transaction: Transaction, - snapshot_time: Option, + pub(crate) snapshot_time: Option, } #[derive(Clone, Debug)] diff --git a/src/cmap/conn/command.rs b/src/cmap/conn/command.rs index badf7991d..086a671a3 100644 --- a/src/cmap/conn/command.rs +++ b/src/cmap/conn/command.rs @@ -5,6 +5,7 @@ use crate::{ bson::{Bson, Document}, bson_util, client::{options::ServerApi, ClusterTime}, + concern::ReadConcern, error::{CommandError, Error, ErrorKind, Result}, options::ServerAddress, selection_criteria::ReadPreference, @@ -84,6 +85,15 @@ impl Command { } Ok(()) } + + pub(crate) fn set_snapshot_read_concern(&mut self, session: &ClientSession) -> Result<()> { + let mut concern_doc = bson::to_document(&ReadConcern::snapshot())?; + if let Some(timestamp) = session.snapshot_time { + concern_doc.insert("atClusterTime", timestamp); + } + self.body.insert("readConcern", concern_doc); + Ok(()) + } } #[derive(Debug, Clone)] diff --git a/src/concern/mod.rs b/src/concern/mod.rs index 48ebd120b..46fc1eaa3 100644 --- a/src/concern/mod.rs +++ b/src/concern/mod.rs @@ -10,7 +10,7 @@ use serde_with::skip_serializing_none; use typed_builder::TypedBuilder; use crate::{ - bson::{doc, serde_helpers}, + bson::{doc, serde_helpers, Timestamp}, bson_util, error::{ErrorKind, Result}, }; @@ -25,6 +25,9 @@ use crate::{ pub struct ReadConcern { /// The level of the read concern. pub level: ReadConcernLevel, + + /// The snapshot read timestamp. + at_cluster_time: Option, } impl ReadConcern { @@ -87,7 +90,7 @@ impl ReadConcern { impl From for ReadConcern { fn from(level: ReadConcernLevel) -> Self { - Self { level } + Self { level, at_cluster_time: None } } } diff --git a/src/cursor/common.rs b/src/cursor/common.rs index b862fb71f..d6591b782 100644 --- a/src/cursor/common.rs +++ b/src/cursor/common.rs @@ -9,10 +9,11 @@ use derivative::Derivative; use futures_core::{Future, Stream}; use crate::{ - bson::Document, + bson::{Document, Timestamp}, error::{Error, ErrorKind, Result}, + operation, options::ServerAddress, - results::GetMoreResult, + results::{GetMoreResult, OperationResult}, Client, Namespace, }; @@ -152,22 +153,21 @@ pub(crate) struct CursorSpecification { impl CursorSpecification { pub(crate) fn new( - ns: Namespace, + info: operation::CursorInfo, address: ServerAddress, - id: i64, batch_size: impl Into>, max_time: impl Into>, - initial_buffer: VecDeque, ) -> Self { Self { info: CursorInformation { - ns, - id, + ns: info.ns, + id: info.id, address, batch_size: batch_size.into(), max_time: max_time.into(), + at_cluster_time: info.at_cluster_time, }, - initial_buffer, + initial_buffer: info.first_batch, } } @@ -191,6 +191,12 @@ impl CursorSpecification { } } +impl OperationResult for CursorSpecification { + fn snapshot_timestamp(&self) -> Option<&Timestamp> { + self.info.at_cluster_time.as_ref() + } +} + /// Static information about a cursor. #[derive(Clone, Debug)] pub(crate) struct CursorInformation { @@ -199,4 +205,5 @@ pub(crate) struct CursorInformation { pub(crate) id: i64, pub(crate) batch_size: Option, pub(crate) max_time: Option, + pub(crate) at_cluster_time: Option, } diff --git a/src/operation/aggregate/mod.rs b/src/operation/aggregate/mod.rs index c7dd7efba..1ac9265a5 100644 --- a/src/operation/aggregate/mod.rs +++ b/src/operation/aggregate/mod.rs @@ -78,12 +78,10 @@ impl Operation for Aggregate { let body: CursorBody = response.body()?; Ok(CursorSpecification::new( - body.cursor.ns, + body.cursor, source_address, - body.cursor.id, self.options.as_ref().and_then(|opts| opts.batch_size), self.options.as_ref().and_then(|opts| opts.max_await_time), - body.cursor.first_batch, )) } diff --git a/src/operation/find/mod.rs b/src/operation/find/mod.rs index 3d7828e39..b94593e0a 100644 --- a/src/operation/find/mod.rs +++ b/src/operation/find/mod.rs @@ -101,15 +101,14 @@ impl Operation for Find { _description: &StreamDescription, ) -> Result { let source_address = response.source_address().clone(); - let body: CursorBody = response.body()?; + let mut body: CursorBody = response.body()?; + body.cursor.ns = self.ns.clone(); Ok(CursorSpecification::new( - self.ns.clone(), + body.cursor, source_address, - body.cursor.id, self.options.as_ref().and_then(|opts| opts.batch_size), self.options.as_ref().and_then(|opts| opts.max_await_time), - body.cursor.first_batch, )) } diff --git a/src/operation/list_collections/mod.rs b/src/operation/list_collections/mod.rs index db1ce3c6f..1b4b5d3bb 100644 --- a/src/operation/list_collections/mod.rs +++ b/src/operation/list_collections/mod.rs @@ -72,12 +72,10 @@ impl Operation for ListCollections { let body: CursorBody = response.body()?; Ok(CursorSpecification::new( - body.cursor.ns, + body.cursor, source_address, - body.cursor.id, self.options.as_ref().and_then(|opts| opts.batch_size), None, - body.cursor.first_batch, )) } diff --git a/src/operation/mod.rs b/src/operation/mod.rs index 66e3d453c..0f2169ab0 100644 --- a/src/operation/mod.rs +++ b/src/operation/mod.rs @@ -34,6 +34,7 @@ use crate::{ WriteFailure, }, options::WriteConcern, + results::OperationResult, selection_criteria::SelectionCriteria, Namespace, }; @@ -60,7 +61,7 @@ pub(crate) use update::Update; /// A trait modeling the behavior of a server side operation. pub(crate) trait Operation { /// The output type of this operation. - type O; + type O: OperationResult; /// The name of the server side command associated with this operation. const NAME: &'static str; @@ -216,12 +217,12 @@ struct CursorBody { } #[derive(Debug, Deserialize)] -struct CursorInfo { - id: i64, - ns: Namespace, +pub(crate) struct CursorInfo { + pub(crate) id: i64, + pub(crate) ns: Namespace, #[serde(rename = "firstBatch")] - first_batch: VecDeque, - at_cluster_time: Option, + pub(crate) first_batch: VecDeque, + pub(crate) at_cluster_time: Option, } #[derive(Debug, PartialEq)] diff --git a/src/results.rs b/src/results.rs index e2d7292d9..50bcfbe66 100644 --- a/src/results.rs +++ b/src/results.rs @@ -3,13 +3,26 @@ use std::collections::{HashMap, VecDeque}; use crate::{ - bson::{Bson, Document}, + bson::{Bson, Document, Timestamp}, db::options::CreateCollectionOptions, }; use bson::Binary; use serde::{Deserialize, Serialize}; +pub(crate) trait OperationResult { + /// Extracts the "atClusterTime" timestamp, if any. + fn snapshot_timestamp(&self) -> Option<&Timestamp> { + None + } +} + +impl OperationResult for () {} +impl OperationResult for u64 {} +impl OperationResult for Option {} +impl OperationResult for Vec {} +impl OperationResult for Document {} + /// The result of a [`Collection::insert_one`](../struct.Collection.html#method.insert_one) /// operation. #[derive(Debug, Serialize)] @@ -46,6 +59,8 @@ impl InsertManyResult { } } +impl OperationResult for InsertManyResult {} + /// The result of a [`Collection::update_one`](../struct.Collection.html#method.update_one) or /// [`Collection::update_many`](../struct.Collection.html#method.update_many) operation. #[derive(Debug, Serialize)] @@ -64,6 +79,8 @@ pub struct UpdateResult { pub upserted_id: Option, } +impl OperationResult for UpdateResult {} + /// The result of a [`Collection::delete_one`](../struct.Collection.html#method.delete_one) or /// [`Collection::delete_many`](../struct.Collection.html#method.delete_many) operation. #[derive(Debug, Serialize)] @@ -75,12 +92,16 @@ pub struct DeleteResult { pub deleted_count: u64, } +impl OperationResult for DeleteResult {} + #[derive(Debug, Clone)] pub(crate) struct GetMoreResult { pub(crate) batch: VecDeque, pub(crate) exhausted: bool, } +impl OperationResult for GetMoreResult {} + /// Describes the type of data store returned when executing /// [`Database::list_collections`](../struct.Database.html#method.list_collections). #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] From bf6acfab4c3332988e8105b77af013cb75398264 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Wed, 7 Jul 2021 12:18:42 -0400 Subject: [PATCH 04/22] use ReadConcern struct rather than document construction --- src/cmap/conn/command.rs | 8 +++----- src/concern/mod.rs | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/cmap/conn/command.rs b/src/cmap/conn/command.rs index 086a671a3..891b7cfee 100644 --- a/src/cmap/conn/command.rs +++ b/src/cmap/conn/command.rs @@ -87,11 +87,9 @@ impl Command { } pub(crate) fn set_snapshot_read_concern(&mut self, session: &ClientSession) -> Result<()> { - let mut concern_doc = bson::to_document(&ReadConcern::snapshot())?; - if let Some(timestamp) = session.snapshot_time { - concern_doc.insert("atClusterTime", timestamp); - } - self.body.insert("readConcern", concern_doc); + let mut concern = ReadConcern::snapshot(); + concern.at_cluster_time = session.snapshot_time; + self.body.insert("readConcern", bson::to_document(&concern)?); Ok(()) } } diff --git a/src/concern/mod.rs b/src/concern/mod.rs index 46fc1eaa3..e08e20657 100644 --- a/src/concern/mod.rs +++ b/src/concern/mod.rs @@ -27,7 +27,7 @@ pub struct ReadConcern { pub level: ReadConcernLevel, /// The snapshot read timestamp. - at_cluster_time: Option, + pub(crate) at_cluster_time: Option, } impl ReadConcern { From 0519bf3520a8162a10dd3b0978fcb514c678fcc1 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Wed, 7 Jul 2021 12:26:38 -0400 Subject: [PATCH 05/22] capture timestamp in Distinct result --- src/coll/mod.rs | 2 +- src/operation/distinct/mod.rs | 15 ++++----------- src/results.rs | 13 +++++++++++++ 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/coll/mod.rs b/src/coll/mod.rs index dfdf2bc8d..02da070f9 100644 --- a/src/coll/mod.rs +++ b/src/coll/mod.rs @@ -396,7 +396,7 @@ impl Collection { filter.into(), options, ); - self.client().execute_operation(op, session).await + self.client().execute_operation(op, session).await.map(|r| r.values) } /// Finds the distinct values of the field specified by `field_name` across the collection. diff --git a/src/operation/distinct/mod.rs b/src/operation/distinct/mod.rs index 0ae0ea3ef..2fb2715cd 100644 --- a/src/operation/distinct/mod.rs +++ b/src/operation/distinct/mod.rs @@ -1,14 +1,13 @@ #[cfg(test)] mod test; -use serde::Deserialize; - use crate::{ - bson::{doc, Bson, Document, Timestamp}, + bson::{doc, Document}, cmap::{Command, CommandResponse, StreamDescription}, coll::{options::DistinctOptions, Namespace}, error::Result, operation::{append_options, Operation, Retryability}, + results::DistinctResult, selection_criteria::SelectionCriteria, }; @@ -49,7 +48,7 @@ impl Distinct { } impl Operation for Distinct { - type O = Vec; + type O = DistinctResult; const NAME: &'static str = "distinct"; fn build(&mut self, _description: &StreamDescription) -> Result { @@ -75,7 +74,7 @@ impl Operation for Distinct { response: CommandResponse, _description: &StreamDescription, ) -> Result { - response.body::().map(|body| body.values) + response.body::() } fn selection_criteria(&self) -> Option<&SelectionCriteria> { @@ -89,9 +88,3 @@ impl Operation for Distinct { Retryability::Read } } - -#[derive(Debug, Deserialize)] -struct ResponseBody { - values: Vec, - at_cluster_time: Option, -} diff --git a/src/results.rs b/src/results.rs index 50bcfbe66..9f940be1d 100644 --- a/src/results.rs +++ b/src/results.rs @@ -102,6 +102,19 @@ pub(crate) struct GetMoreResult { impl OperationResult for GetMoreResult {} + +#[derive(Debug, Deserialize)] +pub(crate) struct DistinctResult { + pub(crate) values: Vec, + pub(crate) at_cluster_time: Option, +} + +impl OperationResult for DistinctResult { + fn snapshot_timestamp(&self) -> Option<&Timestamp> { + self.at_cluster_time.as_ref() + } +} + /// Describes the type of data store returned when executing /// [`Database::list_collections`](../struct.Database.html#method.list_collections). #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] From c2a6619a1fddbcbc487f66a468ce50e4ead509d2 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Wed, 7 Jul 2021 13:43:37 -0400 Subject: [PATCH 06/22] test fixes --- src/operation/distinct/test.rs | 23 +++++++++++++++-------- src/operation/get_more/test.rs | 4 ++++ src/results.rs | 2 +- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/operation/distinct/test.rs b/src/operation/distinct/test.rs index 3d45e97d2..24d8595e9 100644 --- a/src/operation/distinct/test.rs +++ b/src/operation/distinct/test.rs @@ -6,6 +6,7 @@ use crate::{ coll::{options::DistinctOptions, Namespace}, error::ErrorKind, operation::{test, Distinct, Operation}, + results::DistinctResult, }; #[cfg_attr(feature = "tokio-runtime", tokio::test)] @@ -97,19 +98,22 @@ async fn op_selection_criteria() { async fn handle_success() { let distinct_op = Distinct::empty(); - let expected_values: Vec = - vec![Bson::String("A".to_string()), Bson::String("B".to_string())]; + let expected_values = vec![Bson::String("A".to_string()), Bson::String("B".to_string())]; + let expected_result = DistinctResult { + values: expected_values.clone(), + at_cluster_time: None, + }; let response = CommandResponse::with_document(doc! { - "values" : expected_values.clone(), + "values" : expected_values, "ok" : 1 }); - let actual_values = distinct_op + let actual_result = distinct_op .handle_response(response, &Default::default()) .expect("supposed to succeed"); - assert_eq!(actual_values, expected_values); + assert_eq!(actual_result, expected_result); } #[cfg_attr(feature = "tokio-runtime", tokio::test)] @@ -122,13 +126,16 @@ async fn handle_response_with_empty_values() { "ok" : 1 }); - let expected_values: Vec = Vec::new(); + let expected_result = DistinctResult { + values: Vec::new(), + at_cluster_time: None, + }; - let actual_values = distinct_op + let actual_result = distinct_op .handle_response(response, &Default::default()) .expect("supposed to succeed"); - assert_eq!(actual_values, expected_values); + assert_eq!(actual_result, expected_result); } #[cfg_attr(feature = "tokio-runtime", tokio::test)] diff --git a/src/operation/get_more/test.rs b/src/operation/get_more/test.rs index 4960a56e7..8dba36fd1 100644 --- a/src/operation/get_more/test.rs +++ b/src/operation/get_more/test.rs @@ -25,6 +25,7 @@ fn build_test( address, batch_size, max_time, + at_cluster_time: None, }; let mut get_more = GetMore::new(info); @@ -116,6 +117,7 @@ async fn build_batch_size() { id: cursor_id, batch_size: Some((std::i32::MAX as u32) + 1), max_time: None, + at_cluster_time: None, }; let mut op = GetMore::new(info); assert!(op.build(&StreamDescription::new_testing()).is_err()) @@ -135,6 +137,7 @@ async fn op_selection_criteria() { id: 123, batch_size: None, max_time: None, + at_cluster_time: None, }; let get_more = GetMore::new(info); let server_description = ServerDescription { @@ -180,6 +183,7 @@ async fn handle_success() { id: cursor_id, batch_size: None, max_time: None, + at_cluster_time: None, }; let get_more = GetMore::new(info); diff --git a/src/results.rs b/src/results.rs index 9f940be1d..02f616f9f 100644 --- a/src/results.rs +++ b/src/results.rs @@ -103,7 +103,7 @@ pub(crate) struct GetMoreResult { impl OperationResult for GetMoreResult {} -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, PartialEq)] pub(crate) struct DistinctResult { pub(crate) values: Vec, pub(crate) at_cluster_time: Option, From a0f487a3386407e3ec75e7a372c82f6fb5f38941 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Wed, 7 Jul 2021 14:04:43 -0400 Subject: [PATCH 07/22] more test fixes --- src/concern/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/concern/mod.rs b/src/concern/mod.rs index e08e20657..2086ffc21 100644 --- a/src/concern/mod.rs +++ b/src/concern/mod.rs @@ -21,12 +21,14 @@ use crate::{ /// See the documentation [here](https://docs.mongodb.com/manual/reference/read-concern/) for more /// information about read concerns. #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] #[non_exhaustive] pub struct ReadConcern { /// The level of the read concern. pub level: ReadConcernLevel, /// The snapshot read timestamp. + #[serde(skip_serializing_if = "Option::is_none")] pub(crate) at_cluster_time: Option, } From 9524df4d22e757e308ccb056002005466cf13e45 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Wed, 7 Jul 2021 14:27:47 -0400 Subject: [PATCH 08/22] pull in spec test updates --- src/test/spec/json/sessions/README.rst | 26 +- .../{ => legacy}/dirty-session-errors.json | 0 .../{ => legacy}/dirty-session-errors.yml | 0 .../sessions/{ => legacy}/server-support.json | 0 .../sessions/{ => legacy}/server-support.yml | 0 ...t-sessions-not-supported-client-error.json | 113 ++ ...ot-sessions-not-supported-client-error.yml | 69 ++ ...t-sessions-not-supported-server-error.json | 187 ++++ ...ot-sessions-not-supported-server-error.yml | 102 ++ .../snapshot-sessions-unsupported-ops.json | 493 +++++++++ .../snapshot-sessions-unsupported-ops.yml | 258 +++++ .../sessions/unified/snapshot-sessions.json | 993 ++++++++++++++++++ .../sessions/unified/snapshot-sessions.yml | 482 +++++++++ src/test/spec/sessions.rs | 13 +- 14 files changed, 2730 insertions(+), 6 deletions(-) rename src/test/spec/json/sessions/{ => legacy}/dirty-session-errors.json (100%) rename src/test/spec/json/sessions/{ => legacy}/dirty-session-errors.yml (100%) rename src/test/spec/json/sessions/{ => legacy}/server-support.json (100%) rename src/test/spec/json/sessions/{ => legacy}/server-support.yml (100%) create mode 100644 src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-client-error.json create mode 100644 src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-client-error.yml create mode 100644 src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-server-error.json create mode 100644 src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-server-error.yml create mode 100644 src/test/spec/json/sessions/unified/snapshot-sessions-unsupported-ops.json create mode 100644 src/test/spec/json/sessions/unified/snapshot-sessions-unsupported-ops.yml create mode 100644 src/test/spec/json/sessions/unified/snapshot-sessions.json create mode 100644 src/test/spec/json/sessions/unified/snapshot-sessions.yml diff --git a/src/test/spec/json/sessions/README.rst b/src/test/spec/json/sessions/README.rst index 3ed7eea96..d88b5c7ba 100644 --- a/src/test/spec/json/sessions/README.rst +++ b/src/test/spec/json/sessions/README.rst @@ -9,10 +9,11 @@ Driver Session Tests Introduction ============ -The YAML and JSON files in this directory are platform-independent tests that -drivers can use to prove their conformance to the Driver Sessions Spec. They are +The YAML and JSON files in the ``legacy`` and ``unified`` sub-directories are platform-independent tests +that drivers can use to prove their conformance to the Driver Sessions Spec. They are designed with the intention of sharing most test-runner code with the -Transactions spec tests. +`Transactions Spec tests <../../transactions/tests/README.rst#test-format>`_.. Tests in the +``unified`` directory are written using the `Unified Test Format <../../unified-test-format/unified-test-format.rst>`_. Several prose tests, which are not easily expressed in YAML, are also presented in the Driver Sessions Spec. Those tests will need to be manually implemented @@ -78,7 +79,26 @@ the given session is *not* marked dirty:: arguments: session: session0 +Snapshot session tests +====================== +Snapshot sessions tests require server of version 5.0 or higher and +replica set or a sharded cluster deployment. +Default snapshot history window on the server is 5 minutes. Running the test in debug mode, or in any other slow configuration +may lead to `SnapshotTooOld` errors. Drivers can work around this issue by increasing the server's `minSnapshotHistoryWindowInSeconds` parameter, for example: + +.. code:: python + + client.admin.command('setParameter', 1, minSnapshotHistoryWindowInSeconds=60) + +Prose tests +``````````` +- Setting both ``snapshot`` and ``causalConsistency`` is not allowed + + * ``client.startSession(snapshot = true, causalConsistency = true)`` + * Assert that an error was raised by driver + Changelog ========= :2019-05-15: Initial version. +:2021-06-15: Added snapshot-session tests. Introduced legacy and unified folders. diff --git a/src/test/spec/json/sessions/dirty-session-errors.json b/src/test/spec/json/sessions/legacy/dirty-session-errors.json similarity index 100% rename from src/test/spec/json/sessions/dirty-session-errors.json rename to src/test/spec/json/sessions/legacy/dirty-session-errors.json diff --git a/src/test/spec/json/sessions/dirty-session-errors.yml b/src/test/spec/json/sessions/legacy/dirty-session-errors.yml similarity index 100% rename from src/test/spec/json/sessions/dirty-session-errors.yml rename to src/test/spec/json/sessions/legacy/dirty-session-errors.yml diff --git a/src/test/spec/json/sessions/server-support.json b/src/test/spec/json/sessions/legacy/server-support.json similarity index 100% rename from src/test/spec/json/sessions/server-support.json rename to src/test/spec/json/sessions/legacy/server-support.json diff --git a/src/test/spec/json/sessions/server-support.yml b/src/test/spec/json/sessions/legacy/server-support.yml similarity index 100% rename from src/test/spec/json/sessions/server-support.yml rename to src/test/spec/json/sessions/legacy/server-support.yml diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-client-error.json b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-client-error.json new file mode 100644 index 000000000..129aa8d74 --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-client-error.json @@ -0,0 +1,113 @@ +{ + "description": "snapshot-sessions-not-supported-client-error", + "schemaVersion": "1.0", + "runOnRequirements": [ + { + "minServerVersion": "3.6", + "maxServerVersion": "4.4.99" + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent", + "commandFailedEvent" + ] + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "database0" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "collection0" + } + }, + { + "session": { + "id": "session0", + "client": "client0", + "sessionOptions": { + "snapshot": true + } + } + } + ], + "initialData": [ + { + "collectionName": "collection0", + "databaseName": "database0", + "documents": [ + { + "_id": 1, + "x": 11 + } + ] + } + ], + "tests": [ + { + "description": "Client error on find with snapshot", + "operations": [ + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": {} + }, + "expectError": { + "isClientError": true, + "errorContains": "Snapshot reads require MongoDB 5.0 or later" + } + } + ], + "expectEvents": [] + }, + { + "description": "Client error on aggregate with snapshot", + "operations": [ + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "session": "session0", + "pipeline": [] + }, + "expectError": { + "isClientError": true, + "errorContains": "Snapshot reads require MongoDB 5.0 or later" + } + } + ], + "expectEvents": [] + }, + { + "description": "Client error on distinct with snapshot", + "operations": [ + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session0" + }, + "expectError": { + "isClientError": true, + "errorContains": "Snapshot reads require MongoDB 5.0 or later" + } + } + ], + "expectEvents": [] + } + ] +} diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-client-error.yml b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-client-error.yml new file mode 100644 index 000000000..b57344ce9 --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-client-error.yml @@ -0,0 +1,69 @@ +description: snapshot-sessions-not-supported-client-error + +schemaVersion: "1.0" + +runOnRequirements: + - minServerVersion: "3.6" + maxServerVersion: "4.4.99" + +createEntities: + - client: + id: &client0 client0 + observeEvents: [ commandStartedEvent, commandFailedEvent ] + - database: + id: &database0Name database0 + client: *client0 + databaseName: *database0Name + - collection: + id: &collection0Name collection0 + database: *database0Name + collectionName: *collection0Name + - session: + id: session0 + client: client0 + sessionOptions: + snapshot: true + +initialData: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1, x: 11 } + +tests: +- description: Client error on find with snapshot + operations: + - name: find + object: collection0 + arguments: + session: session0 + filter: {} + expectError: + isClientError: true + errorContains: Snapshot reads require MongoDB 5.0 or later + expectEvents: [] + +- description: Client error on aggregate with snapshot + operations: + - name: aggregate + object: collection0 + arguments: + session: session0 + pipeline: [] + expectError: + isClientError: true + errorContains: Snapshot reads require MongoDB 5.0 or later + expectEvents: [] + +- description: Client error on distinct with snapshot + operations: + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + session: session0 + expectError: + isClientError: true + errorContains: Snapshot reads require MongoDB 5.0 or later + expectEvents: [] diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-server-error.json b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-server-error.json new file mode 100644 index 000000000..79213f314 --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-server-error.json @@ -0,0 +1,187 @@ +{ + "description": "snapshot-sessions-not-supported-server-error", + "schemaVersion": "1.0", + "runOnRequirements": [ + { + "minServerVersion": "5.0", + "topologies": [ + "single" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent", + "commandFailedEvent" + ] + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "database0" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "collection0" + } + }, + { + "session": { + "id": "session0", + "client": "client0", + "sessionOptions": { + "snapshot": true + } + } + } + ], + "initialData": [ + { + "collectionName": "collection0", + "databaseName": "database0", + "documents": [ + { + "_id": 1, + "x": 11 + } + ] + } + ], + "tests": [ + { + "description": "Server returns an error on find with snapshot", + "operations": [ + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": {} + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "find" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on aggregate with snapshot", + "operations": [ + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "session": "session0", + "pipeline": [] + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "aggregate" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on distinct with snapshot", + "operations": [ + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session0" + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "distinct" + } + } + ] + } + ] + } + ] +} diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-server-error.yml b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-server-error.yml new file mode 100644 index 000000000..4953dbcbe --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-server-error.yml @@ -0,0 +1,102 @@ +description: snapshot-sessions-not-supported-server-error + +schemaVersion: "1.0" + +runOnRequirements: + - minServerVersion: "5.0" + topologies: [ single ] + +createEntities: + - client: + id: &client0 client0 + observeEvents: [ commandStartedEvent, commandFailedEvent ] + - database: + id: &database0Name database0 + client: *client0 + databaseName: *database0Name + - collection: + id: &collection0Name collection0 + database: *database0Name + collectionName: *collection0Name + - session: + id: session0 + client: client0 + sessionOptions: + snapshot: true + +initialData: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1, x: 11 } + +tests: +- description: Server returns an error on find with snapshot + operations: + - name: find + object: collection0 + arguments: + session: session0 + filter: {} + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: find + +- description: Server returns an error on aggregate with snapshot + operations: + - name: aggregate + object: collection0 + arguments: + session: session0 + pipeline: [] + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: aggregate + +- description: Server returns an error on distinct with snapshot + operations: + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + session: session0 + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: distinct diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions-unsupported-ops.json b/src/test/spec/json/sessions/unified/snapshot-sessions-unsupported-ops.json new file mode 100644 index 000000000..1021b7f26 --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions-unsupported-ops.json @@ -0,0 +1,493 @@ +{ + "description": "snapshot-sessions-unsupported-ops", + "schemaVersion": "1.0", + "runOnRequirements": [ + { + "minServerVersion": "5.0", + "topologies": [ + "replicaset", + "sharded-replicaset" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent", + "commandFailedEvent" + ] + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "database0" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "collection0" + } + }, + { + "session": { + "id": "session0", + "client": "client0", + "sessionOptions": { + "snapshot": true + } + } + } + ], + "initialData": [ + { + "collectionName": "collection0", + "databaseName": "database0", + "documents": [ + { + "_id": 1, + "x": 11 + } + ] + } + ], + "tests": [ + { + "description": "Server returns an error on insertOne with snapshot", + "runOnRequirements": [ + { + "topologies": [ + "replicaset" + ] + } + ], + "operations": [ + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "session": "session0", + "document": { + "_id": 22, + "x": 22 + } + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "insert": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "insert" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on insertMany with snapshot", + "runOnRequirements": [ + { + "topologies": [ + "replicaset" + ] + } + ], + "operations": [ + { + "name": "insertMany", + "object": "collection0", + "arguments": { + "session": "session0", + "documents": [ + { + "_id": 22, + "x": 22 + }, + { + "_id": 33, + "x": 33 + } + ] + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "insert": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "insert" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on deleteOne with snapshot", + "runOnRequirements": [ + { + "topologies": [ + "replicaset" + ] + } + ], + "operations": [ + { + "name": "deleteOne", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": {} + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "delete": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "delete" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on updateOne with snapshot", + "runOnRequirements": [ + { + "topologies": [ + "replicaset" + ] + } + ], + "operations": [ + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + } + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "update": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "update" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on findOneAndUpdate with snapshot", + "operations": [ + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + } + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "findAndModify": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "findAndModify" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on listDatabases with snapshot", + "operations": [ + { + "name": "listDatabases", + "object": "client0", + "arguments": { + "session": "session0" + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "listDatabases": 1, + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "listDatabases" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on listCollections with snapshot", + "operations": [ + { + "name": "listCollections", + "object": "database0", + "arguments": { + "session": "session0" + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "listCollections": 1, + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "listCollections" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on listIndexes with snapshot", + "operations": [ + { + "name": "listIndexes", + "object": "collection0", + "arguments": { + "session": "session0" + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "listIndexes": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "listIndexes" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on runCommand with snapshot", + "operations": [ + { + "name": "runCommand", + "object": "database0", + "arguments": { + "session": "session0", + "commandName": "listCollections", + "command": { + "listCollections": 1 + } + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "listCollections": 1, + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "listCollections" + } + } + ] + } + ] + } + ] +} diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions-unsupported-ops.yml b/src/test/spec/json/sessions/unified/snapshot-sessions-unsupported-ops.yml new file mode 100644 index 000000000..1d5dce893 --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions-unsupported-ops.yml @@ -0,0 +1,258 @@ +description: snapshot-sessions-unsupported-ops + +schemaVersion: "1.0" + +runOnRequirements: + - minServerVersion: "5.0" + topologies: [replicaset, sharded-replicaset] + +createEntities: + - client: + id: &client0 client0 + observeEvents: [ commandStartedEvent, commandFailedEvent ] + - database: + id: &database0Name database0 + client: *client0 + databaseName: *database0Name + - collection: + id: &collection0Name collection0 + database: *database0Name + collectionName: *collection0Name + - session: + id: session0 + client: client0 + sessionOptions: + snapshot: true + +initialData: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1, x: 11 } + +tests: +- description: Server returns an error on insertOne with snapshot + # Skip on sharded clusters due to SERVER-58176. + runOnRequirements: + - topologies: [replicaset] + operations: + - name: insertOne + object: collection0 + arguments: + session: session0 + document: + _id: 22 + x: 22 + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + insert: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: insert + +- description: Server returns an error on insertMany with snapshot + # Skip on sharded clusters due to SERVER-58176. + runOnRequirements: + - topologies: [replicaset] + operations: + - name: insertMany + object: collection0 + arguments: + session: session0 + documents: + - _id: 22 + x: 22 + - _id: 33 + x: 33 + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + insert: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: insert + +- description: Server returns an error on deleteOne with snapshot + # Skip on sharded clusters due to SERVER-58176. + runOnRequirements: + - topologies: [replicaset] + operations: + - name: deleteOne + object: collection0 + arguments: + session: session0 + filter: {} + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + delete: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: delete + +- description: Server returns an error on updateOne with snapshot + # Skip on sharded clusters due to SERVER-58176. + runOnRequirements: + - topologies: [replicaset] + operations: + - name: updateOne + object: collection0 + arguments: + session: session0 + filter: { _id: 1 } + update: { $inc: { x: 1 } } + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + update: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: update + +- description: Server returns an error on findOneAndUpdate with snapshot + operations: + - name: findOneAndUpdate + object: collection0 + arguments: + session: session0 + filter: { _id: 1 } + update: { $inc: { x: 1 } } + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + findAndModify: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: findAndModify + +- description: Server returns an error on listDatabases with snapshot + operations: + - name: listDatabases + object: client0 + arguments: + session: session0 + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + listDatabases: 1 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: listDatabases + +- description: Server returns an error on listCollections with snapshot + operations: + - name: listCollections + object: database0 + arguments: + session: session0 + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + listCollections: 1 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: listCollections + +- description: Server returns an error on listIndexes with snapshot + operations: + - name: listIndexes + object: collection0 + arguments: + session: session0 + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + listIndexes: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: listIndexes + +- description: Server returns an error on runCommand with snapshot + operations: + - name: runCommand + object: database0 + arguments: + session: session0 + commandName: listCollections + command: + listCollections: 1 + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + listCollections: 1 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: listCollections diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions.json b/src/test/spec/json/sessions/unified/snapshot-sessions.json new file mode 100644 index 000000000..75b577b03 --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions.json @@ -0,0 +1,993 @@ +{ + "description": "snapshot-sessions", + "schemaVersion": "1.0", + "runOnRequirements": [ + { + "minServerVersion": "5.0", + "topologies": [ + "replicaset", + "sharded-replicaset" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent" + ], + "ignoreCommandMonitoringEvents": [ + "findAndModify", + "insert", + "update" + ] + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "database0" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "collection0", + "collectionOptions": { + "writeConcern": { + "w": "majority" + } + } + } + }, + { + "session": { + "id": "session0", + "client": "client0", + "sessionOptions": { + "snapshot": true + } + } + }, + { + "session": { + "id": "session1", + "client": "client0", + "sessionOptions": { + "snapshot": true + } + } + } + ], + "initialData": [ + { + "collectionName": "collection0", + "databaseName": "database0", + "documents": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 11 + } + ] + } + ], + "tests": [ + { + "description": "Find operation with snapshot", + "operations": [ + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 1, + "x": 12 + } + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session1", + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 12 + } + ] + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 1, + "x": 13 + } + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 13 + } + ] + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session1", + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 12 + } + ] + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "$$exists": false + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + } + ] + } + ] + }, + { + "description": "Distinct operation with snapshot", + "operations": [ + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session0" + }, + "expectResult": [ + 11 + ] + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 2 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 2, + "x": 12 + } + }, + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session1" + }, + "expectResult": [ + 11, + 12 + ] + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 2 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 2, + "x": 13 + } + }, + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {} + }, + "expectResult": [ + 11, + 13 + ] + }, + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session0" + }, + "expectResult": [ + 11 + ] + }, + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session1" + }, + "expectResult": [ + 11, + 12 + ] + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "$$exists": false + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + } + ] + } + ] + }, + { + "description": "Aggregate operation with snapshot", + "operations": [ + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ], + "session": "session0" + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 1, + "x": 12 + } + }, + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ], + "session": "session1" + }, + "expectResult": [ + { + "_id": 1, + "x": 12 + } + ] + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 1, + "x": 13 + } + }, + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ] + }, + "expectResult": [ + { + "_id": 1, + "x": 13 + } + ] + }, + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ], + "session": "session0" + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ], + "session": "session1" + }, + "expectResult": [ + { + "_id": 1, + "x": 12 + } + ] + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "$$exists": false + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + } + ] + } + ] + }, + { + "description": "countDocuments operation with snapshot", + "operations": [ + { + "name": "countDocuments", + "object": "collection0", + "arguments": { + "filter": {}, + "session": "session0" + }, + "expectResult": 2 + }, + { + "name": "countDocuments", + "object": "collection0", + "arguments": { + "filter": {}, + "session": "session0" + }, + "expectResult": 2 + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + } + ] + } + ] + }, + { + "description": "Mixed operation with snapshot", + "operations": [ + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 1, + "x": 12 + } + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 12 + } + ] + }, + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ], + "session": "session0" + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session0" + }, + "expectResult": [ + 11 + ] + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "$$exists": false + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + } + ] + } + ] + }, + { + "description": "Write commands with snapshot session do not affect snapshot reads", + "operations": [ + { + "name": "find", + "object": "collection0", + "arguments": { + "filter": {}, + "session": "session0" + } + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 22, + "x": 33 + } + } + }, + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "session": "session0" + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + } + ] + } + ] + }, + { + "description": "First snapshot read does not send atClusterTime", + "operations": [ + { + "name": "find", + "object": "collection0", + "arguments": { + "filter": {}, + "session": "session0" + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + }, + "commandName": "find", + "databaseName": "database0" + } + } + ] + } + ] + }, + { + "description": "StartTransaction fails in snapshot session", + "operations": [ + { + "name": "startTransaction", + "object": "session0", + "expectError": { + "isError": true, + "isClientError": true, + "errorContains": "Transactions are not supported in snapshot sessions" + } + } + ] + } + ] +} diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions.yml b/src/test/spec/json/sessions/unified/snapshot-sessions.yml new file mode 100644 index 000000000..2f5fc2312 --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions.yml @@ -0,0 +1,482 @@ +description: snapshot-sessions + +schemaVersion: "1.0" + +runOnRequirements: + - minServerVersion: "5.0" + topologies: [replicaset, sharded-replicaset] + +createEntities: + - client: + id: &client0 client0 + observeEvents: [ commandStartedEvent] + ignoreCommandMonitoringEvents: [ findAndModify, insert, update ] + - database: + id: &database0 database0 + client: *client0 + databaseName: &database0Name database0 + - collection: + id: &collection0 collection0 + database: *database0 + collectionName: &collection0Name collection0 + collectionOptions: + writeConcern: { w: majority } + - session: + id: session0 + client: client0 + sessionOptions: + snapshot: true + - session: + id: session1 + client: client0 + sessionOptions: + snapshot: true + +initialData: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1, x: 11 } + - { _id: 2, x: 11 } + +tests: +- description: Find operation with snapshot + operations: + - name: find + object: collection0 + arguments: + session: session0 + filter: { _id: 1 } + expectResult: + - {_id: 1, x: 11} + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 1 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 1, x: 12 } + - name: find + object: collection0 + arguments: + session: session1 + filter: { _id: 1 } + expectResult: + - { _id: 1, x: 12 } + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 1 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 1, x: 13 } + - name: find + object: collection0 + arguments: + filter: { _id: 1 } + expectResult: + - { _id: 1, x: 13 } + - name: find + object: collection0 + arguments: + session: session0 + filter: { _id: 1 } + expectResult: + - {_id: 1, x: 11} + - name: find + object: collection0 + arguments: + session: session1 + filter: { _id: 1 } + expectResult: + - {_id: 1, x: 12} + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + find: collection0 + readConcern: + "$$exists": false + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + +- description: Distinct operation with snapshot + operations: + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + session: session0 + expectResult: + - 11 + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 2 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 2, x: 12 } + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + session: session1 + expectResult: [11, 12] + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 2 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 2, x: 13 } + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + expectResult: [ 11, 13 ] + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + session: session0 + expectResult: [ 11 ] + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + session: session1 + expectResult: [ 11, 12 ] + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + "$$exists": false + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + +- description: Aggregate operation with snapshot + operations: + - name: aggregate + object: collection0 + arguments: + pipeline: + - "$match": { _id: 1 } + session: session0 + expectResult: + - { _id: 1, x: 11 } + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 1 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 1, x: 12 } + - name: aggregate + object: collection0 + arguments: + pipeline: + - "$match": + _id: 1 + session: session1 + expectResult: + - {_id: 1, x: 12} + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 1 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 1, x: 13 } + - name: aggregate + object: collection0 + arguments: + pipeline: + - "$match": { _id: 1 } + expectResult: + - { _id: 1, x: 13 } + - name: aggregate + object: collection0 + arguments: + pipeline: + - "$match": + _id: 1 + session: session0 + expectResult: + - { _id: 1, x: 11 } + - name: aggregate + object: collection0 + arguments: + pipeline: + - "$match": { _id: 1 } + session: session1 + expectResult: + - { _id: 1, x: 12 } + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + "$$exists": false + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + +- description: countDocuments operation with snapshot + operations: + - name: countDocuments + object: collection0 + arguments: + filter: {} + session: session0 + expectResult: 2 + - name: countDocuments + object: collection0 + arguments: + filter: {} + session: session0 + expectResult: 2 + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + +- description: Mixed operation with snapshot + operations: + - name: find + object: collection0 + arguments: + session: session0 + filter: { _id: 1 } + expectResult: + - { _id: 1, x: 11 } + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 1 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 1, x: 12 } + - name: find + object: collection0 + arguments: + filter: { _id: 1 } + expectResult: + - { _id: 1, x: 12 } + - name: aggregate + object: collection0 + arguments: + pipeline: + - "$match": + _id: 1 + session: session0 + expectResult: + - { _id: 1, x: 11 } + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + session: session0 + expectResult: [ 11 ] + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + find: collection0 + readConcern: + "$$exists": false + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + +- description: Write commands with snapshot session do not affect snapshot reads + operations: + - name: find + object: collection0 + arguments: + filter: {} + session: session0 + - name: insertOne + object: collection0 + arguments: + document: + _id: 22 + x: 33 + - name: updateOne + object: collection0 + arguments: + filter: { _id: 1 } + update: { $inc: { x: 1 } } + - name: find + object: collection0 + arguments: + filter: { _id: 1 } + session: session0 + expectResult: + - {_id: 1, x: 11} + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + +- description: First snapshot read does not send atClusterTime + operations: + - name: find + object: collection0 + arguments: + filter: {} + session: session0 + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + commandName: find + databaseName: database0 + +- description: StartTransaction fails in snapshot session + operations: + - name: startTransaction + object: session0 + expectError: + isError: true + isClientError: true + errorContains: Transactions are not supported in snapshot sessions diff --git a/src/test/spec/sessions.rs b/src/test/spec/sessions.rs index 1a8ad3219..d8d60e378 100644 --- a/src/test/spec/sessions.rs +++ b/src/test/spec/sessions.rs @@ -2,11 +2,18 @@ use tokio::sync::RwLockWriteGuard; use crate::test::{run_spec_test, LOCK}; -use super::run_v2_test; +use super::{run_v2_test, run_unified_format_test}; #[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))] #[cfg_attr(feature = "async-std-runtime", async_std::test)] -async fn run() { +async fn run_unified() { let _guard: RwLockWriteGuard<()> = LOCK.run_exclusively().await; - run_spec_test(&["sessions"], run_v2_test).await; + run_spec_test(&["sessions", "unified"], run_unified_format_test).await; +} + +#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn run_legacy() { + let _guard: RwLockWriteGuard<()> = LOCK.run_exclusively().await; + run_spec_test(&["sessions", "legacy"], run_v2_test).await; } From e178ab7c6a44ed9ab8952a5500dc5f5de8b065ed Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Thu, 8 Jul 2021 11:40:33 -0400 Subject: [PATCH 09/22] support sessions for more unified runner operations --- src/client/mod.rs | 25 ++- src/test/spec/unified_runner/operation.rs | 205 +++++++++++++++++----- 2 files changed, 179 insertions(+), 51 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index f5575b6ed..6b26c8c48 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -158,20 +158,39 @@ impl Client { Database::new(self.clone(), name, Some(options)) } - /// Gets information about each database present in the cluster the Client is connected to. - pub async fn list_databases( + async fn list_databases_common( &self, filter: impl Into>, options: impl Into>, + session: Option<&mut ClientSession>, ) -> Result> { let op = ListDatabases::new(filter.into(), false, options.into()); - self.execute_operation(op, None).await.and_then(|dbs| { + self.execute_operation(op, session).await.and_then(|dbs| { dbs.into_iter() .map(|db_spec| bson::from_document(db_spec).map_err(crate::error::Error::from)) .collect() }) } + /// Gets information about each database present in the cluster the Client is connected to. + pub async fn list_databases( + &self, + filter: impl Into>, + options: impl Into>, + ) -> Result> { + self.list_databases_common(filter, options, None).await + } + + /// Gets information about each database present in the cluster the Client is connected to. + pub async fn list_databases_with_session( + &self, + filter: impl Into>, + options: impl Into>, + session: &mut ClientSession, + ) -> Result> { + self.list_databases_common(filter, options, Some(session)).await + } + /// Gets the names of the databases present in the cluster the Client is connected to. pub async fn list_database_names( &self, diff --git a/src/test/spec/unified_runner/operation.rs b/src/test/spec/unified_runner/operation.rs index ae667d4ba..49a6bdfa6 100644 --- a/src/test/spec/unified_runner/operation.rs +++ b/src/test/spec/unified_runner/operation.rs @@ -273,6 +273,7 @@ impl TestOperation for DeleteMany { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct DeleteOne { filter: Document, + session: Option, #[serde(flatten)] options: DeleteOptions, } @@ -284,10 +285,18 @@ impl TestOperation for DeleteOne { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .delete_one(self.filter.clone(), self.options.clone()) - .await?; + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + collection + .delete_one_with_session(self.filter.clone(), self.options.clone(), session) + .await? + } + None => collection + .delete_one(self.filter.clone(), self.options.clone()) + .await? + }; let result = to_bson(&result)?; Ok(Some(result.into())) } @@ -301,6 +310,7 @@ impl TestOperation for DeleteOne { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct Find { filter: Option, + session: Option, #[serde(flatten)] options: FindOptions, } @@ -312,11 +322,26 @@ impl TestOperation for Find { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let collection = test_runner.get_collection(id); - let cursor = collection - .find(self.filter.clone(), self.options.clone()) - .await?; - let result = cursor.try_collect::>().await?; + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + let mut cursor = collection + .find_with_session( + self.filter.clone(), + self.options.clone(), + session, + ) + .await?; + cursor.stream(session).try_collect::>().await? + } + None => { + let cursor = collection + .find(self.filter.clone(), self.options.clone()) + .await?; + cursor.try_collect::>().await? + } + }; Ok(Some(Bson::from(result).into())) } @@ -333,6 +358,7 @@ impl TestOperation for Find { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct InsertMany { documents: Vec, + session: Option, #[serde(flatten)] options: InsertManyOptions, } @@ -344,10 +370,18 @@ impl TestOperation for InsertMany { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .insert_many(self.documents.clone(), self.options.clone()) - .await?; + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + collection + .insert_many_with_session(self.documents.clone(), self.options.clone(), session) + .await? + }, + None => collection + .insert_many(self.documents.clone(), self.options.clone()) + .await? + }; let ids: HashMap = result .inserted_ids .into_iter() @@ -489,6 +523,7 @@ impl TestOperation for UpdateOne { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct Aggregate { pipeline: Vec, + session: Option, #[serde(flatten)] options: AggregateOptions, } @@ -500,19 +535,40 @@ impl TestOperation for Aggregate { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let cursor = match test_runner.entities.get(id).unwrap() { - Entity::Collection(collection) => { - collection - .aggregate(self.pipeline.clone(), self.options.clone()) - .await? + let result = match &self.session { + Some(session_id) => { + let entity = test_runner.entities.get(id).unwrap().clone(); + let session = test_runner.get_mut_session(session_id); + let mut cursor = match entity { + Entity::Collection(collection) => { + collection + .aggregate_with_session(self.pipeline.clone(), self.options.clone(), session) + .await? + } + Entity::Database(db) => { + db.aggregate_with_session(self.pipeline.clone(), self.options.clone(), session) + .await? + } + other => panic!("Cannot execute aggregate on {:?}", &other), + }; + cursor.stream(session).try_collect::>().await? } - Entity::Database(db) => { - db.aggregate(self.pipeline.clone(), self.options.clone()) - .await? + None => { + let cursor = match test_runner.entities.get(id).unwrap() { + Entity::Collection(collection) => { + collection + .aggregate(self.pipeline.clone(), self.options.clone()) + .await? + } + Entity::Database(db) => { + db.aggregate(self.pipeline.clone(), self.options.clone()) + .await? + } + other => panic!("Cannot execute aggregate on {:?}", &other), + }; + cursor.try_collect::>().await? } - other => panic!("Cannot execute aggregate on {:?}", &other), }; - let result = cursor.try_collect::>().await?; Ok(Some(Bson::from(result).into())) } @@ -530,6 +586,7 @@ impl TestOperation for Aggregate { pub(super) struct Distinct { field_name: String, filter: Option, + session: Option, #[serde(flatten)] options: DistinctOptions, } @@ -541,10 +598,18 @@ impl TestOperation for Distinct { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .distinct(&self.field_name, self.filter.clone(), self.options.clone()) - .await?; + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + collection + .distinct_with_session(&self.field_name, self.filter.clone(), self.options.clone(), session) + .await? + } + None => collection + .distinct(&self.field_name, self.filter.clone(), self.options.clone()) + .await? + }; Ok(Some(Bson::Array(result).into())) } @@ -557,6 +622,7 @@ impl TestOperation for Distinct { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct CountDocuments { filter: Document, + session: Option, #[serde(flatten)] options: CountOptions, } @@ -568,10 +634,18 @@ impl TestOperation for CountDocuments { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .count_documents(self.filter.clone(), self.options.clone()) - .await?; + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + collection + .count_documents_with_session(self.filter.clone(), self.options.clone(), session) + .await? + } + None => collection + .count_documents(self.filter.clone(), self.options.clone()) + .await? + }; Ok(Some(Bson::from(result).into())) } @@ -640,6 +714,7 @@ impl TestOperation for FindOne { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct ListDatabases { filter: Option, + session: Option, #[serde(flatten)] options: ListDatabasesOptions, } @@ -651,10 +726,18 @@ impl TestOperation for ListDatabases { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let client = test_runner.get_client(id); - let result = client - .list_databases(self.filter.clone(), self.options.clone()) - .await?; + let client = test_runner.get_client(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + client + .list_databases_with_session(self.filter.clone(), self.options.clone(), session) + .await? + } + None => client + .list_databases(self.filter.clone(), self.options.clone()) + .await? + }; Ok(Some(bson::to_bson(&result)?.into())) } @@ -695,6 +778,7 @@ impl TestOperation for ListDatabaseNames { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct ListCollections { filter: Option, + session: Option, #[serde(flatten)] options: ListCollectionsOptions, } @@ -706,11 +790,22 @@ impl TestOperation for ListCollections { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let db = test_runner.get_database(id); - let cursor = db - .list_collections(self.filter.clone(), self.options.clone()) - .await?; - let result = cursor.try_collect::>().await?; + let db = test_runner.get_database(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + let mut cursor = db + .list_collections_with_session(self.filter.clone(), self.options.clone(), session) + .await?; + cursor.stream(session).try_collect::>().await? + } + None => { + let cursor = db + .list_collections(self.filter.clone(), self.options.clone()) + .await?; + cursor.try_collect::>().await? + } + }; Ok(Some(bson::to_bson(&result)?.into())) } @@ -785,6 +880,7 @@ impl TestOperation for ReplaceOne { pub(super) struct FindOneAndUpdate { filter: Document, update: UpdateModifications, + session: Option, #[serde(flatten)] options: FindOneAndUpdateOptions, } @@ -796,14 +892,27 @@ impl TestOperation for FindOneAndUpdate { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .find_one_and_update( - self.filter.clone(), - self.update.clone(), - self.options.clone(), - ) - .await?; + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + collection + .find_one_and_update_with_session( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + session, + ) + .await? + } + None => collection + .find_one_and_update( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + ) + .await? + }; let result = to_bson(&result)?; Ok(Some(result.into())) } From 7c571b126b1bc9a2b41a72830172269acd0d8275 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Fri, 9 Jul 2021 10:05:01 -0400 Subject: [PATCH 10/22] show result mismatch --- src/test/spec/unified_runner/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/spec/unified_runner/mod.rs b/src/test/spec/unified_runner/mod.rs index 4c3277624..a32e5e6a6 100644 --- a/src/test/spec/unified_runner/mod.rs +++ b/src/test/spec/unified_runner/mod.rs @@ -189,7 +189,7 @@ pub async fn run_unified_format_test(test_file: TestFile) { operation.returns_root_documents(), Some(&test_runner.entities), ), - "result mismatch, expected = {:#?} actual = {:#?}", + "result mismatch, expected = {:#?}, actual = {:#?}", expect_result, result ); From e65ecff21ff671e91ec69765bd47921dc1571126 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Fri, 9 Jul 2021 11:11:42 -0400 Subject: [PATCH 11/22] fix casing of at_cluster_time --- src/operation/mod.rs | 1 + src/results.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/operation/mod.rs b/src/operation/mod.rs index 0f2169ab0..c5480d0fd 100644 --- a/src/operation/mod.rs +++ b/src/operation/mod.rs @@ -217,6 +217,7 @@ struct CursorBody { } #[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] pub(crate) struct CursorInfo { pub(crate) id: i64, pub(crate) ns: Namespace, diff --git a/src/results.rs b/src/results.rs index 02f616f9f..4c264cc95 100644 --- a/src/results.rs +++ b/src/results.rs @@ -104,6 +104,7 @@ impl OperationResult for GetMoreResult {} #[derive(Debug, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] pub(crate) struct DistinctResult { pub(crate) values: Vec, pub(crate) at_cluster_time: Option, From 45fb940618b391c10c0f572fcf30b5dbfee05463 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Fri, 9 Jul 2021 12:25:24 -0400 Subject: [PATCH 12/22] print event mismatch --- src/test/spec/unified_runner/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/test/spec/unified_runner/mod.rs b/src/test/spec/unified_runner/mod.rs index a32e5e6a6..c86abf7ef 100644 --- a/src/test/spec/unified_runner/mod.rs +++ b/src/test/spec/unified_runner/mod.rs @@ -231,7 +231,10 @@ pub async fn run_unified_format_test(test_file: TestFile) { assert_eq!(actual_events.len(), expected_events.len()); for (actual, expected) in actual_events.iter().zip(expected_events) { - assert!(events_match(actual, expected, Some(&test_runner.entities))); + assert!( + events_match(actual, expected, Some(&test_runner.entities)), + "event mismatch: expected = {:#?}, actual = {:#?}", expected, actual, + ); } } } From c46d37826287e17b012033f4379b15dc4e20f6f1 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Fri, 9 Jul 2021 12:25:56 -0400 Subject: [PATCH 13/22] directly parse snapshot timestamp from response bson --- src/client/executor.rs | 12 ++++++++++++ src/cmap/conn/command.rs | 20 +++++++++++++++++++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index 531c7c501..d37b7820e 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -357,14 +357,23 @@ impl Client { let start_time = Instant::now(); let cmd_name = cmd.name.clone(); + //println!("===> command:\n\t{:?}", cmd.body); let response_result = match connection.send_command(cmd, request_id).await { Ok(response) => { + //println!("===> response:\n\t{:?}", response.raw_response); if let Some(cluster_time) = response.cluster_time() { self.inner.topology.advance_cluster_time(cluster_time).await; if let Some(ref mut session) = session { session.advance_cluster_time(cluster_time) } } + match (response.snapshot_time(), session.as_mut()) { + (Some(timestamp), Some(session)) => { + println!("===> updating snapshot timestamp to {:?}", timestamp); + session.snapshot_time = Some(*timestamp); + } + _ => (), + } response.validate().map(|_| response) } err => err, @@ -417,12 +426,15 @@ impl Client { match op.handle_response(response, connection.stream_description()?) { Ok(response) => { + /* match (response.snapshot_timestamp(), session.as_mut()) { (Some(timestamp), Some(session)) => { + println!("===> updating snapshot timestamp to {:?}", timestamp); session.snapshot_time = Some(*timestamp); } _ => (), } + */ Ok(response) } Err(mut err) => { diff --git a/src/cmap/conn/command.rs b/src/cmap/conn/command.rs index 891b7cfee..a0df934ad 100644 --- a/src/cmap/conn/command.rs +++ b/src/cmap/conn/command.rs @@ -2,7 +2,7 @@ use serde::{de::DeserializeOwned, Deserialize}; use super::wire::Message; use crate::{ - bson::{Bson, Document}, + bson::{Bson, Document, Timestamp}, bson_util, client::{options::ServerApi, ClusterTime}, concern::ReadConcern, @@ -89,6 +89,7 @@ impl Command { pub(crate) fn set_snapshot_read_concern(&mut self, session: &ClientSession) -> Result<()> { let mut concern = ReadConcern::snapshot(); concern.at_cluster_time = session.snapshot_time; + println!("===> snapshot read concern: {:?}", concern); self.body.insert("readConcern", bson::to_document(&concern)?); Ok(()) } @@ -99,6 +100,7 @@ pub(crate) struct CommandResponse { source: ServerAddress, pub(crate) raw_response: Document, cluster_time: Option, + snapshot_time: Option, } impl CommandResponse { @@ -108,6 +110,7 @@ impl CommandResponse { source, raw_response: doc, cluster_time: None, + snapshot_time: None, } } @@ -128,11 +131,21 @@ impl CommandResponse { let cluster_time = raw_response .get("$clusterTime") .and_then(|subdoc| bson::from_bson(subdoc.clone()).ok()); + let snapshot_time = raw_response + .get("atClusterTime") + .or_else(|| { + raw_response + .get("cursor") + .and_then(|b| b.as_document()) + .and_then(|subdoc| subdoc.get("atClusterTime")) + }) + .and_then(|subdoc| bson::from_bson(subdoc.clone()).ok()); Ok(Self { source, raw_response, cluster_time, + snapshot_time, }) } @@ -178,6 +191,11 @@ impl CommandResponse { self.cluster_time.as_ref() } + /// Gets the snapshot time from the response, if any. + pub(crate) fn snapshot_time(&self) -> Option<&Timestamp> { + self.snapshot_time.as_ref() + } + /// The address of the server that sent this response. pub(crate) fn source_address(&self) -> &ServerAddress { &self.source From 5227c3a4a67c3cf56fa71d3fddfc284c44123b33 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Fri, 9 Jul 2021 13:13:39 -0400 Subject: [PATCH 14/22] undo OperationResult --- src/client/executor.rs | 1 - src/coll/mod.rs | 2 +- src/cursor/common.rs | 12 ++--------- src/operation/distinct/mod.rs | 14 +++++++++---- src/operation/distinct/test.rs | 20 ++++++------------ src/operation/get_more/test.rs | 4 ---- src/operation/mod.rs | 6 ++---- src/results.rs | 37 +--------------------------------- 8 files changed, 22 insertions(+), 74 deletions(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index d37b7820e..958e37757 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -19,7 +19,6 @@ use crate::{ event::command::{CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent}, operation::{AbortTransaction, CommitTransaction, Operation, Retryability}, options::SelectionCriteria, - results::OperationResult, sdam::{HandshakePhase, SelectedServer, SessionSupportStatus, TransactionSupportStatus}, selection_criteria::ReadPreference, }; diff --git a/src/coll/mod.rs b/src/coll/mod.rs index 02da070f9..dfdf2bc8d 100644 --- a/src/coll/mod.rs +++ b/src/coll/mod.rs @@ -396,7 +396,7 @@ impl Collection { filter.into(), options, ); - self.client().execute_operation(op, session).await.map(|r| r.values) + self.client().execute_operation(op, session).await } /// Finds the distinct values of the field specified by `field_name` across the collection. diff --git a/src/cursor/common.rs b/src/cursor/common.rs index d6591b782..27106a9dc 100644 --- a/src/cursor/common.rs +++ b/src/cursor/common.rs @@ -9,11 +9,11 @@ use derivative::Derivative; use futures_core::{Future, Stream}; use crate::{ - bson::{Document, Timestamp}, + bson::{Document}, error::{Error, ErrorKind, Result}, operation, options::ServerAddress, - results::{GetMoreResult, OperationResult}, + results::{GetMoreResult}, Client, Namespace, }; @@ -165,7 +165,6 @@ impl CursorSpecification { address, batch_size: batch_size.into(), max_time: max_time.into(), - at_cluster_time: info.at_cluster_time, }, initial_buffer: info.first_batch, } @@ -191,12 +190,6 @@ impl CursorSpecification { } } -impl OperationResult for CursorSpecification { - fn snapshot_timestamp(&self) -> Option<&Timestamp> { - self.info.at_cluster_time.as_ref() - } -} - /// Static information about a cursor. #[derive(Clone, Debug)] pub(crate) struct CursorInformation { @@ -205,5 +198,4 @@ pub(crate) struct CursorInformation { pub(crate) id: i64, pub(crate) batch_size: Option, pub(crate) max_time: Option, - pub(crate) at_cluster_time: Option, } diff --git a/src/operation/distinct/mod.rs b/src/operation/distinct/mod.rs index 2fb2715cd..89e5c17ab 100644 --- a/src/operation/distinct/mod.rs +++ b/src/operation/distinct/mod.rs @@ -1,13 +1,14 @@ #[cfg(test)] mod test; +use serde::Deserialize; + use crate::{ - bson::{doc, Document}, + bson::{doc, Bson, Document}, cmap::{Command, CommandResponse, StreamDescription}, coll::{options::DistinctOptions, Namespace}, error::Result, operation::{append_options, Operation, Retryability}, - results::DistinctResult, selection_criteria::SelectionCriteria, }; @@ -48,7 +49,7 @@ impl Distinct { } impl Operation for Distinct { - type O = DistinctResult; + type O = Vec; const NAME: &'static str = "distinct"; fn build(&mut self, _description: &StreamDescription) -> Result { @@ -74,7 +75,7 @@ impl Operation for Distinct { response: CommandResponse, _description: &StreamDescription, ) -> Result { - response.body::() + response.body::().map(|body| body.values) } fn selection_criteria(&self) -> Option<&SelectionCriteria> { @@ -88,3 +89,8 @@ impl Operation for Distinct { Retryability::Read } } + +#[derive(Debug, Deserialize)] +struct ResponseBody { + values: Vec, +} \ No newline at end of file diff --git a/src/operation/distinct/test.rs b/src/operation/distinct/test.rs index 24d8595e9..e14ecd7e9 100644 --- a/src/operation/distinct/test.rs +++ b/src/operation/distinct/test.rs @@ -6,7 +6,6 @@ use crate::{ coll::{options::DistinctOptions, Namespace}, error::ErrorKind, operation::{test, Distinct, Operation}, - results::DistinctResult, }; #[cfg_attr(feature = "tokio-runtime", tokio::test)] @@ -99,21 +98,17 @@ async fn handle_success() { let distinct_op = Distinct::empty(); let expected_values = vec![Bson::String("A".to_string()), Bson::String("B".to_string())]; - let expected_result = DistinctResult { - values: expected_values.clone(), - at_cluster_time: None, - }; let response = CommandResponse::with_document(doc! { - "values" : expected_values, + "values" : expected_values.clone(), "ok" : 1 }); - let actual_result = distinct_op + let actual_values = distinct_op .handle_response(response, &Default::default()) .expect("supposed to succeed"); - assert_eq!(actual_result, expected_result); + assert_eq!(actual_values, expected_values); } #[cfg_attr(feature = "tokio-runtime", tokio::test)] @@ -126,16 +121,13 @@ async fn handle_response_with_empty_values() { "ok" : 1 }); - let expected_result = DistinctResult { - values: Vec::new(), - at_cluster_time: None, - }; + let expected_values = Vec::new(); - let actual_result = distinct_op + let actual_values = distinct_op .handle_response(response, &Default::default()) .expect("supposed to succeed"); - assert_eq!(actual_result, expected_result); + assert_eq!(actual_values, expected_values); } #[cfg_attr(feature = "tokio-runtime", tokio::test)] diff --git a/src/operation/get_more/test.rs b/src/operation/get_more/test.rs index 8dba36fd1..4960a56e7 100644 --- a/src/operation/get_more/test.rs +++ b/src/operation/get_more/test.rs @@ -25,7 +25,6 @@ fn build_test( address, batch_size, max_time, - at_cluster_time: None, }; let mut get_more = GetMore::new(info); @@ -117,7 +116,6 @@ async fn build_batch_size() { id: cursor_id, batch_size: Some((std::i32::MAX as u32) + 1), max_time: None, - at_cluster_time: None, }; let mut op = GetMore::new(info); assert!(op.build(&StreamDescription::new_testing()).is_err()) @@ -137,7 +135,6 @@ async fn op_selection_criteria() { id: 123, batch_size: None, max_time: None, - at_cluster_time: None, }; let get_more = GetMore::new(info); let server_description = ServerDescription { @@ -183,7 +180,6 @@ async fn handle_success() { id: cursor_id, batch_size: None, max_time: None, - at_cluster_time: None, }; let get_more = GetMore::new(info); diff --git a/src/operation/mod.rs b/src/operation/mod.rs index c5480d0fd..9be2fb2ae 100644 --- a/src/operation/mod.rs +++ b/src/operation/mod.rs @@ -22,7 +22,7 @@ use std::{collections::VecDeque, fmt::Debug, ops::Deref}; use serde::{Deserialize, Serialize}; use crate::{ - bson::{self, Bson, Document, Timestamp}, + bson::{self, Bson, Document}, cmap::{Command, CommandResponse, StreamDescription}, error::{ BulkWriteError, @@ -34,7 +34,6 @@ use crate::{ WriteFailure, }, options::WriteConcern, - results::OperationResult, selection_criteria::SelectionCriteria, Namespace, }; @@ -61,7 +60,7 @@ pub(crate) use update::Update; /// A trait modeling the behavior of a server side operation. pub(crate) trait Operation { /// The output type of this operation. - type O: OperationResult; + type O; /// The name of the server side command associated with this operation. const NAME: &'static str; @@ -223,7 +222,6 @@ pub(crate) struct CursorInfo { pub(crate) ns: Namespace, #[serde(rename = "firstBatch")] pub(crate) first_batch: VecDeque, - pub(crate) at_cluster_time: Option, } #[derive(Debug, PartialEq)] diff --git a/src/results.rs b/src/results.rs index 4c264cc95..e2d7292d9 100644 --- a/src/results.rs +++ b/src/results.rs @@ -3,26 +3,13 @@ use std::collections::{HashMap, VecDeque}; use crate::{ - bson::{Bson, Document, Timestamp}, + bson::{Bson, Document}, db::options::CreateCollectionOptions, }; use bson::Binary; use serde::{Deserialize, Serialize}; -pub(crate) trait OperationResult { - /// Extracts the "atClusterTime" timestamp, if any. - fn snapshot_timestamp(&self) -> Option<&Timestamp> { - None - } -} - -impl OperationResult for () {} -impl OperationResult for u64 {} -impl OperationResult for Option {} -impl OperationResult for Vec {} -impl OperationResult for Document {} - /// The result of a [`Collection::insert_one`](../struct.Collection.html#method.insert_one) /// operation. #[derive(Debug, Serialize)] @@ -59,8 +46,6 @@ impl InsertManyResult { } } -impl OperationResult for InsertManyResult {} - /// The result of a [`Collection::update_one`](../struct.Collection.html#method.update_one) or /// [`Collection::update_many`](../struct.Collection.html#method.update_many) operation. #[derive(Debug, Serialize)] @@ -79,8 +64,6 @@ pub struct UpdateResult { pub upserted_id: Option, } -impl OperationResult for UpdateResult {} - /// The result of a [`Collection::delete_one`](../struct.Collection.html#method.delete_one) or /// [`Collection::delete_many`](../struct.Collection.html#method.delete_many) operation. #[derive(Debug, Serialize)] @@ -92,30 +75,12 @@ pub struct DeleteResult { pub deleted_count: u64, } -impl OperationResult for DeleteResult {} - #[derive(Debug, Clone)] pub(crate) struct GetMoreResult { pub(crate) batch: VecDeque, pub(crate) exhausted: bool, } -impl OperationResult for GetMoreResult {} - - -#[derive(Debug, Deserialize, PartialEq)] -#[serde(rename_all = "camelCase")] -pub(crate) struct DistinctResult { - pub(crate) values: Vec, - pub(crate) at_cluster_time: Option, -} - -impl OperationResult for DistinctResult { - fn snapshot_timestamp(&self) -> Option<&Timestamp> { - self.at_cluster_time.as_ref() - } -} - /// Describes the type of data store returned when executing /// [`Database::list_collections`](../struct.Database.html#method.list_collections). #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] From f03d9364558fffeeb59073834be458936fa19fa0 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Fri, 9 Jul 2021 13:22:04 -0400 Subject: [PATCH 15/22] update error message --- src/client/session/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/session/mod.rs b/src/client/session/mod.rs index 04316ec86..24f77f86f 100644 --- a/src/client/session/mod.rs +++ b/src/client/session/mod.rs @@ -282,7 +282,7 @@ impl ClientSession { ) -> Result<()> { if self.options.as_ref().and_then(|o| o.snapshot).unwrap_or(false) { return Err(ErrorKind::Transaction { - message: "transactions are not allowed on snapshot sessions".into(), + message: "Transactions are not supported in snapshot sessions".into(), }.into()) } match self.transaction.state { From 9f2e7ccac0e418848d12dfe062f68681da5177e5 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Fri, 9 Jul 2021 13:23:24 -0400 Subject: [PATCH 16/22] remove debug prints --- src/client/executor.rs | 16 +--------------- src/cmap/conn/command.rs | 1 - 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index 958e37757..155d7d90c 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -356,10 +356,8 @@ impl Client { let start_time = Instant::now(); let cmd_name = cmd.name.clone(); - //println!("===> command:\n\t{:?}", cmd.body); let response_result = match connection.send_command(cmd, request_id).await { Ok(response) => { - //println!("===> response:\n\t{:?}", response.raw_response); if let Some(cluster_time) = response.cluster_time() { self.inner.topology.advance_cluster_time(cluster_time).await; if let Some(ref mut session) = session { @@ -368,7 +366,6 @@ impl Client { } match (response.snapshot_time(), session.as_mut()) { (Some(timestamp), Some(session)) => { - println!("===> updating snapshot timestamp to {:?}", timestamp); session.snapshot_time = Some(*timestamp); } _ => (), @@ -424,18 +421,7 @@ impl Client { }); match op.handle_response(response, connection.stream_description()?) { - Ok(response) => { - /* - match (response.snapshot_timestamp(), session.as_mut()) { - (Some(timestamp), Some(session)) => { - println!("===> updating snapshot timestamp to {:?}", timestamp); - session.snapshot_time = Some(*timestamp); - } - _ => (), - } - */ - Ok(response) - } + Ok(response) => Ok(response), Err(mut err) => { err.add_labels(Some(connection), session, Some(retryability))?; Err(err) diff --git a/src/cmap/conn/command.rs b/src/cmap/conn/command.rs index a0df934ad..f9c9df82a 100644 --- a/src/cmap/conn/command.rs +++ b/src/cmap/conn/command.rs @@ -89,7 +89,6 @@ impl Command { pub(crate) fn set_snapshot_read_concern(&mut self, session: &ClientSession) -> Result<()> { let mut concern = ReadConcern::snapshot(); concern.at_cluster_time = session.snapshot_time; - println!("===> snapshot read concern: {:?}", concern); self.body.insert("readConcern", bson::to_document(&concern)?); Ok(()) } From ad2f7ec768cf909c37ff5765454a8e09d013bfda Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Fri, 9 Jul 2021 14:09:27 -0400 Subject: [PATCH 17/22] use session in runCommand in the test runner --- src/test/spec/unified_runner/operation.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/test/spec/unified_runner/operation.rs b/src/test/spec/unified_runner/operation.rs index 49a6bdfa6..fb6d13f9e 100644 --- a/src/test/spec/unified_runner/operation.rs +++ b/src/test/spec/unified_runner/operation.rs @@ -1162,10 +1162,18 @@ impl TestOperation for RunCommand { command.insert("writeConcern", write_concern.clone()); } - let db = test_runner.get_database(id); - let result = db - .run_command(command, self.read_preference.clone()) - .await?; + let db = test_runner.get_database(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + db + .run_command_with_session(command, self.read_preference.clone(), session) + .await? + } + None => db + .run_command(command, self.read_preference.clone()) + .await? + }; let result = to_bson(&result)?; Ok(Some(result.into())) } From 609af01b610898a6b3640767430daa5d1d3ea16f Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Fri, 9 Jul 2021 14:43:25 -0400 Subject: [PATCH 18/22] clippy and rustfmt fixes --- src/client/executor.rs | 15 +-- src/client/mod.rs | 3 +- src/client/session/mod.rs | 12 ++- src/cmap/conn/command.rs | 3 +- src/concern/mod.rs | 5 +- src/cursor/common.rs | 4 +- src/operation/distinct/mod.rs | 2 +- src/test/spec/sessions.rs | 2 +- src/test/spec/unified_runner/mod.rs | 4 +- src/test/spec/unified_runner/operation.rs | 119 ++++++++++++++-------- 10 files changed, 110 insertions(+), 59 deletions(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index 155d7d90c..5eb4e4f3e 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -282,7 +282,11 @@ impl Client { if let Some(txn_number) = txn_number { cmd.set_txn_number(txn_number); } - if session.options().and_then(|opts| opts.snapshot).unwrap_or(false) { + if session + .options() + .and_then(|opts| opts.snapshot) + .unwrap_or(false) + { cmd.set_snapshot_read_concern(session)?; } match session.transaction.state { @@ -364,11 +368,10 @@ impl Client { session.advance_cluster_time(cluster_time) } } - match (response.snapshot_time(), session.as_mut()) { - (Some(timestamp), Some(session)) => { - session.snapshot_time = Some(*timestamp); - } - _ => (), + if let (Some(timestamp), Some(session)) = + (response.snapshot_time(), session.as_mut()) + { + session.snapshot_time = Some(*timestamp); } response.validate().map(|_| response) } diff --git a/src/client/mod.rs b/src/client/mod.rs index 6b26c8c48..a6ff3a5c0 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -188,7 +188,8 @@ impl Client { options: impl Into>, session: &mut ClientSession, ) -> Result> { - self.list_databases_common(filter, options, Some(session)).await + self.list_databases_common(filter, options, Some(session)) + .await } /// Gets the names of the databases present in the cluster the Client is connected to. diff --git a/src/client/session/mod.rs b/src/client/session/mod.rs index 24f77f86f..3f12e2b07 100644 --- a/src/client/session/mod.rs +++ b/src/client/session/mod.rs @@ -280,10 +280,16 @@ impl ClientSession { &mut self, options: impl Into>, ) -> Result<()> { - if self.options.as_ref().and_then(|o| o.snapshot).unwrap_or(false) { + if self + .options + .as_ref() + .and_then(|o| o.snapshot) + .unwrap_or(false) + { return Err(ErrorKind::Transaction { message: "Transactions are not supported in snapshot sessions".into(), - }.into()) + } + .into()); } match self.transaction.state { TransactionState::Starting | TransactionState::InProgress => { @@ -520,7 +526,7 @@ impl Drop for ClientSession { is_implicit: self.is_implicit, options: self.options.clone(), transaction: self.transaction.clone(), - snapshot_time: self.snapshot_time.clone(), + snapshot_time: self.snapshot_time, }; RUNTIME.execute(async move { let mut session: ClientSession = dropped_session.into(); diff --git a/src/cmap/conn/command.rs b/src/cmap/conn/command.rs index f9c9df82a..9901d442c 100644 --- a/src/cmap/conn/command.rs +++ b/src/cmap/conn/command.rs @@ -89,7 +89,8 @@ impl Command { pub(crate) fn set_snapshot_read_concern(&mut self, session: &ClientSession) -> Result<()> { let mut concern = ReadConcern::snapshot(); concern.at_cluster_time = session.snapshot_time; - self.body.insert("readConcern", bson::to_document(&concern)?); + self.body + .insert("readConcern", bson::to_document(&concern)?); Ok(()) } } diff --git a/src/concern/mod.rs b/src/concern/mod.rs index 2086ffc21..bc6d6c761 100644 --- a/src/concern/mod.rs +++ b/src/concern/mod.rs @@ -92,7 +92,10 @@ impl ReadConcern { impl From for ReadConcern { fn from(level: ReadConcernLevel) -> Self { - Self { level, at_cluster_time: None } + Self { + level, + at_cluster_time: None, + } } } diff --git a/src/cursor/common.rs b/src/cursor/common.rs index 27106a9dc..d520d2f6b 100644 --- a/src/cursor/common.rs +++ b/src/cursor/common.rs @@ -9,11 +9,11 @@ use derivative::Derivative; use futures_core::{Future, Stream}; use crate::{ - bson::{Document}, + bson::Document, error::{Error, ErrorKind, Result}, operation, options::ServerAddress, - results::{GetMoreResult}, + results::GetMoreResult, Client, Namespace, }; diff --git a/src/operation/distinct/mod.rs b/src/operation/distinct/mod.rs index 89e5c17ab..4bdf51a18 100644 --- a/src/operation/distinct/mod.rs +++ b/src/operation/distinct/mod.rs @@ -93,4 +93,4 @@ impl Operation for Distinct { #[derive(Debug, Deserialize)] struct ResponseBody { values: Vec, -} \ No newline at end of file +} diff --git a/src/test/spec/sessions.rs b/src/test/spec/sessions.rs index d8d60e378..a1a7c7d43 100644 --- a/src/test/spec/sessions.rs +++ b/src/test/spec/sessions.rs @@ -2,7 +2,7 @@ use tokio::sync::RwLockWriteGuard; use crate::test::{run_spec_test, LOCK}; -use super::{run_v2_test, run_unified_format_test}; +use super::{run_unified_format_test, run_v2_test}; #[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))] #[cfg_attr(feature = "async-std-runtime", async_std::test)] diff --git a/src/test/spec/unified_runner/mod.rs b/src/test/spec/unified_runner/mod.rs index c86abf7ef..cba31ca2b 100644 --- a/src/test/spec/unified_runner/mod.rs +++ b/src/test/spec/unified_runner/mod.rs @@ -233,7 +233,9 @@ pub async fn run_unified_format_test(test_file: TestFile) { for (actual, expected) in actual_events.iter().zip(expected_events) { assert!( events_match(actual, expected, Some(&test_runner.entities)), - "event mismatch: expected = {:#?}, actual = {:#?}", expected, actual, + "event mismatch: expected = {:#?}, actual = {:#?}", + expected, + actual, ); } } diff --git a/src/test/spec/unified_runner/operation.rs b/src/test/spec/unified_runner/operation.rs index fb6d13f9e..0c1ff4e6c 100644 --- a/src/test/spec/unified_runner/operation.rs +++ b/src/test/spec/unified_runner/operation.rs @@ -293,9 +293,11 @@ impl TestOperation for DeleteOne { .delete_one_with_session(self.filter.clone(), self.options.clone(), session) .await? } - None => collection - .delete_one(self.filter.clone(), self.options.clone()) - .await? + None => { + collection + .delete_one(self.filter.clone(), self.options.clone()) + .await? + } }; let result = to_bson(&result)?; Ok(Some(result.into())) @@ -327,13 +329,12 @@ impl TestOperation for Find { Some(session_id) => { let session = test_runner.get_mut_session(session_id); let mut cursor = collection - .find_with_session( - self.filter.clone(), - self.options.clone(), - session, - ) + .find_with_session(self.filter.clone(), self.options.clone(), session) .await?; - cursor.stream(session).try_collect::>().await? + cursor + .stream(session) + .try_collect::>() + .await? } None => { let cursor = collection @@ -377,10 +378,12 @@ impl TestOperation for InsertMany { collection .insert_many_with_session(self.documents.clone(), self.options.clone(), session) .await? - }, - None => collection - .insert_many(self.documents.clone(), self.options.clone()) - .await? + } + None => { + collection + .insert_many(self.documents.clone(), self.options.clone()) + .await? + } }; let ids: HashMap = result .inserted_ids @@ -542,16 +545,27 @@ impl TestOperation for Aggregate { let mut cursor = match entity { Entity::Collection(collection) => { collection - .aggregate_with_session(self.pipeline.clone(), self.options.clone(), session) + .aggregate_with_session( + self.pipeline.clone(), + self.options.clone(), + session, + ) .await? } Entity::Database(db) => { - db.aggregate_with_session(self.pipeline.clone(), self.options.clone(), session) - .await? + db.aggregate_with_session( + self.pipeline.clone(), + self.options.clone(), + session, + ) + .await? } other => panic!("Cannot execute aggregate on {:?}", &other), }; - cursor.stream(session).try_collect::>().await? + cursor + .stream(session) + .try_collect::>() + .await? } None => { let cursor = match test_runner.entities.get(id).unwrap() { @@ -566,7 +580,7 @@ impl TestOperation for Aggregate { } other => panic!("Cannot execute aggregate on {:?}", &other), }; - cursor.try_collect::>().await? + cursor.try_collect::>().await? } }; Ok(Some(Bson::from(result).into())) @@ -603,12 +617,19 @@ impl TestOperation for Distinct { Some(session_id) => { let session = test_runner.get_mut_session(session_id); collection - .distinct_with_session(&self.field_name, self.filter.clone(), self.options.clone(), session) + .distinct_with_session( + &self.field_name, + self.filter.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .distinct(&self.field_name, self.filter.clone(), self.options.clone()) .await? } - None => collection - .distinct(&self.field_name, self.filter.clone(), self.options.clone()) - .await? }; Ok(Some(Bson::Array(result).into())) } @@ -639,12 +660,18 @@ impl TestOperation for CountDocuments { Some(session_id) => { let session = test_runner.get_mut_session(session_id); collection - .count_documents_with_session(self.filter.clone(), self.options.clone(), session) + .count_documents_with_session( + self.filter.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .count_documents(self.filter.clone(), self.options.clone()) .await? } - None => collection - .count_documents(self.filter.clone(), self.options.clone()) - .await? }; Ok(Some(Bson::from(result).into())) } @@ -734,9 +761,11 @@ impl TestOperation for ListDatabases { .list_databases_with_session(self.filter.clone(), self.options.clone(), session) .await? } - None => client - .list_databases(self.filter.clone(), self.options.clone()) - .await? + None => { + client + .list_databases(self.filter.clone(), self.options.clone()) + .await? + } }; Ok(Some(bson::to_bson(&result)?.into())) } @@ -795,7 +824,11 @@ impl TestOperation for ListCollections { Some(session_id) => { let session = test_runner.get_mut_session(session_id); let mut cursor = db - .list_collections_with_session(self.filter.clone(), self.options.clone(), session) + .list_collections_with_session( + self.filter.clone(), + self.options.clone(), + session, + ) .await?; cursor.stream(session).try_collect::>().await? } @@ -905,13 +938,15 @@ impl TestOperation for FindOneAndUpdate { ) .await? } - None => collection - .find_one_and_update( - self.filter.clone(), - self.update.clone(), - self.options.clone(), - ) - .await? + None => { + collection + .find_one_and_update( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + ) + .await? + } }; let result = to_bson(&result)?; Ok(Some(result.into())) @@ -1166,13 +1201,13 @@ impl TestOperation for RunCommand { let result = match &self.session { Some(session_id) => { let session = test_runner.get_mut_session(session_id); - db - .run_command_with_session(command, self.read_preference.clone(), session) + db.run_command_with_session(command, self.read_preference.clone(), session) + .await? + } + None => { + db.run_command(command, self.read_preference.clone()) .await? } - None => db - .run_command(command, self.read_preference.clone()) - .await? }; let result = to_bson(&result)?; Ok(Some(result.into())) From c038b60d220f93ea45daf47c1e64b01858c6b990 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Fri, 9 Jul 2021 15:01:43 -0400 Subject: [PATCH 19/22] remove some spurious deltas --- src/operation/distinct/test.rs | 5 +++-- src/test/spec/unified_runner/mod.rs | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/operation/distinct/test.rs b/src/operation/distinct/test.rs index e14ecd7e9..3d45e97d2 100644 --- a/src/operation/distinct/test.rs +++ b/src/operation/distinct/test.rs @@ -97,7 +97,8 @@ async fn op_selection_criteria() { async fn handle_success() { let distinct_op = Distinct::empty(); - let expected_values = vec![Bson::String("A".to_string()), Bson::String("B".to_string())]; + let expected_values: Vec = + vec![Bson::String("A".to_string()), Bson::String("B".to_string())]; let response = CommandResponse::with_document(doc! { "values" : expected_values.clone(), @@ -121,7 +122,7 @@ async fn handle_response_with_empty_values() { "ok" : 1 }); - let expected_values = Vec::new(); + let expected_values: Vec = Vec::new(); let actual_values = distinct_op .handle_response(response, &Default::default()) diff --git a/src/test/spec/unified_runner/mod.rs b/src/test/spec/unified_runner/mod.rs index cba31ca2b..f45ac3911 100644 --- a/src/test/spec/unified_runner/mod.rs +++ b/src/test/spec/unified_runner/mod.rs @@ -189,7 +189,7 @@ pub async fn run_unified_format_test(test_file: TestFile) { operation.returns_root_documents(), Some(&test_runner.entities), ), - "result mismatch, expected = {:#?}, actual = {:#?}", + "result mismatch, expected = {:#?} actual = {:#?}", expect_result, result ); From 702a463f1c8a0c39e335337bfdee1acc01698fb5 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Fri, 9 Jul 2021 15:47:35 -0400 Subject: [PATCH 20/22] error on snapshot operations for server versions < 5.0 --- src/client/executor.rs | 14 ++++++++++++++ src/error.rs | 6 ++++++ src/test/spec/unified_runner/mod.rs | 2 +- 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index 5eb4e4f3e..1c394437c 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -287,6 +287,20 @@ impl Client { .and_then(|opts| opts.snapshot) .unwrap_or(false) { + if connection + .stream_description()? + .max_wire_version + .unwrap_or(0) + < 13 + { + let labels: Option> = None; + return Err(Error::new( + ErrorKind::ServerVersion { + message: "Snapshot reads require MongoDB 5.0 or later".into(), + }, + labels, + )); + } cmd.set_snapshot_read_concern(session)?; } match session.transaction.state { diff --git a/src/error.rs b/src/error.rs index c9463cde2..064bfae38 100644 --- a/src/error.rs +++ b/src/error.rs @@ -251,6 +251,7 @@ impl Error { Some(write_error.message.clone()) } ErrorKind::Transaction { message } => Some(message.clone()), + ErrorKind::ServerVersion { message } => Some(message.clone()), _ => None, } } @@ -411,6 +412,11 @@ pub enum ErrorKind { #[error("{message}")] #[non_exhaustive] Transaction { message: String }, + + /// The server version does not support the operation. + #[error("The server version does not support a database operation: {message}")] + #[non_exhaustive] + ServerVersion { message: String }, } /// An error that occurred due to a database command failing. diff --git a/src/test/spec/unified_runner/mod.rs b/src/test/spec/unified_runner/mod.rs index f45ac3911..02cf2c692 100644 --- a/src/test/spec/unified_runner/mod.rs +++ b/src/test/spec/unified_runner/mod.rs @@ -189,7 +189,7 @@ pub async fn run_unified_format_test(test_file: TestFile) { operation.returns_root_documents(), Some(&test_runner.entities), ), - "result mismatch, expected = {:#?} actual = {:#?}", + "result mismatch, expected = {:#?} actual = {:#?}", expect_result, result ); From 31030f7611d84d7c98a7f7165606759a0da51c48 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Mon, 12 Jul 2021 13:36:35 -0400 Subject: [PATCH 21/22] rename error --- src/client/executor.rs | 2 +- src/error.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/client/executor.rs b/src/client/executor.rs index 1c394437c..a9abc230e 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -295,7 +295,7 @@ impl Client { { let labels: Option> = None; return Err(Error::new( - ErrorKind::ServerVersion { + ErrorKind::IncompatibleServer { message: "Snapshot reads require MongoDB 5.0 or later".into(), }, labels, diff --git a/src/error.rs b/src/error.rs index 064bfae38..b9590bcb3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -251,7 +251,7 @@ impl Error { Some(write_error.message.clone()) } ErrorKind::Transaction { message } => Some(message.clone()), - ErrorKind::ServerVersion { message } => Some(message.clone()), + ErrorKind::IncompatibleServer { message } => Some(message.clone()), _ => None, } } @@ -413,10 +413,10 @@ pub enum ErrorKind { #[non_exhaustive] Transaction { message: String }, - /// The server version does not support the operation. - #[error("The server version does not support a database operation: {message}")] + /// The server does not support the operation. + #[error("The server does not support a database operation: {message}")] #[non_exhaustive] - ServerVersion { message: String }, + IncompatibleServer { message: String }, } /// An error that occurred due to a database command failing. From 02df86681000fc3a42e936f9e5016c2e55b0a535 Mon Sep 17 00:00:00 2001 From: Abraham Egnor Date: Tue, 13 Jul 2021 09:58:49 -0400 Subject: [PATCH 22/22] update comment and serialization --- src/client/mod.rs | 3 ++- src/concern/mod.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index a6ff3a5c0..0ff1458c3 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -181,7 +181,8 @@ impl Client { self.list_databases_common(filter, options, None).await } - /// Gets information about each database present in the cluster the Client is connected to. + /// Gets information about each database present in the cluster the Client is connected to + /// using the provided `ClientSession`. pub async fn list_databases_with_session( &self, filter: impl Into>, diff --git a/src/concern/mod.rs b/src/concern/mod.rs index bc6d6c761..9e386f081 100644 --- a/src/concern/mod.rs +++ b/src/concern/mod.rs @@ -20,6 +20,7 @@ use crate::{ /// /// See the documentation [here](https://docs.mongodb.com/manual/reference/read-concern/) for more /// information about read concerns. +#[skip_serializing_none] #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] #[serde(rename_all = "camelCase")] #[non_exhaustive] @@ -28,7 +29,6 @@ pub struct ReadConcern { pub level: ReadConcernLevel, /// The snapshot read timestamp. - #[serde(skip_serializing_if = "Option::is_none")] pub(crate) at_cluster_time: Option, }