From 4c5fa5baa45a39b676b847ade7f0c2e70db9ee5d Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Mon, 9 Mar 2020 18:39:07 -0600 Subject: [PATCH] Create a new DB pool when rebuilding the worker When we fail to *start* (not run) new jobs, which could happen because all our worker threads are saturated by a job that is taking too long, we will rebuild the runner up to 5 times in an attempt to recover before killing the process. Importantly, if this occurred because of a slow job, this should give those slow jobs a chance to finish without blocking the rest of the queue. However, we were sharing a database pool even when we rebuilt the runner. This means that if the slow jobs held a database connection, our retries will just fail immedaitely, and we kill the process (along with any hopes of the slow job catching up). This creates a new connection pool whenever we restart the runner. --- src/background_jobs.rs | 16 +++++++++++++++- src/bin/background-worker.rs | 32 +++++++++++++++++++------------- 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/src/background_jobs.rs b/src/background_jobs.rs index 820bd68ce0c..163f81b42af 100644 --- a/src/background_jobs.rs +++ b/src/background_jobs.rs @@ -49,9 +49,23 @@ impl Environment { connection_pool: DieselPool, uploader: Uploader, http_client: Client, + ) -> Self { + Self::new_shared( + Arc::new(Mutex::new(index)), + connection_pool, + uploader, + http_client, + ) + } + + pub fn new_shared( + index: Arc>, + connection_pool: DieselPool, + uploader: Uploader, + http_client: Client, ) -> Self { Self { - index: Arc::new(Mutex::new(index)), + index, connection_pool: AssertUnwindSafe(connection_pool), uploader, http_client: AssertUnwindSafe(http_client), diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index f3645889737..cb2de2302bf 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -16,6 +16,7 @@ use cargo_registry::git::{Repository, RepositoryConfig}; use cargo_registry::{background_jobs::*, db}; use diesel::r2d2; use reqwest::blocking::Client; +use std::sync::{Arc, Mutex}; use std::thread::sleep; use std::time::Duration; @@ -24,15 +25,6 @@ fn main() { let config = cargo_registry::Config::default(); - // 2x the thread pool size -- not all our jobs need a DB connection, - // but we want to always be able to run our jobs in parallel, rather - // than adjusting based on how many concurrent jobs need a connection. - // Eventually swirl will do this for us, and this will be the default - // -- we should just let it do a thread pool size of CPU count, and a - // a connection pool size of 2x that when that lands. - let db_config = r2d2::Pool::builder().max_size(4); - let db_pool = db::diesel_pool(&config.db_url, config.env, db_config); - let job_start_timeout = dotenv::var("BACKGROUND_JOB_TIMEOUT") .unwrap_or_else(|_| "30".into()) .parse() @@ -41,13 +33,27 @@ fn main() { println!("Cloning index"); let repository_config = RepositoryConfig::from_environment(); - let repository = Repository::open(&repository_config).expect("Failed to clone index"); + let repository = Arc::new(Mutex::new( + Repository::open(&repository_config).expect("Failed to clone index"), + )); println!("Index cloned"); - let environment = Environment::new(repository, db_pool.clone(), config.uploader, Client::new()); - let build_runner = || { - swirl::Runner::builder(db_pool.clone(), environment.clone()) + // 2x the thread pool size -- not all our jobs need a DB connection, + // but we want to always be able to run our jobs in parallel, rather + // than adjusting based on how many concurrent jobs need a connection. + // Eventually swirl will do this for us, and this will be the default + // -- we should just let it do a thread pool size of CPU count, and a + // a connection pool size of 2x that when that lands. + let db_config = r2d2::Pool::builder().max_size(4); + let db_pool = db::diesel_pool(&config.db_url, config.env, db_config); + let environment = Environment::new_shared( + repository.clone(), + db_pool.clone(), + config.uploader.clone(), + Client::new(), + ); + swirl::Runner::builder(db_pool, environment) .thread_count(2) .job_start_timeout(Duration::from_secs(job_start_timeout)) .build()