diff --git a/Cargo.lock b/Cargo.lock index 763813077a5..e63781c3803 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -207,9 +207,9 @@ dependencies = [ "serde 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", + "swirl 0.1.0 (git+https://github.com/sgrif/swirl.git?rev=95d3a35bc39a7274335cad6d7cab64acd5eb3904)", "tar 0.4.21 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", - "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2091,6 +2091,17 @@ name = "strsim" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "swirl" +version = "0.1.0" +source = "git+https://github.com/sgrif/swirl.git?rev=95d3a35bc39a7274335cad6d7cab64acd5eb3904#95d3a35bc39a7274335cad6d7cab64acd5eb3904" +dependencies = [ + "diesel 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", + "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "syn" version = "0.11.11" @@ -2903,6 +2914,7 @@ dependencies = [ "checksum string_cache_codegen 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1eea1eee654ef80933142157fdad9dd8bc43cf7c74e999e369263496f04ff4da" "checksum string_cache_shared 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b1884d1bc09741d466d9b14e6d37ac89d6909cbcac41dd9ae982d4d063bbedfc" "checksum strsim 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b4d15c810519a91cf877e7e36e63fe068815c678181439f2f29e2562147c3694" +"checksum swirl 0.1.0 (git+https://github.com/sgrif/swirl.git?rev=95d3a35bc39a7274335cad6d7cab64acd5eb3904)" = "" "checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad" "checksum syn 0.14.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c67da57e61ebc7b7b6fff56bb34440ca3a83db037320b0507af4c10368deda7d" "checksum syn 0.15.29 (registry+https://github.com/rust-lang/crates.io-index)" = "1825685f977249735d510a242a6727b46efe914bb67e38d30c071b1b72b1d5c2" diff --git a/Cargo.toml b/Cargo.toml index 58ef697079f..e96d76a9d0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ dotenv = "0.11.0" toml = "0.4" diesel = { version = "1.4.0", features = ["postgres", "serde_json", "chrono", "r2d2"] } diesel_full_text_search = "1.0.0" +swirl = { git = "https://github.com/sgrif/swirl.git", rev = "95d3a35bc39a7274335cad6d7cab64acd5eb3904" } serde_json = "1.0.0" serde_derive = "1.0.0" serde = "1.0.0" @@ -62,7 +63,6 @@ tempdir = "0.3.7" parking_lot = "0.7.1" jemallocator = { version = "0.1.8", features = ['unprefixed_malloc_on_supported_platforms', 'profiling'] } jemalloc-ctl = "0.2.0" -threadpool = "1.7" lettre = {git = "https://github.com/lettre/lettre", version = "0.9"} lettre_email = {git = "https://github.com/lettre/lettre", version = "0.9"} diff --git a/src/background/job.rs b/src/background/job.rs deleted file mode 100644 index b6ea016f715..00000000000 --- a/src/background/job.rs +++ /dev/null @@ -1,26 +0,0 @@ -use diesel::PgConnection; -use serde::{de::DeserializeOwned, Serialize}; - -use super::storage; -use crate::util::CargoResult; - -/// A background job, meant to be run asynchronously. -pub trait Job: Serialize + DeserializeOwned { - /// The environment this job is run with. This is a struct you define, - /// which should encapsulate things like database connection pools, any - /// configuration, and any other static data or shared resources. - type Environment; - - /// The key to use for storing this job, and looking it up later. - /// - /// Typically this is the name of your struct in `snake_case` - const JOB_TYPE: &'static str; - - /// Enqueue this job to be run at some point in the future. - fn enqueue(self, conn: &PgConnection) -> CargoResult<()> { - storage::enqueue_job(conn, self) - } - - /// The logic involved in actually performing this job. - fn perform(self, env: &Self::Environment) -> CargoResult<()>; -} diff --git a/src/background/mod.rs b/src/background/mod.rs deleted file mode 100644 index 70b8d6dce79..00000000000 --- a/src/background/mod.rs +++ /dev/null @@ -1,8 +0,0 @@ -mod job; -mod registry; -mod runner; -mod storage; - -pub use self::job::*; -pub use self::registry::Registry; -pub use self::runner::*; diff --git a/src/background/registry.rs b/src/background/registry.rs deleted file mode 100644 index 2e672ec0ded..00000000000 --- a/src/background/registry.rs +++ /dev/null @@ -1,46 +0,0 @@ -#![allow(clippy::new_without_default)] // https://github.com/rust-lang/rust-clippy/issues/3632 - -use serde_json; -use std::collections::HashMap; -use std::panic::RefUnwindSafe; - -use super::Job; -use crate::util::CargoResult; - -#[doc(hidden)] -pub type PerformFn = - Box CargoResult<()> + RefUnwindSafe + Send + Sync>; - -#[derive(Default)] -#[allow(missing_debug_implementations)] // Can't derive debug -/// A registry of background jobs, used to map job types to concrege perform -/// functions at runtime. -pub struct Registry { - job_types: HashMap<&'static str, PerformFn>, -} - -impl Registry { - /// Create a new, empty registry - pub fn new() -> Self { - Registry { - job_types: Default::default(), - } - } - - /// Get the perform function for a given job type - pub fn get(&self, job_type: &str) -> Option<&PerformFn> { - self.job_types.get(job_type) - } - - /// Register a new background job. This will override any existing - /// registries with the same `JOB_TYPE`, if one exists. - pub fn register>(&mut self) { - self.job_types.insert( - T::JOB_TYPE, - Box::new(|data, env| { - let data = serde_json::from_value(data)?; - T::perform(data, env) - }), - ); - } -} diff --git a/src/background/runner.rs b/src/background/runner.rs deleted file mode 100644 index 22df2104846..00000000000 --- a/src/background/runner.rs +++ /dev/null @@ -1,313 +0,0 @@ -#![allow(dead_code)] -use diesel::prelude::*; -use std::any::Any; -use std::panic::{catch_unwind, PanicInfo, RefUnwindSafe, UnwindSafe}; -use std::sync::Arc; -use threadpool::ThreadPool; - -use super::{storage, Job, Registry}; -use crate::db::{DieselPool, DieselPooledConn}; -use crate::util::errors::*; - -#[allow(missing_debug_implementations)] -pub struct Builder { - connection_pool: DieselPool, - environment: Env, - registry: Registry, - thread_count: Option, -} - -impl Builder { - pub fn register>(mut self) -> Self { - self.registry.register::(); - self - } - - pub fn thread_count(mut self, thread_count: usize) -> Self { - self.thread_count = Some(thread_count); - self - } - - pub fn build(self) -> Runner { - Runner { - connection_pool: self.connection_pool, - thread_pool: ThreadPool::new(self.thread_count.unwrap_or(5)), - environment: Arc::new(self.environment), - registry: Arc::new(self.registry), - } - } -} - -#[allow(missing_debug_implementations)] -pub struct Runner { - connection_pool: DieselPool, - thread_pool: ThreadPool, - environment: Arc, - registry: Arc>, -} - -impl Runner { - pub fn builder(connection_pool: DieselPool, environment: Env) -> Builder { - Builder { - connection_pool, - environment, - registry: Registry::new(), - thread_count: None, - } - } - - pub fn run_all_pending_jobs(&self) -> CargoResult<()> { - if let Some(conn) = self.try_connection() { - let available_job_count = storage::available_job_count(&conn)?; - for _ in 0..available_job_count { - self.run_single_job() - } - } - Ok(()) - } - - fn run_single_job(&self) { - let environment = Arc::clone(&self.environment); - let registry = Arc::clone(&self.registry); - self.get_single_job(move |job| { - let perform_fn = registry - .get(&job.job_type) - .ok_or_else(|| internal(&format_args!("Unknown job type {}", job.job_type)))?; - perform_fn(job.data, &environment) - }) - } - - fn get_single_job(&self, f: F) - where - F: FnOnce(storage::BackgroundJob) -> CargoResult<()> + Send + UnwindSafe + 'static, - { - // The connection may not be `Send` so we need to clone the pool instead - let pool = self.connection_pool.clone(); - self.thread_pool.execute(move || { - let conn = pool.get().expect("Could not acquire connection"); - conn.transaction::<_, Box, _>(|| { - let job = storage::find_next_unlocked_job(&conn).optional()?; - let job = match job { - Some(j) => j, - None => return Ok(()), - }; - let job_id = job.id; - - let result = catch_unwind(|| f(job)) - .map_err(|e| try_to_extract_panic_info(&e)) - .and_then(|r| r); - - match result { - Ok(_) => storage::delete_successful_job(&conn, job_id)?, - Err(e) => { - eprintln!("Job {} failed to run: {}", job_id, e); - storage::update_failed_job(&conn, job_id); - } - } - Ok(()) - }) - .expect("Could not retrieve or update job") - }) - } - - fn connection(&self) -> CargoResult { - self.connection_pool.get().map_err(Into::into) - } - - fn try_connection(&self) -> Option { - self.connection_pool.try_get() - } - - pub fn assert_no_failed_jobs(&self) -> CargoResult<()> { - self.wait_for_jobs(); - let failed_jobs = storage::failed_job_count(&*self.connection()?)?; - assert_eq!(0, failed_jobs); - Ok(()) - } - - fn wait_for_jobs(&self) { - self.thread_pool.join(); - assert_eq!(0, self.thread_pool.panic_count()); - } -} - -/// Try to figure out what's in the box, and print it if we can. -/// -/// The actual error type we will get from `panic::catch_unwind` is really poorly documented. -/// However, the `panic::set_hook` functions deal with a `PanicInfo` type, and its payload is -/// documented as "commonly but not always `&'static str` or `String`". So we can try all of those, -/// and give up if we didn't get one of those three types. -fn try_to_extract_panic_info(info: &(dyn Any + Send + 'static)) -> Box { - if let Some(x) = info.downcast_ref::() { - internal(&format_args!("job panicked: {}", x)) - } else if let Some(x) = info.downcast_ref::<&'static str>() { - internal(&format_args!("job panicked: {}", x)) - } else if let Some(x) = info.downcast_ref::() { - internal(&format_args!("job panicked: {}", x)) - } else { - internal("job panicked") - } -} - -#[cfg(test)] -mod tests { - use diesel::prelude::*; - use diesel::r2d2; - - use super::*; - use crate::schema::background_jobs::dsl::*; - use std::panic::AssertUnwindSafe; - use std::sync::{Arc, Barrier, Mutex, MutexGuard}; - - #[test] - fn jobs_are_locked_when_fetched() { - let _guard = TestGuard::lock(); - - let runner = runner(); - let first_job_id = create_dummy_job(&runner).id; - let second_job_id = create_dummy_job(&runner).id; - let fetch_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); - let fetch_barrier2 = fetch_barrier.clone(); - let return_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); - let return_barrier2 = return_barrier.clone(); - - runner.get_single_job(move |job| { - fetch_barrier.0.wait(); // Tell thread 2 it can lock its job - assert_eq!(first_job_id, job.id); - return_barrier.0.wait(); // Wait for thread 2 to lock its job - Ok(()) - }); - - fetch_barrier2.0.wait(); // Wait until thread 1 locks its job - runner.get_single_job(move |job| { - assert_eq!(second_job_id, job.id); - return_barrier2.0.wait(); // Tell thread 1 it can unlock its job - Ok(()) - }); - - runner.wait_for_jobs(); - } - - #[test] - fn jobs_are_deleted_when_successfully_run() { - let _guard = TestGuard::lock(); - - let runner = runner(); - create_dummy_job(&runner); - - runner.get_single_job(|_| Ok(())); - runner.wait_for_jobs(); - - let remaining_jobs = background_jobs - .count() - .get_result(&*runner.connection().unwrap()); - assert_eq!(Ok(0), remaining_jobs); - } - - #[test] - fn failed_jobs_do_not_release_lock_before_updating_retry_time() { - let _guard = TestGuard::lock(); - - let runner = runner(); - create_dummy_job(&runner); - let barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); - let barrier2 = barrier.clone(); - - runner.get_single_job(move |_| { - barrier.0.wait(); - // error so the job goes back into the queue - Err(human("nope")) - }); - - let conn = runner.connection().unwrap(); - // Wait for the first thread to acquire the lock - barrier2.0.wait(); - // We are intentionally not using `get_single_job` here. - // `SKIP LOCKED` is intentionally omitted here, so we block until - // the lock on the first job is released. - // If there is any point where the row is unlocked, but the retry - // count is not updated, we will get a row here. - let available_jobs = background_jobs - .select(id) - .filter(retries.eq(0)) - .for_update() - .load::(&*conn) - .unwrap(); - assert_eq!(0, available_jobs.len()); - - // Sanity check to make sure the job actually is there - let total_jobs_including_failed = background_jobs - .select(id) - .for_update() - .load::(&*conn) - .unwrap(); - assert_eq!(1, total_jobs_including_failed.len()); - - runner.wait_for_jobs(); - } - - #[test] - fn panicking_in_jobs_updates_retry_counter() { - let _guard = TestGuard::lock(); - let runner = runner(); - let job_id = create_dummy_job(&runner).id; - - runner.get_single_job(|_| panic!()); - runner.wait_for_jobs(); - - let tries = background_jobs - .find(job_id) - .select(retries) - .for_update() - .first::(&*runner.connection().unwrap()) - .unwrap(); - assert_eq!(1, tries); - } - - lazy_static! { - // Since these tests deal with behavior concerning multiple connections - // running concurrently, they have to run outside of a transaction. - // Therefore we can't run more than one at a time. - // - // Rather than forcing the whole suite to be run with `--test-threads 1`, - // we just lock these tests instead. - static ref TEST_MUTEX: Mutex<()> = Mutex::new(()); - } - - struct TestGuard<'a>(MutexGuard<'a, ()>); - - impl<'a> TestGuard<'a> { - fn lock() -> Self { - TestGuard(TEST_MUTEX.lock().unwrap()) - } - } - - impl<'a> Drop for TestGuard<'a> { - fn drop(&mut self) { - ::diesel::sql_query("TRUNCATE TABLE background_jobs") - .execute(&*runner().connection().unwrap()) - .unwrap(); - } - } - - fn runner() -> Runner<()> { - use dotenv; - - let database_url = - dotenv::var("TEST_DATABASE_URL").expect("TEST_DATABASE_URL must be set to run tests"); - let manager = r2d2::ConnectionManager::new(database_url); - let pool = r2d2::Pool::builder().max_size(2).build(manager).unwrap(); - - Runner::builder(DieselPool::Pool(pool), ()) - .thread_count(2) - .build() - } - - fn create_dummy_job(runner: &Runner<()>) -> storage::BackgroundJob { - ::diesel::insert_into(background_jobs) - .values((job_type.eq("Foo"), data.eq(json!(null)))) - .returning((id, job_type, data)) - .get_result(&*runner.connection().unwrap()) - .unwrap() - } -} diff --git a/src/background/storage.rs b/src/background/storage.rs deleted file mode 100644 index 4e289b0fc49..00000000000 --- a/src/background/storage.rs +++ /dev/null @@ -1,88 +0,0 @@ -use diesel::dsl::now; -use diesel::pg::Pg; -use diesel::prelude::*; -use diesel::sql_types::{Bool, Integer, Interval}; -use diesel::{delete, insert_into, update}; -use serde_json; - -use super::Job; -use crate::schema::background_jobs; -use crate::util::CargoResult; - -#[derive(Queryable, Identifiable, Debug, Clone)] -pub struct BackgroundJob { - pub id: i64, - pub job_type: String, - pub data: serde_json::Value, -} - -/// Enqueues a job to be run as soon as possible. -pub fn enqueue_job(conn: &PgConnection, job: T) -> CargoResult<()> { - use crate::schema::background_jobs::dsl::*; - - let job_data = serde_json::to_value(job)?; - insert_into(background_jobs) - .values((job_type.eq(T::JOB_TYPE), data.eq(job_data))) - .execute(conn)?; - Ok(()) -} - -fn retriable() -> Box> { - use crate::schema::background_jobs::dsl::*; - use diesel::dsl::*; - - sql_function!(power, power_t, (x: Integer, y: Integer) -> Integer); - - Box::new(last_retry.lt(now - 1.minute().into_sql::() * power(2, retries))) -} - -/// Finds the next job that is unlocked, and ready to be retried. If a row is -/// found, it will be locked. -pub fn find_next_unlocked_job(conn: &PgConnection) -> QueryResult { - use crate::schema::background_jobs::dsl::*; - - background_jobs - .select((id, job_type, data)) - .filter(retriable()) - .order(id) - .for_update() - .skip_locked() - .first::(conn) -} - -/// The number of jobs that have failed at least once -pub fn failed_job_count(conn: &PgConnection) -> QueryResult { - use crate::schema::background_jobs::dsl::*; - - background_jobs - .count() - .filter(retries.gt(0)) - .get_result(conn) -} - -/// The number of jobs available to be run -pub fn available_job_count(conn: &PgConnection) -> QueryResult { - use crate::schema::background_jobs::dsl::*; - - background_jobs.count().filter(retriable()).get_result(conn) -} - -/// Deletes a job that has successfully completed running -pub fn delete_successful_job(conn: &PgConnection, job_id: i64) -> QueryResult<()> { - use crate::schema::background_jobs::dsl::*; - - delete(background_jobs.find(job_id)).execute(conn)?; - Ok(()) -} - -/// Marks that we just tried and failed to run a job. -/// -/// Ignores any database errors that may have occurred. If the DB has gone away, -/// we assume that just trying again with a new connection will succeed. -pub fn update_failed_job(conn: &PgConnection, job_id: i64) { - use crate::schema::background_jobs::dsl::*; - - let _ = update(background_jobs.find(job_id)) - .set((retries.eq(retries + 1), last_retry.eq(now))) - .execute(conn); -} diff --git a/src/background_jobs.rs b/src/background_jobs.rs index cd71f1c8a9f..ec3c8b57d0b 100644 --- a/src/background_jobs.rs +++ b/src/background_jobs.rs @@ -1,12 +1,21 @@ use std::panic::AssertUnwindSafe; use std::sync::{Mutex, MutexGuard}; +use swirl::{Builder, Runner}; -use crate::background::{Builder, Runner}; use crate::db::{DieselPool, DieselPooledConn}; use crate::git::{AddCrate, Repository, Yank}; -use crate::util::CargoResult; +use crate::util::errors::{CargoErrToStdErr, CargoResult}; -pub fn job_runner(config: Builder) -> Runner { +impl<'a> swirl::DieselPool<'a> for DieselPool { + type Connection = DieselPooledConn<'a>; + type Error = CargoErrToStdErr; + + fn get(&'a self) -> Result { + self.get().map_err(CargoErrToStdErr) + } +} + +pub fn job_runner(config: Builder) -> Runner { config.register::().register::().build() } diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index 07f4f1d8412..00910ce4626 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -10,7 +10,7 @@ #![deny(warnings)] use cargo_registry::git::Repository; -use cargo_registry::{background, background_jobs::*, db}; +use cargo_registry::{background_jobs::*, db}; use diesel::r2d2; use std::env; use std::thread::sleep; @@ -38,7 +38,7 @@ fn main() { let environment = Environment::new(repository, credentials, db_pool.clone()); - let builder = background::Runner::builder(db_pool, environment).thread_count(1); + let builder = swirl::Runner::builder(db_pool, environment).thread_count(1); let runner = job_runner(builder); println!("Runner booted, running jobs"); diff --git a/src/db.rs b/src/db.rs index b090be41a4d..249d2aa9f48 100644 --- a/src/db.rs +++ b/src/db.rs @@ -27,13 +27,6 @@ impl DieselPool { } } - pub fn try_get(&self) -> Option { - match self { - DieselPool::Pool(pool) => pool.try_get().map(DieselPooledConn::Pool), - DieselPool::Test(conn) => conn.try_lock().map(DieselPooledConn::Test), - } - } - pub fn state(&self) -> r2d2::State { match self { DieselPool::Pool(pool) => pool.state(), diff --git a/src/git.rs b/src/git.rs index c0db6cd612f..00b598a07b3 100644 --- a/src/git.rs +++ b/src/git.rs @@ -5,14 +5,14 @@ use std::collections::HashMap; use std::fs::{self, OpenOptions}; use std::io::prelude::*; use std::path::{Path, PathBuf}; +use swirl::{errors::PerformError, Job}; use tempdir::TempDir; use url::Url; -use crate::background::Job; use crate::background_jobs::Environment; use crate::models::{DependencyKind, Version}; use crate::schema::versions; -use crate::util::{internal, CargoResult}; +use crate::util::errors::{internal, std_error_no_send, CargoResult}; #[derive(Serialize, Deserialize, Debug)] pub struct Crate { @@ -82,7 +82,7 @@ impl Repository { msg: &str, modified_file: &Path, credentials: Option<(&str, &str)>, - ) -> CargoResult<()> { + ) -> Result<(), PerformError> { // git add $file let mut index = self.repository.index()?; index.add_path(modified_file)?; @@ -110,7 +110,7 @@ impl Repository { callbacks.push_update_reference(|refname, status| { assert_eq!(refname, "refs/heads/master"); if let Some(s) = status { - ref_status = Err(internal(&format_args!("failed to push a ref: {}", s))) + ref_status = Err(format!("failed to push a ref: {}", s).into()) } Ok(()) }); @@ -140,8 +140,8 @@ impl Job for AddCrate { type Environment = Environment; const JOB_TYPE: &'static str = "add_crate"; - fn perform(self, env: &Self::Environment) -> CargoResult<()> { - let repo = env.lock_index()?; + fn perform(self, env: &Self::Environment) -> Result<(), PerformError> { + let repo = env.lock_index().map_err(std_error_no_send)?; let dst = repo.index_file(&self.krate.name); // Add the crate to its relevant file @@ -159,7 +159,7 @@ impl Job for AddCrate { } pub fn add_crate(conn: &PgConnection, krate: Crate) -> CargoResult<()> { - AddCrate { krate }.enqueue(conn).map_err(Into::into) + AddCrate { krate }.enqueue(conn).map_err(|e| internal(&e)) } #[derive(Serialize, Deserialize)] @@ -173,11 +173,11 @@ impl Job for Yank { type Environment = Environment; const JOB_TYPE: &'static str = "yank"; - fn perform(self, env: &Self::Environment) -> CargoResult<()> { - let repo = env.lock_index()?; + fn perform(self, env: &Self::Environment) -> Result<(), PerformError> { + let repo = env.lock_index().map_err(std_error_no_send)?; let dst = repo.index_file(&self.krate); - let conn = env.connection()?; + let conn = env.connection().map_err(std_error_no_send)?; conn.transaction(|| { let yanked_in_db = versions::table @@ -197,14 +197,14 @@ impl Job for Yank { .lines() .map(|line| { let mut git_crate = serde_json::from_str::(line) - .map_err(|_| internal(&format_args!("couldn't decode: `{}`", line)))?; + .map_err(|_| format!("couldn't decode: `{}`", line))?; if git_crate.name != self.krate || git_crate.vers != version { return Ok(line.to_string()); } git_crate.yanked = Some(self.yanked); Ok(serde_json::to_string(&git_crate)?) }) - .collect::>>(); + .collect::, PerformError>>(); let new = new?.join("\n") + "\n"; fs::write(&dst, new.as_bytes())?; @@ -239,5 +239,5 @@ pub fn yank(conn: &PgConnection, krate: String, version: Version, yanked: bool) yanked, } .enqueue(conn) - .map_err(Into::into) + .map_err(|e| internal(&e)) } diff --git a/src/lib.rs b/src/lib.rs index c597c25fe14..3c0eafc7fa5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,10 +19,6 @@ extern crate serde_derive; #[macro_use] extern crate serde_json; -#[cfg(test)] -#[macro_use] -extern crate lazy_static; - pub use crate::{app::App, config::Config, uploaders::Uploader}; use std::sync::Arc; @@ -33,7 +29,6 @@ use jemallocator::Jemalloc; static ALLOC: Jemalloc = Jemalloc; mod app; -pub mod background; pub mod background_jobs; pub mod boot; mod config; diff --git a/src/middleware/run_pending_background_jobs.rs b/src/middleware/run_pending_background_jobs.rs index 2b831c1f94e..ee120503db6 100644 --- a/src/middleware/run_pending_background_jobs.rs +++ b/src/middleware/run_pending_background_jobs.rs @@ -1,6 +1,7 @@ +use swirl::Runner; + use super::app::RequestApp; use super::prelude::*; -use crate::background::Runner; use crate::background_jobs::*; use crate::git::Repository; diff --git a/src/util/errors.rs b/src/util/errors.rs index c03f70988ae..85455639bd0 100644 --- a/src/util/errors.rs +++ b/src/util/errors.rs @@ -310,26 +310,33 @@ pub fn bad_request(error: &S) -> Box { Box::new(BadRequest(error.to_string())) } -pub fn std_error(e: Box) -> Box { - #[derive(Debug)] - struct E(Box); - impl Error for E { - fn description(&self) -> &str { - self.0.description() - } +#[derive(Debug)] +pub struct CargoErrToStdErr(pub Box); + +impl Error for CargoErrToStdErr { + fn description(&self) -> &str { + self.0.description() } - impl fmt::Display for E { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0)?; +} - let mut err = &*self.0; - while let Some(cause) = err.cause() { - err = cause; - write!(f, "\nCaused by: {}", err)?; - } +impl fmt::Display for CargoErrToStdErr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0)?; - Ok(()) + let mut err = &*self.0; + while let Some(cause) = err.cause() { + err = cause; + write!(f, "\nCaused by: {}", err)?; } + + Ok(()) } - Box::new(E(e)) +} + +pub fn std_error(e: Box) -> Box { + Box::new(CargoErrToStdErr(e)) +} + +pub fn std_error_no_send(e: Box) -> Box { + Box::new(CargoErrToStdErr(e)) }