From 42a56676db4facaf88cbc2785d6b13f9ec8157df Mon Sep 17 00:00:00 2001 From: Yoh Deadfall Date: Thu, 3 Jul 2025 00:37:47 +0300 Subject: [PATCH 1/5] Skip running a job if the crate/version deleted --- src/worker/jobs/readmes.rs | 34 ++++++++++++++----- src/worker/jobs/send_publish_notifications.rs | 13 +++++-- src/worker/jobs/update_default_version.rs | 27 ++++++++++----- 3 files changed, 55 insertions(+), 19 deletions(-) diff --git a/src/worker/jobs/readmes.rs b/src/worker/jobs/readmes.rs index b3cdfcdf9c5..86c2fbce112 100644 --- a/src/worker/jobs/readmes.rs +++ b/src/worker/jobs/readmes.rs @@ -9,7 +9,7 @@ use diesel_async::AsyncConnection; use diesel_async::scoped_futures::ScopedFutureExt; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use tracing::{info, instrument}; +use tracing::{info, instrument, warn}; #[derive(Clone, Serialize, Deserialize)] pub struct RenderAndUploadReadme { @@ -70,13 +70,31 @@ impl BackgroundJob for RenderAndUploadReadme { let mut conn = env.deadpool.get().await?; conn.transaction(|conn| { async move { - Version::record_readme_rendering(job.version_id, conn).await?; - let (crate_name, vers): (String, String) = versions::table - .find(job.version_id) - .inner_join(crates::table) - .select((crates::name, versions::num)) - .first(conn) - .await?; + let (crate_name, vers): (String, String) = + match Version::record_readme_rendering(job.version_id, conn) + .await + .and( + versions::table + .find(job.version_id) + .inner_join(crates::table) + .select((crates::name, versions::num)) + .first(conn) + .await, + ) { + Ok(r) => r, + Err(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::ForeignKeyViolation, + .., + )) + | Err(diesel::result::Error::NotFound) => { + warn!( + "Skipping rendering README for version {}: no version found", + job.version_id + ); + return Ok(()); + } + Err(err) => return Err(err.into()), + }; tracing::Span::current().record("krate.name", tracing::field::display(&crate_name)); diff --git a/src/worker/jobs/send_publish_notifications.rs b/src/worker/jobs/send_publish_notifications.rs index 6c4210da9e6..b945456e7dd 100644 --- a/src/worker/jobs/send_publish_notifications.rs +++ b/src/worker/jobs/send_publish_notifications.rs @@ -39,7 +39,12 @@ impl BackgroundJob for SendPublishNotificationsJob { let mut conn = ctx.deadpool.get().await?; // Get crate name, version and other publish details - let publish_details = PublishDetails::for_version(version_id, &mut conn).await?; + let Some(publish_details) = PublishDetails::for_version(version_id, &mut conn).await? + else { + warn!("Skipping publish notifications for {version_id}: no version found"); + + return Ok(()); + }; let publish_time = publish_details .publish_time @@ -168,7 +173,10 @@ struct PublishDetails { } impl PublishDetails { - async fn for_version(version_id: i32, conn: &mut AsyncPgConnection) -> QueryResult { + async fn for_version( + version_id: i32, + conn: &mut AsyncPgConnection, + ) -> QueryResult> { versions::table .find(version_id) .inner_join(crates::table) @@ -176,5 +184,6 @@ impl PublishDetails { .select(PublishDetails::as_select()) .first(conn) .await + .optional() } } diff --git a/src/worker/jobs/update_default_version.rs b/src/worker/jobs/update_default_version.rs index 2f620ccec61..c54e3aa5d5f 100644 --- a/src/worker/jobs/update_default_version.rs +++ b/src/worker/jobs/update_default_version.rs @@ -7,7 +7,7 @@ use diesel::prelude::*; use diesel_async::RunQueryDsl; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use tracing::info; +use tracing::{info, warn}; #[derive(Serialize, Deserialize)] pub struct UpdateDefaultVersion { @@ -32,16 +32,25 @@ impl BackgroundJob for UpdateDefaultVersion { info!("Updating default version for crate {crate_id}"); let mut conn = ctx.deadpool.get().await?; - update_default_version(crate_id, &mut conn).await?; - - // Get the crate name for OG image generation - let crate_name: String = crates::table - .filter(crates::id.eq(crate_id)) - .select(crates::name) - .first(&mut conn) - .await?; + let crate_name = update_default_version(crate_id, &mut conn).await.and( + // Get the crate name for OG image generation + crates::table + .filter(crates::id.eq(crate_id)) + .select(crates::name) + .first::(&mut conn) + .await, + ); + if let Err(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::ForeignKeyViolation, + .., + )) = crate_name + { + warn!("Skipping update default version for crate for {crate_id}: no crate found"); + return Ok(()); + } // Generate OG image after updating default version + let crate_name = crate_name?; info!("Enqueueing OG image generation for crate {crate_name}"); GenerateOgImage::new(crate_name).enqueue(&mut conn).await?; From adcc17f76774e32a7875d29b4021ee915aea60c6 Mon Sep 17 00:00:00 2001 From: Yoh Deadfall Date: Mon, 7 Jul 2025 01:22:08 +0300 Subject: [PATCH 2/5] Regression tests --- src/tests/worker/mod.rs | 3 +++ src/tests/worker/readmes.rs | 17 +++++++++++++++++ src/tests/worker/send_publish_notifications.rs | 16 ++++++++++++++++ src/tests/worker/update_default_version.rs | 16 ++++++++++++++++ 4 files changed, 52 insertions(+) create mode 100644 src/tests/worker/readmes.rs create mode 100644 src/tests/worker/send_publish_notifications.rs create mode 100644 src/tests/worker/update_default_version.rs diff --git a/src/tests/worker/mod.rs b/src/tests/worker/mod.rs index 07eb5cb02f4..f6f21e41382 100644 --- a/src/tests/worker/mod.rs +++ b/src/tests/worker/mod.rs @@ -1,3 +1,6 @@ mod git; +mod readmes; mod rss; +mod send_publish_notifications; mod sync_admins; +mod update_default_version; diff --git a/src/tests/worker/readmes.rs b/src/tests/worker/readmes.rs new file mode 100644 index 00000000000..81d091ebbbf --- /dev/null +++ b/src/tests/worker/readmes.rs @@ -0,0 +1,17 @@ +use crate::tests::util::TestApp; +use crate::worker::jobs; +use crates_io_worker::BackgroundJob; + +#[tokio::test(flavor = "multi_thread")] +async fn skips_when_crate_deleted() -> anyhow::Result<()> { + let (app, _) = TestApp::full().empty().await; + let mut conn = app.db_conn().await; + + let job = + jobs::RenderAndUploadReadme::new(-1, "deleted".to_string(), ".".to_string(), None, None); + + job.enqueue(&mut conn).await?; + app.run_pending_background_jobs().await; + + Ok(()) +} diff --git a/src/tests/worker/send_publish_notifications.rs b/src/tests/worker/send_publish_notifications.rs new file mode 100644 index 00000000000..98dd4faef17 --- /dev/null +++ b/src/tests/worker/send_publish_notifications.rs @@ -0,0 +1,16 @@ +use crate::tests::util::TestApp; +use crate::worker::jobs; +use crates_io_worker::BackgroundJob; + +#[tokio::test(flavor = "multi_thread")] +async fn skips_when_crate_deleted() -> anyhow::Result<()> { + let (app, _) = TestApp::full().empty().await; + let mut conn = app.db_conn().await; + + let job = jobs::SendPublishNotificationsJob::new(-1); + + job.enqueue(&mut conn).await?; + app.run_pending_background_jobs().await; + + Ok(()) +} diff --git a/src/tests/worker/update_default_version.rs b/src/tests/worker/update_default_version.rs new file mode 100644 index 00000000000..ca38a6fee90 --- /dev/null +++ b/src/tests/worker/update_default_version.rs @@ -0,0 +1,16 @@ +use crate::tests::util::TestApp; +use crate::worker::jobs; +use crates_io_worker::BackgroundJob; + +#[tokio::test(flavor = "multi_thread")] +async fn skips_when_crate_deleted() -> anyhow::Result<()> { + let (app, _) = TestApp::full().empty().await; + let mut conn = app.db_conn().await; + + let job = jobs::UpdateDefaultVersion::new(-1); + + job.enqueue(&mut conn).await?; + app.run_pending_background_jobs().await; + + Ok(()) +} From 30446fc40970f409e7f29ef4d4cb365b555cbe69 Mon Sep 17 00:00:00 2001 From: Yoh Deadfall Date: Tue, 8 Jul 2025 22:06:05 +0300 Subject: [PATCH 3/5] Fixed error handling --- src/worker/jobs/update_default_version.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/worker/jobs/update_default_version.rs b/src/worker/jobs/update_default_version.rs index c54e3aa5d5f..e2174152ecd 100644 --- a/src/worker/jobs/update_default_version.rs +++ b/src/worker/jobs/update_default_version.rs @@ -40,11 +40,14 @@ impl BackgroundJob for UpdateDefaultVersion { .first::(&mut conn) .await, ); - if let Err(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::ForeignKeyViolation, - .., - )) = crate_name - { + + if matches!( + crate_name, + Err(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::ForeignKeyViolation, + .., + )) | Err(diesel::result::Error::NotFound) + ) { warn!("Skipping update default version for crate for {crate_id}: no crate found"); return Ok(()); } From a4dfd4736b6e408e2b179a3c5d8ea7a95f7a9b58 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Sun, 13 Jul 2025 17:10:11 +0200 Subject: [PATCH 4/5] jobs/update_default_version: Simplify error handling --- src/worker/jobs/update_default_version.rs | 51 +++++++++++++---------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/src/worker/jobs/update_default_version.rs b/src/worker/jobs/update_default_version.rs index e2174152ecd..48c98b1762a 100644 --- a/src/worker/jobs/update_default_version.rs +++ b/src/worker/jobs/update_default_version.rs @@ -32,30 +32,37 @@ impl BackgroundJob for UpdateDefaultVersion { info!("Updating default version for crate {crate_id}"); let mut conn = ctx.deadpool.get().await?; - let crate_name = update_default_version(crate_id, &mut conn).await.and( - // Get the crate name for OG image generation - crates::table - .filter(crates::id.eq(crate_id)) - .select(crates::name) - .first::(&mut conn) - .await, - ); - - if matches!( - crate_name, - Err(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::ForeignKeyViolation, - .., - )) | Err(diesel::result::Error::NotFound) - ) { - warn!("Skipping update default version for crate for {crate_id}: no crate found"); - return Ok(()); + + match update_default_version(crate_id, &mut conn).await { + Ok(_) => { + info!("Successfully updated default version for crate {crate_id}"); + } + Err(diesel::result::Error::NotFound) => { + warn!("Skipping default version update for crate {crate_id}: crate not found"); + return Ok(()); + } + Err(err) => { + warn!("Failed to update default version for crate {crate_id}: {err}"); + return Err(err.into()); + } } - // Generate OG image after updating default version - let crate_name = crate_name?; - info!("Enqueueing OG image generation for crate {crate_name}"); - GenerateOgImage::new(crate_name).enqueue(&mut conn).await?; + // Get the crate name for OG image generation + let crate_name = crates::table + .filter(crates::id.eq(crate_id)) + .select(crates::name) + .first::(&mut conn) + .await + .optional()?; + + if let Some(crate_name) = crate_name { + // Generate OG image after updating default version + info!("Enqueueing OG image generation for crate {crate_name}"); + GenerateOgImage::new(crate_name).enqueue(&mut conn).await?; + } else { + warn!("No crate found for ID {crate_id}, skipping OG image generation"); + return Ok(()); + } Ok(()) } From 2cafbef12d75a7bb0736eb340fb76603bcd704d2 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Sun, 13 Jul 2025 17:18:22 +0200 Subject: [PATCH 5/5] jobs/readmes: Simplify error handling --- src/worker/jobs/readmes.rs | 60 ++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/src/worker/jobs/readmes.rs b/src/worker/jobs/readmes.rs index 86c2fbce112..28351047c26 100644 --- a/src/worker/jobs/readmes.rs +++ b/src/worker/jobs/readmes.rs @@ -5,6 +5,8 @@ use crate::tasks::spawn_blocking; use crate::worker::Environment; use crates_io_markdown::text_to_html; use crates_io_worker::BackgroundJob; +use diesel::result::DatabaseErrorKind; +use diesel::result::Error::DatabaseError; use diesel_async::AsyncConnection; use diesel_async::scoped_futures::ScopedFutureExt; use serde::{Deserialize, Serialize}; @@ -70,31 +72,39 @@ impl BackgroundJob for RenderAndUploadReadme { let mut conn = env.deadpool.get().await?; conn.transaction(|conn| { async move { - let (crate_name, vers): (String, String) = - match Version::record_readme_rendering(job.version_id, conn) - .await - .and( - versions::table - .find(job.version_id) - .inner_join(crates::table) - .select((crates::name, versions::num)) - .first(conn) - .await, - ) { - Ok(r) => r, - Err(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::ForeignKeyViolation, - .., - )) - | Err(diesel::result::Error::NotFound) => { - warn!( - "Skipping rendering README for version {}: no version found", - job.version_id - ); - return Ok(()); - } - Err(err) => return Err(err.into()), - }; + match Version::record_readme_rendering(job.version_id, conn).await { + Ok(_) => {} + Err(DatabaseError(DatabaseErrorKind::ForeignKeyViolation, ..)) => { + warn!( + "Skipping README rendering recording for version {}: version not found", + job.version_id + ); + return Ok(()); + } + Err(err) => { + warn!( + "Failed to record README rendering for version {}: {err}", + job.version_id, + ); + return Err(err.into()); + } + } + + let result = versions::table + .find(job.version_id) + .inner_join(crates::table) + .select((crates::name, versions::num)) + .first::<(String, String)>(conn) + .await + .optional()?; + + let Some((crate_name, vers)) = result else { + warn!( + "Skipping README rendering for version {}: version not found", + job.version_id + ); + return Ok(()); + }; tracing::Span::current().record("krate.name", tracing::field::display(&crate_name));