From e49396dafed236645479d1d2c0d6bab67d525c99 Mon Sep 17 00:00:00 2001 From: Pietro Albini Date: Wed, 22 Jan 2020 11:05:02 +0100 Subject: [PATCH 01/38] storage: move db::file::Blob to storage::Blob --- src/db/file.rs | 11 +++-------- src/lib.rs | 1 + src/storage/mod.rs | 8 ++++++++ src/web/file.rs | 2 +- 4 files changed, 13 insertions(+), 9 deletions(-) create mode 100644 src/storage/mod.rs diff --git a/src/db/file.rs b/src/db/file.rs index 56fbb4171..ca912075c 100644 --- a/src/db/file.rs +++ b/src/db/file.rs @@ -18,6 +18,8 @@ use std::fs; use std::io::Read; use std::path::{Path, PathBuf}; +pub(crate) use crate::storage::Blob; + const MAX_CONCURRENT_UPLOADS: usize = 50; pub(super) static S3_BUCKET_NAME: &str = "rust-docs-rs"; @@ -57,14 +59,7 @@ fn get_file_list>(path: P) -> Result> { Ok(files) } -pub struct Blob { - pub path: String, - pub mime: String, - pub date_updated: time::Timespec, - pub content: Vec, -} - -pub fn get_path(conn: &Connection, path: &str) -> Option { +pub(crate) fn get_path(conn: &Connection, path: &str) -> Option { if let Some(client) = s3_client() { let res = client .get_object(GetObjectRequest { diff --git a/src/lib.rs b/src/lib.rs index 354065ccc..86b30f2d7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ pub use self::web::Server; pub mod db; mod docbuilder; mod error; +mod storage; #[cfg(test)] mod test; pub mod utils; diff --git a/src/storage/mod.rs b/src/storage/mod.rs new file mode 100644 index 000000000..d77aaf226 --- /dev/null +++ b/src/storage/mod.rs @@ -0,0 +1,8 @@ +use time::Timespec; + +pub(crate) struct Blob { + pub(crate) path: String, + pub(crate) mime: String, + pub(crate) date_updated: Timespec, + pub(crate) content: Vec, +} diff --git a/src/web/file.rs b/src/web/file.rs index 1cd33df91..a1b5251e9 100644 --- a/src/web/file.rs +++ b/src/web/file.rs @@ -6,7 +6,7 @@ use iron::status; use iron::{Handler, IronError, IronResult, Request, Response}; use postgres::Connection; -pub struct File(pub db::file::Blob); +pub(crate) struct File(pub(crate) db::file::Blob); impl File { /// Gets file from database From 69b36c43b5dfb5c096578f0610606fa9069d65f2 Mon Sep 17 00:00:00 2001 From: Pietro Albini Date: Thu, 23 Jan 2020 10:26:35 +0100 Subject: [PATCH 02/38] wip --- src/storage/database.rs | 86 +++++++++++++++++++++++++++++++++++++++++ src/storage/mod.rs | 21 ++++++++++ src/storage/s3.rs | 77 ++++++++++++++++++++++++++++++++++++ 3 files changed, 184 insertions(+) create mode 100644 src/storage/database.rs create mode 100644 src/storage/s3.rs diff --git a/src/storage/database.rs b/src/storage/database.rs new file mode 100644 index 000000000..d2ca88948 --- /dev/null +++ b/src/storage/database.rs @@ -0,0 +1,86 @@ +use super::Blob; +use failure::{Error, Fail}; +use postgres::Connection; +use time::Timespec; + +#[derive(Debug, Fail)] +#[fail(display = "the path is not present in the database")] +struct PathNotFoundError; + +pub(crate) struct DatabaseBackend<'a> { + conn: &'a Connection, +} + +impl<'a> DatabaseBackend<'a> { + pub(super) fn new(conn: &'a Connection) -> Self { + Self { conn } + } + + pub(super) fn get(&self, path: &str) -> Result { + let rows = self.conn.query( + "SELECT path, mime, date_updated, content FROM files WHERE path = $1;", + &[&path], + )?; + + if rows.is_empty() { + Err(PathNotFoundError.into()) + } else { + let row = rows.get(0); + Ok(Blob { + path: row.get("path"), + mime: row.get("mime"), + date_updated: row.get("date_updated"), + content: row.get("content"), + }) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_path_get() { + crate::test::wrapper(|env| { + let conn = env.db().conn(); + let backend = DatabaseBackend::new(&conn); + + // Add a test file to the database + conn.execute( + "INSERT INTO files (path, mime, date_updated, content) VALUES ($1, $2, $3, $4);", + &[ + &"dir/foo.txt", + &"text/plain", + &Timespec::new(42, 0), + &"Hello world!".as_bytes(), + ], + )?; + + // Test that the proper file was returned + assert_eq!( + Blob { + path: "dir/foo.txt".into(), + mime: "text/plain".into(), + date_updated: Timespec::new(42, 0), + content: "Hello world!".bytes().collect(), + }, + backend.get("dir/foo.txt")? + ); + + // Test that other files are not returned + assert!(backend + .get("dir/bar.txt") + .unwrap_err() + .downcast_ref::() + .is_some()); + assert!(backend + .get("foo.txt") + .unwrap_err() + .downcast_ref::() + .is_some()); + + Ok(()) + }); + } +} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index d77aaf226..5bb300e38 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,8 +1,29 @@ +mod database; +mod s3; + +use self::database::DatabaseBackend; +use self::s3::S3Backend; +use failure::Error; use time::Timespec; +#[derive(Debug, PartialEq, Eq)] pub(crate) struct Blob { pub(crate) path: String, pub(crate) mime: String, pub(crate) date_updated: Timespec, pub(crate) content: Vec, } + +pub(crate) enum Storage<'a> { + Database(DatabaseBackend<'a>), + S3(S3Backend<'a>), +} + +impl Storage<'_> { + pub(crate) fn get(&self, path: &str) -> Result { + match self { + Self::Database(db) => db.get(path), + Self::S3(s3) => s3.get(path), + } + } +} diff --git a/src/storage/s3.rs b/src/storage/s3.rs new file mode 100644 index 000000000..fb0de3800 --- /dev/null +++ b/src/storage/s3.rs @@ -0,0 +1,77 @@ +use super::Blob; +use failure::Error; +use rusoto_s3::{GetObjectRequest, S3Client, S3}; +use std::convert::TryInto; +use std::io::Read; +use time::Timespec; + +pub(crate) struct S3Backend<'a> { + client: &'a S3Client, + bucket: String, +} + +impl<'a> S3Backend<'a> { + pub(super) fn new(client: &'a S3Client, bucket: &str) -> Self { + Self { + client, + bucket: bucket.into(), + } + } + + pub(super) fn get(&self, path: &str) -> Result { + let res = self + .client + .get_object(GetObjectRequest { + bucket: self.bucket.clone(), + key: path.into(), + ..Default::default() + }) + .sync()?; + + let mut b = res.body.unwrap().into_blocking_read(); + let mut content = Vec::with_capacity( + res.content_length + .and_then(|l| l.try_into().ok()) + .unwrap_or(0), + ); + b.read_to_end(&mut content).unwrap(); + + let date_updated = parse_timespec(&res.last_modified.unwrap())?; + + Ok(Blob { + path: path.into(), + mime: res.content_type.unwrap(), + date_updated, + content, + }) + } +} + +fn parse_timespec(raw: &str) -> Result { + Ok(time::strptime(raw, "%a, %d %b %Y %H:%M:%S %Z")?.to_timespec()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_timespec() { + crate::test::wrapper(|_| { + // Test valid conversions + assert_eq!( + parse_timespec("Thu, 1 Jan 1970 00:00:00 GMT")?, + Timespec::new(0, 0) + ); + assert_eq!( + parse_timespec("Mon, 16 Apr 2018 04:33:50 GMT")?, + Timespec::new(1523853230, 0) + ); + + // Test invalid conversion + assert!(parse_timespec("foo").is_err()); + + Ok(()) + }) + } +} From fc944a183af38ef5a73f67edc51f41fb30b90019 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Sun, 15 Mar 2020 14:40:58 -0400 Subject: [PATCH 03/38] Add some impls --- src/storage/mod.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 5bb300e38..0288611a6 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -27,3 +27,15 @@ impl Storage<'_> { } } } + +impl<'a> From> for Storage<'a> { + fn from(db: DatabaseBackend<'a>) -> Self { + Self::Database(db) + } +} + +impl<'a> From> for Storage<'a> { + fn from(db: S3Backend<'a>) -> Self { + Self::S3(db) + } +} From b868fc53f54a91c86de3a0179e371d68fff37645 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Sun, 15 Mar 2020 14:41:05 -0400 Subject: [PATCH 04/38] Remove unnecessary copy --- src/storage/s3.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/storage/s3.rs b/src/storage/s3.rs index fb0de3800..cd0471c94 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -7,14 +7,14 @@ use time::Timespec; pub(crate) struct S3Backend<'a> { client: &'a S3Client, - bucket: String, + bucket: &'a str, } impl<'a> S3Backend<'a> { - pub(super) fn new(client: &'a S3Client, bucket: &str) -> Self { + pub(super) fn new(client: &'a S3Client, bucket: &'a str) -> Self { Self { client, - bucket: bucket.into(), + bucket, } } @@ -22,7 +22,7 @@ impl<'a> S3Backend<'a> { let res = self .client .get_object(GetObjectRequest { - bucket: self.bucket.clone(), + bucket: self.bucket.to_string(), key: path.into(), ..Default::default() }) From 15ec1c69478ca7f58c8c41f4acb2a6c0bc0e1b00 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Sun, 15 Mar 2020 14:50:31 -0400 Subject: [PATCH 05/38] Use new Storage backend --- src/db/file.rs | 61 ++++++----------------------------------- src/storage/database.rs | 3 +- src/storage/mod.rs | 4 +-- src/storage/s3.rs | 2 +- 4 files changed, 12 insertions(+), 58 deletions(-) diff --git a/src/db/file.rs b/src/db/file.rs index ca912075c..f696b0e0c 100644 --- a/src/db/file.rs +++ b/src/db/file.rs @@ -60,60 +60,15 @@ fn get_file_list>(path: P) -> Result> { } pub(crate) fn get_path(conn: &Connection, path: &str) -> Option { - if let Some(client) = s3_client() { - let res = client - .get_object(GetObjectRequest { - bucket: S3_BUCKET_NAME.into(), - key: path.into(), - ..Default::default() - }) - .sync(); - - let res = match res { - Ok(r) => r, - Err(_) => { - return None; - } - }; - - let mut b = res.body.unwrap().into_blocking_read(); - let mut content = Vec::new(); - b.read_to_end(&mut content).unwrap(); - - let last_modified = res.last_modified.unwrap(); - let last_modified = time::strptime(&last_modified, "%a, %d %b %Y %H:%M:%S %Z") - .unwrap_or_else(|e| panic!("failed to parse {:?} as timespec: {:?}", last_modified, e)) - .to_timespec(); - - Some(Blob { - path: path.into(), - mime: res.content_type.unwrap(), - date_updated: last_modified, - content, - }) + use crate::storage::{DatabaseBackend, S3Backend, Storage}; + let client; + let backend = if let Some(c) = s3_client() { + client = c; + Storage::from(S3Backend::new(&client, S3_BUCKET_NAME)) } else { - let rows = conn - .query( - "SELECT path, mime, date_updated, content - FROM files - WHERE path = $1", - &[&path], - ) - .unwrap(); - - if rows.is_empty() { - None - } else { - let row = rows.get(0); - - Some(Blob { - path: row.get(0), - mime: row.get(1), - date_updated: row.get(2), - content: row.get(3), - }) - } - } + DatabaseBackend::new(conn).into() + }; + backend.get(path).ok() } pub(super) fn s3_client() -> Option { diff --git a/src/storage/database.rs b/src/storage/database.rs index d2ca88948..a6fec5494 100644 --- a/src/storage/database.rs +++ b/src/storage/database.rs @@ -1,7 +1,6 @@ use super::Blob; use failure::{Error, Fail}; use postgres::Connection; -use time::Timespec; #[derive(Debug, Fail)] #[fail(display = "the path is not present in the database")] @@ -12,7 +11,7 @@ pub(crate) struct DatabaseBackend<'a> { } impl<'a> DatabaseBackend<'a> { - pub(super) fn new(conn: &'a Connection) -> Self { + pub(crate) fn new(conn: &'a Connection) -> Self { Self { conn } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 0288611a6..bba476064 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,8 +1,8 @@ mod database; mod s3; -use self::database::DatabaseBackend; -use self::s3::S3Backend; +pub(crate) use self::database::DatabaseBackend; +pub(crate) use self::s3::S3Backend; use failure::Error; use time::Timespec; diff --git a/src/storage/s3.rs b/src/storage/s3.rs index cd0471c94..a93d6bf50 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -11,7 +11,7 @@ pub(crate) struct S3Backend<'a> { } impl<'a> S3Backend<'a> { - pub(super) fn new(client: &'a S3Client, bucket: &'a str) -> Self { + pub(crate) fn new(client: &'a S3Client, bucket: &'a str) -> Self { Self { client, bucket, From 75e30f7401108b3babd25a8fd715b086fe69bcc9 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Sun, 15 Mar 2020 17:21:46 -0400 Subject: [PATCH 06/38] [BROKEN] try to add rusoto_mock --- Cargo.lock | 15 ++++++++++++++ Cargo.toml | 1 + src/storage/database.rs | 1 + src/storage/mod.rs | 2 +- src/storage/s3.rs | 44 +++++++++++++++++++++++++++++++++++++++++ src/test/fakes.rs | 36 +++++++++++++++++++++++++++++++++ src/test/mod.rs | 5 +++++ 7 files changed, 103 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 174df43eb..d520d9da3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -385,6 +385,7 @@ dependencies = [ "router 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_core 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_credential 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rusoto_mock 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_s3 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)", "rustwide 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2279,6 +2280,19 @@ dependencies = [ "tokio-timer 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rusoto_mock" +version = "0.40.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "chrono 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "rusoto_core 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.32 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rusoto_s3" version = "0.40.0" @@ -3716,6 +3730,7 @@ dependencies = [ "checksum router 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b9b1797ff166029cb632237bb5542696e54961b4cf75a324c6f05c9cf0584e4e" "checksum rusoto_core 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)" = "dd1a1069ba04874a485528d1602fab4569f2434a5547614428e2cc22b91bfb71" "checksum rusoto_credential 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b0d6cc3a602f01b9c5a04c8ed4ee281b789c5b2692d93202367c9b99ebc022ed" +"checksum rusoto_mock 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6b22b45fb22064594c5f48284ba361efa90fc711662dd9744bb388927b67c82a" "checksum rusoto_s3 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4da6eac54781d2aac517a99f1d85d0d6a78674543f8d122d884628c1ff21b495" "checksum rust-argon2 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2bc8af4bda8e1ff4932523b94d3dd20ee30a87232323eda55903ffd71d2fb017" "checksum rustc-demangle 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "4c691c0e608126e00913e33f0ccf3727d5fc84573623b8d65b2df340b5201783" diff --git a/Cargo.toml b/Cargo.toml index b94314e71..d32da6b16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,7 @@ once_cell = "1.2.0" kuchiki = "0.8" criterion = "0.3" rand = "0.7.3" +rusoto_mock = "0.40" [[bench]] name = "html5ever" diff --git a/src/storage/database.rs b/src/storage/database.rs index a6fec5494..2249621d7 100644 --- a/src/storage/database.rs +++ b/src/storage/database.rs @@ -37,6 +37,7 @@ impl<'a> DatabaseBackend<'a> { #[cfg(test)] mod tests { + use time::Timespec; use super::*; #[test] diff --git a/src/storage/mod.rs b/src/storage/mod.rs index bba476064..20e981d2f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -6,7 +6,7 @@ pub(crate) use self::s3::S3Backend; use failure::Error; use time::Timespec; -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Hash)] pub(crate) struct Blob { pub(crate) path: String, pub(crate) mime: String, diff --git a/src/storage/s3.rs b/src/storage/s3.rs index a93d6bf50..a2dc102f6 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -74,4 +74,48 @@ mod tests { Ok(()) }) } + + #[test] + fn test_path_get() { + crate::test::wrapper(|env| { + let blob = Blob { + path: "dir/foo.txt".into(), + mime: "text/plain".into(), + date_updated: Timespec::new(42, 0), + content: "Hello world!".into(), + }; + + // Add a test file to the database + let s3 = env.s3_upload(blob); + + let backend = S3Backend::new(&s3.client, ""); + + // Test that the proper file was returned + assert_eq!( + Blob { + path: "dir/foo.txt".into(), + mime: "text/plain".into(), + date_updated: Timespec::new(42, 0), + content: "Hello world!".bytes().collect(), + }, + backend.get("dir/foo.txt")? + ); + + /* + // Test that other files are not returned + assert!(backend + .get("dir/bar.txt") + .unwrap_err() + .downcast_ref::() + .is_some()); + assert!(backend + .get("foo.txt") + .unwrap_err() + .downcast_ref::() + .is_some()); + */ + + Ok(()) + }); + } } diff --git a/src/test/fakes.rs b/src/test/fakes.rs index a56a4afe6..c4f80f035 100644 --- a/src/test/fakes.rs +++ b/src/test/fakes.rs @@ -201,3 +201,39 @@ impl<'a> FakeRelease<'a> { Ok(release_id) } } + +use crate::storage::Blob; +use rusoto_mock::{MockCredentialsProvider, MockRequestDispatcher}; +use rusoto_s3::S3Client; +enum State { + NoRequestMade { path: String }, + RequestMade { found: bool }, +} +pub(crate) struct FakeUpload { state: State, pub(crate) client: S3Client } + +impl FakeUpload { + pub(crate) fn new(blob: Blob) -> Self { + let state = State::NoRequestMade { path: blob.path }; + // TODO: different behavior with different filenames + // TODO: is this even possible with `MockRequestDispatcher`? + let dispatcher = MockRequestDispatcher::default() + .with_body(&String::from_utf8(blob.content).unwrap()) + .with_header("Content-Type", &blob.mime) + //.with_header("Last-Modified", blob.date_updated.to_string()) + .with_request_checker(move |req| { + match &state { + State::NoRequestMade { path } => { + state = State::RequestMade { found: req.path == *path }; + } + _ => {} + } + }); + + let client = S3Client::new_with( + dispatcher, + MockCredentialsProvider, + Default::default() + ); + Self { client, state } + } +} \ No newline at end of file diff --git a/src/test/mod.rs b/src/test/mod.rs index 0be67c5f9..3e6cb85db 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -1,5 +1,6 @@ mod fakes; +use crate::storage::Blob; use crate::web::Server; use failure::Error; use log::error; @@ -115,6 +116,10 @@ impl TestEnvironment { pub(crate) fn frontend(&self) -> &TestFrontend { self.frontend.get_or_init(|| TestFrontend::new(self.db())) } + + pub(crate) fn s3_upload(&mut self, blob: Blob) -> fakes::FakeUpload { + fakes::FakeUpload::new(blob) + } } pub(crate) struct TestDatabase { From 7727a98596b5f355d0a53ec365577733bc24bd25 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Sun, 15 Mar 2020 20:50:43 -0400 Subject: [PATCH 07/38] Get basic tests working --- src/storage/mod.rs | 4 +++- src/storage/s3.rs | 21 +++++++++------------ src/test/fakes.rs | 31 +++++++++++++------------------ src/test/mod.rs | 4 ++-- 4 files changed, 27 insertions(+), 33 deletions(-) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 20e981d2f..cb99d7460 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -3,10 +3,12 @@ mod s3; pub(crate) use self::database::DatabaseBackend; pub(crate) use self::s3::S3Backend; +#[cfg(test)] +pub(crate) use self::s3::TIME_FMT; use failure::Error; use time::Timespec; -#[derive(Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub(crate) struct Blob { pub(crate) path: String, pub(crate) mime: String, diff --git a/src/storage/s3.rs b/src/storage/s3.rs index a2dc102f6..93fbd3f4f 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -47,8 +47,13 @@ impl<'a> S3Backend<'a> { } } +#[cfg(not(test))] +const TIME_FMT: &str = "%a, %d %b %Y %H:%M:%S %Z"; +#[cfg(test)] +pub(crate) const TIME_FMT: &str = "%a, %d %b %Y %H:%M:%S %Z"; + fn parse_timespec(raw: &str) -> Result { - Ok(time::strptime(raw, "%a, %d %b %Y %H:%M:%S %Z")?.to_timespec()) + Ok(time::strptime(raw, TIME_FMT)?.to_timespec()) } #[cfg(test)] @@ -86,20 +91,12 @@ mod tests { }; // Add a test file to the database - let s3 = env.s3_upload(blob); + let s3 = env.s3_upload(blob.clone(), ""); - let backend = S3Backend::new(&s3.client, ""); + let backend = S3Backend::new(&s3.client, &s3.bucket); // Test that the proper file was returned - assert_eq!( - Blob { - path: "dir/foo.txt".into(), - mime: "text/plain".into(), - date_updated: Timespec::new(42, 0), - content: "Hello world!".bytes().collect(), - }, - backend.get("dir/foo.txt")? - ); + assert_eq!(blob, backend.get("dir/foo.txt")?); /* // Test that other files are not returned diff --git a/src/test/fakes.rs b/src/test/fakes.rs index c4f80f035..ad9841e2e 100644 --- a/src/test/fakes.rs +++ b/src/test/fakes.rs @@ -202,31 +202,26 @@ impl<'a> FakeRelease<'a> { } } -use crate::storage::Blob; -use rusoto_mock::{MockCredentialsProvider, MockRequestDispatcher}; +use crate::storage::{Blob, TIME_FMT}; use rusoto_s3::S3Client; -enum State { - NoRequestMade { path: String }, - RequestMade { found: bool }, +pub(crate) struct FakeUpload { + pub(crate) client: S3Client, + pub(crate) bucket: &'static str, } -pub(crate) struct FakeUpload { state: State, pub(crate) client: S3Client } impl FakeUpload { - pub(crate) fn new(blob: Blob) -> Self { - let state = State::NoRequestMade { path: blob.path }; - // TODO: different behavior with different filenames - // TODO: is this even possible with `MockRequestDispatcher`? + pub(crate) fn new(blob: Blob, bucket: &'static str) -> Self { + use rusoto_mock::{MockCredentialsProvider, MockRequestDispatcher}; + + let path = blob.path; + let datetime = time::at_utc(blob.date_updated); + let datestring = time::strftime(TIME_FMT, &datetime).unwrap(); let dispatcher = MockRequestDispatcher::default() .with_body(&String::from_utf8(blob.content).unwrap()) .with_header("Content-Type", &blob.mime) - //.with_header("Last-Modified", blob.date_updated.to_string()) + .with_header("Last-Modified", &datestring) .with_request_checker(move |req| { - match &state { - State::NoRequestMade { path } => { - state = State::RequestMade { found: req.path == *path }; - } - _ => {} - } + assert_eq!(req.path, format!("/{}/{}", bucket, path)); }); let client = S3Client::new_with( @@ -234,6 +229,6 @@ impl FakeUpload { MockCredentialsProvider, Default::default() ); - Self { client, state } + Self { client, bucket } } } \ No newline at end of file diff --git a/src/test/mod.rs b/src/test/mod.rs index 3e6cb85db..3274cd6ba 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -117,8 +117,8 @@ impl TestEnvironment { self.frontend.get_or_init(|| TestFrontend::new(self.db())) } - pub(crate) fn s3_upload(&mut self, blob: Blob) -> fakes::FakeUpload { - fakes::FakeUpload::new(blob) + pub(crate) fn s3_upload(&self, blob: Blob, bucket: &'static str) -> fakes::FakeUpload { + fakes::FakeUpload::new(blob, bucket) } } From b980ebbf3b17b51ced166e19fcdabd2e0d2495b9 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Sun, 15 Mar 2020 21:50:45 -0400 Subject: [PATCH 08/38] Add 404 test --- src/storage/s3.rs | 31 ++++++++++++++++++------------- src/test/fakes.rs | 15 ++++++++++++++- src/test/mod.rs | 17 +++++++++++++++-- 3 files changed, 47 insertions(+), 16 deletions(-) diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 93fbd3f4f..9a324fc86 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -58,8 +58,23 @@ fn parse_timespec(raw: &str) -> Result { #[cfg(test)] mod tests { + use crate::test::TestEnvironment; use super::*; + fn assert_s3_404(env: &TestEnvironment, path: &'static str) { + use rusoto_core::RusotoError; + use rusoto_s3::GetObjectError; + + let s3 = env.s3().not_found(path); + let backend = S3Backend::new(&s3.client, s3.bucket); + let err = backend.get(path).unwrap_err(); + let status = match err.downcast_ref::>().expect("wanted GetObject") { + RusotoError::Unknown(http) => http.status, + _ => panic!("wrong error"), + }; + assert_eq!(status, 404); + } + #[test] fn test_parse_timespec() { crate::test::wrapper(|_| { @@ -91,26 +106,16 @@ mod tests { }; // Add a test file to the database - let s3 = env.s3_upload(blob.clone(), ""); + let s3 = env.s3().upload(blob.clone()); let backend = S3Backend::new(&s3.client, &s3.bucket); // Test that the proper file was returned assert_eq!(blob, backend.get("dir/foo.txt")?); - /* // Test that other files are not returned - assert!(backend - .get("dir/bar.txt") - .unwrap_err() - .downcast_ref::() - .is_some()); - assert!(backend - .get("foo.txt") - .unwrap_err() - .downcast_ref::() - .is_some()); - */ + assert_s3_404(&env, "dir/bar.txt"); + assert_s3_404(&env, "foo.txt"); Ok(()) }); diff --git a/src/test/fakes.rs b/src/test/fakes.rs index ad9841e2e..32afaa4f1 100644 --- a/src/test/fakes.rs +++ b/src/test/fakes.rs @@ -204,6 +204,8 @@ impl<'a> FakeRelease<'a> { use crate::storage::{Blob, TIME_FMT}; use rusoto_s3::S3Client; +use rusoto_mock::{MockCredentialsProvider, MockRequestDispatcher}; + pub(crate) struct FakeUpload { pub(crate) client: S3Client, pub(crate) bucket: &'static str, @@ -211,7 +213,6 @@ pub(crate) struct FakeUpload { impl FakeUpload { pub(crate) fn new(blob: Blob, bucket: &'static str) -> Self { - use rusoto_mock::{MockCredentialsProvider, MockRequestDispatcher}; let path = blob.path; let datetime = time::at_utc(blob.date_updated); @@ -231,4 +232,16 @@ impl FakeUpload { ); Self { client, bucket } } + pub(crate) fn not_found(path: &'static str, bucket: &'static str) -> Self { + let dispatcher = MockRequestDispatcher::with_status(404) + .with_request_checker(move |req| { + assert_eq!(req.path, format!("/{}/{}", bucket, path)); + }); + let client = S3Client::new_with( + dispatcher, + MockCredentialsProvider, + Default::default() + ); + Self { client, bucket } + } } \ No newline at end of file diff --git a/src/test/mod.rs b/src/test/mod.rs index 3274cd6ba..1794b4f81 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -90,6 +90,7 @@ pub(crate) fn assert_redirect( pub(crate) struct TestEnvironment { db: OnceCell, frontend: OnceCell, + s3: OnceCell, } impl TestEnvironment { @@ -99,6 +100,7 @@ impl TestEnvironment { Self { db: OnceCell::new(), frontend: OnceCell::new(), + s3: OnceCell::new(), } } @@ -117,8 +119,8 @@ impl TestEnvironment { self.frontend.get_or_init(|| TestFrontend::new(self.db())) } - pub(crate) fn s3_upload(&self, blob: Blob, bucket: &'static str) -> fakes::FakeUpload { - fakes::FakeUpload::new(blob, bucket) + pub(crate) fn s3(&self) -> &TestS3 { + self.s3.get_or_init(|| TestS3 { bucket: "" }) } } @@ -191,3 +193,14 @@ impl TestFrontend { self.build_request(Method::GET, url) } } + +pub(crate) struct TestS3 { bucket: &'static str } + +impl TestS3 { + pub(crate) fn upload(&self, blob: Blob) -> fakes::FakeUpload { + fakes::FakeUpload::new(blob, self.bucket) + } + pub(crate) fn not_found(&self, path: &'static str) -> fakes::FakeUpload { + fakes::FakeUpload::not_found(path, &self.bucket) + } +} From 2cc30a6af3a56276a65b46e57d70c0b0334d61f2 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Sun, 15 Mar 2020 21:55:44 -0400 Subject: [PATCH 09/38] Refactor out shared code --- src/test/fakes.rs | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/src/test/fakes.rs b/src/test/fakes.rs index 32afaa4f1..28a2d951e 100644 --- a/src/test/fakes.rs +++ b/src/test/fakes.rs @@ -213,30 +213,24 @@ pub(crate) struct FakeUpload { impl FakeUpload { pub(crate) fn new(blob: Blob, bucket: &'static str) -> Self { - - let path = blob.path; let datetime = time::at_utc(blob.date_updated); let datestring = time::strftime(TIME_FMT, &datetime).unwrap(); let dispatcher = MockRequestDispatcher::default() .with_body(&String::from_utf8(blob.content).unwrap()) .with_header("Content-Type", &blob.mime) - .with_header("Last-Modified", &datestring) - .with_request_checker(move |req| { - assert_eq!(req.path, format!("/{}/{}", bucket, path)); - }); - - let client = S3Client::new_with( - dispatcher, - MockCredentialsProvider, - Default::default() - ); - Self { client, bucket } + .with_header("Last-Modified", &datestring); + Self::with_dispatcher(blob.path, bucket, dispatcher) } + pub(crate) fn not_found(path: &'static str, bucket: &'static str) -> Self { - let dispatcher = MockRequestDispatcher::with_status(404) - .with_request_checker(move |req| { - assert_eq!(req.path, format!("/{}/{}", bucket, path)); - }); + Self::with_dispatcher(path, bucket, MockRequestDispatcher::with_status(404)) + } + + fn with_dispatcher(path: S, bucket: &'static str, dispatcher: MockRequestDispatcher) -> Self + where S: AsRef + std::fmt::Display + Send + Sync + 'static { + let dispatcher = dispatcher.with_request_checker(move |req| { + assert_eq!(req.path, format!("/{}/{}", bucket, path)); + }); let client = S3Client::new_with( dispatcher, MockCredentialsProvider, @@ -244,4 +238,4 @@ impl FakeUpload { ); Self { client, bucket } } -} \ No newline at end of file +} From 8c44199d3b17db341d2f2941647ca6b915e76497 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Mon, 16 Mar 2020 20:25:16 -0400 Subject: [PATCH 10/38] Split uploading into different functions --- src/db/file.rs | 238 ++++------------------------------------ src/storage/database.rs | 18 ++- src/storage/mod.rs | 186 +++++++++++++++++++++++++++++++ src/storage/s3.rs | 37 ++++++- 4 files changed, 260 insertions(+), 219 deletions(-) diff --git a/src/db/file.rs b/src/db/file.rs index f696b0e0c..c7c4ac9db 100644 --- a/src/db/file.rs +++ b/src/db/file.rs @@ -12,53 +12,15 @@ use rusoto_core::region::Region; use rusoto_credential::DefaultCredentialsProvider; use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3}; use rustc_serialize::json::{Json, ToJson}; -use std::cmp; -use std::ffi::OsStr; -use std::fs; -use std::io::Read; -use std::path::{Path, PathBuf}; +use error::Result; +use rusoto_s3::{S3, PutObjectRequest, S3Client}; +use rusoto_core::region::Region; +use rusoto_credential::DefaultCredentialsProvider; pub(crate) use crate::storage::Blob; -const MAX_CONCURRENT_UPLOADS: usize = 50; - pub(super) static S3_BUCKET_NAME: &str = "rust-docs-rs"; -fn get_file_list_from_dir>(path: P, files: &mut Vec) -> Result<()> { - let path = path.as_ref(); - - for file in path.read_dir()? { - let file = file?; - - if file.file_type()?.is_file() { - files.push(file.path()); - } else if file.file_type()?.is_dir() { - get_file_list_from_dir(file.path(), files)?; - } - } - - Ok(()) -} - -fn get_file_list>(path: P) -> Result> { - let path = path.as_ref(); - let mut files = Vec::new(); - - if !path.exists() { - return Err(err_msg("File not found")); - } else if path.is_file() { - files.push(PathBuf::from(path.file_name().unwrap())); - } else if path.is_dir() { - get_file_list_from_dir(path, &mut files)?; - for file_path in &mut files { - // We want the paths in this list to not be {path}/bar.txt but just bar.txt - *file_path = PathBuf::from(file_path.strip_prefix(path).unwrap()); - } - } - - Ok(files) -} - pub(crate) fn get_path(conn: &Connection, path: &str) -> Option { use crate::storage::{DatabaseBackend, S3Backend, Storage}; let client; @@ -108,150 +70,30 @@ pub(super) fn s3_client() -> Option { /// /// Note that this function is used for uploading both sources /// and files generated by rustdoc. -pub fn add_path_into_database>( - conn: &Connection, - prefix: &str, - path: P, -) -> Result { - use futures::future::Future; - use std::collections::HashMap; - - let trans = conn.transaction()?; - let mut file_paths_and_mimes: HashMap = HashMap::new(); - - let mut rt = ::tokio::runtime::Runtime::new().unwrap(); - - let mut to_upload = get_file_list(&path)?; - let mut batch_size = cmp::min(to_upload.len(), MAX_CONCURRENT_UPLOADS); - let mut currently_uploading: Vec<_> = to_upload.drain(..batch_size).collect(); - let mut attempts = 0; - - while !to_upload.is_empty() || !currently_uploading.is_empty() { - let mut futures = Vec::new(); - let client = s3_client(); - - for file_path in ¤tly_uploading { - let path = Path::new(path.as_ref()).join(&file_path); - // Some files have insufficient permissions (like .lock file created by cargo in - // documentation directory). We are skipping this files. - let mut file = match fs::File::open(path) { - Ok(f) => f, - Err(_) => continue, - }; - let mut content: Vec = Vec::new(); - file.read_to_end(&mut content)?; - - let bucket_path = Path::new(prefix).join(&file_path); - - #[cfg(windows)] // On windows, we need to normalize \\ to / so the route logic works - let bucket_path = path_slash::PathBufExt::to_slash(&bucket_path).unwrap(); - #[cfg(not(windows))] - let bucket_path = bucket_path.into_os_string().into_string().unwrap(); - - let mime = detect_mime(&file_path)?; - - if let Some(client) = &client { - futures.push( - client - .put_object(PutObjectRequest { - bucket: S3_BUCKET_NAME.into(), - key: bucket_path.clone(), - body: Some(content.into()), - content_type: Some(mime.to_owned()), - ..Default::default() - }) - .inspect(|_| { - crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1); - }), - ); - } else { - // If AWS credentials are configured, don't insert/update the database - // check if file already exists in database - let rows = conn.query( - "SELECT COUNT(*) FROM files WHERE path = $1", - &[&bucket_path], - )?; - - if rows.get(0).get::(0) == 0 { - trans.query( - "INSERT INTO files (path, mime, content) VALUES ($1, $2, $3)", - &[&bucket_path, &mime, &content], - )?; - } else { - trans.query( - "UPDATE files SET mime = $2, content = $3, date_updated = NOW() \ - WHERE path = $1", - &[&bucket_path, &mime, &content], - )?; - } - } - - file_paths_and_mimes.insert(file_path.clone(), mime.to_owned()); - } - - if !futures.is_empty() { - attempts += 1; - - match rt.block_on(::futures::future::join_all(futures)) { - Ok(_) => { - // this batch was successful, start another batch if there are still more files - batch_size = cmp::min(to_upload.len(), MAX_CONCURRENT_UPLOADS); - currently_uploading = to_upload.drain(..batch_size).collect(); - attempts = 0; - } - Err(err) => { - error!("failed to upload to s3: {:?}", err); - // if any futures error, leave `currently_uploading` in tact so that we can retry the batch - if attempts > 2 { - panic!("failed to upload 3 times, exiting"); - } - } - } - } else { - batch_size = cmp::min(to_upload.len(), MAX_CONCURRENT_UPLOADS); - currently_uploading = to_upload.drain(..batch_size).collect(); - } - } - - trans.commit()?; - - let file_list_with_mimes: Vec<(String, PathBuf)> = file_paths_and_mimes - .into_iter() - .map(|(file_path, mime)| (mime, file_path)) - .collect(); - file_list_to_json(file_list_with_mimes) +pub fn add_path_into_database>(conn: &Connection, + prefix: &str, + path: P) + -> Result { + use crate::storage::{Storage, DatabaseBackend, S3Backend}; + let client; + let backend = if let Some(c) = s3_client() { + client = c; + Storage::from(S3Backend::new(&client, S3_BUCKET_NAME)) + } else { + DatabaseBackend::new(conn).into() + }; + let file_list = backend.store_all(conn, prefix, path.as_ref())?; + file_list_to_json(file_list.into_iter().collect()) } -fn detect_mime(file_path: &Path) -> Result<&'static str> { - let mime = mime_guess::from_path(file_path) - .first_raw() - .map(|m| m) - .unwrap_or("text/plain"); - Ok(match mime { - "text/plain" | "text/troff" | "text/x-markdown" | "text/x-rust" | "text/x-toml" => { - match file_path.extension().and_then(OsStr::to_str) { - Some("md") => "text/markdown", - Some("rs") => "text/rust", - Some("markdown") => "text/markdown", - Some("css") => "text/css", - Some("toml") => "text/toml", - Some("js") => "application/javascript", - Some("json") => "application/json", - _ => mime, - } - } - "image/svg" => "image/svg+xml", - _ => mime, - }) -} +fn file_list_to_json(file_list: Vec<(PathBuf, String)>) -> Result { -fn file_list_to_json(file_list: Vec<(String, PathBuf)>) -> Result { let mut file_list_json: Vec = Vec::new(); for file in file_list { let mut v: Vec = Vec::new(); - v.push(file.0.clone()); - v.push(file.1.into_os_string().into_string().unwrap()); + v.push(file.1); + v.push(file.0.into_os_string().into_string().unwrap()); file_list_json.push(v.to_json()); } @@ -309,41 +151,3 @@ pub fn move_to_s3(conn: &Connection, n: usize) -> Result { Ok(count) } - -#[cfg(test)] -mod test { - use super::*; - use std::env; - - #[test] - fn test_get_file_list() { - let _ = env_logger::try_init(); - - let files = get_file_list(env::current_dir().unwrap()); - assert!(files.is_ok()); - assert!(files.unwrap().len() > 0); - - let files = get_file_list(env::current_dir().unwrap().join("Cargo.toml")).unwrap(); - assert_eq!(files[0], std::path::Path::new("Cargo.toml")); - } - #[test] - fn test_mime_types() { - check_mime(".gitignore", "text/plain"); - check_mime("hello.toml", "text/toml"); - check_mime("hello.css", "text/css"); - check_mime("hello.js", "application/javascript"); - check_mime("hello.html", "text/html"); - check_mime("hello.hello.md", "text/markdown"); - check_mime("hello.markdown", "text/markdown"); - check_mime("hello.json", "application/json"); - check_mime("hello.txt", "text/plain"); - check_mime("file.rs", "text/rust"); - check_mime("important.svg", "image/svg+xml"); - } - - fn check_mime(path: &str, expected_mime: &str) { - let detected_mime = detect_mime(Path::new(&path)); - let detected_mime = detected_mime.expect("no mime was given"); - assert_eq!(detected_mime, expected_mime); - } -} diff --git a/src/storage/database.rs b/src/storage/database.rs index 2249621d7..d929fd280 100644 --- a/src/storage/database.rs +++ b/src/storage/database.rs @@ -1,6 +1,6 @@ use super::Blob; use failure::{Error, Fail}; -use postgres::Connection; +use postgres::{Connection, transaction::Transaction}; #[derive(Debug, Fail)] #[fail(display = "the path is not present in the database")] @@ -33,6 +33,22 @@ impl<'a> DatabaseBackend<'a> { }) } } + pub(super) fn store_batch(&self, batch: &[Blob], trans: &Transaction) -> Result<(), Error> { + for blob in batch { + // check if file already exists in database + let rows = self.conn.query("SELECT COUNT(*) FROM files WHERE path = $1", &[&blob.path])?; + + if rows.get(0).get::(0) == 0 { + trans.query("INSERT INTO files (path, mime, content) VALUES ($1, $2, $3)", + &[&blob.path, &blob.mime, &blob.content])?; + } else { + trans.query("UPDATE files SET mime = $2, content = $3, date_updated = NOW() \ + WHERE path = $1", + &[&blob.path, &blob.mime, &blob.content])?; + } + } + Ok(()) + } } #[cfg(test)] diff --git a/src/storage/mod.rs b/src/storage/mod.rs index cb99d7460..1ca81f560 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -8,6 +8,16 @@ pub(crate) use self::s3::TIME_FMT; use failure::Error; use time::Timespec; +use std::collections::HashMap; +use std::path::{PathBuf, Path}; +use postgres::{Connection, transaction::Transaction}; +use std::fs; +use std::io::Read; +use failure::err_msg; +use std::ffi::OsStr; +#[cfg(not(windows))] +use magic::{Cookie, flags}; + #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub(crate) struct Blob { pub(crate) path: String, @@ -16,6 +26,44 @@ pub(crate) struct Blob { pub(crate) content: Vec, } +fn get_file_list_from_dir>(path: P, + files: &mut Vec) + -> Result<(), Error> { + let path = path.as_ref(); + + for file in path.read_dir()? { + let file = file?; + + if file.file_type()?.is_file() { + files.push(file.path()); + } else if file.file_type()?.is_dir() { + get_file_list_from_dir(file.path(), files)?; + } + } + + Ok(()) +} + + +pub fn get_file_list>(path: P) -> Result, Error> { + let path = path.as_ref(); + let mut files = Vec::new(); + + if !path.exists() { + return Err(err_msg("File not found")); + } else if path.is_file() { + files.push(PathBuf::from(path.file_name().unwrap())); + } else if path.is_dir() { + get_file_list_from_dir(path, &mut files)?; + for file_path in &mut files { + // We want the paths in this list to not be {path}/bar.txt but just bar.txt + *file_path = PathBuf::from(file_path.strip_prefix(path).unwrap()); + } + } + + Ok(files) +} + pub(crate) enum Storage<'a> { Database(DatabaseBackend<'a>), S3(S3Backend<'a>), @@ -28,6 +76,100 @@ impl Storage<'_> { Self::S3(s3) => s3.get(path), } } + + fn store_batch(&self, batch: &[Blob], trans: &Transaction) -> Result<(), Error> { + match self { + Self::Database(db) => db.store_batch(batch, trans), + Self::S3(s3) => s3.store_batch(batch), + } + } + + pub(crate) fn store_all(&self, conn: &Connection, prefix: &str, root_dir: &Path) -> Result, Error> { + const MAX_CONCURRENT_UPLOADS: usize = 1000; + + let trans = conn.transaction()?; + #[cfg(not(windows))] + let mime_data = load_mime_data()?; + let mut file_paths_and_mimes = HashMap::new(); + + get_file_list(root_dir)?.into_iter() + .filter_map(|file_path| { + // Some files have insufficient permissions + // (like .lock file created by cargo in documentation directory). + // Skip these files. + fs::File::open(root_dir.join(&file_path)) + .ok().map(|file| (file_path, file)) + }).map(|(file_path, mut file)| { + let mut content: Vec = Vec::new(); + file.read_to_end(&mut content)?; + + let bucket_path = Path::new(prefix).join(&file_path); + + #[cfg(windows)] // On windows, we need to normalize \\ to / so the route logic works + let bucket_path = path_slash::PathBufExt::to_slash(&bucket_path).unwrap(); + #[cfg(not(windows))] + let bucket_path = bucket_path.into_os_string().into_string().unwrap(); + + #[cfg(windows)] + let mime = detect_mime(&content, &file_path)?; + #[cfg(not(windows))] + let mime = detect_mime(&content, &file_path, &mime_data)?; + + file_paths_and_mimes.insert(file_path, mime.clone()); + Ok(Blob { + path: bucket_path, + mime, + content, + date_updated: Timespec::new(0, 0), + }) + }) + .collect::, Error>>()? + .chunks(MAX_CONCURRENT_UPLOADS) + .map(|batch| self.store_batch(batch, &trans)) + // exhaust the iterator + .for_each(|_| {}); + + trans.commit()?; + Ok(file_paths_and_mimes) + } +} + +#[cfg(not(windows))] +fn load_mime_data() -> Result { + let cookie = Cookie::open(flags::MIME_TYPE)?; + cookie.load::<&str>(&[])?; + Ok(cookie) +} + +#[cfg(not(windows))] +fn detect_mime(content: &Vec, file_path: &Path, cookie: &Cookie) -> Result { + let mime = cookie.buffer(&content)?; + correct_mime(&mime, &file_path) +} + +#[cfg(windows)] +fn detect_mime(_content: &Vec, file_path: &Path) -> Result { + let mime = mime_guess::from_path(file_path).first_raw().map(|m| m).unwrap_or("text/plain"); + correct_mime(&mime, &file_path) +} + +fn correct_mime(mime: &str, file_path: &Path) -> Result { + Ok(match mime { + "text/plain" | "text/troff" | "text/x-markdown" | "text/x-rust" | "text/x-toml" => { + match file_path.extension().and_then(OsStr::to_str) { + Some("md") => "text/markdown", + Some("rs") => "text/rust", + Some("markdown") => "text/markdown", + Some("css") => "text/css", + Some("toml") => "text/toml", + Some("js") => "application/javascript", + Some("json") => "application/json", + _ => mime + } + }, + "image/svg" => "image/svg+xml", + _ => mime + }.to_owned()) } impl<'a> From> for Storage<'a> { @@ -41,3 +183,47 @@ impl<'a> From> for Storage<'a> { Self::S3(db) } } + +#[cfg(test)] +mod test { + extern crate env_logger; + use std::env; + use super::*; + + #[test] + fn test_get_file_list() { + let _ = env_logger::try_init(); + + let files = get_file_list(env::current_dir().unwrap()); + assert!(files.is_ok()); + assert!(files.unwrap().len() > 0); + + let files = get_file_list(env::current_dir().unwrap().join("Cargo.toml")).unwrap(); + assert_eq!(files[0], std::path::Path::new("Cargo.toml")); + } + #[test] + fn test_mime_types() { + check_mime("/ignored", ".gitignore", "text/plain"); + check_mime("[package]", "hello.toml","text/toml"); + check_mime(".ok { color:red; }", "hello.css","text/css"); + check_mime("var x = 1", "hello.js","application/javascript"); + check_mime("", "hello.html","text/html"); + check_mime("## HELLO", "hello.hello.md","text/markdown"); + check_mime("## WORLD", "hello.markdown","text/markdown"); + check_mime("{}", "hello.json","application/json"); + check_mime("hello world", "hello.txt","text/plain"); + check_mime("//! Simple module to ...", "file.rs", "text/rust"); + check_mime("", "important.svg", "image/svg+xml"); + } + + fn check_mime(content: &str, path: &str, expected_mime: &str) { + #[cfg(not(windows))] + let mime_data = load_mime_data().unwrap(); + #[cfg(windows)] + let detected_mime = detect_mime(&content.as_bytes().to_vec(), Path::new(&path)); + #[cfg(not(windows))] + let detected_mime = detect_mime(&content.as_bytes().to_vec(), Path::new(&path), &mime_data); + let detected_mime = detected_mime.expect("no mime was given"); + assert_eq!(detected_mime, expected_mime); + } +} diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 9a324fc86..fccf486f7 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -1,6 +1,7 @@ use super::Blob; use failure::Error; -use rusoto_s3::{GetObjectRequest, S3Client, S3}; +use futures::Future; +use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3}; use std::convert::TryInto; use std::io::Read; use time::Timespec; @@ -45,6 +46,40 @@ impl<'a> S3Backend<'a> { content, }) } + + pub(super) fn store_batch(&self, batch: &[Blob]) -> Result<(), Error> { + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let mut attempts = 0; + + loop { + let mut futures = Vec::new(); + for blob in batch { + futures.push(self.client.put_object(PutObjectRequest { + bucket: self.bucket.to_string(), + key: blob.path.clone(), + body: Some(blob.content.clone().into()), + content_type: Some(blob.mime.clone()), + ..Default::default() + }).inspect(|_| { + crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1); + })); + } + attempts += 1; + + match rt.block_on(::futures::future::join_all(futures)) { + // this batch was successful, start another batch if there are still more files + Ok(_) => break, + Err(err) => { + error!("failed to upload to s3: {:?}", err); + // if a futures error occurs, retry the batch + if attempts > 2 { + panic!("failed to upload 3 times, exiting"); + } + } + } + } + Ok(()) + } } #[cfg(not(test))] From 42d82c1484b4082ea387b2ef13feaaf6df70fa16 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Mon, 16 Mar 2020 21:35:54 -0400 Subject: [PATCH 11/38] Add tests for `store_all` --- src/storage/mod.rs | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 1ca81f560..5e8050966 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -84,6 +84,12 @@ impl Storage<'_> { } } + // Store all files in `root_dir` into the backend under `prefix`. + // + // If the environmenet is configured with S3 credentials, this will upload to S3; + // otherwise, this will store files in the database. + // + // This returns a HashMap. pub(crate) fn store_all(&self, conn: &Connection, prefix: &str, root_dir: &Path) -> Result, Error> { const MAX_CONCURRENT_UPLOADS: usize = 1000; @@ -188,8 +194,47 @@ impl<'a> From> for Storage<'a> { mod test { extern crate env_logger; use std::env; + use crate::test::wrapper; use super::*; + #[test] + fn test_uploads() { + use std::fs; + let dir = tempdir::TempDir::new("docs.rs-upload-test").unwrap(); + let files = ["Cargo.toml", "src/main.rs"]; + for &file in &files { + let path = dir.path().join(file); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).unwrap(); + } + fs::write(path, "data").expect("failed to write to file"); + } + wrapper(|env| { + let db = env.db(); + let conn = db.conn(); + let backend = Storage::Database(DatabaseBackend::new(&conn)); + let stored_files = backend.store_all(&conn, "rustdoc", dir.path()).unwrap(); + assert_eq!(stored_files.len(), files.len()); + for name in &files { + let name = Path::new(name); + assert!(stored_files.contains_key(name)); + } + assert_eq!(stored_files.get(Path::new("Cargo.toml")).unwrap(), "text/toml"); + assert_eq!(stored_files.get(Path::new("src/main.rs")).unwrap(), "text/rust"); + + let file = backend.get("rustdoc/Cargo.toml").unwrap(); + assert_eq!(file.content, b"data"); + assert_eq!(file.mime, "text/toml"); + assert_eq!(file.path, "rustdoc/Cargo.toml"); + + let file = backend.get("rustdoc/src/main.rs").unwrap(); + assert_eq!(file.content, b"data"); + assert_eq!(file.mime, "text/rust"); + assert_eq!(file.path, "rustdoc/src/main.rs"); + Ok(()) + }) + } + #[test] fn test_get_file_list() { let _ = env_logger::try_init(); From 8bbf8d33f584788f29c04ffc1f48e720bab5a6c0 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Tue, 17 Mar 2020 09:37:45 -0400 Subject: [PATCH 12/38] Clean up store_all a little --- src/storage/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 5e8050966..6724b027a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -126,14 +126,13 @@ impl Storage<'_> { path: bucket_path, mime, content, + // this field is ignored by the backend date_updated: Timespec::new(0, 0), }) }) .collect::, Error>>()? .chunks(MAX_CONCURRENT_UPLOADS) - .map(|batch| self.store_batch(batch, &trans)) - // exhaust the iterator - .for_each(|_| {}); + .try_for_each(|batch| self.store_batch(batch, &trans))?; trans.commit()?; Ok(file_paths_and_mimes) From 8d328fa6c3ec3442dd94f8cf49d7048a09f77d59 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Fri, 10 Apr 2020 19:46:34 -0400 Subject: [PATCH 13/38] Fix typo --- src/storage/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 6724b027a..9e95bbe89 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -86,7 +86,7 @@ impl Storage<'_> { // Store all files in `root_dir` into the backend under `prefix`. // - // If the environmenet is configured with S3 credentials, this will upload to S3; + // If the environment is configured with S3 credentials, this will upload to S3; // otherwise, this will store files in the database. // // This returns a HashMap. From c4f3ecf15614a6a2718c95078531088b546e67e3 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Fri, 10 Apr 2020 20:01:22 -0400 Subject: [PATCH 14/38] Fix rebase conflicts --- Cargo.lock | 8 ++--- src/db/file.rs | 10 ++---- src/storage/mod.rs | 77 +++++++++++++++------------------------------- src/storage/s3.rs | 1 + 4 files changed, 33 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d520d9da3..a231125ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2285,12 +2285,12 @@ name = "rusoto_mock" version = "0.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "chrono 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_core 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.32 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.51 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/src/db/file.rs b/src/db/file.rs index c7c4ac9db..2ea20da60 100644 --- a/src/db/file.rs +++ b/src/db/file.rs @@ -4,18 +4,14 @@ //! They are using so many inodes and it is better to store them in database instead of //! filesystem. This module is adding files into database and retrieving them. +use std::path::{Path, PathBuf}; use crate::error::Result; -use failure::err_msg; -use log::{error, warn}; +use log::warn; use postgres::Connection; use rusoto_core::region::Region; use rusoto_credential::DefaultCredentialsProvider; -use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3}; +use rusoto_s3::{PutObjectRequest, S3Client, S3}; use rustc_serialize::json::{Json, ToJson}; -use error::Result; -use rusoto_s3::{S3, PutObjectRequest, S3Client}; -use rusoto_core::region::Region; -use rusoto_credential::DefaultCredentialsProvider; pub(crate) use crate::storage::Blob; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 9e95bbe89..c4938b8d8 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -15,8 +15,6 @@ use std::fs; use std::io::Read; use failure::err_msg; use std::ffi::OsStr; -#[cfg(not(windows))] -use magic::{Cookie, flags}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub(crate) struct Blob { @@ -94,8 +92,6 @@ impl Storage<'_> { const MAX_CONCURRENT_UPLOADS: usize = 1000; let trans = conn.transaction()?; - #[cfg(not(windows))] - let mime_data = load_mime_data()?; let mut file_paths_and_mimes = HashMap::new(); get_file_list(root_dir)?.into_iter() @@ -116,15 +112,12 @@ impl Storage<'_> { #[cfg(not(windows))] let bucket_path = bucket_path.into_os_string().into_string().unwrap(); - #[cfg(windows)] - let mime = detect_mime(&content, &file_path)?; - #[cfg(not(windows))] - let mime = detect_mime(&content, &file_path, &mime_data)?; + let mime = detect_mime(&file_path)?; - file_paths_and_mimes.insert(file_path, mime.clone()); + file_paths_and_mimes.insert(file_path, mime.to_string()); Ok(Blob { path: bucket_path, - mime, + mime: mime.to_string(), content, // this field is ignored by the backend date_updated: Timespec::new(0, 0), @@ -139,26 +132,11 @@ impl Storage<'_> { } } -#[cfg(not(windows))] -fn load_mime_data() -> Result { - let cookie = Cookie::open(flags::MIME_TYPE)?; - cookie.load::<&str>(&[])?; - Ok(cookie) -} - -#[cfg(not(windows))] -fn detect_mime(content: &Vec, file_path: &Path, cookie: &Cookie) -> Result { - let mime = cookie.buffer(&content)?; - correct_mime(&mime, &file_path) -} - -#[cfg(windows)] -fn detect_mime(_content: &Vec, file_path: &Path) -> Result { - let mime = mime_guess::from_path(file_path).first_raw().map(|m| m).unwrap_or("text/plain"); - correct_mime(&mime, &file_path) -} - -fn correct_mime(mime: &str, file_path: &Path) -> Result { +fn detect_mime(file_path: &Path) -> Result<&'static str, Error> { + let mime = mime_guess::from_path(file_path) + .first_raw() + .map(|m| m) + .unwrap_or("text/plain"); Ok(match mime { "text/plain" | "text/troff" | "text/x-markdown" | "text/x-rust" | "text/x-toml" => { match file_path.extension().and_then(OsStr::to_str) { @@ -169,12 +147,12 @@ fn correct_mime(mime: &str, file_path: &Path) -> Result { Some("toml") => "text/toml", Some("js") => "application/javascript", Some("json") => "application/json", - _ => mime + _ => mime, } - }, + } "image/svg" => "image/svg+xml", - _ => mime - }.to_owned()) + _ => mime, + }) } impl<'a> From> for Storage<'a> { @@ -247,26 +225,21 @@ mod test { } #[test] fn test_mime_types() { - check_mime("/ignored", ".gitignore", "text/plain"); - check_mime("[package]", "hello.toml","text/toml"); - check_mime(".ok { color:red; }", "hello.css","text/css"); - check_mime("var x = 1", "hello.js","application/javascript"); - check_mime("", "hello.html","text/html"); - check_mime("## HELLO", "hello.hello.md","text/markdown"); - check_mime("## WORLD", "hello.markdown","text/markdown"); - check_mime("{}", "hello.json","application/json"); - check_mime("hello world", "hello.txt","text/plain"); - check_mime("//! Simple module to ...", "file.rs", "text/rust"); - check_mime("", "important.svg", "image/svg+xml"); + check_mime(".gitignore", "text/plain"); + check_mime("hello.toml", "text/toml"); + check_mime("hello.css", "text/css"); + check_mime("hello.js", "application/javascript"); + check_mime("hello.html", "text/html"); + check_mime("hello.hello.md", "text/markdown"); + check_mime("hello.markdown", "text/markdown"); + check_mime("hello.json", "application/json"); + check_mime("hello.txt", "text/plain"); + check_mime("file.rs", "text/rust"); + check_mime("important.svg", "image/svg+xml"); } - fn check_mime(content: &str, path: &str, expected_mime: &str) { - #[cfg(not(windows))] - let mime_data = load_mime_data().unwrap(); - #[cfg(windows)] - let detected_mime = detect_mime(&content.as_bytes().to_vec(), Path::new(&path)); - #[cfg(not(windows))] - let detected_mime = detect_mime(&content.as_bytes().to_vec(), Path::new(&path), &mime_data); + fn check_mime(path: &str, expected_mime: &str) { + let detected_mime = detect_mime(Path::new(&path)); let detected_mime = detected_mime.expect("no mime was given"); assert_eq!(detected_mime, expected_mime); } diff --git a/src/storage/s3.rs b/src/storage/s3.rs index fccf486f7..c859bdc19 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -5,6 +5,7 @@ use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3}; use std::convert::TryInto; use std::io::Read; use time::Timespec; +use log::error; pub(crate) struct S3Backend<'a> { client: &'a S3Client, From 6d187903b02e3a494d351be7c12bd198dabe948e Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Fri, 10 Apr 2020 20:09:54 -0400 Subject: [PATCH 15/38] Run rustfmt --- src/db/file.rs | 14 +++--- src/storage/database.rs | 20 ++++++--- src/storage/mod.rs | 99 +++++++++++++++++++++++------------------ src/storage/s3.rs | 34 +++++++------- src/test/fakes.rs | 12 +++-- src/test/mod.rs | 8 +++- 6 files changed, 105 insertions(+), 82 deletions(-) diff --git a/src/db/file.rs b/src/db/file.rs index 2ea20da60..5b907c636 100644 --- a/src/db/file.rs +++ b/src/db/file.rs @@ -4,7 +4,6 @@ //! They are using so many inodes and it is better to store them in database instead of //! filesystem. This module is adding files into database and retrieving them. -use std::path::{Path, PathBuf}; use crate::error::Result; use log::warn; use postgres::Connection; @@ -12,6 +11,7 @@ use rusoto_core::region::Region; use rusoto_credential::DefaultCredentialsProvider; use rusoto_s3::{PutObjectRequest, S3Client, S3}; use rustc_serialize::json::{Json, ToJson}; +use std::path::{Path, PathBuf}; pub(crate) use crate::storage::Blob; @@ -66,11 +66,12 @@ pub(super) fn s3_client() -> Option { /// /// Note that this function is used for uploading both sources /// and files generated by rustdoc. -pub fn add_path_into_database>(conn: &Connection, - prefix: &str, - path: P) - -> Result { - use crate::storage::{Storage, DatabaseBackend, S3Backend}; +pub fn add_path_into_database>( + conn: &Connection, + prefix: &str, + path: P, +) -> Result { + use crate::storage::{DatabaseBackend, S3Backend, Storage}; let client; let backend = if let Some(c) = s3_client() { client = c; @@ -83,7 +84,6 @@ pub fn add_path_into_database>(conn: &Connection, } fn file_list_to_json(file_list: Vec<(PathBuf, String)>) -> Result { - let mut file_list_json: Vec = Vec::new(); for file in file_list { diff --git a/src/storage/database.rs b/src/storage/database.rs index d929fd280..52629eb3c 100644 --- a/src/storage/database.rs +++ b/src/storage/database.rs @@ -1,6 +1,6 @@ use super::Blob; use failure::{Error, Fail}; -use postgres::{Connection, transaction::Transaction}; +use postgres::{transaction::Transaction, Connection}; #[derive(Debug, Fail)] #[fail(display = "the path is not present in the database")] @@ -36,15 +36,21 @@ impl<'a> DatabaseBackend<'a> { pub(super) fn store_batch(&self, batch: &[Blob], trans: &Transaction) -> Result<(), Error> { for blob in batch { // check if file already exists in database - let rows = self.conn.query("SELECT COUNT(*) FROM files WHERE path = $1", &[&blob.path])?; + let rows = self + .conn + .query("SELECT COUNT(*) FROM files WHERE path = $1", &[&blob.path])?; if rows.get(0).get::(0) == 0 { - trans.query("INSERT INTO files (path, mime, content) VALUES ($1, $2, $3)", - &[&blob.path, &blob.mime, &blob.content])?; + trans.query( + "INSERT INTO files (path, mime, content) VALUES ($1, $2, $3)", + &[&blob.path, &blob.mime, &blob.content], + )?; } else { - trans.query("UPDATE files SET mime = $2, content = $3, date_updated = NOW() \ + trans.query( + "UPDATE files SET mime = $2, content = $3, date_updated = NOW() \ WHERE path = $1", - &[&blob.path, &blob.mime, &blob.content])?; + &[&blob.path, &blob.mime, &blob.content], + )?; } } Ok(()) @@ -53,8 +59,8 @@ impl<'a> DatabaseBackend<'a> { #[cfg(test)] mod tests { - use time::Timespec; use super::*; + use time::Timespec; #[test] fn test_path_get() { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index c4938b8d8..6ffd140ae 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -8,13 +8,13 @@ pub(crate) use self::s3::TIME_FMT; use failure::Error; use time::Timespec; +use failure::err_msg; +use postgres::{transaction::Transaction, Connection}; use std::collections::HashMap; -use std::path::{PathBuf, Path}; -use postgres::{Connection, transaction::Transaction}; +use std::ffi::OsStr; use std::fs; use std::io::Read; -use failure::err_msg; -use std::ffi::OsStr; +use std::path::{Path, PathBuf}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub(crate) struct Blob { @@ -24,9 +24,7 @@ pub(crate) struct Blob { pub(crate) content: Vec, } -fn get_file_list_from_dir>(path: P, - files: &mut Vec) - -> Result<(), Error> { +fn get_file_list_from_dir>(path: P, files: &mut Vec) -> Result<(), Error> { let path = path.as_ref(); for file in path.read_dir()? { @@ -42,7 +40,6 @@ fn get_file_list_from_dir>(path: P, Ok(()) } - pub fn get_file_list>(path: P) -> Result, Error> { let path = path.as_ref(); let mut files = Vec::new(); @@ -88,44 +85,52 @@ impl Storage<'_> { // otherwise, this will store files in the database. // // This returns a HashMap. - pub(crate) fn store_all(&self, conn: &Connection, prefix: &str, root_dir: &Path) -> Result, Error> { + pub(crate) fn store_all( + &self, + conn: &Connection, + prefix: &str, + root_dir: &Path, + ) -> Result, Error> { const MAX_CONCURRENT_UPLOADS: usize = 1000; let trans = conn.transaction()?; let mut file_paths_and_mimes = HashMap::new(); - get_file_list(root_dir)?.into_iter() - .filter_map(|file_path| { - // Some files have insufficient permissions - // (like .lock file created by cargo in documentation directory). - // Skip these files. - fs::File::open(root_dir.join(&file_path)) - .ok().map(|file| (file_path, file)) - }).map(|(file_path, mut file)| { - let mut content: Vec = Vec::new(); - file.read_to_end(&mut content)?; - - let bucket_path = Path::new(prefix).join(&file_path); - - #[cfg(windows)] // On windows, we need to normalize \\ to / so the route logic works - let bucket_path = path_slash::PathBufExt::to_slash(&bucket_path).unwrap(); - #[cfg(not(windows))] - let bucket_path = bucket_path.into_os_string().into_string().unwrap(); - - let mime = detect_mime(&file_path)?; - - file_paths_and_mimes.insert(file_path, mime.to_string()); - Ok(Blob { - path: bucket_path, - mime: mime.to_string(), - content, - // this field is ignored by the backend - date_updated: Timespec::new(0, 0), + get_file_list(root_dir)? + .into_iter() + .filter_map(|file_path| { + // Some files have insufficient permissions + // (like .lock file created by cargo in documentation directory). + // Skip these files. + fs::File::open(root_dir.join(&file_path)) + .ok() + .map(|file| (file_path, file)) }) - }) - .collect::, Error>>()? - .chunks(MAX_CONCURRENT_UPLOADS) - .try_for_each(|batch| self.store_batch(batch, &trans))?; + .map(|(file_path, mut file)| { + let mut content: Vec = Vec::new(); + file.read_to_end(&mut content)?; + + let bucket_path = Path::new(prefix).join(&file_path); + + #[cfg(windows)] // On windows, we need to normalize \\ to / so the route logic works + let bucket_path = path_slash::PathBufExt::to_slash(&bucket_path).unwrap(); + #[cfg(not(windows))] + let bucket_path = bucket_path.into_os_string().into_string().unwrap(); + + let mime = detect_mime(&file_path)?; + + file_paths_and_mimes.insert(file_path, mime.to_string()); + Ok(Blob { + path: bucket_path, + mime: mime.to_string(), + content, + // this field is ignored by the backend + date_updated: Timespec::new(0, 0), + }) + }) + .collect::, Error>>()? + .chunks(MAX_CONCURRENT_UPLOADS) + .try_for_each(|batch| self.store_batch(batch, &trans))?; trans.commit()?; Ok(file_paths_and_mimes) @@ -170,9 +175,9 @@ impl<'a> From> for Storage<'a> { #[cfg(test)] mod test { extern crate env_logger; - use std::env; - use crate::test::wrapper; use super::*; + use crate::test::wrapper; + use std::env; #[test] fn test_uploads() { @@ -196,8 +201,14 @@ mod test { let name = Path::new(name); assert!(stored_files.contains_key(name)); } - assert_eq!(stored_files.get(Path::new("Cargo.toml")).unwrap(), "text/toml"); - assert_eq!(stored_files.get(Path::new("src/main.rs")).unwrap(), "text/rust"); + assert_eq!( + stored_files.get(Path::new("Cargo.toml")).unwrap(), + "text/toml" + ); + assert_eq!( + stored_files.get(Path::new("src/main.rs")).unwrap(), + "text/rust" + ); let file = backend.get("rustdoc/Cargo.toml").unwrap(); assert_eq!(file.content, b"data"); diff --git a/src/storage/s3.rs b/src/storage/s3.rs index c859bdc19..097cee283 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -14,10 +14,7 @@ pub(crate) struct S3Backend<'a> { impl<'a> S3Backend<'a> { pub(crate) fn new(client: &'a S3Client, bucket: &'a str) -> Self { - Self { - client, - bucket, - } + Self { client, bucket } } pub(super) fn get(&self, path: &str) -> Result { @@ -55,15 +52,19 @@ impl<'a> S3Backend<'a> { loop { let mut futures = Vec::new(); for blob in batch { - futures.push(self.client.put_object(PutObjectRequest { - bucket: self.bucket.to_string(), - key: blob.path.clone(), - body: Some(blob.content.clone().into()), - content_type: Some(blob.mime.clone()), - ..Default::default() - }).inspect(|_| { - crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1); - })); + futures.push( + self.client + .put_object(PutObjectRequest { + bucket: self.bucket.to_string(), + key: blob.path.clone(), + body: Some(blob.content.clone().into()), + content_type: Some(blob.mime.clone()), + ..Default::default() + }) + .inspect(|_| { + crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1); + }), + ); } attempts += 1; @@ -94,8 +95,8 @@ fn parse_timespec(raw: &str) -> Result { #[cfg(test)] mod tests { - use crate::test::TestEnvironment; use super::*; + use crate::test::TestEnvironment; fn assert_s3_404(env: &TestEnvironment, path: &'static str) { use rusoto_core::RusotoError; @@ -104,7 +105,10 @@ mod tests { let s3 = env.s3().not_found(path); let backend = S3Backend::new(&s3.client, s3.bucket); let err = backend.get(path).unwrap_err(); - let status = match err.downcast_ref::>().expect("wanted GetObject") { + let status = match err + .downcast_ref::>() + .expect("wanted GetObject") + { RusotoError::Unknown(http) => http.status, _ => panic!("wrong error"), }; diff --git a/src/test/fakes.rs b/src/test/fakes.rs index 28a2d951e..d7726ffdb 100644 --- a/src/test/fakes.rs +++ b/src/test/fakes.rs @@ -203,8 +203,8 @@ impl<'a> FakeRelease<'a> { } use crate::storage::{Blob, TIME_FMT}; -use rusoto_s3::S3Client; use rusoto_mock::{MockCredentialsProvider, MockRequestDispatcher}; +use rusoto_s3::S3Client; pub(crate) struct FakeUpload { pub(crate) client: S3Client, @@ -227,15 +227,13 @@ impl FakeUpload { } fn with_dispatcher(path: S, bucket: &'static str, dispatcher: MockRequestDispatcher) -> Self - where S: AsRef + std::fmt::Display + Send + Sync + 'static { + where + S: AsRef + std::fmt::Display + Send + Sync + 'static, + { let dispatcher = dispatcher.with_request_checker(move |req| { assert_eq!(req.path, format!("/{}/{}", bucket, path)); }); - let client = S3Client::new_with( - dispatcher, - MockCredentialsProvider, - Default::default() - ); + let client = S3Client::new_with(dispatcher, MockCredentialsProvider, Default::default()); Self { client, bucket } } } diff --git a/src/test/mod.rs b/src/test/mod.rs index 1794b4f81..0da4d022a 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -120,7 +120,9 @@ impl TestEnvironment { } pub(crate) fn s3(&self) -> &TestS3 { - self.s3.get_or_init(|| TestS3 { bucket: "" }) + self.s3.get_or_init(|| TestS3 { + bucket: "", + }) } } @@ -194,7 +196,9 @@ impl TestFrontend { } } -pub(crate) struct TestS3 { bucket: &'static str } +pub(crate) struct TestS3 { + bucket: &'static str, +} impl TestS3 { pub(crate) fn upload(&self, blob: Blob) -> fakes::FakeUpload { From 81b7325411a59a93c57b4cb1b85cecbaa4023ec1 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Fri, 10 Apr 2020 22:35:43 -0400 Subject: [PATCH 16/38] Encapsulate the s3_client --- src/bin/cratesfyi.rs | 3 +- src/db/delete_crate.rs | 2 +- src/db/file.rs | 107 ++--------------------------------------- src/db/mod.rs | 2 +- src/lib.rs | 2 +- src/storage/mod.rs | 12 ++++- src/storage/s3.rs | 92 +++++++++++++++++++++++++++++++++-- 7 files changed, 106 insertions(+), 114 deletions(-) diff --git a/src/bin/cratesfyi.rs b/src/bin/cratesfyi.rs index 66a44446c..6462b931e 100644 --- a/src/bin/cratesfyi.rs +++ b/src/bin/cratesfyi.rs @@ -268,7 +268,8 @@ pub fn main() { let mut count = 1; let mut total = 0; while count != 0 { - count = db::move_to_s3(&conn, 5_000).expect("Failed to upload batch to S3"); + count = cratesfyi::storage::move_to_s3(&conn, 5_000) + .expect("Failed to upload batch to S3"); total += count; eprintln!( "moved {} rows to s3 in this batch, total moved so far: {}", diff --git a/src/db/delete_crate.rs b/src/db/delete_crate.rs index 70990ec95..47d8b7aa0 100644 --- a/src/db/delete_crate.rs +++ b/src/db/delete_crate.rs @@ -1,4 +1,4 @@ -use super::file::{s3_client, S3_BUCKET_NAME}; +use crate::storage::s3::{s3_client, S3_BUCKET_NAME}; use failure::{Error, Fail}; use postgres::Connection; use rusoto_s3::{DeleteObjectsRequest, ListObjectsV2Request, ObjectIdentifier, S3Client, S3}; diff --git a/src/db/file.rs b/src/db/file.rs index 5b907c636..61e8aeb84 100644 --- a/src/db/file.rs +++ b/src/db/file.rs @@ -5,56 +5,16 @@ //! filesystem. This module is adding files into database and retrieving them. use crate::error::Result; -use log::warn; +use crate::storage::Storage; use postgres::Connection; -use rusoto_core::region::Region; -use rusoto_credential::DefaultCredentialsProvider; -use rusoto_s3::{PutObjectRequest, S3Client, S3}; + use rustc_serialize::json::{Json, ToJson}; use std::path::{Path, PathBuf}; pub(crate) use crate::storage::Blob; -pub(super) static S3_BUCKET_NAME: &str = "rust-docs-rs"; - pub(crate) fn get_path(conn: &Connection, path: &str) -> Option { - use crate::storage::{DatabaseBackend, S3Backend, Storage}; - let client; - let backend = if let Some(c) = s3_client() { - client = c; - Storage::from(S3Backend::new(&client, S3_BUCKET_NAME)) - } else { - DatabaseBackend::new(conn).into() - }; - backend.get(path).ok() -} - -pub(super) fn s3_client() -> Option { - // If AWS keys aren't configured, then presume we should use the DB exclusively - // for file storage. - if std::env::var_os("AWS_ACCESS_KEY_ID").is_none() && std::env::var_os("FORCE_S3").is_none() { - return None; - } - - let creds = match DefaultCredentialsProvider::new() { - Ok(creds) => creds, - Err(err) => { - warn!("failed to retrieve AWS credentials: {}", err); - return None; - } - }; - - Some(S3Client::new_with( - rusoto_core::request::HttpClient::new().unwrap(), - creds, - std::env::var("S3_ENDPOINT") - .ok() - .map(|e| Region::Custom { - name: std::env::var("S3_REGION").unwrap_or_else(|_| "us-west-1".to_owned()), - endpoint: e, - }) - .unwrap_or(Region::UsWest1), - )) + Storage::new(conn).get(path).ok() } /// Store all files in a directory and return [[mimetype, filename]] as Json @@ -71,14 +31,7 @@ pub fn add_path_into_database>( prefix: &str, path: P, ) -> Result { - use crate::storage::{DatabaseBackend, S3Backend, Storage}; - let client; - let backend = if let Some(c) = s3_client() { - client = c; - Storage::from(S3Backend::new(&client, S3_BUCKET_NAME)) - } else { - DatabaseBackend::new(conn).into() - }; + let backend = Storage::new(conn); let file_list = backend.store_all(conn, prefix, path.as_ref())?; file_list_to_json(file_list.into_iter().collect()) } @@ -95,55 +48,3 @@ fn file_list_to_json(file_list: Vec<(PathBuf, String)>) -> Result { Ok(file_list_json.to_json()) } - -pub fn move_to_s3(conn: &Connection, n: usize) -> Result { - let trans = conn.transaction()?; - let client = s3_client().expect("configured s3"); - - let rows = trans.query( - &format!( - "SELECT path, mime, content FROM files WHERE content != E'in-s3' LIMIT {}", - n - ), - &[], - )?; - let count = rows.len(); - - let mut rt = ::tokio::runtime::Runtime::new().unwrap(); - let mut futures = Vec::new(); - for row in &rows { - let path: String = row.get(0); - let mime: String = row.get(1); - let content: Vec = row.get(2); - let path_1 = path.clone(); - futures.push( - client - .put_object(PutObjectRequest { - bucket: S3_BUCKET_NAME.into(), - key: path.clone(), - body: Some(content.into()), - content_type: Some(mime), - ..Default::default() - }) - .map(move |_| path_1) - .map_err(move |e| panic!("failed to upload to {}: {:?}", path, e)), - ); - } - - use ::futures::future::Future; - match rt.block_on(::futures::future::join_all(futures)) { - Ok(paths) => { - let statement = trans.prepare("DELETE FROM files WHERE path = $1").unwrap(); - for path in paths { - statement.execute(&[&path]).unwrap(); - } - } - Err(e) => { - panic!("results err: {:?}", e); - } - } - - trans.commit()?; - - Ok(count) -} diff --git a/src/db/mod.rs b/src/db/mod.rs index 503d22cf0..9d0e07a50 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -4,7 +4,7 @@ pub(crate) use self::add_package::add_build_into_database; pub(crate) use self::add_package::add_package_into_database; pub(crate) use self::add_package::CratesIoData; pub use self::delete_crate::delete_crate; -pub use self::file::{add_path_into_database, move_to_s3}; +pub use self::file::add_path_into_database; pub use self::migrate::migrate; use failure::Fail; diff --git a/src/lib.rs b/src/lib.rs index 86b30f2d7..c751db934 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ pub use self::web::Server; pub mod db; mod docbuilder; mod error; -mod storage; +pub mod storage; #[cfg(test)] mod test; pub mod utils; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 6ffd140ae..e9c63fe24 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,11 +1,12 @@ mod database; -mod s3; +pub(crate) mod s3; pub(crate) use self::database::DatabaseBackend; pub(crate) use self::s3::S3Backend; #[cfg(test)] pub(crate) use self::s3::TIME_FMT; use failure::Error; +pub use s3::move_to_s3; use time::Timespec; use failure::err_msg; @@ -64,7 +65,14 @@ pub(crate) enum Storage<'a> { S3(S3Backend<'a>), } -impl Storage<'_> { +impl<'a> Storage<'a> { + pub(crate) fn new(conn: &'a Connection) -> Self { + if let Some(c) = s3::s3_client() { + Storage::from(S3Backend::new(c, s3::S3_BUCKET_NAME)) + } else { + DatabaseBackend::new(conn).into() + } + } pub(crate) fn get(&self, path: &str) -> Result { match self { Self::Database(db) => db.get(path), diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 097cee283..89452a833 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -1,19 +1,22 @@ use super::Blob; use failure::Error; use futures::Future; +use postgres::Connection; +use rusoto_core::region::Region; +use rusoto_credential::DefaultCredentialsProvider; use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3}; use std::convert::TryInto; use std::io::Read; use time::Timespec; -use log::error; +use log::{error, warn}; pub(crate) struct S3Backend<'a> { - client: &'a S3Client, + client: S3Client, bucket: &'a str, } impl<'a> S3Backend<'a> { - pub(crate) fn new(client: &'a S3Client, bucket: &'a str) -> Self { + pub(crate) fn new(client: S3Client, bucket: &'a str) -> Self { Self { client, bucket } } @@ -103,7 +106,7 @@ mod tests { use rusoto_s3::GetObjectError; let s3 = env.s3().not_found(path); - let backend = S3Backend::new(&s3.client, s3.bucket); + let backend = S3Backend::new(s3.client, s3.bucket); let err = backend.get(path).unwrap_err(); let status = match err .downcast_ref::>() @@ -148,7 +151,7 @@ mod tests { // Add a test file to the database let s3 = env.s3().upload(blob.clone()); - let backend = S3Backend::new(&s3.client, &s3.bucket); + let backend = S3Backend::new(s3.client, &s3.bucket); // Test that the proper file was returned assert_eq!(blob, backend.get("dir/foo.txt")?); @@ -161,3 +164,82 @@ mod tests { }); } } + +pub(crate) static S3_BUCKET_NAME: &str = "rust-docs-rs"; + +pub(crate) fn s3_client() -> Option { + // If AWS keys aren't configured, then presume we should use the DB exclusively + // for file storage. + if std::env::var_os("AWS_ACCESS_KEY_ID").is_none() && std::env::var_os("FORCE_S3").is_none() { + return None; + } + let creds = match DefaultCredentialsProvider::new() { + Ok(creds) => creds, + Err(err) => { + warn!("failed to retrieve AWS credentials: {}", err); + return None; + } + }; + Some(S3Client::new_with( + rusoto_core::request::HttpClient::new().unwrap(), + creds, + std::env::var("S3_ENDPOINT") + .ok() + .map(|e| Region::Custom { + name: std::env::var("S3_REGION").unwrap_or_else(|_| "us-west-1".to_owned()), + endpoint: e, + }) + .unwrap_or(Region::UsWest1), + )) +} + +pub fn move_to_s3(conn: &Connection, n: usize) -> Result { + let trans = conn.transaction()?; + let client = s3_client().expect("configured s3"); + + let rows = trans.query( + &format!( + "SELECT path, mime, content FROM files WHERE content != E'in-s3' LIMIT {}", + n + ), + &[], + )?; + let count = rows.len(); + + let mut rt = ::tokio::runtime::Runtime::new().unwrap(); + let mut futures = Vec::new(); + for row in &rows { + let path: String = row.get(0); + let mime: String = row.get(1); + let content: Vec = row.get(2); + let path_1 = path.clone(); + futures.push( + client + .put_object(PutObjectRequest { + bucket: S3_BUCKET_NAME.into(), + key: path.clone(), + body: Some(content.into()), + content_type: Some(mime), + ..Default::default() + }) + .map(move |_| path_1) + .map_err(move |e| panic!("failed to upload to {}: {:?}", path, e)), + ); + } + + match rt.block_on(::futures::future::join_all(futures)) { + Ok(paths) => { + let statement = trans.prepare("DELETE FROM files WHERE path = $1").unwrap(); + for path in paths { + statement.execute(&[&path]).unwrap(); + } + } + Err(e) => { + panic!("results err: {:?}", e); + } + } + + trans.commit()?; + + Ok(count) +} From abbb3e093c92b656f7caf98adb7eefb542addf90 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Mon, 13 Apr 2020 15:35:32 -0400 Subject: [PATCH 17/38] Remove code duplication The only difference was between `pub(crate)` and private, so probably not worth it. --- src/storage/s3.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 89452a833..6794a9466 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -87,9 +87,7 @@ impl<'a> S3Backend<'a> { } } -#[cfg(not(test))] -const TIME_FMT: &str = "%a, %d %b %Y %H:%M:%S %Z"; -#[cfg(test)] +// public for testing pub(crate) const TIME_FMT: &str = "%a, %d %b %Y %H:%M:%S %Z"; fn parse_timespec(raw: &str) -> Result { From c06e21c5c6f437264e3f720a183811642a7f0457 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Mon, 13 Apr 2020 15:38:32 -0400 Subject: [PATCH 18/38] Pre-allocate vectors of known size Co-Authored-By: Chase Wilson --- src/db/file.rs | 2 +- src/storage/s3.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/db/file.rs b/src/db/file.rs index 61e8aeb84..52f7b712f 100644 --- a/src/db/file.rs +++ b/src/db/file.rs @@ -40,7 +40,7 @@ fn file_list_to_json(file_list: Vec<(PathBuf, String)>) -> Result { let mut file_list_json: Vec = Vec::new(); for file in file_list { - let mut v: Vec = Vec::new(); + let mut v: Vec = Vec::with_capacity(2); v.push(file.1); v.push(file.0.into_os_string().into_string().unwrap()); file_list_json.push(v.to_json()); diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 6794a9466..c08e87422 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -53,7 +53,7 @@ impl<'a> S3Backend<'a> { let mut attempts = 0; loop { - let mut futures = Vec::new(); + let mut futures = Vec::with_capacity(batch.len()); for blob in batch { futures.push( self.client @@ -205,7 +205,7 @@ pub fn move_to_s3(conn: &Connection, n: usize) -> Result { let count = rows.len(); let mut rt = ::tokio::runtime::Runtime::new().unwrap(); - let mut futures = Vec::new(); + let mut futures = Vec::with_capacity(rows.len()); for row in &rows { let path: String = row.get(0); let mime: String = row.get(1); From 1341332295835b227ca097986a86a09001a9247d Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Mon, 13 Apr 2020 15:46:40 -0400 Subject: [PATCH 19/38] Use 2018 idioms Co-Authored-By: Chase Wilson --- src/storage/s3.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/s3.rs b/src/storage/s3.rs index c08e87422..5fc8b4025 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -204,7 +204,7 @@ pub fn move_to_s3(conn: &Connection, n: usize) -> Result { )?; let count = rows.len(); - let mut rt = ::tokio::runtime::Runtime::new().unwrap(); + let mut rt = tokio::runtime::Runtime::new().unwrap(); let mut futures = Vec::with_capacity(rows.len()); for row in &rows { let path: String = row.get(0); From cd99af0f45f9de9c6ba7b13aed64773cb6cd68c6 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Wed, 15 Apr 2020 10:24:12 -0400 Subject: [PATCH 20/38] Remove move-to-s3 command This was only used for a one-time migration and is no longer necessary. --- src/bin/cratesfyi.rs | 14 ------------ src/storage/mod.rs | 1 - src/storage/s3.rs | 52 -------------------------------------------- 3 files changed, 67 deletions(-) diff --git a/src/bin/cratesfyi.rs b/src/bin/cratesfyi.rs index 6462b931e..33600bce0 100644 --- a/src/bin/cratesfyi.rs +++ b/src/bin/cratesfyi.rs @@ -83,7 +83,6 @@ pub fn main() { .subcommand(SubCommand::with_name("database") .about("Database operations") .setting(AppSettings::ArgRequiredElseHelp) - .subcommand(SubCommand::with_name("move-to-s3")) .subcommand(SubCommand::with_name("migrate") .about("Run database migrations") .arg(Arg::with_name("VERSION"))) @@ -263,19 +262,6 @@ pub fn main() { } else if matches.subcommand_matches("update-search-index").is_some() { let conn = db::connect_db().unwrap(); db::update_search_index(&conn).expect("Failed to update search index"); - } else if matches.subcommand_matches("move-to-s3").is_some() { - let conn = db::connect_db().unwrap(); - let mut count = 1; - let mut total = 0; - while count != 0 { - count = cratesfyi::storage::move_to_s3(&conn, 5_000) - .expect("Failed to upload batch to S3"); - total += count; - eprintln!( - "moved {} rows to s3 in this batch, total moved so far: {}", - count, total - ); - } } else if let Some(matches) = matches.subcommand_matches("delete-crate") { let name = matches.value_of("CRATE_NAME").expect("missing crate name"); let conn = db::connect_db().expect("failed to connect to the database"); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index e9c63fe24..bb65ffbc9 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -6,7 +6,6 @@ pub(crate) use self::s3::S3Backend; #[cfg(test)] pub(crate) use self::s3::TIME_FMT; use failure::Error; -pub use s3::move_to_s3; use time::Timespec; use failure::err_msg; diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 5fc8b4025..08e4edf39 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -1,7 +1,6 @@ use super::Blob; use failure::Error; use futures::Future; -use postgres::Connection; use rusoto_core::region::Region; use rusoto_credential::DefaultCredentialsProvider; use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3}; @@ -190,54 +189,3 @@ pub(crate) fn s3_client() -> Option { .unwrap_or(Region::UsWest1), )) } - -pub fn move_to_s3(conn: &Connection, n: usize) -> Result { - let trans = conn.transaction()?; - let client = s3_client().expect("configured s3"); - - let rows = trans.query( - &format!( - "SELECT path, mime, content FROM files WHERE content != E'in-s3' LIMIT {}", - n - ), - &[], - )?; - let count = rows.len(); - - let mut rt = tokio::runtime::Runtime::new().unwrap(); - let mut futures = Vec::with_capacity(rows.len()); - for row in &rows { - let path: String = row.get(0); - let mime: String = row.get(1); - let content: Vec = row.get(2); - let path_1 = path.clone(); - futures.push( - client - .put_object(PutObjectRequest { - bucket: S3_BUCKET_NAME.into(), - key: path.clone(), - body: Some(content.into()), - content_type: Some(mime), - ..Default::default() - }) - .map(move |_| path_1) - .map_err(move |e| panic!("failed to upload to {}: {:?}", path, e)), - ); - } - - match rt.block_on(::futures::future::join_all(futures)) { - Ok(paths) => { - let statement = trans.prepare("DELETE FROM files WHERE path = $1").unwrap(); - for path in paths { - statement.execute(&[&path]).unwrap(); - } - } - Err(e) => { - panic!("results err: {:?}", e); - } - } - - trans.commit()?; - - Ok(count) -} From 4549ed89b5c4e7579ea038457edf2243bb7367f2 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Wed, 15 Apr 2020 10:34:19 -0400 Subject: [PATCH 21/38] Use `ON CONFLICT` instead of rewriting it manually --- src/storage/database.rs | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/src/storage/database.rs b/src/storage/database.rs index 52629eb3c..07ad4aefd 100644 --- a/src/storage/database.rs +++ b/src/storage/database.rs @@ -35,23 +35,13 @@ impl<'a> DatabaseBackend<'a> { } pub(super) fn store_batch(&self, batch: &[Blob], trans: &Transaction) -> Result<(), Error> { for blob in batch { - // check if file already exists in database - let rows = self - .conn - .query("SELECT COUNT(*) FROM files WHERE path = $1", &[&blob.path])?; - - if rows.get(0).get::(0) == 0 { - trans.query( - "INSERT INTO files (path, mime, content) VALUES ($1, $2, $3)", - &[&blob.path, &blob.mime, &blob.content], - )?; - } else { - trans.query( - "UPDATE files SET mime = $2, content = $3, date_updated = NOW() \ - WHERE path = $1", - &[&blob.path, &blob.mime, &blob.content], - )?; - } + trans.query( + "INSERT INTO files (path, mime, content) + VALUES ($1, $2, $3) + ON CONFLICT (path) DO UPDATE + SET mime = EXCLUDED.mime, content = EXCLUDED.content", + &[&blob.path, &blob.mime, &blob.content], + )?; } Ok(()) } From 9213286e2b29c6f149e885852be606425629152e Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Wed, 15 Apr 2020 10:58:18 -0400 Subject: [PATCH 22/38] haha rustfmt go brrr --- src/storage/s3.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 08e4edf39..2f85f9340 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -1,13 +1,13 @@ use super::Blob; use failure::Error; use futures::Future; +use log::{error, warn}; use rusoto_core::region::Region; use rusoto_credential::DefaultCredentialsProvider; use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3}; use std::convert::TryInto; use std::io::Read; use time::Timespec; -use log::{error, warn}; pub(crate) struct S3Backend<'a> { client: S3Client, From c6b79f3adfeb3cb791c4b967af18970d4d13c5c2 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Mon, 20 Apr 2020 16:15:37 -0400 Subject: [PATCH 23/38] Only create runtime once, instead of for each batch --- src/db/file.rs | 2 +- src/storage/mod.rs | 4 ++-- src/storage/s3.rs | 9 +++++---- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/db/file.rs b/src/db/file.rs index 52f7b712f..d82e32900 100644 --- a/src/db/file.rs +++ b/src/db/file.rs @@ -31,7 +31,7 @@ pub fn add_path_into_database>( prefix: &str, path: P, ) -> Result { - let backend = Storage::new(conn); + let mut backend = Storage::new(conn); let file_list = backend.store_all(conn, prefix, path.as_ref())?; file_list_to_json(file_list.into_iter().collect()) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index bb65ffbc9..c0379b269 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -79,7 +79,7 @@ impl<'a> Storage<'a> { } } - fn store_batch(&self, batch: &[Blob], trans: &Transaction) -> Result<(), Error> { + fn store_batch(&mut self, batch: &[Blob], trans: &Transaction) -> Result<(), Error> { match self { Self::Database(db) => db.store_batch(batch, trans), Self::S3(s3) => s3.store_batch(batch), @@ -93,7 +93,7 @@ impl<'a> Storage<'a> { // // This returns a HashMap. pub(crate) fn store_all( - &self, + &mut self, conn: &Connection, prefix: &str, root_dir: &Path, diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 2f85f9340..c214f8e3c 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -8,15 +8,17 @@ use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3}; use std::convert::TryInto; use std::io::Read; use time::Timespec; +use tokio::runtime::Runtime; pub(crate) struct S3Backend<'a> { client: S3Client, bucket: &'a str, + runtime: Runtime, } impl<'a> S3Backend<'a> { pub(crate) fn new(client: S3Client, bucket: &'a str) -> Self { - Self { client, bucket } + Self { client, bucket, runtime: Runtime::new().unwrap() } } pub(super) fn get(&self, path: &str) -> Result { @@ -47,8 +49,7 @@ impl<'a> S3Backend<'a> { }) } - pub(super) fn store_batch(&self, batch: &[Blob]) -> Result<(), Error> { - let mut rt = tokio::runtime::Runtime::new().unwrap(); + pub(super) fn store_batch(&mut self, batch: &[Blob]) -> Result<(), Error> { let mut attempts = 0; loop { @@ -70,7 +71,7 @@ impl<'a> S3Backend<'a> { } attempts += 1; - match rt.block_on(::futures::future::join_all(futures)) { + match self.runtime.block_on(::futures::future::join_all(futures)) { // this batch was successful, start another batch if there are still more files Ok(_) => break, Err(err) => { From a12aef49f3f376bbd1408674f7bb30fe58286e7d Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Mon, 20 Apr 2020 17:11:05 -0400 Subject: [PATCH 24/38] Don't poll each future on each wakeup `join_all` polls every future when any wakes up `FuturesOrdered` has fancier tracking of which one triggered the wakeup so it only polls what it needs to --- src/storage/s3.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/storage/s3.rs b/src/storage/s3.rs index c214f8e3c..8818c57ae 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -50,10 +50,12 @@ impl<'a> S3Backend<'a> { } pub(super) fn store_batch(&mut self, batch: &[Blob]) -> Result<(), Error> { + use futures::stream::FuturesUnordered; + use futures::stream::Stream; let mut attempts = 0; loop { - let mut futures = Vec::with_capacity(batch.len()); + let mut futures = FuturesUnordered::new(); for blob in batch { futures.push( self.client @@ -71,7 +73,7 @@ impl<'a> S3Backend<'a> { } attempts += 1; - match self.runtime.block_on(::futures::future::join_all(futures)) { + match self.runtime.block_on(futures.collect()) { // this batch was successful, start another batch if there are still more files Ok(_) => break, Err(err) => { From 30ff8adcc6862c868daa09eef21378d552fd7391 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Mon, 20 Apr 2020 17:16:22 -0400 Subject: [PATCH 25/38] Make rustfmt happy --- src/storage/s3.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 8818c57ae..772a9dbe9 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -18,7 +18,11 @@ pub(crate) struct S3Backend<'a> { impl<'a> S3Backend<'a> { pub(crate) fn new(client: S3Client, bucket: &'a str) -> Self { - Self { client, bucket, runtime: Runtime::new().unwrap() } + Self { + client, + bucket, + runtime: Runtime::new().unwrap(), + } } pub(super) fn get(&self, path: &str) -> Result { From e89dab8128f4fa9ea9c7bce1be5725e1c3dba9d1 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Mon, 20 Apr 2020 17:17:59 -0400 Subject: [PATCH 26/38] Don't hold all PutObjectOutputs in memory --- src/storage/s3.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 772a9dbe9..d46deac6d 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -77,7 +77,7 @@ impl<'a> S3Backend<'a> { } attempts += 1; - match self.runtime.block_on(futures.collect()) { + match self.runtime.block_on(futures.map(drop).collect()) { // this batch was successful, start another batch if there are still more files Ok(_) => break, Err(err) => { From 725190023e0efc0587996b4884108a6d56496321 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Mon, 20 Apr 2020 17:19:50 -0400 Subject: [PATCH 27/38] Fix test failure --- src/storage/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index c0379b269..42a8d0a14 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -201,7 +201,7 @@ mod test { wrapper(|env| { let db = env.db(); let conn = db.conn(); - let backend = Storage::Database(DatabaseBackend::new(&conn)); + let mut backend = Storage::Database(DatabaseBackend::new(&conn)); let stored_files = backend.store_all(&conn, "rustdoc", dir.path()).unwrap(); assert_eq!(stored_files.len(), files.len()); for name in &files { From 478dcb9588a952d3151bda23e8700729633ec48f Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Sat, 25 Apr 2020 02:17:55 -0400 Subject: [PATCH 28/38] Use min.io to test S3 uploads --- .github/workflows/ci.yml | 4 +- src/storage/mod.rs | 2 - src/storage/s3.rs | 123 +++++++++++++++++++++++++++++++-------- src/test/fakes.rs | 36 ------------ src/test/mod.rs | 19 +----- 5 files changed, 103 insertions(+), 81 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 85d6e8fc6..9172ac149 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,12 +49,12 @@ jobs: restore-key: | ${{ runner.os }}-cargo-build-target-${{ env.CURRENT_RUSTC_VERSION }}- - - name: Launch the postgres image + - name: Launch the postgres and min.io images run: | cp .env.sample .env . .env mkdir -p ${CRATESFYI_PREFIX}/public-html - docker-compose up -d db + docker-compose up -d db s3 # Give the database enough time to start up sleep 5 # Make sure the database is actually working diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 42a8d0a14..e0b4fbd6a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -3,8 +3,6 @@ pub(crate) mod s3; pub(crate) use self::database::DatabaseBackend; pub(crate) use self::s3::S3Backend; -#[cfg(test)] -pub(crate) use self::s3::TIME_FMT; use failure::Error; use time::Timespec; diff --git a/src/storage/s3.rs b/src/storage/s3.rs index d46deac6d..3d174bdb4 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -101,25 +101,101 @@ fn parse_timespec(raw: &str) -> Result { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; - use crate::test::TestEnvironment; - - fn assert_s3_404(env: &TestEnvironment, path: &'static str) { - use rusoto_core::RusotoError; - use rusoto_s3::GetObjectError; - - let s3 = env.s3().not_found(path); - let backend = S3Backend::new(s3.client, s3.bucket); - let err = backend.get(path).unwrap_err(); - let status = match err - .downcast_ref::>() - .expect("wanted GetObject") - { - RusotoError::Unknown(http) => http.status, - _ => panic!("wrong error"), - }; - assert_eq!(status, 404); + + pub(crate) struct TestS3(S3Backend<'static>); + + use crate::storage::s3::S3Backend; + use rusoto_core::RusotoResult; + use rusoto_s3::{ + CreateBucketRequest, DeleteBucketRequest, DeleteObjectRequest, ListObjectsRequest, + PutObjectError, PutObjectOutput, PutObjectRequest, S3, + }; + + impl TestS3 { + pub(crate) fn new() -> Self { + // A random bucket name is generated and used for the current connection. + // This allows each test to create a fresh bucket to test with. + let bucket = format!("docs-rs-test-bucket-{}", rand::random::()); + let client = crate::storage::s3::s3_client().unwrap(); + client + .create_bucket(CreateBucketRequest { + bucket: bucket.clone(), + ..Default::default() + }) + .sync() + .expect("failed to create test bucket"); + let bucket = Box::leak(bucket.into_boxed_str()); + TestS3(S3Backend::new(client, bucket)) + } + pub(crate) fn upload(&self, blob: Blob) -> RusotoResult { + self.0 + .client + .put_object(PutObjectRequest { + bucket: self.0.bucket.to_owned(), + body: Some(blob.content.into()), + content_type: Some(blob.mime), + key: blob.path, + ..PutObjectRequest::default() + }) + .sync() + } + fn assert_404(&self, path: &'static str) { + use rusoto_core::RusotoError; + use rusoto_s3::GetObjectError; + + let err = self.0.get(path).unwrap_err(); + match err + .downcast_ref::>() + .expect("wanted GetObject") + { + RusotoError::Unknown(http) => assert_eq!(http.status, 404), + RusotoError::Service(GetObjectError::NoSuchKey(_)) => {} + x => panic!("wrong error: {:?}", x), + }; + } + fn assert_blob(&self, blob: &Blob, path: &str) { + let actual = self.0.get(path).unwrap(); + assert_eq!(blob.path, actual.path); + assert_eq!(blob.content, actual.content); + assert_eq!(blob.mime, actual.mime); + // NOTE: this does _not_ compare the upload time since min.io doesn't allow this to be configured + } + } + + impl Drop for TestS3 { + fn drop(&mut self) { + let objects = self + .0 + .client + .list_objects(ListObjectsRequest { + bucket: self.0.bucket.to_owned(), + ..Default::default() + }) + .sync() + .unwrap(); + assert!(!objects.is_truncated.unwrap_or(false)); + for path in objects.contents.unwrap() { + self.0 + .client + .delete_object(DeleteObjectRequest { + bucket: self.0.bucket.to_owned(), + key: path.key.unwrap(), + ..Default::default() + }) + .sync() + .unwrap(); + } + let delete_req = DeleteBucketRequest { + bucket: self.0.bucket.to_owned(), + }; + self.0 + .client + .delete_bucket(delete_req) + .sync() + .expect("failed to delete test bucket"); + } } #[test] @@ -153,16 +229,15 @@ mod tests { }; // Add a test file to the database - let s3 = env.s3().upload(blob.clone()); - - let backend = S3Backend::new(s3.client, &s3.bucket); + let s3 = env.s3(); + s3.upload(blob.clone()).unwrap(); // Test that the proper file was returned - assert_eq!(blob, backend.get("dir/foo.txt")?); + s3.assert_blob(&blob, "dir/foo.txt"); // Test that other files are not returned - assert_s3_404(&env, "dir/bar.txt"); - assert_s3_404(&env, "foo.txt"); + s3.assert_404("dir/bar.txt"); + s3.assert_404("foo.txt"); Ok(()) }); diff --git a/src/test/fakes.rs b/src/test/fakes.rs index d7726ffdb..a56a4afe6 100644 --- a/src/test/fakes.rs +++ b/src/test/fakes.rs @@ -201,39 +201,3 @@ impl<'a> FakeRelease<'a> { Ok(release_id) } } - -use crate::storage::{Blob, TIME_FMT}; -use rusoto_mock::{MockCredentialsProvider, MockRequestDispatcher}; -use rusoto_s3::S3Client; - -pub(crate) struct FakeUpload { - pub(crate) client: S3Client, - pub(crate) bucket: &'static str, -} - -impl FakeUpload { - pub(crate) fn new(blob: Blob, bucket: &'static str) -> Self { - let datetime = time::at_utc(blob.date_updated); - let datestring = time::strftime(TIME_FMT, &datetime).unwrap(); - let dispatcher = MockRequestDispatcher::default() - .with_body(&String::from_utf8(blob.content).unwrap()) - .with_header("Content-Type", &blob.mime) - .with_header("Last-Modified", &datestring); - Self::with_dispatcher(blob.path, bucket, dispatcher) - } - - pub(crate) fn not_found(path: &'static str, bucket: &'static str) -> Self { - Self::with_dispatcher(path, bucket, MockRequestDispatcher::with_status(404)) - } - - fn with_dispatcher(path: S, bucket: &'static str, dispatcher: MockRequestDispatcher) -> Self - where - S: AsRef + std::fmt::Display + Send + Sync + 'static, - { - let dispatcher = dispatcher.with_request_checker(move |req| { - assert_eq!(req.path, format!("/{}/{}", bucket, path)); - }); - let client = S3Client::new_with(dispatcher, MockCredentialsProvider, Default::default()); - Self { client, bucket } - } -} diff --git a/src/test/mod.rs b/src/test/mod.rs index 0da4d022a..956bd1ad0 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -1,6 +1,6 @@ mod fakes; -use crate::storage::Blob; +use crate::storage::s3::tests::TestS3; use crate::web::Server; use failure::Error; use log::error; @@ -120,9 +120,7 @@ impl TestEnvironment { } pub(crate) fn s3(&self) -> &TestS3 { - self.s3.get_or_init(|| TestS3 { - bucket: "", - }) + self.s3.get_or_init(TestS3::new) } } @@ -195,16 +193,3 @@ impl TestFrontend { self.build_request(Method::GET, url) } } - -pub(crate) struct TestS3 { - bucket: &'static str, -} - -impl TestS3 { - pub(crate) fn upload(&self, blob: Blob) -> fakes::FakeUpload { - fakes::FakeUpload::new(blob, self.bucket) - } - pub(crate) fn not_found(&self, path: &'static str) -> fakes::FakeUpload { - fakes::FakeUpload::not_found(path, &self.bucket) - } -} From df9de24c28b98a4436006c28498f03c0557ae615 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Sat, 25 Apr 2020 18:36:08 -0400 Subject: [PATCH 29/38] [BROKEN] start adding more tests --- src/storage/s3.rs | 94 ++++++++++++++++++++++++++++------------------- 1 file changed, 57 insertions(+), 37 deletions(-) diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 3d174bdb4..81d58fc9c 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -103,8 +103,7 @@ fn parse_timespec(raw: &str) -> Result { #[cfg(test)] pub(crate) mod tests { use super::*; - - pub(crate) struct TestS3(S3Backend<'static>); + use crate::test::*; use crate::storage::s3::S3Backend; use rusoto_core::RusotoResult; @@ -113,12 +112,16 @@ pub(crate) mod tests { PutObjectError, PutObjectOutput, PutObjectRequest, S3, }; + use std::cell::RefCell; + + pub(crate) struct TestS3(RefCell>); + impl TestS3 { pub(crate) fn new() -> Self { // A random bucket name is generated and used for the current connection. // This allows each test to create a fresh bucket to test with. let bucket = format!("docs-rs-test-bucket-{}", rand::random::()); - let client = crate::storage::s3::s3_client().unwrap(); + let client = s3_client().unwrap(); client .create_bucket(CreateBucketRequest { bucket: bucket.clone(), @@ -129,6 +132,8 @@ pub(crate) mod tests { let bucket = Box::leak(bucket.into_boxed_str()); TestS3(S3Backend::new(client, bucket)) } + /// NOTE: this mocks the upload instead of using store_one + /// TODO: I think `store_one` would be better pub(crate) fn upload(&self, blob: Blob) -> RusotoResult { self.0 .client @@ -166,26 +171,19 @@ pub(crate) mod tests { impl Drop for TestS3 { fn drop(&mut self) { - let objects = self - .0 - .client - .list_objects(ListObjectsRequest { - bucket: self.0.bucket.to_owned(), - ..Default::default() - }) - .sync() - .unwrap(); + let list_req = ListObjectsRequest { + bucket: self.0.bucket.to_owned(), + ..Default::default() + }; + let objects = self.0.client.list_objects(list_req).sync().unwrap(); assert!(!objects.is_truncated.unwrap_or(false)); for path in objects.contents.unwrap() { - self.0 - .client - .delete_object(DeleteObjectRequest { - bucket: self.0.bucket.to_owned(), - key: path.key.unwrap(), - ..Default::default() - }) - .sync() - .unwrap(); + let delete_req = DeleteObjectRequest { + bucket: self.0.bucket.to_owned(), + key: path.key.unwrap(), + ..Default::default() + }; + self.0.client.delete_object(delete_req).sync().unwrap(); } let delete_req = DeleteBucketRequest { bucket: self.0.bucket.to_owned(), @@ -200,27 +198,23 @@ pub(crate) mod tests { #[test] fn test_parse_timespec() { - crate::test::wrapper(|_| { - // Test valid conversions - assert_eq!( - parse_timespec("Thu, 1 Jan 1970 00:00:00 GMT")?, - Timespec::new(0, 0) - ); - assert_eq!( - parse_timespec("Mon, 16 Apr 2018 04:33:50 GMT")?, - Timespec::new(1523853230, 0) - ); - - // Test invalid conversion - assert!(parse_timespec("foo").is_err()); + // Test valid conversions + assert_eq!( + parse_timespec("Thu, 1 Jan 1970 00:00:00 GMT").unwrap(), + Timespec::new(0, 0) + ); + assert_eq!( + parse_timespec("Mon, 16 Apr 2018 04:33:50 GMT").unwrap(), + Timespec::new(1523853230, 0) + ); - Ok(()) - }) + // Test invalid conversion + assert!(parse_timespec("foo").is_err()); } #[test] fn test_path_get() { - crate::test::wrapper(|env| { + wrapper(|env| { let blob = Blob { path: "dir/foo.txt".into(), mime: "text/plain".into(), @@ -242,6 +236,32 @@ pub(crate) mod tests { Ok(()) }); } + + #[test] + fn test_store() { + wrapper(|env| { + let s3 = env.s3(); + let names = [ + "a", + "b", + "a_very_long_file_name_that_has_an.extension", + "parent/child", + "h/i/g/h/l/y/_/n/e/s/t/e/d/_/d/i/r/e/c/t/o/r/i/e/s", + "trailing/slash/", + ]; + let blobs: Vec<_> = names.iter().map(|&path| Blob { + path: path.into(), + mime: "text/plain".into(), + date_updated: Timespec::new(42, 0), + content: "Hello world!".into(), + }).collect(); + s3.0.store_batch(&blobs).unwrap(); + for blob in &blobs { + s3.assert_blob(blob, &blob.path); + } + Ok(()) + }) + } } pub(crate) static S3_BUCKET_NAME: &str = "rust-docs-rs"; From cb9f5febb25c680046d07c900e899f4ee8297422 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Sat, 25 Apr 2020 20:20:49 -0400 Subject: [PATCH 30/38] Add more tests --- .env.sample | 3 +++ src/storage/s3.rs | 46 +++++++++++++++++++--------------------------- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/.env.sample b/.env.sample index 45f166bbe..7cf054776 100644 --- a/.env.sample +++ b/.env.sample @@ -3,3 +3,6 @@ CRATESFYI_GITHUB_ACCESSTOKEN= CRATESFYI_PREFIX=ignored/cratesfyi-prefix CRATESFYI_DATABASE_URL=postgresql://cratesfyi:password@localhost RUST_LOG=cratesfyi,rustwide=info +AWS_ACCESS_KEY_ID=cratesfyi +AWS_SECRET_ACCESS_KEY=secret_key +S3_ENDPOINT=http://localhost:9000 diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 81d58fc9c..18c7bed7f 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -113,6 +113,7 @@ pub(crate) mod tests { }; use std::cell::RefCell; + use std::slice; pub(crate) struct TestS3(RefCell>); @@ -130,27 +131,16 @@ pub(crate) mod tests { .sync() .expect("failed to create test bucket"); let bucket = Box::leak(bucket.into_boxed_str()); - TestS3(S3Backend::new(client, bucket)) + TestS3(RefCell::new(S3Backend::new(client, bucket))) } - /// NOTE: this mocks the upload instead of using store_one - /// TODO: I think `store_one` would be better - pub(crate) fn upload(&self, blob: Blob) -> RusotoResult { - self.0 - .client - .put_object(PutObjectRequest { - bucket: self.0.bucket.to_owned(), - body: Some(blob.content.into()), - content_type: Some(blob.mime), - key: blob.path, - ..PutObjectRequest::default() - }) - .sync() + pub(crate) fn upload(&self, blobs: &[Blob]) -> Result<(), Error> { + self.0.borrow_mut().store_batch(blobs) } fn assert_404(&self, path: &'static str) { use rusoto_core::RusotoError; use rusoto_s3::GetObjectError; - let err = self.0.get(path).unwrap_err(); + let err = self.0.borrow().get(path).unwrap_err(); match err .downcast_ref::>() .expect("wanted GetObject") @@ -161,7 +151,7 @@ pub(crate) mod tests { }; } fn assert_blob(&self, blob: &Blob, path: &str) { - let actual = self.0.get(path).unwrap(); + let actual = self.0.borrow().get(path).unwrap(); assert_eq!(blob.path, actual.path); assert_eq!(blob.content, actual.content); assert_eq!(blob.mime, actual.mime); @@ -171,25 +161,25 @@ pub(crate) mod tests { impl Drop for TestS3 { fn drop(&mut self) { + let inner = self.0.borrow(); let list_req = ListObjectsRequest { - bucket: self.0.bucket.to_owned(), + bucket: inner.bucket.to_owned(), ..Default::default() }; - let objects = self.0.client.list_objects(list_req).sync().unwrap(); + let objects = inner.client.list_objects(list_req).sync().unwrap(); assert!(!objects.is_truncated.unwrap_or(false)); for path in objects.contents.unwrap() { let delete_req = DeleteObjectRequest { - bucket: self.0.bucket.to_owned(), + bucket: inner.bucket.to_owned(), key: path.key.unwrap(), ..Default::default() }; - self.0.client.delete_object(delete_req).sync().unwrap(); + inner.client.delete_object(delete_req).sync().unwrap(); } let delete_req = DeleteBucketRequest { - bucket: self.0.bucket.to_owned(), + bucket: inner.bucket.to_owned(), }; - self.0 - .client + inner.client .delete_bucket(delete_req) .sync() .expect("failed to delete test bucket"); @@ -213,7 +203,7 @@ pub(crate) mod tests { } #[test] - fn test_path_get() { + fn test_get() { wrapper(|env| { let blob = Blob { path: "dir/foo.txt".into(), @@ -224,7 +214,7 @@ pub(crate) mod tests { // Add a test file to the database let s3 = env.s3(); - s3.upload(blob.clone()).unwrap(); + s3.upload(slice::from_ref(&blob)).unwrap(); // Test that the proper file was returned s3.assert_blob(&blob, "dir/foo.txt"); @@ -247,7 +237,6 @@ pub(crate) mod tests { "a_very_long_file_name_that_has_an.extension", "parent/child", "h/i/g/h/l/y/_/n/e/s/t/e/d/_/d/i/r/e/c/t/o/r/i/e/s", - "trailing/slash/", ]; let blobs: Vec<_> = names.iter().map(|&path| Blob { path: path.into(), @@ -255,13 +244,16 @@ pub(crate) mod tests { date_updated: Timespec::new(42, 0), content: "Hello world!".into(), }).collect(); - s3.0.store_batch(&blobs).unwrap(); + s3.upload(&blobs).unwrap(); for blob in &blobs { s3.assert_blob(blob, &blob.path); } Ok(()) }) } + // NOTE: trying to upload a file ending with `/` will behave differently in test and prod. + // NOTE: On s3, it will succeed and create a file called `/`. + // NOTE: On min.io, it will fail with 'Object name contains unsupported characters.' } pub(crate) static S3_BUCKET_NAME: &str = "rust-docs-rs"; From 4c4c623847fadc19ab4fea8194967ade93f8ec52 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Sat, 25 Apr 2020 20:42:50 -0400 Subject: [PATCH 31/38] rustfmt go brrr --- src/storage/s3.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 18c7bed7f..2b3df49a5 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -179,7 +179,8 @@ pub(crate) mod tests { let delete_req = DeleteBucketRequest { bucket: inner.bucket.to_owned(), }; - inner.client + inner + .client .delete_bucket(delete_req) .sync() .expect("failed to delete test bucket"); @@ -238,12 +239,15 @@ pub(crate) mod tests { "parent/child", "h/i/g/h/l/y/_/n/e/s/t/e/d/_/d/i/r/e/c/t/o/r/i/e/s", ]; - let blobs: Vec<_> = names.iter().map(|&path| Blob { - path: path.into(), - mime: "text/plain".into(), - date_updated: Timespec::new(42, 0), - content: "Hello world!".into(), - }).collect(); + let blobs: Vec<_> = names + .iter() + .map(|&path| Blob { + path: path.into(), + mime: "text/plain".into(), + date_updated: Timespec::new(42, 0), + content: "Hello world!".into(), + }) + .collect(); s3.upload(&blobs).unwrap(); for blob in &blobs { s3.assert_blob(blob, &blob.path); From 2194aa89ff3179bbecb345b6d9e19ae282bd336f Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Sat, 25 Apr 2020 21:31:43 -0400 Subject: [PATCH 32/38] Cleanup - remove unused imports - remove unused rusoto_mock dependency --- Cargo.lock | 15 --------------- Cargo.toml | 1 - src/storage/s3.rs | 6 +++--- 3 files changed, 3 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a231125ac..174df43eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -385,7 +385,6 @@ dependencies = [ "router 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_core 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_credential 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)", - "rusoto_mock 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_s3 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)", "rustwide 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2280,19 +2279,6 @@ dependencies = [ "tokio-timer 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "rusoto_mock" -version = "0.40.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "http 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "rusoto_core 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.51 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "rusoto_s3" version = "0.40.0" @@ -3730,7 +3716,6 @@ dependencies = [ "checksum router 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b9b1797ff166029cb632237bb5542696e54961b4cf75a324c6f05c9cf0584e4e" "checksum rusoto_core 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)" = "dd1a1069ba04874a485528d1602fab4569f2434a5547614428e2cc22b91bfb71" "checksum rusoto_credential 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b0d6cc3a602f01b9c5a04c8ed4ee281b789c5b2692d93202367c9b99ebc022ed" -"checksum rusoto_mock 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6b22b45fb22064594c5f48284ba361efa90fc711662dd9744bb388927b67c82a" "checksum rusoto_s3 0.40.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4da6eac54781d2aac517a99f1d85d0d6a78674543f8d122d884628c1ff21b495" "checksum rust-argon2 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2bc8af4bda8e1ff4932523b94d3dd20ee30a87232323eda55903ffd71d2fb017" "checksum rustc-demangle 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "4c691c0e608126e00913e33f0ccf3727d5fc84573623b8d65b2df340b5201783" diff --git a/Cargo.toml b/Cargo.toml index d32da6b16..b94314e71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,6 @@ once_cell = "1.2.0" kuchiki = "0.8" criterion = "0.3" rand = "0.7.3" -rusoto_mock = "0.40" [[bench]] name = "html5ever" diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 2b3df49a5..91d1b8620 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -106,10 +106,8 @@ pub(crate) mod tests { use crate::test::*; use crate::storage::s3::S3Backend; - use rusoto_core::RusotoResult; use rusoto_s3::{ - CreateBucketRequest, DeleteBucketRequest, DeleteObjectRequest, ListObjectsRequest, - PutObjectError, PutObjectOutput, PutObjectRequest, S3, + CreateBucketRequest, DeleteBucketRequest, DeleteObjectRequest, ListObjectsRequest, S3, }; use std::cell::RefCell; @@ -161,6 +159,8 @@ pub(crate) mod tests { impl Drop for TestS3 { fn drop(&mut self) { + // delete the bucket when the test ends + // this has to delete all the objects in the bucket first or min.io will give an error let inner = self.0.borrow(); let list_req = ListObjectsRequest { bucket: inner.bucket.to_owned(), From 2911c7a3d2862e018a4869ad5ba563c1e3e0e2b3 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Sat, 25 Apr 2020 22:02:00 -0400 Subject: [PATCH 33/38] Add instructions to README for running s3 tests --- README.md | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 2a4acb1f8..9d06c516b 100644 --- a/README.md +++ b/README.md @@ -86,16 +86,22 @@ cargo test ``` Some tests require access to the database. To run them, set the -`CRATESFYI_DATABASE_URL` to the url of a PostgreSQL database. If you are using -the `docker-compose` environment to run tests against, see the -[Docker-Compose][docker-compose-section] section for the default PostgreSQL URL. -You don't have to run the migrations on it or ensure it's empty, as all the -tests use temporary tables to prevent conflicts with each other or existing -data. See the [wiki page on developing outside docker-compose][wiki-no-compose] +`CRATESFYI_DATABASE_URL` in `.env` to the url of a PostgreSQL database, +and set the `AWS_ACCESS_KEY_ID`, `S3_ENDPOINT`, and `AWS_SECRET_ACCESS_KEY` variables. +We have some reasonable default parameters in `.env.sample`. + +For example, if you are using the `docker-compose` environment to run tests against, you can launch only the database and s3 server like so: + +```console +docker-compose up -d db s3 +``` + +If you don't want to use docker-compose, see the +[wiki page on developing outside docker-compose][wiki-no-compose] for more information on how to setup this environment. +Note that either way, you will need docker installed for sandboxing with Rustwide. [wiki-no-compose]: https://github.com/rust-lang/docs.rs/wiki/Developing-without-docker-compose -[docker-compose-section]: #Docker-Compose ### Docker-Compose From 826844509c20bb2f92aaeba585d0a203b0e320dd Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Tue, 28 Apr 2020 13:52:17 -0400 Subject: [PATCH 34/38] Move some code around --- src/storage/s3.rs | 56 +++++++++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 91d1b8620..ae9861758 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -10,6 +10,8 @@ use std::io::Read; use time::Timespec; use tokio::runtime::Runtime; +pub(crate) static S3_BUCKET_NAME: &str = "rust-docs-rs"; + pub(crate) struct S3Backend<'a> { client: S3Client, bucket: &'a str, @@ -100,6 +102,32 @@ fn parse_timespec(raw: &str) -> Result { Ok(time::strptime(raw, TIME_FMT)?.to_timespec()) } +pub(crate) fn s3_client() -> Option { + // If AWS keys aren't configured, then presume we should use the DB exclusively + // for file storage. + if std::env::var_os("AWS_ACCESS_KEY_ID").is_none() && std::env::var_os("FORCE_S3").is_none() { + return None; + } + let creds = match DefaultCredentialsProvider::new() { + Ok(creds) => creds, + Err(err) => { + warn!("failed to retrieve AWS credentials: {}", err); + return None; + } + }; + Some(S3Client::new_with( + rusoto_core::request::HttpClient::new().unwrap(), + creds, + std::env::var("S3_ENDPOINT") + .ok() + .map(|e| Region::Custom { + name: std::env::var("S3_REGION").unwrap_or_else(|_| "us-west-1".to_owned()), + endpoint: e, + }) + .unwrap_or(Region::UsWest1), + )) +} + #[cfg(test)] pub(crate) mod tests { use super::*; @@ -259,31 +287,3 @@ pub(crate) mod tests { // NOTE: On s3, it will succeed and create a file called `/`. // NOTE: On min.io, it will fail with 'Object name contains unsupported characters.' } - -pub(crate) static S3_BUCKET_NAME: &str = "rust-docs-rs"; - -pub(crate) fn s3_client() -> Option { - // If AWS keys aren't configured, then presume we should use the DB exclusively - // for file storage. - if std::env::var_os("AWS_ACCESS_KEY_ID").is_none() && std::env::var_os("FORCE_S3").is_none() { - return None; - } - let creds = match DefaultCredentialsProvider::new() { - Ok(creds) => creds, - Err(err) => { - warn!("failed to retrieve AWS credentials: {}", err); - return None; - } - }; - Some(S3Client::new_with( - rusoto_core::request::HttpClient::new().unwrap(), - creds, - std::env::var("S3_ENDPOINT") - .ok() - .map(|e| Region::Custom { - name: std::env::var("S3_REGION").unwrap_or_else(|_| "us-west-1".to_owned()), - endpoint: e, - }) - .unwrap_or(Region::UsWest1), - )) -} From ec9280e5c6063c08cc4c0471793e8c3b55907283 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Tue, 28 Apr 2020 15:08:50 -0400 Subject: [PATCH 35/38] Don't read all files into memory at once --- src/storage/mod.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index e0b4fbd6a..6809ce9e6 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -101,7 +101,7 @@ impl<'a> Storage<'a> { let trans = conn.transaction()?; let mut file_paths_and_mimes = HashMap::new(); - get_file_list(root_dir)? + let mut blobs = get_file_list(root_dir)? .into_iter() .filter_map(|file_path| { // Some files have insufficient permissions @@ -111,7 +111,7 @@ impl<'a> Storage<'a> { .ok() .map(|file| (file_path, file)) }) - .map(|(file_path, mut file)| { + .map(|(file_path, mut file)| -> Result<_, Error> { let mut content: Vec = Vec::new(); file.read_to_end(&mut content)?; @@ -132,10 +132,17 @@ impl<'a> Storage<'a> { // this field is ignored by the backend date_updated: Timespec::new(0, 0), }) - }) - .collect::, Error>>()? - .chunks(MAX_CONCURRENT_UPLOADS) - .try_for_each(|batch| self.store_batch(batch, &trans))?; + }); + loop { + let batch: Vec<_> = blobs + .by_ref() + .take(MAX_CONCURRENT_UPLOADS) + .collect::>()?; + if batch.is_empty() { + break; + } + self.store_batch(&batch, &trans)?; + } trans.commit()?; Ok(file_paths_and_mimes) From 034b8caa03d8e3c239d3b7472fdf88a4e8d2a13d Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Tue, 28 Apr 2020 15:20:52 -0400 Subject: [PATCH 36/38] Add test for uploading more than a single batch of files --- src/storage/mod.rs | 51 ++++++++++++++++++++++++++++++++++++++++++++-- src/storage/s3.rs | 8 +++----- 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 6809ce9e6..830fb2430 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -14,6 +14,8 @@ use std::fs; use std::io::Read; use std::path::{Path, PathBuf}; +const MAX_CONCURRENT_UPLOADS: usize = 1000; + #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub(crate) struct Blob { pub(crate) path: String, @@ -96,8 +98,6 @@ impl<'a> Storage<'a> { prefix: &str, root_dir: &Path, ) -> Result, Error> { - const MAX_CONCURRENT_UPLOADS: usize = 1000; - let trans = conn.transaction()?; let mut file_paths_and_mimes = HashMap::new(); @@ -191,6 +191,40 @@ mod test { use crate::test::wrapper; use std::env; + pub(crate) fn assert_blob_eq(blob: &Blob, actual: &Blob) { + assert_eq!(blob.path, actual.path); + assert_eq!(blob.content, actual.content); + assert_eq!(blob.mime, actual.mime); + // NOTE: this does _not_ compare the upload time since min.io doesn't allow this to be configured + } + + pub(crate) fn test_roundtrip(blobs: &[Blob]) { + let dir = tempdir::TempDir::new("docs.rs-upload-test").unwrap(); + for blob in blobs { + let path = dir.path().join(&blob.path); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).unwrap(); + } + fs::write(path, &blob.content).expect("failed to write to file"); + } + wrapper(|env| { + let db = env.db(); + let conn = db.conn(); + let mut backend = Storage::Database(DatabaseBackend::new(&conn)); + let stored_files = backend.store_all(&conn, "", dir.path()).unwrap(); + assert_eq!(stored_files.len(), blobs.len()); + for blob in blobs { + let name = Path::new(&blob.path); + assert!(stored_files.contains_key(name)); + + let actual = backend.get(&blob.path).unwrap(); + assert_blob_eq(blob, &actual); + } + + Ok(()) + }); + } + #[test] fn test_uploads() { use std::fs; @@ -235,6 +269,19 @@ mod test { }) } + #[test] + fn test_batched_uploads() { + let uploads: Vec<_> = (0..=MAX_CONCURRENT_UPLOADS + 1) + .map(|i| Blob { + mime: "text/rust".into(), + content: "fn main() {}".into(), + path: format!("{}.rs", i), + date_updated: Timespec::new(42, 0), + }) + .collect(); + test_roundtrip(&uploads); + } + #[test] fn test_get_file_list() { let _ = env_logger::try_init(); diff --git a/src/storage/s3.rs b/src/storage/s3.rs index ae9861758..b4c9906a4 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -131,6 +131,7 @@ pub(crate) fn s3_client() -> Option { #[cfg(test)] pub(crate) mod tests { use super::*; + use crate::storage::test::*; use crate::test::*; use crate::storage::s3::S3Backend; @@ -176,12 +177,9 @@ pub(crate) mod tests { x => panic!("wrong error: {:?}", x), }; } - fn assert_blob(&self, blob: &Blob, path: &str) { + pub(crate) fn assert_blob(&self, blob: &Blob, path: &str) { let actual = self.0.borrow().get(path).unwrap(); - assert_eq!(blob.path, actual.path); - assert_eq!(blob.content, actual.content); - assert_eq!(blob.mime, actual.mime); - // NOTE: this does _not_ compare the upload time since min.io doesn't allow this to be configured + assert_blob_eq(blob, &actual); } } From e0104d0ed276aa16484964835f8650bbcb3a9016 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Wed, 29 Apr 2020 09:41:36 -0400 Subject: [PATCH 37/38] Move TestS3 into a submodule --- src/storage/s3.rs | 90 +++------------------------------------ src/storage/s3/test_s3.rs | 77 +++++++++++++++++++++++++++++++++ src/test/mod.rs | 2 +- 3 files changed, 85 insertions(+), 84 deletions(-) create mode 100644 src/storage/s3/test_s3.rs diff --git a/src/storage/s3.rs b/src/storage/s3.rs index b4c9906a4..89df3f693 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -10,6 +10,11 @@ use std::io::Read; use time::Timespec; use tokio::runtime::Runtime; +#[cfg(test)] +mod test_s3; +#[cfg(test)] +pub(crate) use test_s3::TestS3; + pub(crate) static S3_BUCKET_NAME: &str = "rust-docs-rs"; pub(crate) struct S3Backend<'a> { @@ -95,10 +100,8 @@ impl<'a> S3Backend<'a> { } } -// public for testing -pub(crate) const TIME_FMT: &str = "%a, %d %b %Y %H:%M:%S %Z"; - fn parse_timespec(raw: &str) -> Result { + const TIME_FMT: &str = "%a, %d %b %Y %H:%M:%S %Z"; Ok(time::strptime(raw, TIME_FMT)?.to_timespec()) } @@ -129,90 +132,11 @@ pub(crate) fn s3_client() -> Option { } #[cfg(test)] -pub(crate) mod tests { +pub(crate) mod test { use super::*; - use crate::storage::test::*; use crate::test::*; - - use crate::storage::s3::S3Backend; - use rusoto_s3::{ - CreateBucketRequest, DeleteBucketRequest, DeleteObjectRequest, ListObjectsRequest, S3, - }; - - use std::cell::RefCell; use std::slice; - pub(crate) struct TestS3(RefCell>); - - impl TestS3 { - pub(crate) fn new() -> Self { - // A random bucket name is generated and used for the current connection. - // This allows each test to create a fresh bucket to test with. - let bucket = format!("docs-rs-test-bucket-{}", rand::random::()); - let client = s3_client().unwrap(); - client - .create_bucket(CreateBucketRequest { - bucket: bucket.clone(), - ..Default::default() - }) - .sync() - .expect("failed to create test bucket"); - let bucket = Box::leak(bucket.into_boxed_str()); - TestS3(RefCell::new(S3Backend::new(client, bucket))) - } - pub(crate) fn upload(&self, blobs: &[Blob]) -> Result<(), Error> { - self.0.borrow_mut().store_batch(blobs) - } - fn assert_404(&self, path: &'static str) { - use rusoto_core::RusotoError; - use rusoto_s3::GetObjectError; - - let err = self.0.borrow().get(path).unwrap_err(); - match err - .downcast_ref::>() - .expect("wanted GetObject") - { - RusotoError::Unknown(http) => assert_eq!(http.status, 404), - RusotoError::Service(GetObjectError::NoSuchKey(_)) => {} - x => panic!("wrong error: {:?}", x), - }; - } - pub(crate) fn assert_blob(&self, blob: &Blob, path: &str) { - let actual = self.0.borrow().get(path).unwrap(); - assert_blob_eq(blob, &actual); - } - } - - impl Drop for TestS3 { - fn drop(&mut self) { - // delete the bucket when the test ends - // this has to delete all the objects in the bucket first or min.io will give an error - let inner = self.0.borrow(); - let list_req = ListObjectsRequest { - bucket: inner.bucket.to_owned(), - ..Default::default() - }; - let objects = inner.client.list_objects(list_req).sync().unwrap(); - assert!(!objects.is_truncated.unwrap_or(false)); - for path in objects.contents.unwrap() { - let delete_req = DeleteObjectRequest { - bucket: inner.bucket.to_owned(), - key: path.key.unwrap(), - ..Default::default() - }; - inner.client.delete_object(delete_req).sync().unwrap(); - } - let delete_req = DeleteBucketRequest { - bucket: inner.bucket.to_owned(), - }; - inner - .client - .delete_bucket(delete_req) - .sync() - .expect("failed to delete test bucket"); - } - } - #[test] fn test_parse_timespec() { // Test valid conversions diff --git a/src/storage/s3/test_s3.rs b/src/storage/s3/test_s3.rs new file mode 100644 index 000000000..937f20127 --- /dev/null +++ b/src/storage/s3/test_s3.rs @@ -0,0 +1,77 @@ +use super::*; +use crate::storage::test::assert_blob_eq; +use rusoto_s3::{ + CreateBucketRequest, DeleteBucketRequest, DeleteObjectRequest, ListObjectsRequest, S3, +}; +use std::cell::RefCell; + +pub(crate) struct TestS3(RefCell>); + +impl TestS3 { + pub(crate) fn new() -> Self { + // A random bucket name is generated and used for the current connection. + // This allows each test to create a fresh bucket to test with. + let bucket = format!("docs-rs-test-bucket-{}", rand::random::()); + let client = s3_client().unwrap(); + client + .create_bucket(CreateBucketRequest { + bucket: bucket.clone(), + ..Default::default() + }) + .sync() + .expect("failed to create test bucket"); + let bucket = Box::leak(bucket.into_boxed_str()); + TestS3(RefCell::new(S3Backend::new(client, bucket))) + } + pub(crate) fn upload(&self, blobs: &[Blob]) -> Result<(), Error> { + self.0.borrow_mut().store_batch(blobs) + } + pub(crate) fn assert_404(&self, path: &'static str) { + use rusoto_core::RusotoError; + use rusoto_s3::GetObjectError; + + let err = self.0.borrow().get(path).unwrap_err(); + match err + .downcast_ref::>() + .expect("wanted GetObject") + { + RusotoError::Unknown(http) => assert_eq!(http.status, 404), + RusotoError::Service(GetObjectError::NoSuchKey(_)) => {} + x => panic!("wrong error: {:?}", x), + }; + } + pub(crate) fn assert_blob(&self, blob: &Blob, path: &str) { + let actual = self.0.borrow().get(path).unwrap(); + assert_blob_eq(blob, &actual); + } +} + +impl Drop for TestS3 { + fn drop(&mut self) { + // delete the bucket when the test ends + // this has to delete all the objects in the bucket first or min.io will give an error + let inner = self.0.borrow(); + let list_req = ListObjectsRequest { + bucket: inner.bucket.to_owned(), + ..Default::default() + }; + let objects = inner.client.list_objects(list_req).sync().unwrap(); + assert!(!objects.is_truncated.unwrap_or(false)); + for path in objects.contents.unwrap() { + let delete_req = DeleteObjectRequest { + bucket: inner.bucket.to_owned(), + key: path.key.unwrap(), + ..Default::default() + }; + inner.client.delete_object(delete_req).sync().unwrap(); + } + let delete_req = DeleteBucketRequest { + bucket: inner.bucket.to_owned(), + }; + inner + .client + .delete_bucket(delete_req) + .sync() + .expect("failed to delete test bucket"); + } +} diff --git a/src/test/mod.rs b/src/test/mod.rs index 956bd1ad0..2d7d3f0bd 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -1,6 +1,6 @@ mod fakes; -use crate::storage::s3::tests::TestS3; +use crate::storage::s3::TestS3; use crate::web::Server; use failure::Error; use log::error; From 5d982d7c7f231fb2559def05c617ca34eb471189 Mon Sep 17 00:00:00 2001 From: Joshua Nelson Date: Fri, 1 May 2020 12:07:55 -0400 Subject: [PATCH 38/38] test_s3 -> test --- src/storage/s3.rs | 6 +++--- src/storage/s3/{test_s3.rs => test.rs} | 0 2 files changed, 3 insertions(+), 3 deletions(-) rename src/storage/s3/{test_s3.rs => test.rs} (100%) diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 89df3f693..6cf3fbf95 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -11,9 +11,9 @@ use time::Timespec; use tokio::runtime::Runtime; #[cfg(test)] -mod test_s3; +mod test; #[cfg(test)] -pub(crate) use test_s3::TestS3; +pub(crate) use test::TestS3; pub(crate) static S3_BUCKET_NAME: &str = "rust-docs-rs"; @@ -132,7 +132,7 @@ pub(crate) fn s3_client() -> Option { } #[cfg(test)] -pub(crate) mod test { +pub(crate) mod tests { use super::*; use crate::test::*; use std::slice; diff --git a/src/storage/s3/test_s3.rs b/src/storage/s3/test.rs similarity index 100% rename from src/storage/s3/test_s3.rs rename to src/storage/s3/test.rs