diff --git a/src/config/server.rs b/src/config/server.rs index 7966beedec5..b0c010fe026 100644 --- a/src/config/server.rs +++ b/src/config/server.rs @@ -45,9 +45,6 @@ pub struct Server { pub cdn_log_counting_enabled: bool, pub cdn_log_storage: CdnLogStorageConfig, pub cdn_log_queue: CdnLogQueueConfig, - /// If `true`, the `update_downloads` background job will use batch - /// processing with a static SQL query. - pub batch_update_downloads: bool, pub session_key: cookie::Key, pub gh_client_id: ClientId, pub gh_client_secret: ClientSecret, @@ -189,7 +186,6 @@ impl Server { cdn_log_counting_enabled: var("CDN_LOG_COUNTING_ENABLED")?.is_some(), cdn_log_storage: CdnLogStorageConfig::from_env()?, cdn_log_queue: CdnLogQueueConfig::from_env()?, - batch_update_downloads: var("BATCH_UPDATE_DOWNLOADS")?.is_some(), base, ip, port, diff --git a/src/tests/util/test_app.rs b/src/tests/util/test_app.rs index 3ddebc270cc..71c29f57fd3 100644 --- a/src/tests/util/test_app.rs +++ b/src/tests/util/test_app.rs @@ -431,7 +431,6 @@ fn simple_config() -> config::Server { cdn_log_counting_enabled: false, cdn_log_queue: CdnLogQueueConfig::Mock, cdn_log_storage: CdnLogStorageConfig::memory(), - batch_update_downloads: true, session_key: cookie::Key::derive_from("test this has to be over 32 bytes long".as_bytes()), gh_client_id: ClientId::new(dotenvy::var("GH_CLIENT_ID").unwrap_or_default()), gh_client_secret: ClientSecret::new(dotenvy::var("GH_CLIENT_SECRET").unwrap_or_default()), diff --git a/src/worker/jobs/downloads/update_metadata.rs b/src/worker/jobs/downloads/update_metadata.rs index 83018f8c043..c92d092eda1 100644 --- a/src/worker/jobs/downloads/update_metadata.rs +++ b/src/worker/jobs/downloads/update_metadata.rs @@ -1,5 +1,4 @@ -use crate::models::VersionDownload; -use crate::schema::{crates, metadata, version_downloads, versions}; +use crate::schema::version_downloads; use crate::worker::Environment; use anyhow::anyhow; use crates_io_worker::BackgroundJob; @@ -17,10 +16,8 @@ impl BackgroundJob for UpdateDownloads { type Context = Arc; async fn run(&self, env: Self::Context) -> anyhow::Result<()> { - let batch_update = env.config.batch_update_downloads; - let conn = env.deadpool.get().await?; - conn.interact(move |conn| update(conn, batch_update)) + conn.interact(update) .await .map_err(|err| anyhow!(err.to_string()))??; @@ -28,45 +25,36 @@ impl BackgroundJob for UpdateDownloads { } } -fn update(conn: &mut PgConnection, use_batch_update: bool) -> QueryResult<()> { +fn update(conn: &mut PgConnection) -> QueryResult<()> { use diesel::dsl::now; use diesel::select; - if use_batch_update { - info!("Updating versions…"); - - // After 45 minutes, we stop the batch updating process to avoid - // triggering the long-running job alert. 15 minutes later, the job - // will be started again by our cron service anyway. - const TIME_LIMIT: Duration = Duration::from_secs(45 * 60); - - // We update the `downloads` columns in batches to a) avoid the - // back and forth between the application and the database and b) avoid - // holding locks on any of the involved tables for too long. - const BATCH_SIZE: i64 = 5_000; - - let start_time = Instant::now(); - loop { - let update_count = batch_update(BATCH_SIZE, conn)?; - info!("Updated {update_count} versions"); - if update_count < BATCH_SIZE { - break; - } - - if start_time.elapsed() > TIME_LIMIT { - info!("Time limit reached, stopping batch update"); - break; - } + info!("Updating versions…"); + + // After 45 minutes, we stop the batch updating process to avoid + // triggering the long-running job alert. 15 minutes later, the job + // will be started again by our cron service anyway. + const TIME_LIMIT: Duration = Duration::from_secs(45 * 60); + + // We update the `downloads` columns in batches to a) avoid the + // back and forth between the application and the database and b) avoid + // holding locks on any of the involved tables for too long. + const BATCH_SIZE: i64 = 5_000; + + let start_time = Instant::now(); + loop { + let update_count = batch_update(BATCH_SIZE, conn)?; + info!("Updated {update_count} versions"); + if update_count < BATCH_SIZE { + break; + } + + if start_time.elapsed() > TIME_LIMIT { + info!("Time limit reached, stopping batch update"); + break; } - } else { - let rows = version_downloads::table - .filter(version_downloads::processed.eq(false)) - .filter(version_downloads::downloads.ne(version_downloads::counted)) - .load(conn)?; - - info!(rows = rows.len(), "Updating versions"); - collect(conn, &rows)?; } + info!("Finished updating versions"); // Anything older than 24 hours ago will be frozen and will not be queried @@ -177,275 +165,12 @@ fn batch_update(batch_size: i64, conn: &mut PgConnection) -> QueryResult { Ok(result.count) } -fn collect(conn: &mut PgConnection, rows: &[VersionDownload]) -> QueryResult<()> { - use diesel::update; - - for download in rows { - let amt = download.downloads - download.counted; - - conn.transaction::<_, diesel::result::Error, _>(|conn| { - // Update the total number of version downloads - let crate_id: i32 = update(versions::table.find(download.version_id)) - .set(versions::downloads.eq(versions::downloads + amt)) - .returning(versions::crate_id) - .get_result(conn)?; - - // Update the total number of crate downloads - update(crates::table.find(crate_id)) - .set(crates::downloads.eq(crates::downloads + amt)) - .execute(conn)?; - - // Update the global counter of total downloads - update(metadata::table) - .set(metadata::total_downloads.eq(metadata::total_downloads + i64::from(amt))) - .execute(conn)?; - - // Record that these downloads have been propagated to the other tables. This is done - // last, immediately before the transaction is committed, to minimize lock contention - // with counting new downloads. - update(version_downloads::table.find(download.id())) - .set(version_downloads::counted.eq(version_downloads::counted + amt)) - .execute(conn)?; - - Ok(()) - })?; - } - - Ok(()) -} - -#[cfg(test)] -mod test { - use super::*; - use crate::email::Emails; - use crate::models::{Crate, NewCrate, NewUser, NewVersion, User, Version}; - use crate::test_util::test_db_connection; - use std::collections::BTreeMap; - - fn user(conn: &mut PgConnection) -> User { - NewUser::new(2, "login", None, None, "access_token") - .create_or_update(None, &Emails::new_in_memory(), conn) - .unwrap() - } - - fn crate_and_version(conn: &mut PgConnection, user_id: i32) -> (Crate, Version) { - let krate = NewCrate { - name: "foo", - ..Default::default() - } - .create(conn, user_id) - .unwrap(); - let version = NewVersion::new( - krate.id, - &semver::Version::parse("1.0.0").unwrap(), - &BTreeMap::new(), - None, - 0, - user_id, - "0000000000000000000000000000000000000000000000000000000000000000".to_string(), - None, - None, - ) - .unwrap(); - let version = version.save(conn, "someone@example.com").unwrap(); - (krate, version) - } - - #[test] - fn increment() { - use diesel::dsl::*; - - let (_test_db, conn) = &mut test_db_connection(); - let user = user(conn); - let (krate, version) = crate_and_version(conn, user.id); - insert_into(version_downloads::table) - .values(version_downloads::version_id.eq(version.id)) - .execute(conn) - .unwrap(); - insert_into(version_downloads::table) - .values(( - version_downloads::version_id.eq(version.id), - version_downloads::date.eq(date(now - 1.day())), - version_downloads::processed.eq(true), - )) - .execute(conn) - .unwrap(); - - super::update(conn, false).unwrap(); - let version_downloads = versions::table - .find(version.id) - .select(versions::downloads) - .first(conn); - assert_eq!(version_downloads, Ok(1)); - let crate_downloads = crates::table - .find(krate.id) - .select(crates::downloads) - .first(conn); - assert_eq!(crate_downloads, Ok(1)); - super::update(conn, false).unwrap(); - let version_downloads = versions::table - .find(version.id) - .select(versions::downloads) - .first(conn); - assert_eq!(version_downloads, Ok(1)); - } - - #[test] - fn set_processed_true() { - use diesel::dsl::*; - - let (_test_db, conn) = &mut test_db_connection(); - let user = user(conn); - let (_, version) = crate_and_version(conn, user.id); - insert_into(version_downloads::table) - .values(( - version_downloads::version_id.eq(version.id), - version_downloads::downloads.eq(2), - version_downloads::counted.eq(2), - version_downloads::date.eq(date(now - 2.days())), - version_downloads::processed.eq(false), - )) - .execute(conn) - .unwrap(); - super::update(conn, false).unwrap(); - let processed = version_downloads::table - .filter(version_downloads::version_id.eq(version.id)) - .select(version_downloads::processed) - .first(conn); - assert_eq!(processed, Ok(true)); - } - - #[test] - fn dont_process_recent_row() { - use diesel::dsl::*; - let (_test_db, conn) = &mut test_db_connection(); - let user = user(conn); - let (_, version) = crate_and_version(conn, user.id); - insert_into(version_downloads::table) - .values(( - version_downloads::version_id.eq(version.id), - version_downloads::downloads.eq(2), - version_downloads::counted.eq(2), - version_downloads::date.eq(date(now)), - version_downloads::processed.eq(false), - )) - .execute(conn) - .unwrap(); - super::update(conn, false).unwrap(); - let processed = version_downloads::table - .filter(version_downloads::version_id.eq(version.id)) - .select(version_downloads::processed) - .first(conn); - assert_eq!(processed, Ok(false)); - } - - #[test] - fn increment_a_little() { - use diesel::dsl::*; - use diesel::update; - - let (_test_db, conn) = &mut test_db_connection(); - let user = user(conn); - let (krate, version) = crate_and_version(conn, user.id); - update(versions::table) - .set(versions::updated_at.eq(now - 2.hours())) - .execute(conn) - .unwrap(); - update(crates::table) - .set(crates::updated_at.eq(now - 2.hours())) - .execute(conn) - .unwrap(); - insert_into(version_downloads::table) - .values(( - version_downloads::version_id.eq(version.id), - version_downloads::downloads.eq(2), - version_downloads::counted.eq(1), - version_downloads::date.eq(date(now)), - version_downloads::processed.eq(false), - )) - .execute(conn) - .unwrap(); - insert_into(version_downloads::table) - .values(( - version_downloads::version_id.eq(version.id), - version_downloads::date.eq(date(now - 1.day())), - )) - .execute(conn) - .unwrap(); - - let version_before: Version = versions::table.find(version.id).first(conn).unwrap(); - let krate_before: Crate = Crate::all() - .filter(crates::id.eq(krate.id)) - .first(conn) - .unwrap(); - super::update(conn, false).unwrap(); - let version2: Version = versions::table.find(version.id).first(conn).unwrap(); - assert_eq!(version2.downloads, 2); - assert_eq!(version2.updated_at, version_before.updated_at); - let krate2: Crate = Crate::all() - .filter(crates::id.eq(krate.id)) - .first(conn) - .unwrap(); - assert_eq!(krate2.downloads, 2); - assert_eq!(krate2.updated_at, krate_before.updated_at); - super::update(conn, false).unwrap(); - let version3: Version = versions::table.find(version.id).first(conn).unwrap(); - assert_eq!(version3.downloads, 2); - } - - #[test] - fn set_processed_no_set_updated_at() { - use diesel::dsl::*; - use diesel::update; - - let (_test_db, mut conn) = test_db_connection(); - - // This test is using a transaction to ensure `now` is the same for all - // queries within this test. - conn.begin_test_transaction().unwrap(); - - let conn = &mut conn; - - let user = user(conn); - let (_, version) = crate_and_version(conn, user.id); - update(versions::table) - .set(versions::updated_at.eq(now - 2.days())) - .execute(conn) - .unwrap(); - update(crates::table) - .set(crates::updated_at.eq(now - 2.days())) - .execute(conn) - .unwrap(); - insert_into(version_downloads::table) - .values(( - version_downloads::version_id.eq(version.id), - version_downloads::downloads.eq(2), - version_downloads::counted.eq(2), - version_downloads::date.eq(date(now - 2.days())), - version_downloads::processed.eq(false), - )) - .execute(conn) - .unwrap(); - - super::update(conn, false).unwrap(); - let versions_changed = versions::table - .select(versions::updated_at.ne(now - 2.days())) - .get_result(conn); - let crates_changed = crates::table - .select(crates::updated_at.ne(now - 2.days())) - .get_result(conn); - assert_eq!(versions_changed, Ok(false)); - assert_eq!(crates_changed, Ok(false)); - } -} - -/// This is a copy of the `test` module above, but using -/// `use_batch_update: true` in the `update` calls. #[cfg(test)] -mod batch_tests { +mod tests { use super::*; use crate::email::Emails; use crate::models::{Crate, NewCrate, NewUser, NewVersion, User, Version}; + use crate::schema::{crates, versions}; use crate::test_util::test_db_connection; use std::collections::BTreeMap; @@ -498,7 +223,7 @@ mod batch_tests { .execute(conn) .unwrap(); - super::update(conn, true).unwrap(); + super::update(conn).unwrap(); let version_downloads = versions::table .find(version.id) .select(versions::downloads) @@ -509,7 +234,7 @@ mod batch_tests { .select(crates::downloads) .first(conn); assert_eq!(crate_downloads, Ok(1)); - super::update(conn, true).unwrap(); + super::update(conn).unwrap(); let version_downloads = versions::table .find(version.id) .select(versions::downloads) @@ -534,7 +259,7 @@ mod batch_tests { )) .execute(conn) .unwrap(); - super::update(conn, true).unwrap(); + super::update(conn).unwrap(); let processed = version_downloads::table .filter(version_downloads::version_id.eq(version.id)) .select(version_downloads::processed) @@ -558,7 +283,7 @@ mod batch_tests { )) .execute(conn) .unwrap(); - super::update(conn, true).unwrap(); + super::update(conn).unwrap(); let processed = version_downloads::table .filter(version_downloads::version_id.eq(version.id)) .select(version_downloads::processed) @@ -605,7 +330,7 @@ mod batch_tests { .filter(crates::id.eq(krate.id)) .first(conn) .unwrap(); - super::update(conn, true).unwrap(); + super::update(conn).unwrap(); let version2: Version = versions::table.find(version.id).first(conn).unwrap(); assert_eq!(version2.downloads, 2); assert_eq!(version2.updated_at, version_before.updated_at); @@ -615,7 +340,7 @@ mod batch_tests { .unwrap(); assert_eq!(krate2.downloads, 2); assert_eq!(krate2.updated_at, krate_before.updated_at); - super::update(conn, true).unwrap(); + super::update(conn).unwrap(); let version3: Version = versions::table.find(version.id).first(conn).unwrap(); assert_eq!(version3.downloads, 2); } @@ -654,7 +379,7 @@ mod batch_tests { .execute(conn) .unwrap(); - super::update(conn, true).unwrap(); + super::update(conn).unwrap(); let versions_changed = versions::table .select(versions::updated_at.ne(now - 2.days())) .get_result(conn);