From 4be63c4e4278721000a6d8da6e2bb2fa3453f325 Mon Sep 17 00:00:00 2001 From: Justin Geibel Date: Sun, 8 Mar 2020 19:37:43 -0400 Subject: [PATCH] Ensure the update_downloads job doesn't run concurrently This is an improved implementation of #2157. The previous design relied on a transaction based lock to manage the lifetime of the lock. The wrapper transaction caused the `update_downloads` job to interfere with incoming download requests, and the changes had to be reverted. This implementation uses a session lock which is automatically released even if the callback panics. If multiple instances of the `update_downloads` job are run concurrently then it is possible to over count downloads, at least temporarily. The first job selects all matching `version_downloads` and later uses those values to calculate how many downloads to add to `versions` and `crates`. If a second job is run, it would select some rows from `version_downloads` that were already queued for processing by the first task. If an over count were to occur, the next time the job is run it should calculate a negative adjustment and correct the situation. There's no point in doing extra work and if we eventually need concurrency we should built that out intentionally. Therefore, this commit wraps the entire job in a transaction and obtains an transaction level advisory lock from the database. If the lock has already been taken the job will fail and will be retried by swirl. If the duration of this job begins to approach the scheduling interval, then we will want to increase that interval to avoid triggering alerts. --- src/tasks.rs | 5 ++ src/tasks/update_downloads.rs | 4 +- src/tasks/util.rs | 1 + src/tasks/util/advisory_lock.rs | 102 ++++++++++++++++++++++++++++++++ 4 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 src/tasks/util.rs create mode 100644 src/tasks/util/advisory_lock.rs diff --git a/src/tasks.rs b/src/tasks.rs index ed9e0e91449..b8d2ddb6915 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -1,5 +1,10 @@ pub mod dump_db; mod update_downloads; +mod util; pub use dump_db::dump_db; pub use update_downloads::update_downloads; + +pub(self) use self::util::advisory_lock::with_advisory_lock; + +const UPDATE_DOWNLOADS_ADVISORY_LOCK_KEY: i64 = 1; diff --git a/src/tasks/update_downloads.rs b/src/tasks/update_downloads.rs index 9a275ac8575..65b7bd439d7 100644 --- a/src/tasks/update_downloads.rs +++ b/src/tasks/update_downloads.rs @@ -1,3 +1,5 @@ +use super::with_advisory_lock; +use super::UPDATE_DOWNLOADS_ADVISORY_LOCK_KEY as LOCK_KEY; use crate::{ models::VersionDownload, schema::{crates, metadata, version_downloads, versions}, @@ -8,7 +10,7 @@ use swirl::PerformError; #[swirl::background_job] pub fn update_downloads(conn: &PgConnection) -> Result<(), PerformError> { - update(&conn)?; + with_advisory_lock(&conn, LOCK_KEY, update)?; Ok(()) } diff --git a/src/tasks/util.rs b/src/tasks/util.rs new file mode 100644 index 00000000000..6e1f221931e --- /dev/null +++ b/src/tasks/util.rs @@ -0,0 +1 @@ +pub(super) mod advisory_lock; diff --git a/src/tasks/util/advisory_lock.rs b/src/tasks/util/advisory_lock.rs new file mode 100644 index 00000000000..f3806af6397 --- /dev/null +++ b/src/tasks/util/advisory_lock.rs @@ -0,0 +1,102 @@ +use std::error::Error; + +use diesel::prelude::*; +use diesel::sql_types::BigInt; +use diesel::PgConnection; + +sql_function!(fn pg_try_advisory_lock(key: BigInt) -> Bool); +sql_function!(fn pg_advisory_unlock(key: BigInt) -> Bool); + +/// Run the callback if the session advisory lock for the given key can be obtained. +/// +/// If the lock is not already held, the callback will be called and the lock will be unlocked +/// after the closure returns. If the lock is already held, the function returns an error without +/// calling the callback. +pub(crate) fn with_advisory_lock( + conn: &PgConnection, + key: i64, + f: F, +) -> Result<(), Box> +where + F: FnOnce(&PgConnection) -> QueryResult<()>, +{ + if !diesel::select(pg_try_advisory_lock(key)).get_result(conn)? { + let string = format!( + "A job holding the session advisory lock for key {} is already running", + key + ); + println!("return"); + return Err(string.into()); + } + println!("umm"); + let _dont_drop_yet = DropGuard { conn, key }; + f(conn).map_err(Into::into) +} + +struct DropGuard<'a> { + conn: &'a PgConnection, + key: i64, +} + +impl<'a> Drop for DropGuard<'a> { + fn drop(&mut self) { + match diesel::select(pg_advisory_unlock(self.key)).get_result(self.conn) { + Ok(true) => (), + Ok(false) => println!( + "Error: job advisory lock for key {} was not locked", + self.key + ), + Err(err) => println!("Error unlocking advisory lock (key: {}): {}", self.key, err), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_util::*; + + #[test] + fn lock_released_after_callback_returns() { + const KEY: i64 = -1; + let conn1 = pg_connection(); + let conn2 = pg_connection(); + + let mut callback_run = false; + let result = with_advisory_lock(&conn1, KEY, |_| { + // Another connection cannot obtain the lock + assert!(!diesel::select(pg_try_advisory_lock(KEY)).get_result(&conn2)?); + callback_run = true; + Ok(()) + }); + assert!(result.is_ok()); + assert!(callback_run); + + // Another connection can now obtain the lock + assert_eq!( + diesel::select(pg_try_advisory_lock(KEY)).get_result(&conn2), + Ok(true) + ); + } + + #[test] + fn test_already_locked() { + const KEY: i64 = -2; + let conn1 = pg_connection(); + let conn2 = pg_connection(); + + // Another connection obtains the lock first + assert_eq!( + diesel::select(pg_try_advisory_lock(KEY)).get_result(&conn2), + Ok(true) + ); + + let mut callback_run = false; + let result = with_advisory_lock(&conn1, KEY, |_| { + callback_run = true; + Ok(()) + }); + assert!(dbg!(result).is_err()); + assert!(!callback_run); + } +}