diff --git a/Cargo.lock b/Cargo.lock index 3f3ee3e305d..815e6672bb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2356,11 +2356,12 @@ checksum = "032c03039aae92b350aad2e3779c352e104d919cb192ba2fabbd7b831ce4f0f6" [[package]] name = "swirl" version = "0.1.0" -source = "git+https://github.com/sgrif/swirl.git?rev=de5d8bb#de5d8bbcfbf7878aa2c248c105938567b32ed8a8" +source = "git+https://github.com/sgrif/swirl.git?rev=e87cf37#e87cf3772cd99d8edb478121c44972ea613c932f" dependencies = [ "diesel", "inventory", "serde", + "serde_derive", "serde_json", "swirl_proc_macro", "threadpool", @@ -2369,11 +2370,11 @@ dependencies = [ [[package]] name = "swirl_proc_macro" version = "0.1.0" -source = "git+https://github.com/sgrif/swirl.git?rev=de5d8bb#de5d8bbcfbf7878aa2c248c105938567b32ed8a8" +source = "git+https://github.com/sgrif/swirl.git?rev=e87cf37#e87cf3772cd99d8edb478121c44972ea613c932f" dependencies = [ - "proc-macro2 0.4.30", - "quote 0.6.13", - "syn 0.15.44", + "proc-macro2 1.0.9", + "quote 1.0.3", + "syn 1.0.16", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f88e7f63eb6..e9c8dd6196a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,7 @@ dotenv = "0.15" toml = "0.5" diesel = { version = "1.4.0", features = ["postgres", "serde_json", "chrono", "r2d2"] } diesel_full_text_search = "1.0.0" -swirl = { git = "https://github.com/sgrif/swirl.git", rev = "de5d8bb" } +swirl = { git = "https://github.com/sgrif/swirl.git", rev = "e87cf37" } serde_json = "1.0.0" serde = { version = "1.0.0", features = ["derive"] } chrono = { version = "0.4.0", features = ["serde"] } diff --git a/src/background_jobs.rs b/src/background_jobs.rs index 163f81b42af..7968d6624a0 100644 --- a/src/background_jobs.rs +++ b/src/background_jobs.rs @@ -24,8 +24,6 @@ impl swirl::db::DieselPool for DieselPool { #[allow(missing_debug_implementations)] pub struct Environment { index: Arc>, - // FIXME: https://github.com/sfackler/r2d2/pull/70 - pub connection_pool: AssertUnwindSafe, pub uploader: Uploader, http_client: AssertUnwindSafe, } @@ -36,7 +34,6 @@ impl Clone for Environment { fn clone(&self) -> Self { Self { index: self.index.clone(), - connection_pool: AssertUnwindSafe(self.connection_pool.0.clone()), uploader: self.uploader.clone(), http_client: AssertUnwindSafe(self.http_client.0.clone()), } @@ -44,38 +41,22 @@ impl Clone for Environment { } impl Environment { - pub fn new( - index: Repository, - connection_pool: DieselPool, - uploader: Uploader, - http_client: Client, - ) -> Self { - Self::new_shared( - Arc::new(Mutex::new(index)), - connection_pool, - uploader, - http_client, - ) + pub fn new(index: Repository, uploader: Uploader, http_client: Client) -> Self { + Self::new_shared(Arc::new(Mutex::new(index)), uploader, http_client) } pub fn new_shared( index: Arc>, - connection_pool: DieselPool, uploader: Uploader, http_client: Client, ) -> Self { Self { index, - connection_pool: AssertUnwindSafe(connection_pool), uploader, http_client: AssertUnwindSafe(http_client), } } - pub fn connection(&self) -> Result, PoolError> { - self.connection_pool.get() - } - pub fn lock_index(&self) -> Result, PerformError> { let repo = self.index.lock().unwrap_or_else(PoisonError::into_inner); repo.reset_head()?; diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index cb2de2302bf..b2bcb39407d 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -24,6 +24,7 @@ fn main() { println!("Booting runner"); let config = cargo_registry::Config::default(); + let db_url = db::connection_url(&config.db_url); let job_start_timeout = dotenv::var("BACKGROUND_JOB_TIMEOUT") .unwrap_or_else(|_| "30".into()) @@ -39,22 +40,11 @@ fn main() { println!("Index cloned"); let build_runner = || { - // 2x the thread pool size -- not all our jobs need a DB connection, - // but we want to always be able to run our jobs in parallel, rather - // than adjusting based on how many concurrent jobs need a connection. - // Eventually swirl will do this for us, and this will be the default - // -- we should just let it do a thread pool size of CPU count, and a - // a connection pool size of 2x that when that lands. - let db_config = r2d2::Pool::builder().max_size(4); - let db_pool = db::diesel_pool(&config.db_url, config.env, db_config); - let environment = Environment::new_shared( - repository.clone(), - db_pool.clone(), - config.uploader.clone(), - Client::new(), - ); - swirl::Runner::builder(db_pool, environment) - .thread_count(2) + let environment = + Environment::new_shared(repository.clone(), config.uploader.clone(), Client::new()); + let db_config = r2d2::Pool::builder().min_idle(Some(0)); + swirl::Runner::builder(environment) + .connection_pool_builder(&db_url, db_config) .job_start_timeout(Duration::from_secs(job_start_timeout)) .build() }; diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index beffa88baa7..94bdc7860a8 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -183,8 +183,7 @@ pub fn publish(req: &mut dyn RequestExt) -> EndpointResult { .unwrap_or_else(|| String::from("README.md")), repo, ) - .enqueue(&conn) - .map_err(|e| AppError::from_std_error(e))?; + .enqueue(&conn)?; } let cksum = app @@ -204,9 +203,7 @@ pub fn publish(req: &mut dyn RequestExt) -> EndpointResult { yanked: Some(false), links, }; - git::add_crate(git_crate) - .enqueue(&conn) - .map_err(|e| AppError::from_std_error(e))?; + git::add_crate(git_crate).enqueue(&conn)?; // The `other` field on `PublishWarnings` was introduced to handle a temporary warning // that is no longer needed. As such, crates.io currently does not return any `other` diff --git a/src/controllers/version/yank.rs b/src/controllers/version/yank.rs index 77791bc297e..912656a0c86 100644 --- a/src/controllers/version/yank.rs +++ b/src/controllers/version/yank.rs @@ -44,9 +44,7 @@ fn modify_yank(req: &mut dyn RequestExt, yanked: bool) -> EndpointResult { insert_version_owner_action(&conn, version.id, user.id, ids.api_token_id(), action)?; - git::yank(krate.name, version, yanked) - .enqueue(&conn) - .map_err(|e| AppError::from_std_error(e))?; + git::yank(krate.name, version, yanked).enqueue(&conn)?; ok_true() } diff --git a/src/db.rs b/src/db.rs index 0345e196659..73436dcf75c 100644 --- a/src/db.rs +++ b/src/db.rs @@ -63,22 +63,25 @@ pub fn connect_now() -> ConnectionResult { PgConnection::establish(&url.to_string()) } -pub fn diesel_pool( - url: &str, - env: Env, - config: r2d2::Builder>, -) -> DieselPool { +pub fn connection_url(url: &str) -> String { let mut url = Url::parse(url).expect("Invalid database URL"); if dotenv::var("HEROKU").is_ok() && !url.query_pairs().any(|(k, _)| k == "sslmode") { url.query_pairs_mut().append_pair("sslmode", "require"); } + url.into_string() +} +pub fn diesel_pool( + url: &str, + env: Env, + config: r2d2::Builder>, +) -> DieselPool { + let url = connection_url(url); if env == Env::Test { - let conn = - PgConnection::establish(&url.into_string()).expect("failed to establish connection"); + let conn = PgConnection::establish(&url).expect("failed to establish connection"); DieselPool::test_conn(conn) } else { - let manager = ConnectionManager::new(url.into_string()); + let manager = ConnectionManager::new(url); DieselPool::Pool(config.build(manager).unwrap()) } } diff --git a/src/git.rs b/src/git.rs index 66a6a51f74b..0ac802185ee 100644 --- a/src/git.rs +++ b/src/git.rs @@ -289,6 +289,7 @@ pub fn add_crate(env: &Environment, krate: Crate) -> Result<(), PerformError> { /// push the changes. #[swirl::background_job] pub fn yank( + conn: &PgConnection, env: &Environment, krate: String, version: Version, @@ -299,8 +300,6 @@ pub fn yank( let repo = env.lock_index()?; let dst = repo.index_file(&krate); - let conn = env.connection()?; - conn.transaction(|| { let yanked_in_db = versions::table .find(version.id) diff --git a/src/render.rs b/src/render.rs index 9a56fa0f5fa..5694d993160 100644 --- a/src/render.rs +++ b/src/render.rs @@ -222,6 +222,7 @@ pub fn readme_to_html(text: &str, filename: &str, base_url: Option<&str>) -> Str #[swirl::background_job] pub fn render_and_upload_readme( + conn: &PgConnection, env: &Environment, version_id: i32, text: String, @@ -232,7 +233,6 @@ pub fn render_and_upload_readme( use diesel::prelude::*; let rendered = readme_to_html(&text, &file_name, base_url.as_deref()); - let conn = env.connection()?; conn.transaction(|| { Version::record_readme_rendering(version_id, &conn)?; diff --git a/src/tasks/update_downloads.rs b/src/tasks/update_downloads.rs index fc1620d6e36..9a275ac8575 100644 --- a/src/tasks/update_downloads.rs +++ b/src/tasks/update_downloads.rs @@ -1,5 +1,4 @@ use crate::{ - background_jobs::Environment, models::VersionDownload, schema::{crates, metadata, version_downloads, versions}, }; @@ -8,8 +7,7 @@ use diesel::prelude::*; use swirl::PerformError; #[swirl::background_job] -pub fn update_downloads(env: &Environment) -> Result<(), PerformError> { - let conn = env.connection()?; +pub fn update_downloads(conn: &PgConnection) -> Result<(), PerformError> { update(&conn)?; Ok(()) } diff --git a/src/tests/util.rs b/src/tests/util.rs index b7acc08f723..e33d8e25b0e 100644 --- a/src/tests/util.rs +++ b/src/tests/util.rs @@ -77,7 +77,7 @@ impl Drop for TestAppInner { // Lazily run any remaining jobs if let Some(runner) = &self.runner { runner.run_all_pending_jobs().expect("Could not run jobs"); - runner.assert_no_failed_jobs().expect("Failed jobs remain"); + runner.check_for_failed_jobs().expect("Failed jobs remain"); } // Manually verify that all jobs have completed successfully @@ -192,7 +192,7 @@ impl TestApp { runner.run_all_pending_jobs().expect("Could not run jobs"); runner - .assert_no_failed_jobs() + .check_for_failed_jobs() .expect("Could not determine if jobs failed"); } @@ -223,7 +223,6 @@ impl TestAppBuilder { let (app, middle) = crate::build_app(self.config, self.proxy); let runner = if self.build_job_runner { - let connection_pool = app.primary_database.clone(); let repository_config = RepositoryConfig { index_location: Url::from_file_path(&git::bare()).unwrap(), credentials: Credentials::Missing, @@ -231,16 +230,16 @@ impl TestAppBuilder { let index = WorkerRepository::open(&repository_config).expect("Could not clone index"); let environment = Environment::new( index, - connection_pool.clone(), app.config.uploader.clone(), app.http_client().clone(), ); Some( - Runner::builder(connection_pool, environment) + Runner::builder(environment) // We only have 1 connection in tests, so trying to run more than // 1 job concurrently will just block .thread_count(1) + .connection_pool(app.primary_database.clone()) .job_start_timeout(Duration::from_secs(5)) .build(), ) diff --git a/src/util/errors.rs b/src/util/errors.rs index c2e087fa12e..f20564ace31 100644 --- a/src/util/errors.rs +++ b/src/util/errors.rs @@ -97,10 +97,6 @@ impl dyn AppError { self.get_type_id() == TypeId::of::() } - pub fn from_std_error(err: Box) -> Box { - Self::try_convert(&*err).unwrap_or_else(|| internal(&err)) - } - fn try_convert(err: &(dyn Error + Send + 'static)) -> Option> { match err.downcast_ref() { Some(DieselError::NotFound) => Some(Box::new(NotFound)),