diff --git a/crates/crates_io_worker/src/runner.rs b/crates/crates_io_worker/src/runner.rs index 56c0754aa8..95bbcbdfbd 100644 --- a/crates/crates_io_worker/src/runner.rs +++ b/crates/crates_io_worker/src/runner.rs @@ -3,14 +3,13 @@ use crate::job_registry::JobRegistry; use crate::worker::Worker; use crate::{storage, BackgroundJob}; use anyhow::anyhow; -use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; use diesel_async::pooled_connection::deadpool::Pool; use diesel_async::AsyncPgConnection; use futures_util::future::join_all; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use tokio::task::{spawn_blocking, JoinHandle}; +use tokio::task::JoinHandle; use tracing::{info, info_span, warn, Instrument}; const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(1); @@ -97,19 +96,14 @@ impl Runner { /// This function is intended for use in tests and will return an error if /// any jobs have failed. pub async fn check_for_failed_jobs(&self) -> anyhow::Result<()> { - let conn = self.connection_pool.get().await?; - spawn_blocking(move || { - let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - - let failed_jobs = storage::failed_job_count(conn)?; - if failed_jobs == 0 { - Ok(()) - } else { - Err(anyhow!("{failed_jobs} jobs failed")) - } - }) - .await - .map_err(|err| anyhow!(err.to_string()))? + let mut conn = self.connection_pool.get().await?; + + let failed_jobs = storage::failed_job_count(&mut conn).await?; + if failed_jobs == 0 { + Ok(()) + } else { + Err(anyhow!("{failed_jobs} jobs failed")) + } } } diff --git a/crates/crates_io_worker/src/storage.rs b/crates/crates_io_worker/src/storage.rs index b55762215a..2e927dfed8 100644 --- a/crates/crates_io_worker/src/storage.rs +++ b/crates/crates_io_worker/src/storage.rs @@ -1,10 +1,10 @@ use crate::schema::background_jobs; -use diesel::connection::LoadConnection; use diesel::dsl::now; use diesel::pg::Pg; use diesel::prelude::*; use diesel::sql_types::{Bool, Integer, Interval}; use diesel::{delete, update}; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; #[derive(Queryable, Selectable, Identifiable, Debug, Clone)] pub(super) struct BackgroundJob { @@ -26,8 +26,8 @@ fn retriable() -> Box, +pub(super) async fn find_next_unlocked_job( + conn: &mut AsyncPgConnection, job_types: &[String], ) -> QueryResult { background_jobs::table @@ -38,22 +38,26 @@ pub(super) fn find_next_unlocked_job( .for_update() .skip_locked() .first::(conn) + .await } /// The number of jobs that have failed at least once -pub(super) fn failed_job_count(conn: &mut impl LoadConnection) -> QueryResult { +pub(super) async fn failed_job_count(conn: &mut AsyncPgConnection) -> QueryResult { background_jobs::table .count() .filter(background_jobs::retries.gt(0)) .get_result(conn) + .await } /// Deletes a job that has successfully completed running -pub(super) fn delete_successful_job( - conn: &mut impl LoadConnection, +pub(super) async fn delete_successful_job( + conn: &mut AsyncPgConnection, job_id: i64, ) -> QueryResult<()> { - delete(background_jobs::table.find(job_id)).execute(conn)?; + delete(background_jobs::table.find(job_id)) + .execute(conn) + .await?; Ok(()) } @@ -61,11 +65,12 @@ pub(super) fn delete_successful_job( /// /// Ignores any database errors that may have occurred. If the DB has gone away, /// we assume that just trying again with a new connection will succeed. -pub(super) fn update_failed_job(conn: &mut impl LoadConnection, job_id: i64) { +pub(super) async fn update_failed_job(conn: &mut AsyncPgConnection, job_id: i64) { let _ = update(background_jobs::table.find(job_id)) .set(( background_jobs::retries.eq(background_jobs::retries + 1), background_jobs::last_retry.eq(now), )) - .execute(conn); + .execute(conn) + .await; } diff --git a/crates/crates_io_worker/src/worker.rs b/crates/crates_io_worker/src/worker.rs index ea4ae064ee..100d9a9ea1 100644 --- a/crates/crates_io_worker/src/worker.rs +++ b/crates/crates_io_worker/src/worker.rs @@ -3,16 +3,14 @@ use crate::storage; use crate::util::{try_to_extract_panic_info, with_sentry_transaction}; use anyhow::anyhow; use diesel::prelude::*; -use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; use diesel_async::pooled_connection::deadpool::Pool; -use diesel_async::AsyncPgConnection; +use diesel_async::scoped_futures::ScopedFutureExt; +use diesel_async::{AsyncConnection, AsyncPgConnection}; use futures_util::FutureExt; use sentry_core::{Hub, SentryFutureExt}; use std::panic::AssertUnwindSafe; use std::sync::Arc; use std::time::Duration; -use tokio::runtime::Handle; -use tokio::task::spawn_blocking; use tokio::time::sleep; use tracing::{debug, error, info_span, warn}; @@ -58,15 +56,15 @@ impl Worker { async fn run_next_job(&self) -> anyhow::Result> { let context = self.context.clone(); let job_registry = self.job_registry.clone(); - let conn = self.connection_pool.get().await?; + let mut conn = self.connection_pool.get().await?; - spawn_blocking(move || { - let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into(); - - let job_types = job_registry.job_types(); - conn.transaction(|conn| { + let job_types = job_registry.job_types(); + conn.transaction(|conn| { + async move { debug!("Looking for next background worker job…"); - let Some(job) = storage::find_next_unlocked_job(conn, &job_types).optional()? + let Some(job) = storage::find_next_unlocked_job(conn, &job_types) + .await + .optional()? else { return Ok(None); }; @@ -90,23 +88,21 @@ impl Worker { .and_then(std::convert::identity) }); - let result = Handle::current().block_on(future.bind_hub(Hub::current())); - - match result { + match future.bind_hub(Hub::current()).await { Ok(_) => { debug!("Deleting successful job…"); - storage::delete_successful_job(conn, job_id)? + storage::delete_successful_job(conn, job_id).await? } Err(error) => { warn!("Failed to run job: {error}"); - storage::update_failed_job(conn, job_id); + storage::update_failed_job(conn, job_id).await; } } Ok(Some(job_id)) - }) + } + .scope_boxed() }) .await - .map_err(|err| anyhow!(err.to_string()))? } }