diff --git a/src/tasks.rs b/src/tasks.rs index ed9e0e91449..0f33ca99a81 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -3,3 +3,8 @@ mod update_downloads; pub use dump_db::dump_db; pub use update_downloads::update_downloads; + +use diesel::sql_types::BigInt; +sql_function!(fn pg_try_advisory_xact_lock(key: BigInt) -> Bool); + +const UPDATE_DOWNLOADS_ADVISORY_LOCK_KEY: i64 = 1; diff --git a/src/tasks/update_downloads.rs b/src/tasks/update_downloads.rs index b59de4b032b..2024aae207a 100644 --- a/src/tasks/update_downloads.rs +++ b/src/tasks/update_downloads.rs @@ -1,3 +1,5 @@ +use super::pg_try_advisory_xact_lock; +use super::UPDATE_DOWNLOADS_ADVISORY_LOCK_KEY as LOCK_KEY; use crate::{ background_jobs::Environment, models::VersionDownload, @@ -9,9 +11,17 @@ use swirl::PerformError; #[swirl::background_job] pub fn update_downloads(env: &Environment) -> Result<(), PerformError> { + use diesel::select; + let conn = env.connection()?; - update(&conn)?; - Ok(()) + conn.transaction::<_, PerformError, _>(|| { + // If this job runs concurrently with itself, it could result in a overcount + if !select(pg_try_advisory_xact_lock(LOCK_KEY)).get_result(&*conn)? { + return Err("The advisory lock for update_downloads is already taken".into()); + } + + update(&conn).map_err(Into::into) + }) } fn update(conn: &PgConnection) -> QueryResult<()> {