From 86adf528cd89c631c931f44faf3a0e3ccd131a41 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Tue, 27 Feb 2024 17:39:53 +0100 Subject: [PATCH] Remove `BATCH_UPDATE_DOWNLOADS` feature flag We've very successfully used this on production for almost two weeks now. There does not appear to be any reason for switching it back off, so this commit is removing the feature flag and permanently enables the batch processing. --- src/config/server.rs | 4 - src/tests/util/test_app.rs | 1 - src/worker/jobs/downloads/update_metadata.rs | 347 ++----------------- 3 files changed, 36 insertions(+), 316 deletions(-) diff --git a/src/config/server.rs b/src/config/server.rs index 7966beedec5..b0c010fe026 100644 --- a/src/config/server.rs +++ b/src/config/server.rs @@ -45,9 +45,6 @@ pub struct Server { pub cdn_log_counting_enabled: bool, pub cdn_log_storage: CdnLogStorageConfig, pub cdn_log_queue: CdnLogQueueConfig, - /// If `true`, the `update_downloads` background job will use batch - /// processing with a static SQL query. - pub batch_update_downloads: bool, pub session_key: cookie::Key, pub gh_client_id: ClientId, pub gh_client_secret: ClientSecret, @@ -189,7 +186,6 @@ impl Server { cdn_log_counting_enabled: var("CDN_LOG_COUNTING_ENABLED")?.is_some(), cdn_log_storage: CdnLogStorageConfig::from_env()?, cdn_log_queue: CdnLogQueueConfig::from_env()?, - batch_update_downloads: var("BATCH_UPDATE_DOWNLOADS")?.is_some(), base, ip, port, diff --git a/src/tests/util/test_app.rs b/src/tests/util/test_app.rs index 3ddebc270cc..71c29f57fd3 100644 --- a/src/tests/util/test_app.rs +++ b/src/tests/util/test_app.rs @@ -431,7 +431,6 @@ fn simple_config() -> config::Server { cdn_log_counting_enabled: false, cdn_log_queue: CdnLogQueueConfig::Mock, cdn_log_storage: CdnLogStorageConfig::memory(), - batch_update_downloads: true, session_key: cookie::Key::derive_from("test this has to be over 32 bytes long".as_bytes()), gh_client_id: ClientId::new(dotenvy::var("GH_CLIENT_ID").unwrap_or_default()), gh_client_secret: ClientSecret::new(dotenvy::var("GH_CLIENT_SECRET").unwrap_or_default()), diff --git a/src/worker/jobs/downloads/update_metadata.rs b/src/worker/jobs/downloads/update_metadata.rs index 83018f8c043..c92d092eda1 100644 --- a/src/worker/jobs/downloads/update_metadata.rs +++ b/src/worker/jobs/downloads/update_metadata.rs @@ -1,5 +1,4 @@ -use crate::models::VersionDownload; -use crate::schema::{crates, metadata, version_downloads, versions}; +use crate::schema::version_downloads; use crate::worker::Environment; use anyhow::anyhow; use crates_io_worker::BackgroundJob; @@ -17,10 +16,8 @@ impl BackgroundJob for UpdateDownloads { type Context = Arc; async fn run(&self, env: Self::Context) -> anyhow::Result<()> { - let batch_update = env.config.batch_update_downloads; - let conn = env.deadpool.get().await?; - conn.interact(move |conn| update(conn, batch_update)) + conn.interact(update) .await .map_err(|err| anyhow!(err.to_string()))??; @@ -28,45 +25,36 @@ impl BackgroundJob for UpdateDownloads { } } -fn update(conn: &mut PgConnection, use_batch_update: bool) -> QueryResult<()> { +fn update(conn: &mut PgConnection) -> QueryResult<()> { use diesel::dsl::now; use diesel::select; - if use_batch_update { - info!("Updating versions…"); - - // After 45 minutes, we stop the batch updating process to avoid - // triggering the long-running job alert. 15 minutes later, the job - // will be started again by our cron service anyway. - const TIME_LIMIT: Duration = Duration::from_secs(45 * 60); - - // We update the `downloads` columns in batches to a) avoid the - // back and forth between the application and the database and b) avoid - // holding locks on any of the involved tables for too long. - const BATCH_SIZE: i64 = 5_000; - - let start_time = Instant::now(); - loop { - let update_count = batch_update(BATCH_SIZE, conn)?; - info!("Updated {update_count} versions"); - if update_count < BATCH_SIZE { - break; - } - - if start_time.elapsed() > TIME_LIMIT { - info!("Time limit reached, stopping batch update"); - break; - } + info!("Updating versions…"); + + // After 45 minutes, we stop the batch updating process to avoid + // triggering the long-running job alert. 15 minutes later, the job + // will be started again by our cron service anyway. + const TIME_LIMIT: Duration = Duration::from_secs(45 * 60); + + // We update the `downloads` columns in batches to a) avoid the + // back and forth between the application and the database and b) avoid + // holding locks on any of the involved tables for too long. + const BATCH_SIZE: i64 = 5_000; + + let start_time = Instant::now(); + loop { + let update_count = batch_update(BATCH_SIZE, conn)?; + info!("Updated {update_count} versions"); + if update_count < BATCH_SIZE { + break; + } + + if start_time.elapsed() > TIME_LIMIT { + info!("Time limit reached, stopping batch update"); + break; } - } else { - let rows = version_downloads::table - .filter(version_downloads::processed.eq(false)) - .filter(version_downloads::downloads.ne(version_downloads::counted)) - .load(conn)?; - - info!(rows = rows.len(), "Updating versions"); - collect(conn, &rows)?; } + info!("Finished updating versions"); // Anything older than 24 hours ago will be frozen and will not be queried @@ -177,275 +165,12 @@ fn batch_update(batch_size: i64, conn: &mut PgConnection) -> QueryResult { Ok(result.count) } -fn collect(conn: &mut PgConnection, rows: &[VersionDownload]) -> QueryResult<()> { - use diesel::update; - - for download in rows { - let amt = download.downloads - download.counted; - - conn.transaction::<_, diesel::result::Error, _>(|conn| { - // Update the total number of version downloads - let crate_id: i32 = update(versions::table.find(download.version_id)) - .set(versions::downloads.eq(versions::downloads + amt)) - .returning(versions::crate_id) - .get_result(conn)?; - - // Update the total number of crate downloads - update(crates::table.find(crate_id)) - .set(crates::downloads.eq(crates::downloads + amt)) - .execute(conn)?; - - // Update the global counter of total downloads - update(metadata::table) - .set(metadata::total_downloads.eq(metadata::total_downloads + i64::from(amt))) - .execute(conn)?; - - // Record that these downloads have been propagated to the other tables. This is done - // last, immediately before the transaction is committed, to minimize lock contention - // with counting new downloads. - update(version_downloads::table.find(download.id())) - .set(version_downloads::counted.eq(version_downloads::counted + amt)) - .execute(conn)?; - - Ok(()) - })?; - } - - Ok(()) -} - -#[cfg(test)] -mod test { - use super::*; - use crate::email::Emails; - use crate::models::{Crate, NewCrate, NewUser, NewVersion, User, Version}; - use crate::test_util::test_db_connection; - use std::collections::BTreeMap; - - fn user(conn: &mut PgConnection) -> User { - NewUser::new(2, "login", None, None, "access_token") - .create_or_update(None, &Emails::new_in_memory(), conn) - .unwrap() - } - - fn crate_and_version(conn: &mut PgConnection, user_id: i32) -> (Crate, Version) { - let krate = NewCrate { - name: "foo", - ..Default::default() - } - .create(conn, user_id) - .unwrap(); - let version = NewVersion::new( - krate.id, - &semver::Version::parse("1.0.0").unwrap(), - &BTreeMap::new(), - None, - 0, - user_id, - "0000000000000000000000000000000000000000000000000000000000000000".to_string(), - None, - None, - ) - .unwrap(); - let version = version.save(conn, "someone@example.com").unwrap(); - (krate, version) - } - - #[test] - fn increment() { - use diesel::dsl::*; - - let (_test_db, conn) = &mut test_db_connection(); - let user = user(conn); - let (krate, version) = crate_and_version(conn, user.id); - insert_into(version_downloads::table) - .values(version_downloads::version_id.eq(version.id)) - .execute(conn) - .unwrap(); - insert_into(version_downloads::table) - .values(( - version_downloads::version_id.eq(version.id), - version_downloads::date.eq(date(now - 1.day())), - version_downloads::processed.eq(true), - )) - .execute(conn) - .unwrap(); - - super::update(conn, false).unwrap(); - let version_downloads = versions::table - .find(version.id) - .select(versions::downloads) - .first(conn); - assert_eq!(version_downloads, Ok(1)); - let crate_downloads = crates::table - .find(krate.id) - .select(crates::downloads) - .first(conn); - assert_eq!(crate_downloads, Ok(1)); - super::update(conn, false).unwrap(); - let version_downloads = versions::table - .find(version.id) - .select(versions::downloads) - .first(conn); - assert_eq!(version_downloads, Ok(1)); - } - - #[test] - fn set_processed_true() { - use diesel::dsl::*; - - let (_test_db, conn) = &mut test_db_connection(); - let user = user(conn); - let (_, version) = crate_and_version(conn, user.id); - insert_into(version_downloads::table) - .values(( - version_downloads::version_id.eq(version.id), - version_downloads::downloads.eq(2), - version_downloads::counted.eq(2), - version_downloads::date.eq(date(now - 2.days())), - version_downloads::processed.eq(false), - )) - .execute(conn) - .unwrap(); - super::update(conn, false).unwrap(); - let processed = version_downloads::table - .filter(version_downloads::version_id.eq(version.id)) - .select(version_downloads::processed) - .first(conn); - assert_eq!(processed, Ok(true)); - } - - #[test] - fn dont_process_recent_row() { - use diesel::dsl::*; - let (_test_db, conn) = &mut test_db_connection(); - let user = user(conn); - let (_, version) = crate_and_version(conn, user.id); - insert_into(version_downloads::table) - .values(( - version_downloads::version_id.eq(version.id), - version_downloads::downloads.eq(2), - version_downloads::counted.eq(2), - version_downloads::date.eq(date(now)), - version_downloads::processed.eq(false), - )) - .execute(conn) - .unwrap(); - super::update(conn, false).unwrap(); - let processed = version_downloads::table - .filter(version_downloads::version_id.eq(version.id)) - .select(version_downloads::processed) - .first(conn); - assert_eq!(processed, Ok(false)); - } - - #[test] - fn increment_a_little() { - use diesel::dsl::*; - use diesel::update; - - let (_test_db, conn) = &mut test_db_connection(); - let user = user(conn); - let (krate, version) = crate_and_version(conn, user.id); - update(versions::table) - .set(versions::updated_at.eq(now - 2.hours())) - .execute(conn) - .unwrap(); - update(crates::table) - .set(crates::updated_at.eq(now - 2.hours())) - .execute(conn) - .unwrap(); - insert_into(version_downloads::table) - .values(( - version_downloads::version_id.eq(version.id), - version_downloads::downloads.eq(2), - version_downloads::counted.eq(1), - version_downloads::date.eq(date(now)), - version_downloads::processed.eq(false), - )) - .execute(conn) - .unwrap(); - insert_into(version_downloads::table) - .values(( - version_downloads::version_id.eq(version.id), - version_downloads::date.eq(date(now - 1.day())), - )) - .execute(conn) - .unwrap(); - - let version_before: Version = versions::table.find(version.id).first(conn).unwrap(); - let krate_before: Crate = Crate::all() - .filter(crates::id.eq(krate.id)) - .first(conn) - .unwrap(); - super::update(conn, false).unwrap(); - let version2: Version = versions::table.find(version.id).first(conn).unwrap(); - assert_eq!(version2.downloads, 2); - assert_eq!(version2.updated_at, version_before.updated_at); - let krate2: Crate = Crate::all() - .filter(crates::id.eq(krate.id)) - .first(conn) - .unwrap(); - assert_eq!(krate2.downloads, 2); - assert_eq!(krate2.updated_at, krate_before.updated_at); - super::update(conn, false).unwrap(); - let version3: Version = versions::table.find(version.id).first(conn).unwrap(); - assert_eq!(version3.downloads, 2); - } - - #[test] - fn set_processed_no_set_updated_at() { - use diesel::dsl::*; - use diesel::update; - - let (_test_db, mut conn) = test_db_connection(); - - // This test is using a transaction to ensure `now` is the same for all - // queries within this test. - conn.begin_test_transaction().unwrap(); - - let conn = &mut conn; - - let user = user(conn); - let (_, version) = crate_and_version(conn, user.id); - update(versions::table) - .set(versions::updated_at.eq(now - 2.days())) - .execute(conn) - .unwrap(); - update(crates::table) - .set(crates::updated_at.eq(now - 2.days())) - .execute(conn) - .unwrap(); - insert_into(version_downloads::table) - .values(( - version_downloads::version_id.eq(version.id), - version_downloads::downloads.eq(2), - version_downloads::counted.eq(2), - version_downloads::date.eq(date(now - 2.days())), - version_downloads::processed.eq(false), - )) - .execute(conn) - .unwrap(); - - super::update(conn, false).unwrap(); - let versions_changed = versions::table - .select(versions::updated_at.ne(now - 2.days())) - .get_result(conn); - let crates_changed = crates::table - .select(crates::updated_at.ne(now - 2.days())) - .get_result(conn); - assert_eq!(versions_changed, Ok(false)); - assert_eq!(crates_changed, Ok(false)); - } -} - -/// This is a copy of the `test` module above, but using -/// `use_batch_update: true` in the `update` calls. #[cfg(test)] -mod batch_tests { +mod tests { use super::*; use crate::email::Emails; use crate::models::{Crate, NewCrate, NewUser, NewVersion, User, Version}; + use crate::schema::{crates, versions}; use crate::test_util::test_db_connection; use std::collections::BTreeMap; @@ -498,7 +223,7 @@ mod batch_tests { .execute(conn) .unwrap(); - super::update(conn, true).unwrap(); + super::update(conn).unwrap(); let version_downloads = versions::table .find(version.id) .select(versions::downloads) @@ -509,7 +234,7 @@ mod batch_tests { .select(crates::downloads) .first(conn); assert_eq!(crate_downloads, Ok(1)); - super::update(conn, true).unwrap(); + super::update(conn).unwrap(); let version_downloads = versions::table .find(version.id) .select(versions::downloads) @@ -534,7 +259,7 @@ mod batch_tests { )) .execute(conn) .unwrap(); - super::update(conn, true).unwrap(); + super::update(conn).unwrap(); let processed = version_downloads::table .filter(version_downloads::version_id.eq(version.id)) .select(version_downloads::processed) @@ -558,7 +283,7 @@ mod batch_tests { )) .execute(conn) .unwrap(); - super::update(conn, true).unwrap(); + super::update(conn).unwrap(); let processed = version_downloads::table .filter(version_downloads::version_id.eq(version.id)) .select(version_downloads::processed) @@ -605,7 +330,7 @@ mod batch_tests { .filter(crates::id.eq(krate.id)) .first(conn) .unwrap(); - super::update(conn, true).unwrap(); + super::update(conn).unwrap(); let version2: Version = versions::table.find(version.id).first(conn).unwrap(); assert_eq!(version2.downloads, 2); assert_eq!(version2.updated_at, version_before.updated_at); @@ -615,7 +340,7 @@ mod batch_tests { .unwrap(); assert_eq!(krate2.downloads, 2); assert_eq!(krate2.updated_at, krate_before.updated_at); - super::update(conn, true).unwrap(); + super::update(conn).unwrap(); let version3: Version = versions::table.find(version.id).first(conn).unwrap(); assert_eq!(version3.downloads, 2); } @@ -654,7 +379,7 @@ mod batch_tests { .execute(conn) .unwrap(); - super::update(conn, true).unwrap(); + super::update(conn).unwrap(); let versions_changed = versions::table .select(versions::updated_at.ne(now - 2.days())) .get_result(conn);