From 6fb501199f1f4ff39b850dba3029d63dbf1cba29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Thu, 25 Sep 2025 18:31:08 +0200 Subject: [PATCH 1/4] Separate try benchmark requests for the old and new system and test the new system on a specific PR --- site/src/github.rs | 48 +++++++++--------- site/src/job_queue/mod.rs | 7 +++ site/src/request_handlers/github.rs | 75 +++++++++++++++-------------- 3 files changed, 70 insertions(+), 60 deletions(-) diff --git a/site/src/github.rs b/site/src/github.rs index b17eebf52..2916b5a74 100644 --- a/site/src/github.rs +++ b/site/src/github.rs @@ -2,7 +2,7 @@ pub mod client; pub mod comparison_summary; use crate::api::github::Commit; -use crate::job_queue::run_new_queue; +use crate::job_queue::{run_new_queue, should_use_new_system}; use crate::load::{MissingReason, SiteCtxt, TryCommit}; use chrono::{DateTime, Utc}; use serde::Deserialize; @@ -240,18 +240,16 @@ async fn attach_shas_to_try_benchmark_request( commit: &TryCommit, commit_date: DateTime, ) { - if run_new_queue() { - if let Err(e) = conn - .attach_shas_to_try_benchmark_request( - pr_number, - &commit.sha, - &commit.parent_sha, - commit_date, - ) - .await - { - log::error!("Failed to add shas to try commit: {e:?}"); - } + if let Err(e) = conn + .attach_shas_to_try_benchmark_request( + pr_number, + &commit.sha, + &commit.parent_sha, + commit_date, + ) + .await + { + log::error!("Failed to add shas to try commit: {e:?}"); } } @@ -281,22 +279,24 @@ pub async fn enqueue_shas( }; let conn = ctxt.conn().await; - attach_shas_to_try_benchmark_request( - &*conn, - pr_number, - &try_commit, - commit_response.commit.committer.date, - ) - .await; - - let queued = conn - .pr_attach_commit( + let queued = if should_use_new_system(pr_number) { + attach_shas_to_try_benchmark_request( + &*conn, + pr_number, + &try_commit, + commit_response.commit.committer.date, + ) + .await; + true + } else { + conn.pr_attach_commit( pr_number, &try_commit.sha, &try_commit.parent_sha, Some(commit_response.commit.committer.date), ) - .await; + .await + }; if queued { if !msg.is_empty() { msg.push('\n'); diff --git a/site/src/job_queue/mod.rs b/site/src/job_queue/mod.rs index c63bfbe79..eab007e70 100644 --- a/site/src/job_queue/mod.rs +++ b/site/src/job_queue/mod.rs @@ -20,6 +20,13 @@ pub fn run_new_queue() -> bool { .unwrap_or(false) } +/// rust-lang/rust PR that will be used for testing the new system. +const TEST_PR_FOR_NEW_SYSTEM: u32 = 147039; + +pub fn should_use_new_system(pr: u32) -> bool { + run_new_queue() && pr == TEST_PR_FOR_NEW_SYSTEM +} + /// Store the latest master commits or do nothing if all of them are /// already in the database. /// Returns `true` if at least one benchmark request was inserted. diff --git a/site/src/request_handlers/github.rs b/site/src/request_handlers/github.rs index 9fc313a13..d9b87818f 100644 --- a/site/src/request_handlers/github.rs +++ b/site/src/request_handlers/github.rs @@ -3,7 +3,7 @@ use crate::github::{ client, enqueue_shas, parse_homu_comment, rollup_pr_number, unroll_rollup, COMMENT_MARK_TEMPORARY, RUST_REPO_GITHUB_API_URL, }; -use crate::job_queue::run_new_queue; +use crate::job_queue::{run_new_queue, should_use_new_system}; use crate::load::SiteCtxt; use database::BenchmarkRequest; @@ -84,13 +84,10 @@ async fn record_try_benchmark_request_without_artifacts( pr: u32, backends: &str, ) { - // We only want to run this if the new system is running - if run_new_queue() { - let try_request = BenchmarkRequest::create_try_without_artifacts(pr, backends, ""); - log::info!("Inserting try benchmark request {try_request:?}"); - if let Err(e) = conn.insert_benchmark_request(&try_request).await { - log::error!("Failed to insert try benchmark request: {}", e); - } + let try_request = BenchmarkRequest::create_try_without_artifacts(pr, backends, ""); + log::info!("Inserting try benchmark request {try_request:?}"); + if let Err(e) = conn.insert_benchmark_request(&try_request).await { + log::error!("Failed to insert try benchmark request: {}", e); } } @@ -120,20 +117,23 @@ async fn handle_rust_timer( Ok(cmd) => { let conn = ctxt.conn().await; - record_try_benchmark_request_without_artifacts( - &*conn, - issue.number, - cmd.params.backends.unwrap_or(""), - ) - .await; - conn.queue_pr( - issue.number, - cmd.params.include, - cmd.params.exclude, - cmd.params.runs, - cmd.params.backends, - ) - .await; + if should_use_new_system(issue.number) { + record_try_benchmark_request_without_artifacts( + &*conn, + issue.number, + cmd.params.backends.unwrap_or(""), + ) + .await; + } else { + conn.queue_pr( + issue.number, + cmd.params.include, + cmd.params.exclude, + cmd.params.runs, + cmd.params.backends, + ) + .await; + } format!( "Awaiting bors try build completion. @@ -166,20 +166,23 @@ async fn handle_rust_timer( { let conn = ctxt.conn().await; for command in &valid_build_cmds { - record_try_benchmark_request_without_artifacts( - &*conn, - issue.number, - command.params.backends.unwrap_or(""), - ) - .await; - conn.queue_pr( - issue.number, - command.params.include, - command.params.exclude, - command.params.runs, - command.params.backends, - ) - .await; + if should_use_new_system(issue.number) { + record_try_benchmark_request_without_artifacts( + &*conn, + issue.number, + command.params.backends.unwrap_or(""), + ) + .await; + } else { + conn.queue_pr( + issue.number, + command.params.include, + command.params.exclude, + command.params.runs, + command.params.backends, + ) + .await; + } } } From 21929934668d59577ee2dc17cad960bf216e885c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Thu, 25 Sep 2025 18:42:03 +0200 Subject: [PATCH 2/4] Disable creating master/release benchmark requests and enqueuing parent jobs --- site/src/github.rs | 2 +- site/src/job_queue/mod.rs | 117 ++++++++++++++++------------ site/src/main.rs | 4 +- site/src/request_handlers/github.rs | 2 +- 4 files changed, 71 insertions(+), 54 deletions(-) diff --git a/site/src/github.rs b/site/src/github.rs index 2916b5a74..1ff5347a7 100644 --- a/site/src/github.rs +++ b/site/src/github.rs @@ -2,7 +2,7 @@ pub mod client; pub mod comparison_summary; use crate::api::github::Commit; -use crate::job_queue::{run_new_queue, should_use_new_system}; +use crate::job_queue::should_use_new_system; use crate::load::{MissingReason, SiteCtxt, TryCommit}; use chrono::{DateTime, Utc}; use serde::Deserialize; diff --git a/site/src/job_queue/mod.rs b/site/src/job_queue/mod.rs index eab007e70..660ccceec 100644 --- a/site/src/job_queue/mod.rs +++ b/site/src/job_queue/mod.rs @@ -32,7 +32,7 @@ pub fn should_use_new_system(pr: u32) -> bool { /// Returns `true` if at least one benchmark request was inserted. async fn create_benchmark_request_master_commits( ctxt: &SiteCtxt, - conn: &dyn database::pool::Connection, + _conn: &dyn database::pool::Connection, index: &BenchmarkRequestIndex, ) -> anyhow::Result { let now = Utc::now(); @@ -47,7 +47,7 @@ async fn create_benchmark_request_master_commits( // TODO; delete at some point in the future let cutoff: chrono::DateTime = chrono::DateTime::from_str("2025-08-27T00:00:00.000Z")?; - let mut inserted = false; + let inserted = false; for master_commit in master_commits { // We don't want to add masses of obsolete data if master_commit.time >= cutoff && !index.contains_tag(&master_commit.sha) { @@ -59,11 +59,14 @@ async fn create_benchmark_request_master_commits( master_commit.time, ); log::info!("Inserting master benchmark request {benchmark:?}"); - if let Err(error) = conn.insert_benchmark_request(&benchmark).await { - log::error!("Failed to insert master benchmark request: {error:?}"); - } else { - inserted = true; - } + + // Do not create benchmark requests on production, to allow running in parallel with + // the old system. + // if let Err(error) = conn.insert_benchmark_request(&benchmark).await { + // log::error!("Failed to insert master benchmark request: {error:?}"); + // } else { + // inserted = true; + // } } } Ok(inserted) @@ -73,7 +76,7 @@ async fn create_benchmark_request_master_commits( /// already in the database /// Returns `true` if at least one benchmark request was inserted. async fn create_benchmark_request_releases( - conn: &dyn database::pool::Connection, + _conn: &dyn database::pool::Connection, index: &BenchmarkRequestIndex, ) -> anyhow::Result { let releases: String = reqwest::get("https://static.rust-lang.org/manifests.txt") @@ -89,16 +92,19 @@ async fn create_benchmark_request_releases( .filter_map(parse_release_string) .take(20); - let mut inserted = false; + let inserted = false; for (name, commit_date) in releases { if commit_date >= cutoff && !index.contains_tag(&name) { let release_request = BenchmarkRequest::create_release(&name, commit_date); log::info!("Inserting release benchmark request {release_request:?}"); - if let Err(error) = conn.insert_benchmark_request(&release_request).await { - log::error!("Failed to insert release benchmark request: {error}"); - } else { - inserted = true; - } + + // Do not create benchmark requests on production, to allow running in parallel with + // the old system. + // if let Err(error) = conn.insert_benchmark_request(&release_request).await { + // log::error!("Failed to insert release benchmark request: {error}"); + // } else { + // inserted = true; + // } } } Ok(inserted) @@ -211,18 +217,18 @@ pub async fn build_queue( /// This is performed atomically, in a transaction. pub async fn enqueue_benchmark_request( conn: &mut dyn database::pool::Connection, - benchmark_request: &BenchmarkRequest, + request: &BenchmarkRequest, ) -> anyhow::Result<()> { let mut tx = conn.transaction().await; - let Some(request_tag) = benchmark_request.tag() else { - panic!("Benchmark request {benchmark_request:?} has no tag"); + let Some(request_tag) = request.tag() else { + panic!("Benchmark request {request:?} has no tag"); }; - log::info!("Enqueuing jobs for request {benchmark_request:?}"); + log::info!("Enqueuing jobs for request {request:?}"); - let backends = benchmark_request.backends()?; - let profiles = benchmark_request.profiles()?; + let backends = request.backends()?; + let profiles = request.profiles()?; // Prevent the error from spamming the logs let mut has_emitted_parent_sha_error = false; @@ -245,32 +251,36 @@ pub async fn enqueue_benchmark_request( // If the parent job has been deleted from the database // but was already benchmarked then the collector will ignore // it as it will see it already has results. - if let Some(parent_sha) = benchmark_request.parent_sha() { - let (is_foreign_key_violation, result) = tx - .conn() - .enqueue_parent_benchmark_job( - parent_sha, - target, - backend, - profile, - benchmark_set as u32, - ) - .await; - - // At some point in time the parent_sha may not refer - // to a `benchmark_request` and we want to be able to - // see that error. - if let Err(e) = result { - if is_foreign_key_violation && !has_emitted_parent_sha_error { - log::error!("Failed to create job for parent sha {e:?}"); - has_emitted_parent_sha_error = true; - } else if has_emitted_parent_sha_error && is_foreign_key_violation { - continue; - } else { - return Err(e); - } - } - } + + // Do not enqueue parent jobs to allow parallel execution with the old system + // If the parent artifact wouldn't be benchmarked yet, we would benchmark the + // parent with the new system. + // if let Some(parent_sha) = request.parent_sha() { + // let (is_foreign_key_violation, result) = tx + // .conn() + // .enqueue_parent_benchmark_job( + // parent_sha, + // target, + // backend, + // profile, + // benchmark_set as u32, + // ) + // .await; + // + // // At some point in time the parent_sha may not refer + // // to a `benchmark_request` and we want to be able to + // // see that error. + // if let Err(e) = result { + // if is_foreign_key_violation && !has_emitted_parent_sha_error { + // log::error!("Failed to create job for parent sha {e:?}"); + // has_emitted_parent_sha_error = true; + // } else if has_emitted_parent_sha_error && is_foreign_key_violation { + // continue; + // } else { + // return Err(e); + // } + // } + // } } } } @@ -294,12 +304,15 @@ async fn process_benchmark_requests( ) -> anyhow::Result> { let queue = build_queue(conn).await?; + log::debug!("Current queue: {queue:?}"); + let mut completed = vec![]; for request in queue { match request.status() { BenchmarkRequestStatus::InProgress => { let tag = request.tag().expect("In progress request without a tag"); if conn.maybe_mark_benchmark_request_as_completed(tag).await? { + log::info!("Request {tag} marked as completed"); completed.push(request); continue; } @@ -318,8 +331,9 @@ async fn process_benchmark_requests( Ok(completed) } -/// For queueing jobs, add the jobs you want to queue to this function -async fn cron_enqueue_jobs(ctxt: &SiteCtxt) -> anyhow::Result<()> { +/// Creates new benchmark requests, enqueues jobs for ready benchmark requests and +/// finishes completed benchmark requests. +async fn perform_queue_tick(ctxt: &SiteCtxt) -> anyhow::Result<()> { let mut conn = ctxt.conn().await; let index = ctxt.known_benchmark_requests.load(); @@ -394,7 +408,10 @@ async fn cron_enqueue_jobs(ctxt: &SiteCtxt) -> anyhow::Result<()> { } /// Entry point for the cron job that manages the benchmark request and job queue. -pub async fn cron_main(site_ctxt: Arc>>>, run_interval: Duration) { +pub async fn create_queue_process( + site_ctxt: Arc>>>, + run_interval: Duration, +) { let mut interval = time::interval(run_interval); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -405,7 +422,7 @@ pub async fn cron_main(site_ctxt: Arc>>>, run_interv let guard = ctxt.read(); guard.as_ref().cloned() } { - match cron_enqueue_jobs(&ctxt_clone).await { + match perform_queue_tick(&ctxt_clone).await { Ok(_) => log::info!("Cron job finished"), Err(e) => log::error!("Cron job failed to execute: {e:?}"), } diff --git a/site/src/main.rs b/site/src/main.rs index c74792242..31aba0e7b 100644 --- a/site/src/main.rs +++ b/site/src/main.rs @@ -1,6 +1,6 @@ use futures::future::FutureExt; use parking_lot::RwLock; -use site::job_queue::{cron_main, run_new_queue}; +use site::job_queue::{create_queue_process, run_new_queue}; use site::load; use std::env; use std::sync::Arc; @@ -61,7 +61,7 @@ async fn main() { if run_new_queue() { task::spawn(async move { - cron_main( + create_queue_process( ctxt.clone(), Duration::from_secs(queue_update_interval_seconds), ) diff --git a/site/src/request_handlers/github.rs b/site/src/request_handlers/github.rs index d9b87818f..2ff2e4594 100644 --- a/site/src/request_handlers/github.rs +++ b/site/src/request_handlers/github.rs @@ -3,7 +3,7 @@ use crate::github::{ client, enqueue_shas, parse_homu_comment, rollup_pr_number, unroll_rollup, COMMENT_MARK_TEMPORARY, RUST_REPO_GITHUB_API_URL, }; -use crate::job_queue::{run_new_queue, should_use_new_system}; +use crate::job_queue::should_use_new_system; use crate::load::SiteCtxt; use database::BenchmarkRequest; From 98dee6dc11d9893cd426e6b93e5149b4e7d3f242 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Thu, 25 Sep 2025 18:43:31 +0200 Subject: [PATCH 3/4] Enable the job queue by default --- site/src/github.rs | 4 ++-- site/src/job_queue/mod.rs | 14 +++++++------- site/src/main.rs | 4 ++-- site/src/request_handlers/github.rs | 6 +++--- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/site/src/github.rs b/site/src/github.rs index 1ff5347a7..a2a6e2dad 100644 --- a/site/src/github.rs +++ b/site/src/github.rs @@ -2,7 +2,7 @@ pub mod client; pub mod comparison_summary; use crate::api::github::Commit; -use crate::job_queue::should_use_new_system; +use crate::job_queue::should_use_job_queue; use crate::load::{MissingReason, SiteCtxt, TryCommit}; use chrono::{DateTime, Utc}; use serde::Deserialize; @@ -279,7 +279,7 @@ pub async fn enqueue_shas( }; let conn = ctxt.conn().await; - let queued = if should_use_new_system(pr_number) { + let queued = if should_use_job_queue(pr_number) { attach_shas_to_try_benchmark_request( &*conn, pr_number, diff --git a/site/src/job_queue/mod.rs b/site/src/job_queue/mod.rs index 660ccceec..58a5d1307 100644 --- a/site/src/job_queue/mod.rs +++ b/site/src/job_queue/mod.rs @@ -13,18 +13,18 @@ use parking_lot::RwLock; use std::{str::FromStr, sync::Arc}; use tokio::time::{self, Duration, MissedTickBehavior}; -pub fn run_new_queue() -> bool { - std::env::var("RUN_CRON") +pub fn is_job_queue_enabled() -> bool { + std::env::var("USE_JOB_QUEUE") .ok() .and_then(|x| x.parse().ok()) - .unwrap_or(false) + .unwrap_or(true) } -/// rust-lang/rust PR that will be used for testing the new system. -const TEST_PR_FOR_NEW_SYSTEM: u32 = 147039; +/// rust-lang/rust PR that will be used for testing the job queue. +const TEST_PR_FOR_JOB_QUEUE: u32 = 147039; -pub fn should_use_new_system(pr: u32) -> bool { - run_new_queue() && pr == TEST_PR_FOR_NEW_SYSTEM +pub fn should_use_job_queue(pr: u32) -> bool { + is_job_queue_enabled() && pr == TEST_PR_FOR_JOB_QUEUE } /// Store the latest master commits or do nothing if all of them are diff --git a/site/src/main.rs b/site/src/main.rs index 31aba0e7b..7affbd7de 100644 --- a/site/src/main.rs +++ b/site/src/main.rs @@ -1,6 +1,6 @@ use futures::future::FutureExt; use parking_lot::RwLock; -use site::job_queue::{create_queue_process, run_new_queue}; +use site::job_queue::{create_queue_process, is_job_queue_enabled}; use site::load; use std::env; use std::sync::Arc; @@ -59,7 +59,7 @@ async fn main() { let server = site::server::start(ctxt.clone(), port).fuse(); - if run_new_queue() { + if is_job_queue_enabled() { task::spawn(async move { create_queue_process( ctxt.clone(), diff --git a/site/src/request_handlers/github.rs b/site/src/request_handlers/github.rs index 2ff2e4594..1275136de 100644 --- a/site/src/request_handlers/github.rs +++ b/site/src/request_handlers/github.rs @@ -3,7 +3,7 @@ use crate::github::{ client, enqueue_shas, parse_homu_comment, rollup_pr_number, unroll_rollup, COMMENT_MARK_TEMPORARY, RUST_REPO_GITHUB_API_URL, }; -use crate::job_queue::should_use_new_system; +use crate::job_queue::should_use_job_queue; use crate::load::SiteCtxt; use database::BenchmarkRequest; @@ -117,7 +117,7 @@ async fn handle_rust_timer( Ok(cmd) => { let conn = ctxt.conn().await; - if should_use_new_system(issue.number) { + if should_use_job_queue(issue.number) { record_try_benchmark_request_without_artifacts( &*conn, issue.number, @@ -166,7 +166,7 @@ async fn handle_rust_timer( { let conn = ctxt.conn().await; for command in &valid_build_cmds { - if should_use_new_system(issue.number) { + if should_use_job_queue(issue.number) { record_try_benchmark_request_without_artifacts( &*conn, issue.number, From a491dd3b97240c1bfcab6f0d099d31de4712dcbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Fri, 26 Sep 2025 14:09:05 +0200 Subject: [PATCH 4/4] Fix Clippy --- site/src/job_queue/mod.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/site/src/job_queue/mod.rs b/site/src/job_queue/mod.rs index 58a5d1307..29e12e631 100644 --- a/site/src/job_queue/mod.rs +++ b/site/src/job_queue/mod.rs @@ -51,14 +51,14 @@ async fn create_benchmark_request_master_commits( for master_commit in master_commits { // We don't want to add masses of obsolete data if master_commit.time >= cutoff && !index.contains_tag(&master_commit.sha) { - let pr = master_commit.pr.unwrap_or(0); - let benchmark = BenchmarkRequest::create_master( - &master_commit.sha, - &master_commit.parent_sha, - pr, - master_commit.time, - ); - log::info!("Inserting master benchmark request {benchmark:?}"); + // let pr = master_commit.pr.unwrap_or(0); + // let benchmark = BenchmarkRequest::create_master( + // &master_commit.sha, + // &master_commit.parent_sha, + // pr, + // master_commit.time, + // ); + // log::info!("Inserting master benchmark request {benchmark:?}"); // Do not create benchmark requests on production, to allow running in parallel with // the old system. @@ -95,8 +95,8 @@ async fn create_benchmark_request_releases( let inserted = false; for (name, commit_date) in releases { if commit_date >= cutoff && !index.contains_tag(&name) { - let release_request = BenchmarkRequest::create_release(&name, commit_date); - log::info!("Inserting release benchmark request {release_request:?}"); + // let release_request = BenchmarkRequest::create_release(&name, commit_date); + // log::info!("Inserting release benchmark request {release_request:?}"); // Do not create benchmark requests on production, to allow running in parallel with // the old system. @@ -230,7 +230,7 @@ pub async fn enqueue_benchmark_request( let backends = request.backends()?; let profiles = request.profiles()?; // Prevent the error from spamming the logs - let mut has_emitted_parent_sha_error = false; + // let mut has_emitted_parent_sha_error = false; // Target x benchmark_set x backend x profile -> BenchmarkJob for target in Target::all() {