From 38175fd37fc4ba6d396af79d83fb82c9036a254b Mon Sep 17 00:00:00 2001 From: Pietro Albini Date: Mon, 15 Mar 2021 12:07:14 +0100 Subject: [PATCH 1/4] downloads_counter: batch download count writes to the db --- Cargo.lock | 11 +++ Cargo.toml | 1 + src/app.rs | 5 + src/bin/server.rs | 21 ++++- src/config.rs | 9 ++ src/controllers/version/downloads.rs | 63 ++++--------- src/downloads_counter.rs | 136 +++++++++++++++++++++++++++ src/lib.rs | 1 + src/models/download.rs | 21 +---- src/tests/util/test_app.rs | 1 + 10 files changed, 204 insertions(+), 65 deletions(-) create mode 100644 src/downloads_counter.rs diff --git a/Cargo.lock b/Cargo.lock index a316baa6f60..7b495899d31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -268,6 +268,7 @@ dependencies = [ "conduit-test", "cookie", "ctrlc", + "dashmap", "derive_deref", "dialoguer", "diesel", @@ -665,6 +666,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "dashmap" +version = "4.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c" +dependencies = [ + "cfg-if 1.0.0", + "num_cpus", +] + [[package]] name = "debugid" version = "0.7.2" diff --git a/Cargo.toml b/Cargo.toml index 1ac48c0af45..e23c5ac4779 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ conduit-static = "0.9.0-alpha.3" cookie = { version = "0.15", features = ["secure"] } ctrlc = { version = "3.0", features = ["termination"] } +dashmap = { version = "4.0.2", features = ["raw-api"] } derive_deref = "1.1.1" dialoguer = "0.7.1" diesel = { version = "1.4.0", features = ["postgres", "serde_json", "chrono", "r2d2"] } diff --git a/src/app.rs b/src/app.rs index db099ff2b61..61597f7fce5 100644 --- a/src/app.rs +++ b/src/app.rs @@ -3,6 +3,7 @@ use crate::{db, Config, Env}; use std::{sync::Arc, time::Duration}; +use crate::downloads_counter::DownloadsCounter; use crate::github::GitHubClient; use diesel::r2d2; use oauth2::basic::BasicClient; @@ -32,6 +33,9 @@ pub struct App { /// The server configuration pub config: Config, + /// Count downloads and periodically persist them in the database + pub downloads_counter: DownloadsCounter, + /// A configured client for outgoing HTTP requests /// /// In production this shares a single connection pool across requests. In tests @@ -131,6 +135,7 @@ impl App { github_oauth, session_key: config.session_key.clone(), config, + downloads_counter: DownloadsCounter::new(), http_client, } } diff --git a/src/bin/server.rs b/src/bin/server.rs index f5ba462809a..b0ca84484ca 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -49,9 +49,12 @@ fn main() -> Result<(), Box> { let config = cargo_registry::Config::default(); let client = Client::new(); + let app = Arc::new(App::new(config.clone(), Some(client))); - let app = App::new(config.clone(), Some(client)); - let app = cargo_registry::build_handler(Arc::new(app)); + // Start the background thread periodically persisting download counts to the database. + downloads_counter_thread(app.clone()); + + let app = cargo_registry::build_handler(app); // On every server restart, ensure the categories available in the database match // the information in *src/categories.toml*. @@ -184,3 +187,17 @@ where }) .unwrap(); } + +fn downloads_counter_thread(app: Arc) { + let interval = Duration::from_millis( + (app.config.downloads_persist_interval_ms / app.downloads_counter.shards_count()) as u64, + ); + + std::thread::spawn(move || loop { + std::thread::sleep(interval); + + if let Err(err) = app.downloads_counter.persist_next_shard(&app) { + println!("downloads_counter error: {}", err); + } + }); +} diff --git a/src/config.rs b/src/config.rs index cdc5753b04f..4e7187b2b37 100644 --- a/src/config.rs +++ b/src/config.rs @@ -19,6 +19,7 @@ pub struct Config { pub blocked_traffic: Vec<(String, Vec)>, pub domain_name: String, pub allowed_origins: Vec, + pub downloads_persist_interval_ms: usize, } impl Default for Config { @@ -45,6 +46,7 @@ impl Default for Config { /// - `READ_ONLY_REPLICA_URL`: The URL of an optional postgres read-only replica database. /// - `BLOCKED_TRAFFIC`: A list of headers and environment variables to use for blocking ///. traffic. See the `block_traffic` module for more documentation. + /// - `DOWNLOADS_PERSIST_INTERVAL_MS`: how frequent to persist download counts (in ms). fn default() -> Config { let api_protocol = String::from("https"); let mirror = if dotenv::var("MIRROR").is_ok() { @@ -144,6 +146,13 @@ impl Default for Config { blocked_traffic: blocked_traffic(), domain_name: domain_name(), allowed_origins, + downloads_persist_interval_ms: dotenv::var("DOWNLOADS_PERSIST_INTERVAL_MS") + .map(|interval| { + interval + .parse() + .expect("invalid DOWNLOADS_PERSIST_INTERVAL_MS") + }) + .unwrap_or(60_000), // 1 minute } } } diff --git a/src/controllers/version/downloads.rs b/src/controllers/version/downloads.rs index 24744eddbd0..75c2db80fdd 100644 --- a/src/controllers/version/downloads.rs +++ b/src/controllers/version/downloads.rs @@ -20,7 +20,26 @@ pub fn download(req: &mut dyn RequestExt) -> EndpointResult { let crate_name = &req.params()["crate_id"]; let version = &req.params()["version"]; - let (crate_name, was_counted) = increment_download_counts(req, recorder, crate_name, version)?; + let (version_id, crate_name): (_, String) = { + use self::versions::dsl::*; + + let conn = recorder.record("get_conn", || req.db_conn())?; + + // Returns the crate name as stored in the database, or an error if we could + // not load the version ID from the database. + recorder.record("get_version", || { + versions + .inner_join(crates::table) + .select((id, crates::name)) + .filter(Crate::with_name(crate_name)) + .filter(num.eq(version)) + .first(&*conn) + })? + }; + + // The increment does not happen instantly, but it's deferred to be executed in a batch + // along with other downloads. See crate::downloads_counter for the implementation. + req.app().downloads_counter.increment(version_id); let redirect_url = req .app() @@ -28,12 +47,6 @@ pub fn download(req: &mut dyn RequestExt) -> EndpointResult { .uploader .crate_location(&crate_name, version); - // Adding log metadata requires &mut access, so we have to defer this step until - // after the (immutable) query parameters are no longer used. - if !was_counted { - req.log_metadata("uncounted_dl", "true"); - } - if req.wants_json() { #[derive(Serialize)] struct R { @@ -45,42 +58,6 @@ pub fn download(req: &mut dyn RequestExt) -> EndpointResult { } } -/// Increment the download counts for a given crate version. -/// -/// Returns the crate name as stored in the database, or an error if we could -/// not load the version ID from the database. -/// -/// This ignores any errors that occur updating the download count. Failure is -/// expected if the application is in read only mode, or for API-only mirrors. -/// Even if failure occurs for unexpected reasons, we would rather have `cargo -/// build` succeed and not count the download than break people's builds. -fn increment_download_counts( - req: &dyn RequestExt, - recorder: TimingRecorder, - crate_name: &str, - version: &str, -) -> AppResult<(String, bool)> { - use self::versions::dsl::*; - - let conn = recorder.record("get_conn", || req.db_conn())?; - - let (version_id, crate_name) = recorder.record("get_version", || { - versions - .inner_join(crates::table) - .select((id, crates::name)) - .filter(Crate::with_name(crate_name)) - .filter(num.eq(version)) - .first(&*conn) - })?; - - // Wrap in a transaction so we don't poison the outer transaction if this - // fails - let res = recorder.record("update_count", || { - conn.transaction(|| VersionDownload::create_or_increment(version_id, &conn)) - }); - Ok((crate_name, res.is_ok())) -} - /// Handles the `GET /crates/:crate_id/:version/downloads` route. pub fn downloads(req: &mut dyn RequestExt) -> EndpointResult { let (crate_name, semver) = extract_crate_name_and_semver(req)?; diff --git a/src/downloads_counter.rs b/src/downloads_counter.rs new file mode 100644 index 00000000000..f549d803a80 --- /dev/null +++ b/src/downloads_counter.rs @@ -0,0 +1,136 @@ +use crate::App; +use anyhow::Error; +use dashmap::{DashMap, SharedValue}; +use diesel::{pg::upsert::excluded, prelude::*}; +use std::collections::HashMap; +use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; + +/// crates.io receives a lot of download requests, and we can't execute a write query to the +/// database during each connection for performance reasons. To reduce the write load, this struct +/// collects the pending updates from the current process and writes in batch. +/// +/// To avoid locking the whole data structure behind a RwLock, which could potentially delay +/// requests, this uses the dashmap crate. A DashMap has the same public API as an HashMap, but +/// stores the items into `num_cpus()*4` individually locked shards. This approach reduces the +/// likelyhood of a request encountering a locked shard. +/// +/// Persisting the download counts in the database also takes advantage of the inner sharding of +/// DashMaps: to avoid locking all the download requests at the same time each iteration only +/// persists a single shard at the time. +/// +/// The disadvantage of this approach is that download counts are stored in memory until they're +/// persisted, so it's possible to lose some of them if the process exits ungracefully. While +/// that's far from ideal, the advantage of batching database updates far outweights potentially +/// losing some download counts. +#[derive(Debug)] +pub struct DownloadsCounter { + /// Inner storage for the download counts. + inner: DashMap, + /// Index of the next shard that should be persisted by `persist_next_shard`. + shard_idx: AtomicUsize, + /// Number of downloads that are not yet persisted on the database. This is just used as a + /// metric included in log lines, and it's not guaranteed to be accurate. + pending_count: AtomicI64, +} + +impl DownloadsCounter { + pub(crate) fn new() -> Self { + Self { + inner: DashMap::new(), + shard_idx: AtomicUsize::new(0), + pending_count: AtomicI64::new(0), + } + } + + pub(crate) fn increment(&self, version_id: i32) { + self.pending_count.fetch_add(1, Ordering::SeqCst); + + if let Some(counter) = self.inner.get(&version_id) { + // The version is already recorded in the DashMap, so we don't need to lock the whole + // shard in write mode. The shard is instead locked in read mode, which allows an + // unbounded number of readers as long as there are no write locks. + counter.value().fetch_add(1, Ordering::SeqCst); + } else { + // The version is not in the DashMap, so we need to lock the whole shard in write mode + // and insert the version into it. This has worse performance than the above case. + self.inner + .entry(version_id) + .and_modify(|counter| { + // Handle the version being inserted by another thread while we were waiting + // for the write lock on the shard. + counter.fetch_add(1, Ordering::SeqCst); + }) + .or_insert_with(|| AtomicUsize::new(1)); + } + } + + pub fn persist_next_shard(&self, app: &App) -> Result<(), Error> { + let conn = app.primary_database.get()?; + + // Replace the next shard in the ring with an empty HashMap (clearing it), and return the + // previous contents for processing. The fetch_add method wraps around on overflow, so it's + // fine to keep incrementing it without resetting. + let shards = self.inner.shards(); + let idx = self.shard_idx.fetch_add(1, Ordering::SeqCst) % shards.len(); + let shard = std::mem::take(&mut *shards[idx].write()); + + let stats = self.persist_shard(&conn, shard)?; + println!( + "download_counter shard={} counted_versions={} counted_downloads={} pending_downloads={}", + idx, + stats.counted_versions, + stats.counted_downloads, + stats.pending_downloads, + ); + + Ok(()) + } + + fn persist_shard( + &self, + conn: &PgConnection, + shard: HashMap>, + ) -> Result { + use crate::schema::version_downloads::dsl::*; + + let mut counted_downloads = 0; + let mut counted_versions = 0; + let mut to_insert = Vec::new(); + for (key, atomic) in shard.iter() { + let count = atomic.get().load(Ordering::SeqCst); + counted_downloads += count; + counted_versions += 1; + + to_insert.push((version_id.eq(*key), downloads.eq(count as i32))); + } + + if !to_insert.is_empty() { + diesel::insert_into(version_downloads) + .values(&to_insert) + .on_conflict((version_id, date)) + .do_update() + .set(downloads.eq(downloads + excluded(downloads))) + .execute(conn)?; + } + + let old_pending = self + .pending_count + .fetch_sub(counted_downloads as i64, Ordering::SeqCst); + + Ok(PersistStats { + counted_downloads, + counted_versions, + pending_downloads: old_pending - counted_downloads as i64, + }) + } + + pub fn shards_count(&self) -> usize { + self.inner.shards().len() + } +} + +struct PersistStats { + counted_downloads: usize, + counted_versions: usize, + pending_downloads: i64, +} diff --git a/src/lib.rs b/src/lib.rs index 850472e9424..2e0ec09aefe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,6 +36,7 @@ pub mod background_jobs; pub mod boot; mod config; pub mod db; +mod downloads_counter; pub mod email; pub mod git; pub mod github; diff --git a/src/models/download.rs b/src/models/download.rs index d48ec534e39..4b9ad49e300 100644 --- a/src/models/download.rs +++ b/src/models/download.rs @@ -1,8 +1,6 @@ -use chrono::NaiveDate; -use diesel::prelude::*; - use crate::models::Version; use crate::schema::version_downloads; +use chrono::NaiveDate; #[derive(Queryable, Identifiable, Associations, Debug, Clone, Copy)] #[belongs_to(Version)] @@ -14,20 +12,3 @@ pub struct VersionDownload { pub date: NaiveDate, pub processed: bool, } - -impl VersionDownload { - pub fn create_or_increment(version: i32, conn: &PgConnection) -> QueryResult<()> { - use self::version_downloads::dsl::*; - - // We only update the counter for *today* (the default date), - // nothing else. We have lots of other counters, but they're - // all updated later on via the update-downloads script. - diesel::insert_into(version_downloads) - .values(version_id.eq(version)) - .on_conflict((version_id, date)) - .do_update() - .set(downloads.eq(downloads + 1)) - .execute(conn)?; - Ok(()) - } -} diff --git a/src/tests/util/test_app.rs b/src/tests/util/test_app.rs index 31cd34e9867..43d6a4a185d 100644 --- a/src/tests/util/test_app.rs +++ b/src/tests/util/test_app.rs @@ -315,6 +315,7 @@ fn simple_config() -> Config { blocked_traffic: Default::default(), domain_name: "crates.io".into(), allowed_origins: Vec::new(), + downloads_persist_interval_ms: 1000, } } From f0da6cd5f92f18edff4a2bffaae619cb072f61c7 Mon Sep 17 00:00:00 2001 From: Pietro Albini Date: Mon, 15 Mar 2021 14:57:15 +0100 Subject: [PATCH 2/4] server: avoid shadowing the app --- src/bin/server.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/bin/server.rs b/src/bin/server.rs index b0ca84484ca..67b067220ae 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -54,7 +54,7 @@ fn main() -> Result<(), Box> { // Start the background thread periodically persisting download counts to the database. downloads_counter_thread(app.clone()); - let app = cargo_registry::build_handler(app); + let handler = cargo_registry::build_handler(app); // On every server restart, ensure the categories available in the database match // the information in *src/categories.toml*. @@ -103,7 +103,7 @@ fn main() -> Result<(), Box> { .build() .unwrap(); - let handler = Arc::new(conduit_hyper::BlockingHandler::new(app)); + let handler = Arc::new(conduit_hyper::BlockingHandler::new(handler)); let make_service = hyper::service::make_service_fn(move |socket: &hyper::server::conn::AddrStream| { let addr = socket.remote_addr(); @@ -134,7 +134,7 @@ fn main() -> Result<(), Box> { println!("Booting with a civet based server"); let mut cfg = civet::Config::new(); cfg.port(port).threads(threads).keep_alive(true); - Civet(CivetServer::start(cfg, app).unwrap()) + Civet(CivetServer::start(cfg, handler).unwrap()) }; println!("listening on port {}", port); From a13b94e95692cf4fd9dc88946bcd95895ad2f3b8 Mon Sep 17 00:00:00 2001 From: Pietro Albini Date: Mon, 15 Mar 2021 15:04:09 +0100 Subject: [PATCH 3/4] downloads_counter: persist all counters on shutdown --- src/bin/server.rs | 7 ++++++- src/downloads_counter.rs | 25 +++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/bin/server.rs b/src/bin/server.rs index 67b067220ae..5d5d2102f7c 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -54,7 +54,7 @@ fn main() -> Result<(), Box> { // Start the background thread periodically persisting download counts to the database. downloads_counter_thread(app.clone()); - let handler = cargo_registry::build_handler(app); + let handler = cargo_registry::build_handler(app.clone()); // On every server restart, ensure the categories available in the database match // the information in *src/categories.toml*. @@ -167,6 +167,11 @@ fn main() -> Result<(), Box> { } } + println!("Persisting remaining downloads counters"); + if let Err(err) = app.downloads_counter.persist_all_shards(&app) { + println!("downloads_counter error: {}", err); + } + println!("Server has gracefully shutdown!"); Ok(()) } diff --git a/src/downloads_counter.rs b/src/downloads_counter.rs index f549d803a80..cfd389efeda 100644 --- a/src/downloads_counter.rs +++ b/src/downloads_counter.rs @@ -64,6 +64,31 @@ impl DownloadsCounter { } } + pub fn persist_all_shards(&self, app: &App) -> Result<(), Error> { + let conn = app.primary_database.get()?; + + let mut counted_downloads = 0; + let mut counted_versions = 0; + let mut pending_downloads = 0; + for shard in self.inner.shards() { + let shard = std::mem::take(&mut *shard.write()); + let stats = self.persist_shard(&conn, shard)?; + + counted_downloads += stats.counted_downloads; + counted_versions += stats.counted_versions; + pending_downloads = stats.pending_downloads; + } + + println!( + "download_counter all_shards counted_versions={} counted_downloads={} pending_downloads={}", + counted_versions, + counted_downloads, + pending_downloads, + ); + + Ok(()) + } + pub fn persist_next_shard(&self, app: &App) -> Result<(), Error> { let conn = app.primary_database.get()?; From f646e45cc58437c6b29f738c8798b13580cb60fe Mon Sep 17 00:00:00 2001 From: Pietro Albini Date: Mon, 15 Mar 2021 21:00:47 +0100 Subject: [PATCH 4/4] tests: update download test to work with downloads_counter --- src/tests/krate/downloads.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/tests/krate/downloads.rs b/src/tests/krate/downloads.rs index fce16e42a38..0a326baa6f4 100644 --- a/src/tests/krate/downloads.rs +++ b/src/tests/krate/downloads.rs @@ -42,11 +42,24 @@ fn download() { // TODO: test the with_json code path }; + let persist_downloads_count = || { + app.as_inner() + .downloads_counter + .persist_all_shards(app.as_inner()) + .expect("failed to persist downloads count"); + }; + download("foo_download/1.0.0"); + // No downloads are counted until the counters are persisted + assert_dl_count("foo_download/1.0.0", None, 0); + assert_dl_count("foo_download", None, 0); + persist_downloads_count(); + // Now that the counters are persisted the download counts show up. assert_dl_count("foo_download/1.0.0", None, 1); assert_dl_count("foo_download", None, 1); download("FOO_DOWNLOAD/1.0.0"); + persist_downloads_count(); assert_dl_count("FOO_DOWNLOAD/1.0.0", None, 2); assert_dl_count("FOO_DOWNLOAD", None, 2);