diff --git a/src/bin/enqueue-job.rs b/src/bin/enqueue-job.rs index 42a59d41b28..94db422c14f 100644 --- a/src/bin/enqueue-job.rs +++ b/src/bin/enqueue-job.rs @@ -1,6 +1,8 @@ #![deny(clippy::all)] use cargo_registry::{db, env, tasks, util::Error}; +use diesel::prelude::*; +use swirl::schema::background_jobs::dsl::*; use swirl::Job; fn main() -> Result<(), Error> { @@ -11,7 +13,20 @@ fn main() -> Result<(), Error> { println!("Enqueueing background job: {}", job); match &*job { - "update_downloads" => Ok(tasks::update_downloads().enqueue(&conn)?), + "update_downloads" => { + let count: i64 = background_jobs + .filter(job_type.eq("update_downloads")) + .count() + .get_result(&conn) + .unwrap(); + + if count > 0 { + println!("Did not enqueue update_downloads, existing job already in progress"); + Ok(()) + } else { + Ok(tasks::update_downloads().enqueue(&conn)?) + } + } "dump_db" => { let database_url = args.next().unwrap_or_else(|| env("READ_ONLY_REPLICA_URL")); let target_name = args diff --git a/src/bin/monitor.rs b/src/bin/monitor.rs index 1e2f550172a..aa55ac233da 100644 --- a/src/bin/monitor.rs +++ b/src/bin/monitor.rs @@ -14,20 +14,30 @@ use diesel::prelude::*; fn main() -> Result<(), Error> { let conn = db::connect_now()?; - check_stalled_background_jobs(&conn)?; + check_failing_background_jobs(&conn)?; + check_stalled_update_downloads(&conn)?; check_spam_attack(&conn)?; Ok(()) } -fn check_stalled_background_jobs(conn: &PgConnection) -> Result<(), Error> { +/// Check for old background jobs that are not currently running. +/// +/// This check includes `skip_locked` in the query and will only trigger on +/// enqueued jobs that have attempted to run and have failed (and are in the +/// queue awaiting a retry). +/// +/// Within the default 15 minute time, a job should have already had several +/// failed retry attempts. +fn check_failing_background_jobs(conn: &PgConnection) -> Result<(), Error> { use cargo_registry::schema::background_jobs::dsl::*; use diesel::dsl::*; use diesel::sql_types::Integer; const EVENT_KEY: &str = "background_jobs"; - println!("Checking for stalled background jobs"); + println!("Checking for failed background jobs"); + // Max job execution time in minutes let max_job_time = dotenv::var("MAX_JOB_TIME") .map(|s| s.parse::().unwrap()) .unwrap_or(15); @@ -59,6 +69,44 @@ fn check_stalled_background_jobs(conn: &PgConnection) -> Result<(), Error> { Ok(()) } +/// Check for an `update_downloads` job that has run longer than expected +fn check_stalled_update_downloads(conn: &PgConnection) -> Result<(), Error> { + use cargo_registry::schema::background_jobs::dsl::*; + use chrono::{DateTime, NaiveDateTime, Utc}; + + const EVENT_KEY: &str = "update_downloads_stalled"; + + println!("Checking for stalled background jobs"); + + // Max job execution time in minutes + let max_job_time = dotenv::var("MONITOR_MAX_UPDATE_DOWNLOADS_TIME") + .map(|s| s.parse::().unwrap() as i64) + .unwrap_or(120); + + let start_time = background_jobs + .filter(job_type.eq("update_downloads")) + .select(created_at) + .first::(conn); + + if let Ok(start_time) = start_time { + let start_time = DateTime::::from_utc(start_time, Utc); + let minutes = Utc::now().signed_duration_since(start_time).num_minutes(); + + if minutes > max_job_time { + return log_and_trigger_event(on_call::Event::Trigger { + incident_key: Some(EVENT_KEY.into()), + description: format!("update_downloads job running for {} minutes", minutes), + }); + } + }; + + log_and_trigger_event(on_call::Event::Resolve { + incident_key: EVENT_KEY.into(), + description: Some("No stalled update_downloads job".into()), + }) +} + +/// Check for known spam patterns fn check_spam_attack(conn: &PgConnection) -> Result<(), Error> { use cargo_registry::models::krate::canon_crate_name; use diesel::dsl::*;