diff --git a/src/client/executor.rs b/src/client/executor.rs index 5ec9645aa..a9abc230e 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -282,6 +282,27 @@ impl Client { if let Some(txn_number) = txn_number { cmd.set_txn_number(txn_number); } + if session + .options() + .and_then(|opts| opts.snapshot) + .unwrap_or(false) + { + if connection + .stream_description()? + .max_wire_version + .unwrap_or(0) + < 13 + { + let labels: Option> = None; + return Err(Error::new( + ErrorKind::IncompatibleServer { + message: "Snapshot reads require MongoDB 5.0 or later".into(), + }, + labels, + )); + } + cmd.set_snapshot_read_concern(session)?; + } match session.transaction.state { TransactionState::Starting => { cmd.set_start_transaction(); @@ -361,6 +382,11 @@ impl Client { session.advance_cluster_time(cluster_time) } } + if let (Some(timestamp), Some(session)) = + (response.snapshot_time(), session.as_mut()) + { + session.snapshot_time = Some(*timestamp); + } response.validate().map(|_| response) } err => err, diff --git a/src/client/mod.rs b/src/client/mod.rs index f5575b6ed..0ff1458c3 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -158,20 +158,41 @@ impl Client { Database::new(self.clone(), name, Some(options)) } - /// Gets information about each database present in the cluster the Client is connected to. - pub async fn list_databases( + async fn list_databases_common( &self, filter: impl Into>, options: impl Into>, + session: Option<&mut ClientSession>, ) -> Result> { let op = ListDatabases::new(filter.into(), false, options.into()); - self.execute_operation(op, None).await.and_then(|dbs| { + self.execute_operation(op, session).await.and_then(|dbs| { dbs.into_iter() .map(|db_spec| bson::from_document(db_spec).map_err(crate::error::Error::from)) .collect() }) } + /// Gets information about each database present in the cluster the Client is connected to. + pub async fn list_databases( + &self, + filter: impl Into>, + options: impl Into>, + ) -> Result> { + self.list_databases_common(filter, options, None).await + } + + /// Gets information about each database present in the cluster the Client is connected to + /// using the provided `ClientSession`. + pub async fn list_databases_with_session( + &self, + filter: impl Into>, + options: impl Into>, + session: &mut ClientSession, + ) -> Result> { + self.list_databases_common(filter, options, Some(session)) + .await + } + /// Gets the names of the databases present in the cluster the Client is connected to. pub async fn list_database_names( &self, diff --git a/src/client/options/mod.rs b/src/client/options/mod.rs index 5ad92f518..4a592a834 100644 --- a/src/client/options/mod.rs +++ b/src/client/options/mod.rs @@ -2307,6 +2307,11 @@ pub struct SessionOptions { /// on the [`Database`](../struct.Database.html) or [`Collection`](../struct.Collection.html) /// associated with the operations within the transaction. pub default_transaction_options: Option, + + /// If true, all read operations performed using this client session will share the same + /// snapshot. Defaults to false. + // TODO RUST-18 enforce snapshot exclusivity with causalConsistency. + pub snapshot: Option, } /// Contains the options that can be used for a transaction. diff --git a/src/client/session/mod.rs b/src/client/session/mod.rs index 619d1f823..3f12e2b07 100644 --- a/src/client/session/mod.rs +++ b/src/client/session/mod.rs @@ -12,7 +12,7 @@ use lazy_static::lazy_static; use uuid::Uuid; use crate::{ - bson::{doc, spec::BinarySubtype, Binary, Bson, Document}, + bson::{doc, spec::BinarySubtype, Binary, Bson, Document, Timestamp}, error::{ErrorKind, Result}, operation::{AbortTransaction, CommitTransaction, Operation}, options::{SessionOptions, TransactionOptions}, @@ -106,6 +106,7 @@ pub struct ClientSession { is_implicit: bool, options: Option, pub(crate) transaction: Transaction, + pub(crate) snapshot_time: Option, } #[derive(Clone, Debug)] @@ -173,6 +174,7 @@ impl ClientSession { is_implicit, options, transaction: Default::default(), + snapshot_time: None, } } @@ -278,6 +280,17 @@ impl ClientSession { &mut self, options: impl Into>, ) -> Result<()> { + if self + .options + .as_ref() + .and_then(|o| o.snapshot) + .unwrap_or(false) + { + return Err(ErrorKind::Transaction { + message: "Transactions are not supported in snapshot sessions".into(), + } + .into()); + } match self.transaction.state { TransactionState::Starting | TransactionState::InProgress => { return Err(ErrorKind::Transaction { @@ -486,6 +499,7 @@ struct DroppedClientSession { is_implicit: bool, options: Option, transaction: Transaction, + snapshot_time: Option, } impl From for ClientSession { @@ -497,6 +511,7 @@ impl From for ClientSession { is_implicit: dropped_session.is_implicit, options: dropped_session.options, transaction: dropped_session.transaction, + snapshot_time: dropped_session.snapshot_time, } } } @@ -511,6 +526,7 @@ impl Drop for ClientSession { is_implicit: self.is_implicit, options: self.options.clone(), transaction: self.transaction.clone(), + snapshot_time: self.snapshot_time, }; RUNTIME.execute(async move { let mut session: ClientSession = dropped_session.into(); diff --git a/src/cmap/conn/command.rs b/src/cmap/conn/command.rs index badf7991d..9901d442c 100644 --- a/src/cmap/conn/command.rs +++ b/src/cmap/conn/command.rs @@ -2,9 +2,10 @@ use serde::{de::DeserializeOwned, Deserialize}; use super::wire::Message; use crate::{ - bson::{Bson, Document}, + bson::{Bson, Document, Timestamp}, bson_util, client::{options::ServerApi, ClusterTime}, + concern::ReadConcern, error::{CommandError, Error, ErrorKind, Result}, options::ServerAddress, selection_criteria::ReadPreference, @@ -84,6 +85,14 @@ impl Command { } Ok(()) } + + pub(crate) fn set_snapshot_read_concern(&mut self, session: &ClientSession) -> Result<()> { + let mut concern = ReadConcern::snapshot(); + concern.at_cluster_time = session.snapshot_time; + self.body + .insert("readConcern", bson::to_document(&concern)?); + Ok(()) + } } #[derive(Debug, Clone)] @@ -91,6 +100,7 @@ pub(crate) struct CommandResponse { source: ServerAddress, pub(crate) raw_response: Document, cluster_time: Option, + snapshot_time: Option, } impl CommandResponse { @@ -100,6 +110,7 @@ impl CommandResponse { source, raw_response: doc, cluster_time: None, + snapshot_time: None, } } @@ -120,11 +131,21 @@ impl CommandResponse { let cluster_time = raw_response .get("$clusterTime") .and_then(|subdoc| bson::from_bson(subdoc.clone()).ok()); + let snapshot_time = raw_response + .get("atClusterTime") + .or_else(|| { + raw_response + .get("cursor") + .and_then(|b| b.as_document()) + .and_then(|subdoc| subdoc.get("atClusterTime")) + }) + .and_then(|subdoc| bson::from_bson(subdoc.clone()).ok()); Ok(Self { source, raw_response, cluster_time, + snapshot_time, }) } @@ -170,6 +191,11 @@ impl CommandResponse { self.cluster_time.as_ref() } + /// Gets the snapshot time from the response, if any. + pub(crate) fn snapshot_time(&self) -> Option<&Timestamp> { + self.snapshot_time.as_ref() + } + /// The address of the server that sent this response. pub(crate) fn source_address(&self) -> &ServerAddress { &self.source diff --git a/src/concern/mod.rs b/src/concern/mod.rs index 48ebd120b..9e386f081 100644 --- a/src/concern/mod.rs +++ b/src/concern/mod.rs @@ -10,7 +10,7 @@ use serde_with::skip_serializing_none; use typed_builder::TypedBuilder; use crate::{ - bson::{doc, serde_helpers}, + bson::{doc, serde_helpers, Timestamp}, bson_util, error::{ErrorKind, Result}, }; @@ -20,11 +20,16 @@ use crate::{ /// /// See the documentation [here](https://docs.mongodb.com/manual/reference/read-concern/) for more /// information about read concerns. +#[skip_serializing_none] #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] #[non_exhaustive] pub struct ReadConcern { /// The level of the read concern. pub level: ReadConcernLevel, + + /// The snapshot read timestamp. + pub(crate) at_cluster_time: Option, } impl ReadConcern { @@ -87,7 +92,10 @@ impl ReadConcern { impl From for ReadConcern { fn from(level: ReadConcernLevel) -> Self { - Self { level } + Self { + level, + at_cluster_time: None, + } } } diff --git a/src/cursor/common.rs b/src/cursor/common.rs index b862fb71f..d520d2f6b 100644 --- a/src/cursor/common.rs +++ b/src/cursor/common.rs @@ -11,6 +11,7 @@ use futures_core::{Future, Stream}; use crate::{ bson::Document, error::{Error, ErrorKind, Result}, + operation, options::ServerAddress, results::GetMoreResult, Client, @@ -152,22 +153,20 @@ pub(crate) struct CursorSpecification { impl CursorSpecification { pub(crate) fn new( - ns: Namespace, + info: operation::CursorInfo, address: ServerAddress, - id: i64, batch_size: impl Into>, max_time: impl Into>, - initial_buffer: VecDeque, ) -> Self { Self { info: CursorInformation { - ns, - id, + ns: info.ns, + id: info.id, address, batch_size: batch_size.into(), max_time: max_time.into(), }, - initial_buffer, + initial_buffer: info.first_batch, } } diff --git a/src/error.rs b/src/error.rs index c9463cde2..b9590bcb3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -251,6 +251,7 @@ impl Error { Some(write_error.message.clone()) } ErrorKind::Transaction { message } => Some(message.clone()), + ErrorKind::IncompatibleServer { message } => Some(message.clone()), _ => None, } } @@ -411,6 +412,11 @@ pub enum ErrorKind { #[error("{message}")] #[non_exhaustive] Transaction { message: String }, + + /// The server does not support the operation. + #[error("The server does not support a database operation: {message}")] + #[non_exhaustive] + IncompatibleServer { message: String }, } /// An error that occurred due to a database command failing. diff --git a/src/operation/aggregate/mod.rs b/src/operation/aggregate/mod.rs index c7dd7efba..1ac9265a5 100644 --- a/src/operation/aggregate/mod.rs +++ b/src/operation/aggregate/mod.rs @@ -78,12 +78,10 @@ impl Operation for Aggregate { let body: CursorBody = response.body()?; Ok(CursorSpecification::new( - body.cursor.ns, + body.cursor, source_address, - body.cursor.id, self.options.as_ref().and_then(|opts| opts.batch_size), self.options.as_ref().and_then(|opts| opts.max_await_time), - body.cursor.first_batch, )) } diff --git a/src/operation/find/mod.rs b/src/operation/find/mod.rs index 3d7828e39..b94593e0a 100644 --- a/src/operation/find/mod.rs +++ b/src/operation/find/mod.rs @@ -101,15 +101,14 @@ impl Operation for Find { _description: &StreamDescription, ) -> Result { let source_address = response.source_address().clone(); - let body: CursorBody = response.body()?; + let mut body: CursorBody = response.body()?; + body.cursor.ns = self.ns.clone(); Ok(CursorSpecification::new( - self.ns.clone(), + body.cursor, source_address, - body.cursor.id, self.options.as_ref().and_then(|opts| opts.batch_size), self.options.as_ref().and_then(|opts| opts.max_await_time), - body.cursor.first_batch, )) } diff --git a/src/operation/list_collections/mod.rs b/src/operation/list_collections/mod.rs index db1ce3c6f..1b4b5d3bb 100644 --- a/src/operation/list_collections/mod.rs +++ b/src/operation/list_collections/mod.rs @@ -72,12 +72,10 @@ impl Operation for ListCollections { let body: CursorBody = response.body()?; Ok(CursorSpecification::new( - body.cursor.ns, + body.cursor, source_address, - body.cursor.id, self.options.as_ref().and_then(|opts| opts.batch_size), None, - body.cursor.first_batch, )) } diff --git a/src/operation/mod.rs b/src/operation/mod.rs index 13a8f66eb..9be2fb2ae 100644 --- a/src/operation/mod.rs +++ b/src/operation/mod.rs @@ -216,11 +216,12 @@ struct CursorBody { } #[derive(Debug, Deserialize)] -struct CursorInfo { - id: i64, - ns: Namespace, +#[serde(rename_all = "camelCase")] +pub(crate) struct CursorInfo { + pub(crate) id: i64, + pub(crate) ns: Namespace, #[serde(rename = "firstBatch")] - first_batch: VecDeque, + pub(crate) first_batch: VecDeque, } #[derive(Debug, PartialEq)] diff --git a/src/test/spec/json/sessions/README.rst b/src/test/spec/json/sessions/README.rst index 3ed7eea96..d88b5c7ba 100644 --- a/src/test/spec/json/sessions/README.rst +++ b/src/test/spec/json/sessions/README.rst @@ -9,10 +9,11 @@ Driver Session Tests Introduction ============ -The YAML and JSON files in this directory are platform-independent tests that -drivers can use to prove their conformance to the Driver Sessions Spec. They are +The YAML and JSON files in the ``legacy`` and ``unified`` sub-directories are platform-independent tests +that drivers can use to prove their conformance to the Driver Sessions Spec. They are designed with the intention of sharing most test-runner code with the -Transactions spec tests. +`Transactions Spec tests <../../transactions/tests/README.rst#test-format>`_.. Tests in the +``unified`` directory are written using the `Unified Test Format <../../unified-test-format/unified-test-format.rst>`_. Several prose tests, which are not easily expressed in YAML, are also presented in the Driver Sessions Spec. Those tests will need to be manually implemented @@ -78,7 +79,26 @@ the given session is *not* marked dirty:: arguments: session: session0 +Snapshot session tests +====================== +Snapshot sessions tests require server of version 5.0 or higher and +replica set or a sharded cluster deployment. +Default snapshot history window on the server is 5 minutes. Running the test in debug mode, or in any other slow configuration +may lead to `SnapshotTooOld` errors. Drivers can work around this issue by increasing the server's `minSnapshotHistoryWindowInSeconds` parameter, for example: + +.. code:: python + + client.admin.command('setParameter', 1, minSnapshotHistoryWindowInSeconds=60) + +Prose tests +``````````` +- Setting both ``snapshot`` and ``causalConsistency`` is not allowed + + * ``client.startSession(snapshot = true, causalConsistency = true)`` + * Assert that an error was raised by driver + Changelog ========= :2019-05-15: Initial version. +:2021-06-15: Added snapshot-session tests. Introduced legacy and unified folders. diff --git a/src/test/spec/json/sessions/dirty-session-errors.json b/src/test/spec/json/sessions/legacy/dirty-session-errors.json similarity index 100% rename from src/test/spec/json/sessions/dirty-session-errors.json rename to src/test/spec/json/sessions/legacy/dirty-session-errors.json diff --git a/src/test/spec/json/sessions/dirty-session-errors.yml b/src/test/spec/json/sessions/legacy/dirty-session-errors.yml similarity index 100% rename from src/test/spec/json/sessions/dirty-session-errors.yml rename to src/test/spec/json/sessions/legacy/dirty-session-errors.yml diff --git a/src/test/spec/json/sessions/server-support.json b/src/test/spec/json/sessions/legacy/server-support.json similarity index 100% rename from src/test/spec/json/sessions/server-support.json rename to src/test/spec/json/sessions/legacy/server-support.json diff --git a/src/test/spec/json/sessions/server-support.yml b/src/test/spec/json/sessions/legacy/server-support.yml similarity index 100% rename from src/test/spec/json/sessions/server-support.yml rename to src/test/spec/json/sessions/legacy/server-support.yml diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-client-error.json b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-client-error.json new file mode 100644 index 000000000..129aa8d74 --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-client-error.json @@ -0,0 +1,113 @@ +{ + "description": "snapshot-sessions-not-supported-client-error", + "schemaVersion": "1.0", + "runOnRequirements": [ + { + "minServerVersion": "3.6", + "maxServerVersion": "4.4.99" + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent", + "commandFailedEvent" + ] + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "database0" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "collection0" + } + }, + { + "session": { + "id": "session0", + "client": "client0", + "sessionOptions": { + "snapshot": true + } + } + } + ], + "initialData": [ + { + "collectionName": "collection0", + "databaseName": "database0", + "documents": [ + { + "_id": 1, + "x": 11 + } + ] + } + ], + "tests": [ + { + "description": "Client error on find with snapshot", + "operations": [ + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": {} + }, + "expectError": { + "isClientError": true, + "errorContains": "Snapshot reads require MongoDB 5.0 or later" + } + } + ], + "expectEvents": [] + }, + { + "description": "Client error on aggregate with snapshot", + "operations": [ + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "session": "session0", + "pipeline": [] + }, + "expectError": { + "isClientError": true, + "errorContains": "Snapshot reads require MongoDB 5.0 or later" + } + } + ], + "expectEvents": [] + }, + { + "description": "Client error on distinct with snapshot", + "operations": [ + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session0" + }, + "expectError": { + "isClientError": true, + "errorContains": "Snapshot reads require MongoDB 5.0 or later" + } + } + ], + "expectEvents": [] + } + ] +} diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-client-error.yml b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-client-error.yml new file mode 100644 index 000000000..b57344ce9 --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-client-error.yml @@ -0,0 +1,69 @@ +description: snapshot-sessions-not-supported-client-error + +schemaVersion: "1.0" + +runOnRequirements: + - minServerVersion: "3.6" + maxServerVersion: "4.4.99" + +createEntities: + - client: + id: &client0 client0 + observeEvents: [ commandStartedEvent, commandFailedEvent ] + - database: + id: &database0Name database0 + client: *client0 + databaseName: *database0Name + - collection: + id: &collection0Name collection0 + database: *database0Name + collectionName: *collection0Name + - session: + id: session0 + client: client0 + sessionOptions: + snapshot: true + +initialData: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1, x: 11 } + +tests: +- description: Client error on find with snapshot + operations: + - name: find + object: collection0 + arguments: + session: session0 + filter: {} + expectError: + isClientError: true + errorContains: Snapshot reads require MongoDB 5.0 or later + expectEvents: [] + +- description: Client error on aggregate with snapshot + operations: + - name: aggregate + object: collection0 + arguments: + session: session0 + pipeline: [] + expectError: + isClientError: true + errorContains: Snapshot reads require MongoDB 5.0 or later + expectEvents: [] + +- description: Client error on distinct with snapshot + operations: + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + session: session0 + expectError: + isClientError: true + errorContains: Snapshot reads require MongoDB 5.0 or later + expectEvents: [] diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-server-error.json b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-server-error.json new file mode 100644 index 000000000..79213f314 --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-server-error.json @@ -0,0 +1,187 @@ +{ + "description": "snapshot-sessions-not-supported-server-error", + "schemaVersion": "1.0", + "runOnRequirements": [ + { + "minServerVersion": "5.0", + "topologies": [ + "single" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent", + "commandFailedEvent" + ] + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "database0" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "collection0" + } + }, + { + "session": { + "id": "session0", + "client": "client0", + "sessionOptions": { + "snapshot": true + } + } + } + ], + "initialData": [ + { + "collectionName": "collection0", + "databaseName": "database0", + "documents": [ + { + "_id": 1, + "x": 11 + } + ] + } + ], + "tests": [ + { + "description": "Server returns an error on find with snapshot", + "operations": [ + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": {} + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "find" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on aggregate with snapshot", + "operations": [ + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "session": "session0", + "pipeline": [] + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "aggregate" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on distinct with snapshot", + "operations": [ + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session0" + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "distinct" + } + } + ] + } + ] + } + ] +} diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-server-error.yml b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-server-error.yml new file mode 100644 index 000000000..4953dbcbe --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions-not-supported-server-error.yml @@ -0,0 +1,102 @@ +description: snapshot-sessions-not-supported-server-error + +schemaVersion: "1.0" + +runOnRequirements: + - minServerVersion: "5.0" + topologies: [ single ] + +createEntities: + - client: + id: &client0 client0 + observeEvents: [ commandStartedEvent, commandFailedEvent ] + - database: + id: &database0Name database0 + client: *client0 + databaseName: *database0Name + - collection: + id: &collection0Name collection0 + database: *database0Name + collectionName: *collection0Name + - session: + id: session0 + client: client0 + sessionOptions: + snapshot: true + +initialData: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1, x: 11 } + +tests: +- description: Server returns an error on find with snapshot + operations: + - name: find + object: collection0 + arguments: + session: session0 + filter: {} + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: find + +- description: Server returns an error on aggregate with snapshot + operations: + - name: aggregate + object: collection0 + arguments: + session: session0 + pipeline: [] + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: aggregate + +- description: Server returns an error on distinct with snapshot + operations: + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + session: session0 + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: distinct diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions-unsupported-ops.json b/src/test/spec/json/sessions/unified/snapshot-sessions-unsupported-ops.json new file mode 100644 index 000000000..1021b7f26 --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions-unsupported-ops.json @@ -0,0 +1,493 @@ +{ + "description": "snapshot-sessions-unsupported-ops", + "schemaVersion": "1.0", + "runOnRequirements": [ + { + "minServerVersion": "5.0", + "topologies": [ + "replicaset", + "sharded-replicaset" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent", + "commandFailedEvent" + ] + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "database0" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "collection0" + } + }, + { + "session": { + "id": "session0", + "client": "client0", + "sessionOptions": { + "snapshot": true + } + } + } + ], + "initialData": [ + { + "collectionName": "collection0", + "databaseName": "database0", + "documents": [ + { + "_id": 1, + "x": 11 + } + ] + } + ], + "tests": [ + { + "description": "Server returns an error on insertOne with snapshot", + "runOnRequirements": [ + { + "topologies": [ + "replicaset" + ] + } + ], + "operations": [ + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "session": "session0", + "document": { + "_id": 22, + "x": 22 + } + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "insert": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "insert" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on insertMany with snapshot", + "runOnRequirements": [ + { + "topologies": [ + "replicaset" + ] + } + ], + "operations": [ + { + "name": "insertMany", + "object": "collection0", + "arguments": { + "session": "session0", + "documents": [ + { + "_id": 22, + "x": 22 + }, + { + "_id": 33, + "x": 33 + } + ] + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "insert": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "insert" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on deleteOne with snapshot", + "runOnRequirements": [ + { + "topologies": [ + "replicaset" + ] + } + ], + "operations": [ + { + "name": "deleteOne", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": {} + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "delete": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "delete" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on updateOne with snapshot", + "runOnRequirements": [ + { + "topologies": [ + "replicaset" + ] + } + ], + "operations": [ + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + } + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "update": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "update" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on findOneAndUpdate with snapshot", + "operations": [ + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + } + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "findAndModify": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "findAndModify" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on listDatabases with snapshot", + "operations": [ + { + "name": "listDatabases", + "object": "client0", + "arguments": { + "session": "session0" + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "listDatabases": 1, + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "listDatabases" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on listCollections with snapshot", + "operations": [ + { + "name": "listCollections", + "object": "database0", + "arguments": { + "session": "session0" + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "listCollections": 1, + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "listCollections" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on listIndexes with snapshot", + "operations": [ + { + "name": "listIndexes", + "object": "collection0", + "arguments": { + "session": "session0" + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "listIndexes": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "listIndexes" + } + } + ] + } + ] + }, + { + "description": "Server returns an error on runCommand with snapshot", + "operations": [ + { + "name": "runCommand", + "object": "database0", + "arguments": { + "session": "session0", + "commandName": "listCollections", + "command": { + "listCollections": 1 + } + }, + "expectError": { + "isError": true, + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "listCollections": 1, + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "listCollections" + } + } + ] + } + ] + } + ] +} diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions-unsupported-ops.yml b/src/test/spec/json/sessions/unified/snapshot-sessions-unsupported-ops.yml new file mode 100644 index 000000000..1d5dce893 --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions-unsupported-ops.yml @@ -0,0 +1,258 @@ +description: snapshot-sessions-unsupported-ops + +schemaVersion: "1.0" + +runOnRequirements: + - minServerVersion: "5.0" + topologies: [replicaset, sharded-replicaset] + +createEntities: + - client: + id: &client0 client0 + observeEvents: [ commandStartedEvent, commandFailedEvent ] + - database: + id: &database0Name database0 + client: *client0 + databaseName: *database0Name + - collection: + id: &collection0Name collection0 + database: *database0Name + collectionName: *collection0Name + - session: + id: session0 + client: client0 + sessionOptions: + snapshot: true + +initialData: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1, x: 11 } + +tests: +- description: Server returns an error on insertOne with snapshot + # Skip on sharded clusters due to SERVER-58176. + runOnRequirements: + - topologies: [replicaset] + operations: + - name: insertOne + object: collection0 + arguments: + session: session0 + document: + _id: 22 + x: 22 + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + insert: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: insert + +- description: Server returns an error on insertMany with snapshot + # Skip on sharded clusters due to SERVER-58176. + runOnRequirements: + - topologies: [replicaset] + operations: + - name: insertMany + object: collection0 + arguments: + session: session0 + documents: + - _id: 22 + x: 22 + - _id: 33 + x: 33 + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + insert: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: insert + +- description: Server returns an error on deleteOne with snapshot + # Skip on sharded clusters due to SERVER-58176. + runOnRequirements: + - topologies: [replicaset] + operations: + - name: deleteOne + object: collection0 + arguments: + session: session0 + filter: {} + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + delete: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: delete + +- description: Server returns an error on updateOne with snapshot + # Skip on sharded clusters due to SERVER-58176. + runOnRequirements: + - topologies: [replicaset] + operations: + - name: updateOne + object: collection0 + arguments: + session: session0 + filter: { _id: 1 } + update: { $inc: { x: 1 } } + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + update: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: update + +- description: Server returns an error on findOneAndUpdate with snapshot + operations: + - name: findOneAndUpdate + object: collection0 + arguments: + session: session0 + filter: { _id: 1 } + update: { $inc: { x: 1 } } + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + findAndModify: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: findAndModify + +- description: Server returns an error on listDatabases with snapshot + operations: + - name: listDatabases + object: client0 + arguments: + session: session0 + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + listDatabases: 1 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: listDatabases + +- description: Server returns an error on listCollections with snapshot + operations: + - name: listCollections + object: database0 + arguments: + session: session0 + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + listCollections: 1 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: listCollections + +- description: Server returns an error on listIndexes with snapshot + operations: + - name: listIndexes + object: collection0 + arguments: + session: session0 + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + listIndexes: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: listIndexes + +- description: Server returns an error on runCommand with snapshot + operations: + - name: runCommand + object: database0 + arguments: + session: session0 + commandName: listCollections + command: + listCollections: 1 + expectError: + isError: true + isClientError: false + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + listCollections: 1 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandFailedEvent: + commandName: listCollections diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions.json b/src/test/spec/json/sessions/unified/snapshot-sessions.json new file mode 100644 index 000000000..75b577b03 --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions.json @@ -0,0 +1,993 @@ +{ + "description": "snapshot-sessions", + "schemaVersion": "1.0", + "runOnRequirements": [ + { + "minServerVersion": "5.0", + "topologies": [ + "replicaset", + "sharded-replicaset" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent" + ], + "ignoreCommandMonitoringEvents": [ + "findAndModify", + "insert", + "update" + ] + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "database0" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "collection0", + "collectionOptions": { + "writeConcern": { + "w": "majority" + } + } + } + }, + { + "session": { + "id": "session0", + "client": "client0", + "sessionOptions": { + "snapshot": true + } + } + }, + { + "session": { + "id": "session1", + "client": "client0", + "sessionOptions": { + "snapshot": true + } + } + } + ], + "initialData": [ + { + "collectionName": "collection0", + "databaseName": "database0", + "documents": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 11 + } + ] + } + ], + "tests": [ + { + "description": "Find operation with snapshot", + "operations": [ + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 1, + "x": 12 + } + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session1", + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 12 + } + ] + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 1, + "x": 13 + } + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 13 + } + ] + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session1", + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 12 + } + ] + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "$$exists": false + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + } + ] + } + ] + }, + { + "description": "Distinct operation with snapshot", + "operations": [ + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session0" + }, + "expectResult": [ + 11 + ] + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 2 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 2, + "x": 12 + } + }, + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session1" + }, + "expectResult": [ + 11, + 12 + ] + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 2 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 2, + "x": 13 + } + }, + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {} + }, + "expectResult": [ + 11, + 13 + ] + }, + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session0" + }, + "expectResult": [ + 11 + ] + }, + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session1" + }, + "expectResult": [ + 11, + 12 + ] + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "$$exists": false + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + } + ] + } + ] + }, + { + "description": "Aggregate operation with snapshot", + "operations": [ + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ], + "session": "session0" + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 1, + "x": 12 + } + }, + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ], + "session": "session1" + }, + "expectResult": [ + { + "_id": 1, + "x": 12 + } + ] + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 1, + "x": 13 + } + }, + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ] + }, + "expectResult": [ + { + "_id": 1, + "x": 13 + } + ] + }, + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ], + "session": "session0" + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ], + "session": "session1" + }, + "expectResult": [ + { + "_id": 1, + "x": 12 + } + ] + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "$$exists": false + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + } + ] + } + ] + }, + { + "description": "countDocuments operation with snapshot", + "operations": [ + { + "name": "countDocuments", + "object": "collection0", + "arguments": { + "filter": {}, + "session": "session0" + }, + "expectResult": 2 + }, + { + "name": "countDocuments", + "object": "collection0", + "arguments": { + "filter": {}, + "session": "session0" + }, + "expectResult": 2 + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + } + ] + } + ] + }, + { + "description": "Mixed operation with snapshot", + "operations": [ + { + "name": "find", + "object": "collection0", + "arguments": { + "session": "session0", + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "findOneAndUpdate", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + }, + "returnDocument": "After" + }, + "expectResult": { + "_id": 1, + "x": 12 + } + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + } + }, + "expectResult": [ + { + "_id": 1, + "x": 12 + } + ] + }, + { + "name": "aggregate", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$match": { + "_id": 1 + } + } + ], + "session": "session0" + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + }, + { + "name": "distinct", + "object": "collection0", + "arguments": { + "fieldName": "x", + "filter": {}, + "session": "session0" + }, + "expectResult": [ + 11 + ] + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "$$exists": false + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "distinct": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + } + ] + } + ] + }, + { + "description": "Write commands with snapshot session do not affect snapshot reads", + "operations": [ + { + "name": "find", + "object": "collection0", + "arguments": { + "filter": {}, + "session": "session0" + } + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 22, + "x": 33 + } + } + }, + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + }, + { + "name": "find", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "session": "session0" + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + } + ] + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": true + } + } + } + } + } + ] + } + ] + }, + { + "description": "First snapshot read does not send atClusterTime", + "operations": [ + { + "name": "find", + "object": "collection0", + "arguments": { + "filter": {}, + "session": "session0" + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "find": "collection0", + "readConcern": { + "level": "snapshot", + "atClusterTime": { + "$$exists": false + } + } + }, + "commandName": "find", + "databaseName": "database0" + } + } + ] + } + ] + }, + { + "description": "StartTransaction fails in snapshot session", + "operations": [ + { + "name": "startTransaction", + "object": "session0", + "expectError": { + "isError": true, + "isClientError": true, + "errorContains": "Transactions are not supported in snapshot sessions" + } + } + ] + } + ] +} diff --git a/src/test/spec/json/sessions/unified/snapshot-sessions.yml b/src/test/spec/json/sessions/unified/snapshot-sessions.yml new file mode 100644 index 000000000..2f5fc2312 --- /dev/null +++ b/src/test/spec/json/sessions/unified/snapshot-sessions.yml @@ -0,0 +1,482 @@ +description: snapshot-sessions + +schemaVersion: "1.0" + +runOnRequirements: + - minServerVersion: "5.0" + topologies: [replicaset, sharded-replicaset] + +createEntities: + - client: + id: &client0 client0 + observeEvents: [ commandStartedEvent] + ignoreCommandMonitoringEvents: [ findAndModify, insert, update ] + - database: + id: &database0 database0 + client: *client0 + databaseName: &database0Name database0 + - collection: + id: &collection0 collection0 + database: *database0 + collectionName: &collection0Name collection0 + collectionOptions: + writeConcern: { w: majority } + - session: + id: session0 + client: client0 + sessionOptions: + snapshot: true + - session: + id: session1 + client: client0 + sessionOptions: + snapshot: true + +initialData: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1, x: 11 } + - { _id: 2, x: 11 } + +tests: +- description: Find operation with snapshot + operations: + - name: find + object: collection0 + arguments: + session: session0 + filter: { _id: 1 } + expectResult: + - {_id: 1, x: 11} + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 1 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 1, x: 12 } + - name: find + object: collection0 + arguments: + session: session1 + filter: { _id: 1 } + expectResult: + - { _id: 1, x: 12 } + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 1 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 1, x: 13 } + - name: find + object: collection0 + arguments: + filter: { _id: 1 } + expectResult: + - { _id: 1, x: 13 } + - name: find + object: collection0 + arguments: + session: session0 + filter: { _id: 1 } + expectResult: + - {_id: 1, x: 11} + - name: find + object: collection0 + arguments: + session: session1 + filter: { _id: 1 } + expectResult: + - {_id: 1, x: 12} + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + find: collection0 + readConcern: + "$$exists": false + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + +- description: Distinct operation with snapshot + operations: + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + session: session0 + expectResult: + - 11 + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 2 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 2, x: 12 } + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + session: session1 + expectResult: [11, 12] + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 2 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 2, x: 13 } + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + expectResult: [ 11, 13 ] + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + session: session0 + expectResult: [ 11 ] + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + session: session1 + expectResult: [ 11, 12 ] + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + "$$exists": false + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + +- description: Aggregate operation with snapshot + operations: + - name: aggregate + object: collection0 + arguments: + pipeline: + - "$match": { _id: 1 } + session: session0 + expectResult: + - { _id: 1, x: 11 } + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 1 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 1, x: 12 } + - name: aggregate + object: collection0 + arguments: + pipeline: + - "$match": + _id: 1 + session: session1 + expectResult: + - {_id: 1, x: 12} + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 1 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 1, x: 13 } + - name: aggregate + object: collection0 + arguments: + pipeline: + - "$match": { _id: 1 } + expectResult: + - { _id: 1, x: 13 } + - name: aggregate + object: collection0 + arguments: + pipeline: + - "$match": + _id: 1 + session: session0 + expectResult: + - { _id: 1, x: 11 } + - name: aggregate + object: collection0 + arguments: + pipeline: + - "$match": { _id: 1 } + session: session1 + expectResult: + - { _id: 1, x: 12 } + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + "$$exists": false + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + +- description: countDocuments operation with snapshot + operations: + - name: countDocuments + object: collection0 + arguments: + filter: {} + session: session0 + expectResult: 2 + - name: countDocuments + object: collection0 + arguments: + filter: {} + session: session0 + expectResult: 2 + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + +- description: Mixed operation with snapshot + operations: + - name: find + object: collection0 + arguments: + session: session0 + filter: { _id: 1 } + expectResult: + - { _id: 1, x: 11 } + - name: findOneAndUpdate + object: collection0 + arguments: + filter: { _id: 1 } + update: { $inc: { x: 1 } } + returnDocument: After + expectResult: { _id: 1, x: 12 } + - name: find + object: collection0 + arguments: + filter: { _id: 1 } + expectResult: + - { _id: 1, x: 12 } + - name: aggregate + object: collection0 + arguments: + pipeline: + - "$match": + _id: 1 + session: session0 + expectResult: + - { _id: 1, x: 11 } + - name: distinct + object: collection0 + arguments: + fieldName: x + filter: {} + session: session0 + expectResult: [ 11 ] + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + find: collection0 + readConcern: + "$$exists": false + - commandStartedEvent: + command: + aggregate: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + - commandStartedEvent: + command: + distinct: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + +- description: Write commands with snapshot session do not affect snapshot reads + operations: + - name: find + object: collection0 + arguments: + filter: {} + session: session0 + - name: insertOne + object: collection0 + arguments: + document: + _id: 22 + x: 33 + - name: updateOne + object: collection0 + arguments: + filter: { _id: 1 } + update: { $inc: { x: 1 } } + - name: find + object: collection0 + arguments: + filter: { _id: 1 } + session: session0 + expectResult: + - {_id: 1, x: 11} + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": true + +- description: First snapshot read does not send atClusterTime + operations: + - name: find + object: collection0 + arguments: + filter: {} + session: session0 + expectEvents: + - client: client0 + events: + - commandStartedEvent: + command: + find: collection0 + readConcern: + level: snapshot + atClusterTime: + "$$exists": false + commandName: find + databaseName: database0 + +- description: StartTransaction fails in snapshot session + operations: + - name: startTransaction + object: session0 + expectError: + isError: true + isClientError: true + errorContains: Transactions are not supported in snapshot sessions diff --git a/src/test/spec/sessions.rs b/src/test/spec/sessions.rs index 1a8ad3219..a1a7c7d43 100644 --- a/src/test/spec/sessions.rs +++ b/src/test/spec/sessions.rs @@ -2,11 +2,18 @@ use tokio::sync::RwLockWriteGuard; use crate::test::{run_spec_test, LOCK}; -use super::run_v2_test; +use super::{run_unified_format_test, run_v2_test}; #[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))] #[cfg_attr(feature = "async-std-runtime", async_std::test)] -async fn run() { +async fn run_unified() { let _guard: RwLockWriteGuard<()> = LOCK.run_exclusively().await; - run_spec_test(&["sessions"], run_v2_test).await; + run_spec_test(&["sessions", "unified"], run_unified_format_test).await; +} + +#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn run_legacy() { + let _guard: RwLockWriteGuard<()> = LOCK.run_exclusively().await; + run_spec_test(&["sessions", "legacy"], run_v2_test).await; } diff --git a/src/test/spec/unified_runner/mod.rs b/src/test/spec/unified_runner/mod.rs index 4c3277624..02cf2c692 100644 --- a/src/test/spec/unified_runner/mod.rs +++ b/src/test/spec/unified_runner/mod.rs @@ -231,7 +231,12 @@ pub async fn run_unified_format_test(test_file: TestFile) { assert_eq!(actual_events.len(), expected_events.len()); for (actual, expected) in actual_events.iter().zip(expected_events) { - assert!(events_match(actual, expected, Some(&test_runner.entities))); + assert!( + events_match(actual, expected, Some(&test_runner.entities)), + "event mismatch: expected = {:#?}, actual = {:#?}", + expected, + actual, + ); } } } diff --git a/src/test/spec/unified_runner/operation.rs b/src/test/spec/unified_runner/operation.rs index ae667d4ba..0c1ff4e6c 100644 --- a/src/test/spec/unified_runner/operation.rs +++ b/src/test/spec/unified_runner/operation.rs @@ -273,6 +273,7 @@ impl TestOperation for DeleteMany { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct DeleteOne { filter: Document, + session: Option, #[serde(flatten)] options: DeleteOptions, } @@ -284,10 +285,20 @@ impl TestOperation for DeleteOne { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .delete_one(self.filter.clone(), self.options.clone()) - .await?; + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + collection + .delete_one_with_session(self.filter.clone(), self.options.clone(), session) + .await? + } + None => { + collection + .delete_one(self.filter.clone(), self.options.clone()) + .await? + } + }; let result = to_bson(&result)?; Ok(Some(result.into())) } @@ -301,6 +312,7 @@ impl TestOperation for DeleteOne { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct Find { filter: Option, + session: Option, #[serde(flatten)] options: FindOptions, } @@ -312,11 +324,25 @@ impl TestOperation for Find { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let collection = test_runner.get_collection(id); - let cursor = collection - .find(self.filter.clone(), self.options.clone()) - .await?; - let result = cursor.try_collect::>().await?; + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + let mut cursor = collection + .find_with_session(self.filter.clone(), self.options.clone(), session) + .await?; + cursor + .stream(session) + .try_collect::>() + .await? + } + None => { + let cursor = collection + .find(self.filter.clone(), self.options.clone()) + .await?; + cursor.try_collect::>().await? + } + }; Ok(Some(Bson::from(result).into())) } @@ -333,6 +359,7 @@ impl TestOperation for Find { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct InsertMany { documents: Vec, + session: Option, #[serde(flatten)] options: InsertManyOptions, } @@ -344,10 +371,20 @@ impl TestOperation for InsertMany { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .insert_many(self.documents.clone(), self.options.clone()) - .await?; + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + collection + .insert_many_with_session(self.documents.clone(), self.options.clone(), session) + .await? + } + None => { + collection + .insert_many(self.documents.clone(), self.options.clone()) + .await? + } + }; let ids: HashMap = result .inserted_ids .into_iter() @@ -489,6 +526,7 @@ impl TestOperation for UpdateOne { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct Aggregate { pipeline: Vec, + session: Option, #[serde(flatten)] options: AggregateOptions, } @@ -500,19 +538,51 @@ impl TestOperation for Aggregate { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let cursor = match test_runner.entities.get(id).unwrap() { - Entity::Collection(collection) => { - collection - .aggregate(self.pipeline.clone(), self.options.clone()) + let result = match &self.session { + Some(session_id) => { + let entity = test_runner.entities.get(id).unwrap().clone(); + let session = test_runner.get_mut_session(session_id); + let mut cursor = match entity { + Entity::Collection(collection) => { + collection + .aggregate_with_session( + self.pipeline.clone(), + self.options.clone(), + session, + ) + .await? + } + Entity::Database(db) => { + db.aggregate_with_session( + self.pipeline.clone(), + self.options.clone(), + session, + ) + .await? + } + other => panic!("Cannot execute aggregate on {:?}", &other), + }; + cursor + .stream(session) + .try_collect::>() .await? } - Entity::Database(db) => { - db.aggregate(self.pipeline.clone(), self.options.clone()) - .await? + None => { + let cursor = match test_runner.entities.get(id).unwrap() { + Entity::Collection(collection) => { + collection + .aggregate(self.pipeline.clone(), self.options.clone()) + .await? + } + Entity::Database(db) => { + db.aggregate(self.pipeline.clone(), self.options.clone()) + .await? + } + other => panic!("Cannot execute aggregate on {:?}", &other), + }; + cursor.try_collect::>().await? } - other => panic!("Cannot execute aggregate on {:?}", &other), }; - let result = cursor.try_collect::>().await?; Ok(Some(Bson::from(result).into())) } @@ -530,6 +600,7 @@ impl TestOperation for Aggregate { pub(super) struct Distinct { field_name: String, filter: Option, + session: Option, #[serde(flatten)] options: DistinctOptions, } @@ -541,10 +612,25 @@ impl TestOperation for Distinct { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .distinct(&self.field_name, self.filter.clone(), self.options.clone()) - .await?; + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + collection + .distinct_with_session( + &self.field_name, + self.filter.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .distinct(&self.field_name, self.filter.clone(), self.options.clone()) + .await? + } + }; Ok(Some(Bson::Array(result).into())) } @@ -557,6 +643,7 @@ impl TestOperation for Distinct { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct CountDocuments { filter: Document, + session: Option, #[serde(flatten)] options: CountOptions, } @@ -568,10 +655,24 @@ impl TestOperation for CountDocuments { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .count_documents(self.filter.clone(), self.options.clone()) - .await?; + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + collection + .count_documents_with_session( + self.filter.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .count_documents(self.filter.clone(), self.options.clone()) + .await? + } + }; Ok(Some(Bson::from(result).into())) } @@ -640,6 +741,7 @@ impl TestOperation for FindOne { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct ListDatabases { filter: Option, + session: Option, #[serde(flatten)] options: ListDatabasesOptions, } @@ -651,10 +753,20 @@ impl TestOperation for ListDatabases { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let client = test_runner.get_client(id); - let result = client - .list_databases(self.filter.clone(), self.options.clone()) - .await?; + let client = test_runner.get_client(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + client + .list_databases_with_session(self.filter.clone(), self.options.clone(), session) + .await? + } + None => { + client + .list_databases(self.filter.clone(), self.options.clone()) + .await? + } + }; Ok(Some(bson::to_bson(&result)?.into())) } @@ -695,6 +807,7 @@ impl TestOperation for ListDatabaseNames { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct ListCollections { filter: Option, + session: Option, #[serde(flatten)] options: ListCollectionsOptions, } @@ -706,11 +819,26 @@ impl TestOperation for ListCollections { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let db = test_runner.get_database(id); - let cursor = db - .list_collections(self.filter.clone(), self.options.clone()) - .await?; - let result = cursor.try_collect::>().await?; + let db = test_runner.get_database(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + let mut cursor = db + .list_collections_with_session( + self.filter.clone(), + self.options.clone(), + session, + ) + .await?; + cursor.stream(session).try_collect::>().await? + } + None => { + let cursor = db + .list_collections(self.filter.clone(), self.options.clone()) + .await?; + cursor.try_collect::>().await? + } + }; Ok(Some(bson::to_bson(&result)?.into())) } @@ -785,6 +913,7 @@ impl TestOperation for ReplaceOne { pub(super) struct FindOneAndUpdate { filter: Document, update: UpdateModifications, + session: Option, #[serde(flatten)] options: FindOneAndUpdateOptions, } @@ -796,14 +925,29 @@ impl TestOperation for FindOneAndUpdate { id: &str, test_runner: &mut TestRunner, ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .find_one_and_update( - self.filter.clone(), - self.update.clone(), - self.options.clone(), - ) - .await?; + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + collection + .find_one_and_update_with_session( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .find_one_and_update( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + ) + .await? + } + }; let result = to_bson(&result)?; Ok(Some(result.into())) } @@ -1053,10 +1197,18 @@ impl TestOperation for RunCommand { command.insert("writeConcern", write_concern.clone()); } - let db = test_runner.get_database(id); - let result = db - .run_command(command, self.read_preference.clone()) - .await?; + let db = test_runner.get_database(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + db.run_command_with_session(command, self.read_preference.clone(), session) + .await? + } + None => { + db.run_command(command, self.read_preference.clone()) + .await? + } + }; let result = to_bson(&result)?; Ok(Some(result.into())) }