From 818e6401ef251a9a361f265fd5c0a5be6c833157 Mon Sep 17 00:00:00 2001 From: Justin Geibel Date: Sat, 30 May 2020 12:35:18 -0400 Subject: [PATCH 1/2] Only enqueue 1 update_downloads job at a time This ensures that if an `update_downloads` job is already running, a duplicate job will not be enqueued. Currently, when multiple jobs are running in parallel, they end up doing duplicate work resulting in temporary overcounts that must be corrected in the next run. The concurrent tasks also slow down the overall process and can result in runaway performance problems as further jobs are spawned. This commit also updates the monitoring to specifically check if the update downloads job runs for too long (120 minutes by default). The main check for stalled jobs will not trigger for `update_downloads` as the row is locked for the duration of the job (and `skip_locked` is used in that query). --- src/bin/enqueue-job.rs | 17 ++++++++++++++++- src/bin/monitor.rs | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/src/bin/enqueue-job.rs b/src/bin/enqueue-job.rs index 42a59d41b28..94db422c14f 100644 --- a/src/bin/enqueue-job.rs +++ b/src/bin/enqueue-job.rs @@ -1,6 +1,8 @@ #![deny(clippy::all)] use cargo_registry::{db, env, tasks, util::Error}; +use diesel::prelude::*; +use swirl::schema::background_jobs::dsl::*; use swirl::Job; fn main() -> Result<(), Error> { @@ -11,7 +13,20 @@ fn main() -> Result<(), Error> { println!("Enqueueing background job: {}", job); match &*job { - "update_downloads" => Ok(tasks::update_downloads().enqueue(&conn)?), + "update_downloads" => { + let count: i64 = background_jobs + .filter(job_type.eq("update_downloads")) + .count() + .get_result(&conn) + .unwrap(); + + if count > 0 { + println!("Did not enqueue update_downloads, existing job already in progress"); + Ok(()) + } else { + Ok(tasks::update_downloads().enqueue(&conn)?) + } + } "dump_db" => { let database_url = args.next().unwrap_or_else(|| env("READ_ONLY_REPLICA_URL")); let target_name = args diff --git a/src/bin/monitor.rs b/src/bin/monitor.rs index 1e2f550172a..cb419ab5daf 100644 --- a/src/bin/monitor.rs +++ b/src/bin/monitor.rs @@ -15,10 +15,12 @@ fn main() -> Result<(), Error> { let conn = db::connect_now()?; check_stalled_background_jobs(&conn)?; + check_stalled_update_downloads(&conn)?; check_spam_attack(&conn)?; Ok(()) } +/// Check for old background jobs that are not currently running fn check_stalled_background_jobs(conn: &PgConnection) -> Result<(), Error> { use cargo_registry::schema::background_jobs::dsl::*; use diesel::dsl::*; @@ -59,6 +61,43 @@ fn check_stalled_background_jobs(conn: &PgConnection) -> Result<(), Error> { Ok(()) } +/// Check for an `update_downloads` job that has run longer than expected +fn check_stalled_update_downloads(conn: &PgConnection) -> Result<(), Error> { + use cargo_registry::schema::background_jobs::dsl::*; + use chrono::{DateTime, NaiveDateTime, Utc}; + + const EVENT_KEY: &str = "update_downloads_stalled"; + + println!("Checking for stalled background jobs"); + + let max_job_time = dotenv::var("MONITOR_MAX_UPDATE_DOWNLOADS_TIME") + .map(|s| s.parse::().unwrap() as i64) + .unwrap_or(120); + + let start_time = background_jobs + .filter(job_type.eq("update_downloads")) + .select(created_at) + .first::(conn); + + if let Ok(start_time) = start_time { + let start_time = DateTime::::from_utc(start_time, Utc); + let minutes = Utc::now().signed_duration_since(start_time).num_minutes(); + + if minutes > max_job_time { + return log_and_trigger_event(on_call::Event::Trigger { + incident_key: Some(EVENT_KEY.into()), + description: format!("update_downloads job running for {} minutes", minutes), + }); + } + }; + + log_and_trigger_event(on_call::Event::Resolve { + incident_key: EVENT_KEY.into(), + description: Some("No stalled update_downloads job".into()), + }) +} + +/// Check for known spam patterns fn check_spam_attack(conn: &PgConnection) -> Result<(), Error> { use cargo_registry::models::krate::canon_crate_name; use diesel::dsl::*; From a0b3e7b41c97eaf13775efaeb2974dc26e98e3fa Mon Sep 17 00:00:00 2001 From: Justin Geibel Date: Fri, 19 Jun 2020 15:03:45 -0400 Subject: [PATCH 2/2] Improve comments and nomenclature in monitoring logic --- src/bin/monitor.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/bin/monitor.rs b/src/bin/monitor.rs index cb419ab5daf..aa55ac233da 100644 --- a/src/bin/monitor.rs +++ b/src/bin/monitor.rs @@ -14,22 +14,30 @@ use diesel::prelude::*; fn main() -> Result<(), Error> { let conn = db::connect_now()?; - check_stalled_background_jobs(&conn)?; + check_failing_background_jobs(&conn)?; check_stalled_update_downloads(&conn)?; check_spam_attack(&conn)?; Ok(()) } -/// Check for old background jobs that are not currently running -fn check_stalled_background_jobs(conn: &PgConnection) -> Result<(), Error> { +/// Check for old background jobs that are not currently running. +/// +/// This check includes `skip_locked` in the query and will only trigger on +/// enqueued jobs that have attempted to run and have failed (and are in the +/// queue awaiting a retry). +/// +/// Within the default 15 minute time, a job should have already had several +/// failed retry attempts. +fn check_failing_background_jobs(conn: &PgConnection) -> Result<(), Error> { use cargo_registry::schema::background_jobs::dsl::*; use diesel::dsl::*; use diesel::sql_types::Integer; const EVENT_KEY: &str = "background_jobs"; - println!("Checking for stalled background jobs"); + println!("Checking for failed background jobs"); + // Max job execution time in minutes let max_job_time = dotenv::var("MAX_JOB_TIME") .map(|s| s.parse::().unwrap()) .unwrap_or(15); @@ -70,6 +78,7 @@ fn check_stalled_update_downloads(conn: &PgConnection) -> Result<(), Error> { println!("Checking for stalled background jobs"); + // Max job execution time in minutes let max_job_time = dotenv::var("MONITOR_MAX_UPDATE_DOWNLOADS_TIME") .map(|s| s.parse::().unwrap() as i64) .unwrap_or(120);