Skip to content

Commit f2ee2be

Browse files
authored
worker: Remove spawn_blocking() usage (#9785)
1 parent 9739800 commit f2ee2be

File tree

3 files changed

+37
-42
lines changed

3 files changed

+37
-42
lines changed

crates/crates_io_worker/src/runner.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,13 @@ use crate::job_registry::JobRegistry;
33
use crate::worker::Worker;
44
use crate::{storage, BackgroundJob};
55
use anyhow::anyhow;
6-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
76
use diesel_async::pooled_connection::deadpool::Pool;
87
use diesel_async::AsyncPgConnection;
98
use futures_util::future::join_all;
109
use std::collections::HashMap;
1110
use std::sync::Arc;
1211
use std::time::Duration;
13-
use tokio::task::{spawn_blocking, JoinHandle};
12+
use tokio::task::JoinHandle;
1413
use tracing::{info, info_span, warn, Instrument};
1514

1615
const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(1);
@@ -97,19 +96,14 @@ impl<Context: Clone + Send + Sync + 'static> Runner<Context> {
9796
/// This function is intended for use in tests and will return an error if
9897
/// any jobs have failed.
9998
pub async fn check_for_failed_jobs(&self) -> anyhow::Result<()> {
100-
let conn = self.connection_pool.get().await?;
101-
spawn_blocking(move || {
102-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
103-
104-
let failed_jobs = storage::failed_job_count(conn)?;
105-
if failed_jobs == 0 {
106-
Ok(())
107-
} else {
108-
Err(anyhow!("{failed_jobs} jobs failed"))
109-
}
110-
})
111-
.await
112-
.map_err(|err| anyhow!(err.to_string()))?
99+
let mut conn = self.connection_pool.get().await?;
100+
101+
let failed_jobs = storage::failed_job_count(&mut conn).await?;
102+
if failed_jobs == 0 {
103+
Ok(())
104+
} else {
105+
Err(anyhow!("{failed_jobs} jobs failed"))
106+
}
113107
}
114108
}
115109

crates/crates_io_worker/src/storage.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use crate::schema::background_jobs;
2-
use diesel::connection::LoadConnection;
32
use diesel::dsl::now;
43
use diesel::pg::Pg;
54
use diesel::prelude::*;
65
use diesel::sql_types::{Bool, Integer, Interval};
76
use diesel::{delete, update};
7+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
88

99
#[derive(Queryable, Selectable, Identifiable, Debug, Clone)]
1010
pub(super) struct BackgroundJob {
@@ -26,8 +26,8 @@ fn retriable() -> Box<dyn BoxableExpression<background_jobs::table, Pg, SqlType
2626

2727
/// Finds the next job that is unlocked, and ready to be retried. If a row is
2828
/// found, it will be locked.
29-
pub(super) fn find_next_unlocked_job(
30-
conn: &mut impl LoadConnection<Backend = Pg>,
29+
pub(super) async fn find_next_unlocked_job(
30+
conn: &mut AsyncPgConnection,
3131
job_types: &[String],
3232
) -> QueryResult<BackgroundJob> {
3333
background_jobs::table
@@ -38,34 +38,39 @@ pub(super) fn find_next_unlocked_job(
3838
.for_update()
3939
.skip_locked()
4040
.first::<BackgroundJob>(conn)
41+
.await
4142
}
4243

4344
/// The number of jobs that have failed at least once
44-
pub(super) fn failed_job_count(conn: &mut impl LoadConnection<Backend = Pg>) -> QueryResult<i64> {
45+
pub(super) async fn failed_job_count(conn: &mut AsyncPgConnection) -> QueryResult<i64> {
4546
background_jobs::table
4647
.count()
4748
.filter(background_jobs::retries.gt(0))
4849
.get_result(conn)
50+
.await
4951
}
5052

5153
/// Deletes a job that has successfully completed running
52-
pub(super) fn delete_successful_job(
53-
conn: &mut impl LoadConnection<Backend = Pg>,
54+
pub(super) async fn delete_successful_job(
55+
conn: &mut AsyncPgConnection,
5456
job_id: i64,
5557
) -> QueryResult<()> {
56-
delete(background_jobs::table.find(job_id)).execute(conn)?;
58+
delete(background_jobs::table.find(job_id))
59+
.execute(conn)
60+
.await?;
5761
Ok(())
5862
}
5963

6064
/// Marks that we just tried and failed to run a job.
6165
///
6266
/// Ignores any database errors that may have occurred. If the DB has gone away,
6367
/// we assume that just trying again with a new connection will succeed.
64-
pub(super) fn update_failed_job(conn: &mut impl LoadConnection<Backend = Pg>, job_id: i64) {
68+
pub(super) async fn update_failed_job(conn: &mut AsyncPgConnection, job_id: i64) {
6569
let _ = update(background_jobs::table.find(job_id))
6670
.set((
6771
background_jobs::retries.eq(background_jobs::retries + 1),
6872
background_jobs::last_retry.eq(now),
6973
))
70-
.execute(conn);
74+
.execute(conn)
75+
.await;
7176
}

crates/crates_io_worker/src/worker.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,14 @@ use crate::storage;
33
use crate::util::{try_to_extract_panic_info, with_sentry_transaction};
44
use anyhow::anyhow;
55
use diesel::prelude::*;
6-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
76
use diesel_async::pooled_connection::deadpool::Pool;
8-
use diesel_async::AsyncPgConnection;
7+
use diesel_async::scoped_futures::ScopedFutureExt;
8+
use diesel_async::{AsyncConnection, AsyncPgConnection};
99
use futures_util::FutureExt;
1010
use sentry_core::{Hub, SentryFutureExt};
1111
use std::panic::AssertUnwindSafe;
1212
use std::sync::Arc;
1313
use std::time::Duration;
14-
use tokio::runtime::Handle;
15-
use tokio::task::spawn_blocking;
1614
use tokio::time::sleep;
1715
use tracing::{debug, error, info_span, warn};
1816

@@ -58,15 +56,15 @@ impl<Context: Clone + Send + Sync + 'static> Worker<Context> {
5856
async fn run_next_job(&self) -> anyhow::Result<Option<i64>> {
5957
let context = self.context.clone();
6058
let job_registry = self.job_registry.clone();
61-
let conn = self.connection_pool.get().await?;
59+
let mut conn = self.connection_pool.get().await?;
6260

63-
spawn_blocking(move || {
64-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
65-
66-
let job_types = job_registry.job_types();
67-
conn.transaction(|conn| {
61+
let job_types = job_registry.job_types();
62+
conn.transaction(|conn| {
63+
async move {
6864
debug!("Looking for next background worker job…");
69-
let Some(job) = storage::find_next_unlocked_job(conn, &job_types).optional()?
65+
let Some(job) = storage::find_next_unlocked_job(conn, &job_types)
66+
.await
67+
.optional()?
7068
else {
7169
return Ok(None);
7270
};
@@ -90,23 +88,21 @@ impl<Context: Clone + Send + Sync + 'static> Worker<Context> {
9088
.and_then(std::convert::identity)
9189
});
9290

93-
let result = Handle::current().block_on(future.bind_hub(Hub::current()));
94-
95-
match result {
91+
match future.bind_hub(Hub::current()).await {
9692
Ok(_) => {
9793
debug!("Deleting successful job…");
98-
storage::delete_successful_job(conn, job_id)?
94+
storage::delete_successful_job(conn, job_id).await?
9995
}
10096
Err(error) => {
10197
warn!("Failed to run job: {error}");
102-
storage::update_failed_job(conn, job_id);
98+
storage::update_failed_job(conn, job_id).await;
10399
}
104100
}
105101

106102
Ok(Some(job_id))
107-
})
103+
}
104+
.scope_boxed()
108105
})
109106
.await
110-
.map_err(|err| anyhow!(err.to_string()))?
111107
}
112108
}

0 commit comments

Comments
 (0)