diff --git a/src/cmap/conn/command.rs b/src/cmap/conn/command.rs index c99dd6321..7c0184d40 100644 --- a/src/cmap/conn/command.rs +++ b/src/cmap/conn/command.rs @@ -217,6 +217,16 @@ impl RawCommandResponse { }) } + /// Used to handle decoding responses where the server may return invalid UTF-8 in error + /// messages. + pub(crate) fn body_utf8_lossy<'a, T: Deserialize<'a>>(&'a self) -> Result { + bson::from_slice_utf8_lossy(self.raw.as_bytes()).map_err(|e| { + Error::from(ErrorKind::InvalidResponse { + message: format!("{}", e), + }) + }) + } + pub(crate) fn raw_body(&self) -> &RawDocument { &self.raw } diff --git a/src/operation/insert/mod.rs b/src/operation/insert/mod.rs index b7a99358f..0cf129b28 100644 --- a/src/operation/insert/mod.rs +++ b/src/operation/insert/mod.rs @@ -135,7 +135,7 @@ impl<'a, T: Serialize> Operation for Insert<'a, T> { raw_response: RawCommandResponse, _description: &StreamDescription, ) -> Result { - let response: WriteResponseBody = raw_response.body()?; + let response: WriteResponseBody = raw_response.body_utf8_lossy()?; let mut map = HashMap::new(); if self.is_ordered() { diff --git a/src/operation/update/mod.rs b/src/operation/update/mod.rs index f5e129193..fe40ef285 100644 --- a/src/operation/update/mod.rs +++ b/src/operation/update/mod.rs @@ -118,7 +118,7 @@ impl Operation for Update { raw_response: RawCommandResponse, _description: &StreamDescription, ) -> Result { - let response: WriteResponseBody = raw_response.body()?; + let response: WriteResponseBody = raw_response.body_utf8_lossy()?; response.validate().map_err(convert_bulk_errors)?; let modified_count = response.n_modified; diff --git a/src/test/coll.rs b/src/test/coll.rs index 068931984..eafd08770 100644 --- a/src/test/coll.rs +++ b/src/test/coll.rs @@ -19,6 +19,7 @@ use crate::{ FindOneOptions, FindOptions, Hint, + IndexOptions, InsertManyOptions, ReadConcern, ReadPreference, @@ -35,6 +36,7 @@ use crate::{ LOCK, }, Collection, + IndexModel, }; #[cfg_attr(feature = "tokio-runtime", tokio::test)] @@ -1098,3 +1100,104 @@ async fn cursor_batch_size() { let docs: Vec<_> = cursor.stream(&mut session).try_collect().await.unwrap(); assert_eq!(docs.len(), 10); } + +/// Test that the driver gracefully handles cases where the server returns invalid UTF-8 in error +/// messages. See SERVER-24007 and related tickets for details. +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn invalid_utf8_response() { + let _guard: RwLockReadGuard<()> = LOCK.run_concurrently().await; + + let client = TestClient::new().await; + let coll = client + .init_db_and_coll("invalid_uft8_handling", "invalid_uft8_handling") + .await; + + let index_model = IndexModel::builder() + .keys(doc! {"name": 1}) + .options(IndexOptions::builder().unique(true).build()) + .build(); + coll.create_index(index_model, None) + .await + .expect("creating an index should succeed"); + + // a document containing a long string with multi-byte unicode characters. taken from a user + // repro in RUBY-2560. + let long_unicode_str_doc = doc! {"name": "(╯°□°)╯︵ ┻━┻(╯°□°)╯︵ ┻━┻(╯°□°)╯︵ ┻━┻(╯°□°)╯︵ ┻━┻(╯°□°)╯︵ ┻━┻(╯°□°)╯︵ ┻━┻"}; + coll.insert_one(&long_unicode_str_doc, None) + .await + .expect("first insert of document should succeed"); + + // test triggering an invalid error message via an insert_one. + let insert_err = coll + .insert_one(&long_unicode_str_doc, None) + .await + .expect_err("second insert of document should fail") + .kind; + assert_duplicate_key_error_with_utf8_replacement(&insert_err); + + // test triggering an invalid error message via an insert_many. + let insert_err = coll + .insert_many([&long_unicode_str_doc], None) + .await + .expect_err("second insert of document should fail") + .kind; + assert_duplicate_key_error_with_utf8_replacement(&insert_err); + + // test triggering an invalid error message via an update_one. + coll.insert_one(doc! {"x": 1}, None) + .await + .expect("inserting new document should succeed"); + + let update_err = coll + .update_one(doc! {"x": 1}, doc! {"$set": &long_unicode_str_doc}, None) + .await + .expect_err("update setting duplicate key should fail") + .kind; + assert_duplicate_key_error_with_utf8_replacement(&update_err); + + // test triggering an invalid error message via an update_many. + let update_err = coll + .update_many(doc! {"x": 1}, doc! {"$set": &long_unicode_str_doc}, None) + .await + .expect_err("update setting duplicate key should fail") + .kind; + assert_duplicate_key_error_with_utf8_replacement(&update_err); + + // test triggering an invalid error message via a replace_one. + let replace_err = coll + .replace_one(doc! {"x": 1}, &long_unicode_str_doc, None) + .await + .expect_err("replacement with duplicate key should fail") + .kind; + assert_duplicate_key_error_with_utf8_replacement(&replace_err); +} + +/// Check that we successfully decoded a duplicate key error and that the error message contains the +/// unicode replacement character, meaning we gracefully handled the invalid UTF-8. +fn assert_duplicate_key_error_with_utf8_replacement(error: &ErrorKind) { + match error { + ErrorKind::Write(ref failure) => match failure { + WriteFailure::WriteError(err) => { + assert_eq!(err.code, 11000); + assert!(err.message.contains('�')); + } + e => panic!("expected WriteFailure::WriteError, got {:?} instead", e), + }, + ErrorKind::BulkWrite(ref failure) => match &failure.write_errors { + Some(write_errors) => { + assert_eq!(write_errors.len(), 1); + assert_eq!(write_errors[0].code, 11000); + assert!(write_errors[0].message.contains('�')); + } + None => panic!( + "expected BulkWriteFailure containing write errors, got {:?} instead", + failure + ), + }, + e => panic!( + "expected ErrorKind::Write or ErrorKind::BulkWrite, got {:?} instead", + e + ), + } +}