From 0b03ae6c0676702946969bf6d310c2fe3999cb03 Mon Sep 17 00:00:00 2001 From: Justin Geibel Date: Sat, 25 Jan 2020 00:12:26 -0500 Subject: [PATCH] Ensure the update_downloads job doesn't run concurrently If multiple instances of this job are run concurrently then it is possible to overcount downloads, at least temporarily. The job first selects all matching `version_downloads` and later uses those values to calculate how many downloads to add to `versions` and `crates`. If a second job is run, it would select some rows from `version_downloads` that were already queued for processing by the first task. If an overcount were to occur, the next time the job is run it should calculate a negative adjustment and correct the situation. There's no point in doing extra work and if we eventually need concurrency we should built that out intentionally. Therefore, this commit wraps the entire job in a transaction and obtains an transaction level advisory lock from the database. If the lock has already been taken the job will fail and will be retried by swirl. If the duration of this job begins to approach the scheduling interval, then we will want to increase that interval to avoid triggering alerts. --- src/tasks.rs | 5 +++++ src/tasks/update_downloads.rs | 14 ++++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/tasks.rs b/src/tasks.rs index ed9e0e91449..0f33ca99a81 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -3,3 +3,8 @@ mod update_downloads; pub use dump_db::dump_db; pub use update_downloads::update_downloads; + +use diesel::sql_types::BigInt; +sql_function!(fn pg_try_advisory_xact_lock(key: BigInt) -> Bool); + +const UPDATE_DOWNLOADS_ADVISORY_LOCK_KEY: i64 = 1; diff --git a/src/tasks/update_downloads.rs b/src/tasks/update_downloads.rs index b59de4b032b..2024aae207a 100644 --- a/src/tasks/update_downloads.rs +++ b/src/tasks/update_downloads.rs @@ -1,3 +1,5 @@ +use super::pg_try_advisory_xact_lock; +use super::UPDATE_DOWNLOADS_ADVISORY_LOCK_KEY as LOCK_KEY; use crate::{ background_jobs::Environment, models::VersionDownload, @@ -9,9 +11,17 @@ use swirl::PerformError; #[swirl::background_job] pub fn update_downloads(env: &Environment) -> Result<(), PerformError> { + use diesel::select; + let conn = env.connection()?; - update(&conn)?; - Ok(()) + conn.transaction::<_, PerformError, _>(|| { + // If this job runs concurrently with itself, it could result in a overcount + if !select(pg_try_advisory_xact_lock(LOCK_KEY)).get_result(&*conn)? { + return Err("The advisory lock for update_downloads is already taken".into()); + } + + update(&conn).map_err(Into::into) + }) } fn update(conn: &PgConnection) -> QueryResult<()> {