Skip to content

Commit 6c1460f

Browse files
authored
Merge pull request #2259 from Kobzol/test-pr
Partially enable job queue in production
2 parents 5ae6a0c + a491dd3 commit 6c1460f

File tree

4 files changed

+154
-127
lines changed

4 files changed

+154
-127
lines changed

site/src/github.rs

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ pub mod client;
22
pub mod comparison_summary;
33

44
use crate::api::github::Commit;
5-
use crate::job_queue::run_new_queue;
5+
use crate::job_queue::should_use_job_queue;
66
use crate::load::{MissingReason, SiteCtxt, TryCommit};
77
use chrono::{DateTime, Utc};
88
use serde::Deserialize;
@@ -240,18 +240,16 @@ async fn attach_shas_to_try_benchmark_request(
240240
commit: &TryCommit,
241241
commit_date: DateTime<Utc>,
242242
) {
243-
if run_new_queue() {
244-
if let Err(e) = conn
245-
.attach_shas_to_try_benchmark_request(
246-
pr_number,
247-
&commit.sha,
248-
&commit.parent_sha,
249-
commit_date,
250-
)
251-
.await
252-
{
253-
log::error!("Failed to add shas to try commit: {e:?}");
254-
}
243+
if let Err(e) = conn
244+
.attach_shas_to_try_benchmark_request(
245+
pr_number,
246+
&commit.sha,
247+
&commit.parent_sha,
248+
commit_date,
249+
)
250+
.await
251+
{
252+
log::error!("Failed to add shas to try commit: {e:?}");
255253
}
256254
}
257255

@@ -281,22 +279,24 @@ pub async fn enqueue_shas(
281279
};
282280
let conn = ctxt.conn().await;
283281

284-
attach_shas_to_try_benchmark_request(
285-
&*conn,
286-
pr_number,
287-
&try_commit,
288-
commit_response.commit.committer.date,
289-
)
290-
.await;
291-
292-
let queued = conn
293-
.pr_attach_commit(
282+
let queued = if should_use_job_queue(pr_number) {
283+
attach_shas_to_try_benchmark_request(
284+
&*conn,
285+
pr_number,
286+
&try_commit,
287+
commit_response.commit.committer.date,
288+
)
289+
.await;
290+
true
291+
} else {
292+
conn.pr_attach_commit(
294293
pr_number,
295294
&try_commit.sha,
296295
&try_commit.parent_sha,
297296
Some(commit_response.commit.committer.date),
298297
)
299-
.await;
298+
.await
299+
};
300300
if queued {
301301
if !msg.is_empty() {
302302
msg.push('\n');

site/src/job_queue/mod.rs

Lines changed: 88 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,26 @@ use parking_lot::RwLock;
1313
use std::{str::FromStr, sync::Arc};
1414
use tokio::time::{self, Duration, MissedTickBehavior};
1515

16-
pub fn run_new_queue() -> bool {
17-
std::env::var("RUN_CRON")
16+
pub fn is_job_queue_enabled() -> bool {
17+
std::env::var("USE_JOB_QUEUE")
1818
.ok()
1919
.and_then(|x| x.parse().ok())
20-
.unwrap_or(false)
20+
.unwrap_or(true)
21+
}
22+
23+
/// rust-lang/rust PR that will be used for testing the job queue.
24+
const TEST_PR_FOR_JOB_QUEUE: u32 = 147039;
25+
26+
pub fn should_use_job_queue(pr: u32) -> bool {
27+
is_job_queue_enabled() && pr == TEST_PR_FOR_JOB_QUEUE
2128
}
2229

2330
/// Store the latest master commits or do nothing if all of them are
2431
/// already in the database.
2532
/// Returns `true` if at least one benchmark request was inserted.
2633
async fn create_benchmark_request_master_commits(
2734
ctxt: &SiteCtxt,
28-
conn: &dyn database::pool::Connection,
35+
_conn: &dyn database::pool::Connection,
2936
index: &BenchmarkRequestIndex,
3037
) -> anyhow::Result<bool> {
3138
let now = Utc::now();
@@ -40,23 +47,26 @@ async fn create_benchmark_request_master_commits(
4047
// TODO; delete at some point in the future
4148
let cutoff: chrono::DateTime<Utc> = chrono::DateTime::from_str("2025-08-27T00:00:00.000Z")?;
4249

43-
let mut inserted = false;
50+
let inserted = false;
4451
for master_commit in master_commits {
4552
// We don't want to add masses of obsolete data
4653
if master_commit.time >= cutoff && !index.contains_tag(&master_commit.sha) {
47-
let pr = master_commit.pr.unwrap_or(0);
48-
let benchmark = BenchmarkRequest::create_master(
49-
&master_commit.sha,
50-
&master_commit.parent_sha,
51-
pr,
52-
master_commit.time,
53-
);
54-
log::info!("Inserting master benchmark request {benchmark:?}");
55-
if let Err(error) = conn.insert_benchmark_request(&benchmark).await {
56-
log::error!("Failed to insert master benchmark request: {error:?}");
57-
} else {
58-
inserted = true;
59-
}
54+
// let pr = master_commit.pr.unwrap_or(0);
55+
// let benchmark = BenchmarkRequest::create_master(
56+
// &master_commit.sha,
57+
// &master_commit.parent_sha,
58+
// pr,
59+
// master_commit.time,
60+
// );
61+
// log::info!("Inserting master benchmark request {benchmark:?}");
62+
63+
// Do not create benchmark requests on production, to allow running in parallel with
64+
// the old system.
65+
// if let Err(error) = conn.insert_benchmark_request(&benchmark).await {
66+
// log::error!("Failed to insert master benchmark request: {error:?}");
67+
// } else {
68+
// inserted = true;
69+
// }
6070
}
6171
}
6272
Ok(inserted)
@@ -66,7 +76,7 @@ async fn create_benchmark_request_master_commits(
6676
/// already in the database
6777
/// Returns `true` if at least one benchmark request was inserted.
6878
async fn create_benchmark_request_releases(
69-
conn: &dyn database::pool::Connection,
79+
_conn: &dyn database::pool::Connection,
7080
index: &BenchmarkRequestIndex,
7181
) -> anyhow::Result<bool> {
7282
let releases: String = reqwest::get("https://static.rust-lang.org/manifests.txt")
@@ -82,16 +92,19 @@ async fn create_benchmark_request_releases(
8292
.filter_map(parse_release_string)
8393
.take(20);
8494

85-
let mut inserted = false;
95+
let inserted = false;
8696
for (name, commit_date) in releases {
8797
if commit_date >= cutoff && !index.contains_tag(&name) {
88-
let release_request = BenchmarkRequest::create_release(&name, commit_date);
89-
log::info!("Inserting release benchmark request {release_request:?}");
90-
if let Err(error) = conn.insert_benchmark_request(&release_request).await {
91-
log::error!("Failed to insert release benchmark request: {error}");
92-
} else {
93-
inserted = true;
94-
}
98+
// let release_request = BenchmarkRequest::create_release(&name, commit_date);
99+
// log::info!("Inserting release benchmark request {release_request:?}");
100+
101+
// Do not create benchmark requests on production, to allow running in parallel with
102+
// the old system.
103+
// if let Err(error) = conn.insert_benchmark_request(&release_request).await {
104+
// log::error!("Failed to insert release benchmark request: {error}");
105+
// } else {
106+
// inserted = true;
107+
// }
95108
}
96109
}
97110
Ok(inserted)
@@ -204,20 +217,20 @@ pub async fn build_queue(
204217
/// This is performed atomically, in a transaction.
205218
pub async fn enqueue_benchmark_request(
206219
conn: &mut dyn database::pool::Connection,
207-
benchmark_request: &BenchmarkRequest,
220+
request: &BenchmarkRequest,
208221
) -> anyhow::Result<()> {
209222
let mut tx = conn.transaction().await;
210223

211-
let Some(request_tag) = benchmark_request.tag() else {
212-
panic!("Benchmark request {benchmark_request:?} has no tag");
224+
let Some(request_tag) = request.tag() else {
225+
panic!("Benchmark request {request:?} has no tag");
213226
};
214227

215-
log::info!("Enqueuing jobs for request {benchmark_request:?}");
228+
log::info!("Enqueuing jobs for request {request:?}");
216229

217-
let backends = benchmark_request.backends()?;
218-
let profiles = benchmark_request.profiles()?;
230+
let backends = request.backends()?;
231+
let profiles = request.profiles()?;
219232
// Prevent the error from spamming the logs
220-
let mut has_emitted_parent_sha_error = false;
233+
// let mut has_emitted_parent_sha_error = false;
221234

222235
// Target x benchmark_set x backend x profile -> BenchmarkJob
223236
for target in Target::all() {
@@ -238,32 +251,36 @@ pub async fn enqueue_benchmark_request(
238251
// If the parent job has been deleted from the database
239252
// but was already benchmarked then the collector will ignore
240253
// it as it will see it already has results.
241-
if let Some(parent_sha) = benchmark_request.parent_sha() {
242-
let (is_foreign_key_violation, result) = tx
243-
.conn()
244-
.enqueue_parent_benchmark_job(
245-
parent_sha,
246-
target,
247-
backend,
248-
profile,
249-
benchmark_set as u32,
250-
)
251-
.await;
252-
253-
// At some point in time the parent_sha may not refer
254-
// to a `benchmark_request` and we want to be able to
255-
// see that error.
256-
if let Err(e) = result {
257-
if is_foreign_key_violation && !has_emitted_parent_sha_error {
258-
log::error!("Failed to create job for parent sha {e:?}");
259-
has_emitted_parent_sha_error = true;
260-
} else if has_emitted_parent_sha_error && is_foreign_key_violation {
261-
continue;
262-
} else {
263-
return Err(e);
264-
}
265-
}
266-
}
254+
255+
// Do not enqueue parent jobs to allow parallel execution with the old system
256+
// If the parent artifact wouldn't be benchmarked yet, we would benchmark the
257+
// parent with the new system.
258+
// if let Some(parent_sha) = request.parent_sha() {
259+
// let (is_foreign_key_violation, result) = tx
260+
// .conn()
261+
// .enqueue_parent_benchmark_job(
262+
// parent_sha,
263+
// target,
264+
// backend,
265+
// profile,
266+
// benchmark_set as u32,
267+
// )
268+
// .await;
269+
//
270+
// // At some point in time the parent_sha may not refer
271+
// // to a `benchmark_request` and we want to be able to
272+
// // see that error.
273+
// if let Err(e) = result {
274+
// if is_foreign_key_violation && !has_emitted_parent_sha_error {
275+
// log::error!("Failed to create job for parent sha {e:?}");
276+
// has_emitted_parent_sha_error = true;
277+
// } else if has_emitted_parent_sha_error && is_foreign_key_violation {
278+
// continue;
279+
// } else {
280+
// return Err(e);
281+
// }
282+
// }
283+
// }
267284
}
268285
}
269286
}
@@ -287,12 +304,15 @@ async fn process_benchmark_requests(
287304
) -> anyhow::Result<Vec<BenchmarkRequest>> {
288305
let queue = build_queue(conn).await?;
289306

307+
log::debug!("Current queue: {queue:?}");
308+
290309
let mut completed = vec![];
291310
for request in queue {
292311
match request.status() {
293312
BenchmarkRequestStatus::InProgress => {
294313
let tag = request.tag().expect("In progress request without a tag");
295314
if conn.maybe_mark_benchmark_request_as_completed(tag).await? {
315+
log::info!("Request {tag} marked as completed");
296316
completed.push(request);
297317
continue;
298318
}
@@ -311,8 +331,9 @@ async fn process_benchmark_requests(
311331
Ok(completed)
312332
}
313333

314-
/// For queueing jobs, add the jobs you want to queue to this function
315-
async fn cron_enqueue_jobs(ctxt: &SiteCtxt) -> anyhow::Result<()> {
334+
/// Creates new benchmark requests, enqueues jobs for ready benchmark requests and
335+
/// finishes completed benchmark requests.
336+
async fn perform_queue_tick(ctxt: &SiteCtxt) -> anyhow::Result<()> {
316337
let mut conn = ctxt.conn().await;
317338

318339
let index = ctxt.known_benchmark_requests.load();
@@ -387,7 +408,10 @@ async fn cron_enqueue_jobs(ctxt: &SiteCtxt) -> anyhow::Result<()> {
387408
}
388409

389410
/// Entry point for the cron job that manages the benchmark request and job queue.
390-
pub async fn cron_main(site_ctxt: Arc<RwLock<Option<Arc<SiteCtxt>>>>, run_interval: Duration) {
411+
pub async fn create_queue_process(
412+
site_ctxt: Arc<RwLock<Option<Arc<SiteCtxt>>>>,
413+
run_interval: Duration,
414+
) {
391415
let mut interval = time::interval(run_interval);
392416
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
393417

@@ -398,7 +422,7 @@ pub async fn cron_main(site_ctxt: Arc<RwLock<Option<Arc<SiteCtxt>>>>, run_interv
398422
let guard = ctxt.read();
399423
guard.as_ref().cloned()
400424
} {
401-
match cron_enqueue_jobs(&ctxt_clone).await {
425+
match perform_queue_tick(&ctxt_clone).await {
402426
Ok(_) => log::info!("Cron job finished"),
403427
Err(e) => log::error!("Cron job failed to execute: {e:?}"),
404428
}

site/src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use futures::future::FutureExt;
22
use parking_lot::RwLock;
3-
use site::job_queue::{cron_main, run_new_queue};
3+
use site::job_queue::{create_queue_process, is_job_queue_enabled};
44
use site::load;
55
use std::env;
66
use std::sync::Arc;
@@ -59,9 +59,9 @@ async fn main() {
5959

6060
let server = site::server::start(ctxt.clone(), port).fuse();
6161

62-
if run_new_queue() {
62+
if is_job_queue_enabled() {
6363
task::spawn(async move {
64-
cron_main(
64+
create_queue_process(
6565
ctxt.clone(),
6666
Duration::from_secs(queue_update_interval_seconds),
6767
)

0 commit comments

Comments
 (0)