From 8b4e76920073289fb3d105175ebbfa3902a40d32 Mon Sep 17 00:00:00 2001 From: Justin Geibel Date: Thu, 26 Mar 2020 18:51:44 -0400 Subject: [PATCH] Revert "Auto merge of #2269 - rust-lang:sg-update-swirl-2020-03-10, r=jtgeibel" This reverts commit b63607eab318d5ce1531835a39b7f48b4c99519d, reversing changes made to dc18552facdda23742d4c5b9e16a88a6e8c122b8. --- Cargo.lock | 11 +++++------ Cargo.toml | 2 +- src/background_jobs.rs | 23 +++++++++++++++++++++-- src/bin/background-worker.rs | 22 ++++++++++++++++------ src/controllers/krate/publish.rs | 7 +++++-- src/controllers/version/yank.rs | 4 +++- src/db.rs | 19 ++++++++----------- src/git.rs | 3 ++- src/render.rs | 2 +- src/tasks/update_downloads.rs | 4 +++- src/tests/util.rs | 9 +++++---- src/util/errors.rs | 4 ++++ 12 files changed, 74 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6af57378364..bca85c34795 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2447,12 +2447,11 @@ checksum = "032c03039aae92b350aad2e3779c352e104d919cb192ba2fabbd7b831ce4f0f6" [[package]] name = "swirl" version = "0.1.0" -source = "git+https://github.com/sgrif/swirl.git?rev=6ef8c4cd#6ef8c4cda725e62df1ba82075aea96c51a4bdc11" +source = "git+https://github.com/sgrif/swirl.git?rev=de5d8bb#de5d8bbcfbf7878aa2c248c105938567b32ed8a8" dependencies = [ "diesel", "inventory", "serde", - "serde_derive", "serde_json", "swirl_proc_macro", "threadpool", @@ -2461,11 +2460,11 @@ dependencies = [ [[package]] name = "swirl_proc_macro" version = "0.1.0" -source = "git+https://github.com/sgrif/swirl.git?rev=6ef8c4cd#6ef8c4cda725e62df1ba82075aea96c51a4bdc11" +source = "git+https://github.com/sgrif/swirl.git?rev=de5d8bb#de5d8bbcfbf7878aa2c248c105938567b32ed8a8" dependencies = [ - "proc-macro2 1.0.9", - "quote 1.0.3", - "syn 1.0.16", + "proc-macro2 0.4.30", + "quote 0.6.13", + "syn 0.15.44", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 3360f0c125e..93857382946 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,7 @@ dotenv = "0.15" toml = "0.4" diesel = { version = "1.4.0", features = ["postgres", "serde_json", "chrono", "r2d2"] } diesel_full_text_search = "1.0.0" -swirl = { git = "https://github.com/sgrif/swirl.git", rev = "6ef8c4cd" } +swirl = { git = "https://github.com/sgrif/swirl.git", rev = "de5d8bb" } serde_json = "1.0.0" serde = { version = "1.0.0", features = ["derive"] } chrono = { version = "0.4.0", features = ["serde"] } diff --git a/src/background_jobs.rs b/src/background_jobs.rs index 7968d6624a0..163f81b42af 100644 --- a/src/background_jobs.rs +++ b/src/background_jobs.rs @@ -24,6 +24,8 @@ impl swirl::db::DieselPool for DieselPool { #[allow(missing_debug_implementations)] pub struct Environment { index: Arc>, + // FIXME: https://github.com/sfackler/r2d2/pull/70 + pub connection_pool: AssertUnwindSafe, pub uploader: Uploader, http_client: AssertUnwindSafe, } @@ -34,6 +36,7 @@ impl Clone for Environment { fn clone(&self) -> Self { Self { index: self.index.clone(), + connection_pool: AssertUnwindSafe(self.connection_pool.0.clone()), uploader: self.uploader.clone(), http_client: AssertUnwindSafe(self.http_client.0.clone()), } @@ -41,22 +44,38 @@ impl Clone for Environment { } impl Environment { - pub fn new(index: Repository, uploader: Uploader, http_client: Client) -> Self { - Self::new_shared(Arc::new(Mutex::new(index)), uploader, http_client) + pub fn new( + index: Repository, + connection_pool: DieselPool, + uploader: Uploader, + http_client: Client, + ) -> Self { + Self::new_shared( + Arc::new(Mutex::new(index)), + connection_pool, + uploader, + http_client, + ) } pub fn new_shared( index: Arc>, + connection_pool: DieselPool, uploader: Uploader, http_client: Client, ) -> Self { Self { index, + connection_pool: AssertUnwindSafe(connection_pool), uploader, http_client: AssertUnwindSafe(http_client), } } + pub fn connection(&self) -> Result, PoolError> { + self.connection_pool.get() + } + pub fn lock_index(&self) -> Result, PerformError> { let repo = self.index.lock().unwrap_or_else(PoisonError::into_inner); repo.reset_head()?; diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index b2bcb39407d..cb2de2302bf 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -24,7 +24,6 @@ fn main() { println!("Booting runner"); let config = cargo_registry::Config::default(); - let db_url = db::connection_url(&config.db_url); let job_start_timeout = dotenv::var("BACKGROUND_JOB_TIMEOUT") .unwrap_or_else(|_| "30".into()) @@ -40,11 +39,22 @@ fn main() { println!("Index cloned"); let build_runner = || { - let environment = - Environment::new_shared(repository.clone(), config.uploader.clone(), Client::new()); - let db_config = r2d2::Pool::builder().min_idle(Some(0)); - swirl::Runner::builder(environment) - .connection_pool_builder(&db_url, db_config) + // 2x the thread pool size -- not all our jobs need a DB connection, + // but we want to always be able to run our jobs in parallel, rather + // than adjusting based on how many concurrent jobs need a connection. + // Eventually swirl will do this for us, and this will be the default + // -- we should just let it do a thread pool size of CPU count, and a + // a connection pool size of 2x that when that lands. + let db_config = r2d2::Pool::builder().max_size(4); + let db_pool = db::diesel_pool(&config.db_url, config.env, db_config); + let environment = Environment::new_shared( + repository.clone(), + db_pool.clone(), + config.uploader.clone(), + Client::new(), + ); + swirl::Runner::builder(db_pool, environment) + .thread_count(2) .job_start_timeout(Duration::from_secs(job_start_timeout)) .build() }; diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index a8756faa135..93f3e1d5a50 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -183,7 +183,8 @@ pub fn publish(req: &mut dyn Request) -> AppResult { .unwrap_or_else(|| String::from("README.md")), repo, ) - .enqueue(&conn)?; + .enqueue(&conn) + .map_err(|e| AppError::from_std_error(e))?; } let cksum = app @@ -203,7 +204,9 @@ pub fn publish(req: &mut dyn Request) -> AppResult { yanked: Some(false), links, }; - git::add_crate(git_crate).enqueue(&conn)?; + git::add_crate(git_crate) + .enqueue(&conn) + .map_err(|e| AppError::from_std_error(e))?; // The `other` field on `PublishWarnings` was introduced to handle a temporary warning // that is no longer needed. As such, crates.io currently does not return any `other` diff --git a/src/controllers/version/yank.rs b/src/controllers/version/yank.rs index a2be37eeca6..597d8d53415 100644 --- a/src/controllers/version/yank.rs +++ b/src/controllers/version/yank.rs @@ -44,7 +44,9 @@ fn modify_yank(req: &mut dyn Request, yanked: bool) -> AppResult { insert_version_owner_action(&conn, version.id, user.id, ids.api_token_id(), action)?; - git::yank(krate.name, version, yanked).enqueue(&conn)?; + git::yank(krate.name, version, yanked) + .enqueue(&conn) + .map_err(|e| AppError::from_std_error(e))?; ok_true() } diff --git a/src/db.rs b/src/db.rs index 0d56250d9e6..50954f53bcf 100644 --- a/src/db.rs +++ b/src/db.rs @@ -63,25 +63,22 @@ pub fn connect_now() -> ConnectionResult { PgConnection::establish(&url.to_string()) } -pub fn connection_url(url: &str) -> String { - let mut url = Url::parse(url).expect("Invalid database URL"); - if dotenv::var("HEROKU").is_ok() && !url.query_pairs().any(|(k, _)| k == "sslmode") { - url.query_pairs_mut().append_pair("sslmode", "require"); - } - url.into_string() -} - pub fn diesel_pool( url: &str, env: Env, config: r2d2::Builder>, ) -> DieselPool { - let url = connection_url(url); + let mut url = Url::parse(url).expect("Invalid database URL"); + if dotenv::var("HEROKU").is_ok() && !url.query_pairs().any(|(k, _)| k == "sslmode") { + url.query_pairs_mut().append_pair("sslmode", "require"); + } + if env == Env::Test { - let conn = PgConnection::establish(&url).expect("failed to establish connection"); + let conn = + PgConnection::establish(&url.into_string()).expect("failed to establish connection"); DieselPool::test_conn(conn) } else { - let manager = ConnectionManager::new(url); + let manager = ConnectionManager::new(url.into_string()); DieselPool::Pool(config.build(manager).unwrap()) } } diff --git a/src/git.rs b/src/git.rs index 0ac802185ee..66a6a51f74b 100644 --- a/src/git.rs +++ b/src/git.rs @@ -289,7 +289,6 @@ pub fn add_crate(env: &Environment, krate: Crate) -> Result<(), PerformError> { /// push the changes. #[swirl::background_job] pub fn yank( - conn: &PgConnection, env: &Environment, krate: String, version: Version, @@ -300,6 +299,8 @@ pub fn yank( let repo = env.lock_index()?; let dst = repo.index_file(&krate); + let conn = env.connection()?; + conn.transaction(|| { let yanked_in_db = versions::table .find(version.id) diff --git a/src/render.rs b/src/render.rs index 5694d993160..9a56fa0f5fa 100644 --- a/src/render.rs +++ b/src/render.rs @@ -222,7 +222,6 @@ pub fn readme_to_html(text: &str, filename: &str, base_url: Option<&str>) -> Str #[swirl::background_job] pub fn render_and_upload_readme( - conn: &PgConnection, env: &Environment, version_id: i32, text: String, @@ -233,6 +232,7 @@ pub fn render_and_upload_readme( use diesel::prelude::*; let rendered = readme_to_html(&text, &file_name, base_url.as_deref()); + let conn = env.connection()?; conn.transaction(|| { Version::record_readme_rendering(version_id, &conn)?; diff --git a/src/tasks/update_downloads.rs b/src/tasks/update_downloads.rs index 9a275ac8575..fc1620d6e36 100644 --- a/src/tasks/update_downloads.rs +++ b/src/tasks/update_downloads.rs @@ -1,4 +1,5 @@ use crate::{ + background_jobs::Environment, models::VersionDownload, schema::{crates, metadata, version_downloads, versions}, }; @@ -7,7 +8,8 @@ use diesel::prelude::*; use swirl::PerformError; #[swirl::background_job] -pub fn update_downloads(conn: &PgConnection) -> Result<(), PerformError> { +pub fn update_downloads(env: &Environment) -> Result<(), PerformError> { + let conn = env.connection()?; update(&conn)?; Ok(()) } diff --git a/src/tests/util.rs b/src/tests/util.rs index 0896e488a13..87759ec09c0 100644 --- a/src/tests/util.rs +++ b/src/tests/util.rs @@ -74,7 +74,7 @@ impl Drop for TestAppInner { // Lazily run any remaining jobs if let Some(runner) = &self.runner { runner.run_all_pending_jobs().expect("Could not run jobs"); - runner.check_for_failed_jobs().expect("Failed jobs remain"); + runner.assert_no_failed_jobs().expect("Failed jobs remain"); } // Manually verify that all jobs have completed successfully @@ -189,7 +189,7 @@ impl TestApp { runner.run_all_pending_jobs().expect("Could not run jobs"); runner - .check_for_failed_jobs() + .assert_no_failed_jobs() .expect("Could not determine if jobs failed"); } @@ -220,6 +220,7 @@ impl TestAppBuilder { let (app, middle) = crate::build_app(self.config, self.proxy); let runner = if self.build_job_runner { + let connection_pool = app.primary_database.clone(); let repository_config = RepositoryConfig { index_location: Url::from_file_path(&git::bare()).unwrap(), credentials: Credentials::Missing, @@ -227,16 +228,16 @@ impl TestAppBuilder { let index = WorkerRepository::open(&repository_config).expect("Could not clone index"); let environment = Environment::new( index, + connection_pool.clone(), app.config.uploader.clone(), app.http_client().clone(), ); Some( - Runner::builder(environment) + Runner::builder(connection_pool, environment) // We only have 1 connection in tests, so trying to run more than // 1 job concurrently will just block .thread_count(1) - .connection_pool(app.primary_database.clone()) .job_start_timeout(Duration::from_secs(5)) .build(), ) diff --git a/src/util/errors.rs b/src/util/errors.rs index 4d2213ffc57..9c7eafc851d 100644 --- a/src/util/errors.rs +++ b/src/util/errors.rs @@ -97,6 +97,10 @@ impl dyn AppError { self.get_type_id() == TypeId::of::() } + pub fn from_std_error(err: Box) -> Box { + Self::try_convert(&*err).unwrap_or_else(|| internal(&err)) + } + fn try_convert(err: &(dyn Error + Send + 'static)) -> Option> { match err.downcast_ref() { Some(DieselError::NotFound) => Some(Box::new(NotFound)),