Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 24 additions & 24 deletions site/src/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub mod client;
pub mod comparison_summary;

use crate::api::github::Commit;
use crate::job_queue::run_new_queue;
use crate::job_queue::should_use_job_queue;
use crate::load::{MissingReason, SiteCtxt, TryCommit};
use chrono::{DateTime, Utc};
use serde::Deserialize;
Expand Down Expand Up @@ -240,18 +240,16 @@ async fn attach_shas_to_try_benchmark_request(
commit: &TryCommit,
commit_date: DateTime<Utc>,
) {
if run_new_queue() {
if let Err(e) = conn
.attach_shas_to_try_benchmark_request(
pr_number,
&commit.sha,
&commit.parent_sha,
commit_date,
)
.await
{
log::error!("Failed to add shas to try commit: {e:?}");
}
if let Err(e) = conn
.attach_shas_to_try_benchmark_request(
pr_number,
&commit.sha,
&commit.parent_sha,
commit_date,
)
.await
{
log::error!("Failed to add shas to try commit: {e:?}");
}
}

Expand Down Expand Up @@ -281,22 +279,24 @@ pub async fn enqueue_shas(
};
let conn = ctxt.conn().await;

attach_shas_to_try_benchmark_request(
&*conn,
pr_number,
&try_commit,
commit_response.commit.committer.date,
)
.await;

let queued = conn
.pr_attach_commit(
let queued = if should_use_job_queue(pr_number) {
attach_shas_to_try_benchmark_request(
&*conn,
pr_number,
&try_commit,
commit_response.commit.committer.date,
)
.await;
true
} else {
conn.pr_attach_commit(
pr_number,
&try_commit.sha,
&try_commit.parent_sha,
Some(commit_response.commit.committer.date),
)
.await;
.await
};
if queued {
if !msg.is_empty() {
msg.push('\n');
Expand Down
152 changes: 88 additions & 64 deletions site/src/job_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,26 @@ use parking_lot::RwLock;
use std::{str::FromStr, sync::Arc};
use tokio::time::{self, Duration, MissedTickBehavior};

pub fn run_new_queue() -> bool {
std::env::var("RUN_CRON")
pub fn is_job_queue_enabled() -> bool {
std::env::var("USE_JOB_QUEUE")
.ok()
.and_then(|x| x.parse().ok())
.unwrap_or(false)
.unwrap_or(true)
}

/// rust-lang/rust PR that will be used for testing the job queue.
const TEST_PR_FOR_JOB_QUEUE: u32 = 147039;

pub fn should_use_job_queue(pr: u32) -> bool {
is_job_queue_enabled() && pr == TEST_PR_FOR_JOB_QUEUE
}

/// Store the latest master commits or do nothing if all of them are
/// already in the database.
/// Returns `true` if at least one benchmark request was inserted.
async fn create_benchmark_request_master_commits(
ctxt: &SiteCtxt,
conn: &dyn database::pool::Connection,
_conn: &dyn database::pool::Connection,
index: &BenchmarkRequestIndex,
) -> anyhow::Result<bool> {
let now = Utc::now();
Expand All @@ -40,23 +47,26 @@ async fn create_benchmark_request_master_commits(
// TODO; delete at some point in the future
let cutoff: chrono::DateTime<Utc> = chrono::DateTime::from_str("2025-08-27T00:00:00.000Z")?;

let mut inserted = false;
let inserted = false;
for master_commit in master_commits {
// We don't want to add masses of obsolete data
if master_commit.time >= cutoff && !index.contains_tag(&master_commit.sha) {
let pr = master_commit.pr.unwrap_or(0);
let benchmark = BenchmarkRequest::create_master(
&master_commit.sha,
&master_commit.parent_sha,
pr,
master_commit.time,
);
log::info!("Inserting master benchmark request {benchmark:?}");
if let Err(error) = conn.insert_benchmark_request(&benchmark).await {
log::error!("Failed to insert master benchmark request: {error:?}");
} else {
inserted = true;
}
// let pr = master_commit.pr.unwrap_or(0);
// let benchmark = BenchmarkRequest::create_master(
// &master_commit.sha,
// &master_commit.parent_sha,
// pr,
// master_commit.time,
// );
// log::info!("Inserting master benchmark request {benchmark:?}");

// Do not create benchmark requests on production, to allow running in parallel with
// the old system.
// if let Err(error) = conn.insert_benchmark_request(&benchmark).await {
// log::error!("Failed to insert master benchmark request: {error:?}");
// } else {
// inserted = true;
// }
}
}
Ok(inserted)
Expand All @@ -66,7 +76,7 @@ async fn create_benchmark_request_master_commits(
/// already in the database
/// Returns `true` if at least one benchmark request was inserted.
async fn create_benchmark_request_releases(
conn: &dyn database::pool::Connection,
_conn: &dyn database::pool::Connection,
index: &BenchmarkRequestIndex,
) -> anyhow::Result<bool> {
let releases: String = reqwest::get("https://static.rust-lang.org/manifests.txt")
Expand All @@ -82,16 +92,19 @@ async fn create_benchmark_request_releases(
.filter_map(parse_release_string)
.take(20);

let mut inserted = false;
let inserted = false;
for (name, commit_date) in releases {
if commit_date >= cutoff && !index.contains_tag(&name) {
let release_request = BenchmarkRequest::create_release(&name, commit_date);
log::info!("Inserting release benchmark request {release_request:?}");
if let Err(error) = conn.insert_benchmark_request(&release_request).await {
log::error!("Failed to insert release benchmark request: {error}");
} else {
inserted = true;
}
// let release_request = BenchmarkRequest::create_release(&name, commit_date);
// log::info!("Inserting release benchmark request {release_request:?}");

// Do not create benchmark requests on production, to allow running in parallel with
// the old system.
// if let Err(error) = conn.insert_benchmark_request(&release_request).await {
// log::error!("Failed to insert release benchmark request: {error}");
// } else {
// inserted = true;
// }
}
}
Ok(inserted)
Expand Down Expand Up @@ -204,20 +217,20 @@ pub async fn build_queue(
/// This is performed atomically, in a transaction.
pub async fn enqueue_benchmark_request(
conn: &mut dyn database::pool::Connection,
benchmark_request: &BenchmarkRequest,
request: &BenchmarkRequest,
) -> anyhow::Result<()> {
let mut tx = conn.transaction().await;

let Some(request_tag) = benchmark_request.tag() else {
panic!("Benchmark request {benchmark_request:?} has no tag");
let Some(request_tag) = request.tag() else {
panic!("Benchmark request {request:?} has no tag");
};

log::info!("Enqueuing jobs for request {benchmark_request:?}");
log::info!("Enqueuing jobs for request {request:?}");

let backends = benchmark_request.backends()?;
let profiles = benchmark_request.profiles()?;
let backends = request.backends()?;
let profiles = request.profiles()?;
// Prevent the error from spamming the logs
let mut has_emitted_parent_sha_error = false;
// let mut has_emitted_parent_sha_error = false;

// Target x benchmark_set x backend x profile -> BenchmarkJob
for target in Target::all() {
Expand All @@ -238,32 +251,36 @@ pub async fn enqueue_benchmark_request(
// If the parent job has been deleted from the database
// but was already benchmarked then the collector will ignore
// it as it will see it already has results.
if let Some(parent_sha) = benchmark_request.parent_sha() {
let (is_foreign_key_violation, result) = tx
.conn()
.enqueue_parent_benchmark_job(
parent_sha,
target,
backend,
profile,
benchmark_set as u32,
)
.await;

// At some point in time the parent_sha may not refer
// to a `benchmark_request` and we want to be able to
// see that error.
if let Err(e) = result {
if is_foreign_key_violation && !has_emitted_parent_sha_error {
log::error!("Failed to create job for parent sha {e:?}");
has_emitted_parent_sha_error = true;
} else if has_emitted_parent_sha_error && is_foreign_key_violation {
continue;
} else {
return Err(e);
}
}
}

// Do not enqueue parent jobs to allow parallel execution with the old system
// If the parent artifact wouldn't be benchmarked yet, we would benchmark the
// parent with the new system.
// if let Some(parent_sha) = request.parent_sha() {
// let (is_foreign_key_violation, result) = tx
// .conn()
// .enqueue_parent_benchmark_job(
// parent_sha,
// target,
// backend,
// profile,
// benchmark_set as u32,
// )
// .await;
//
// // At some point in time the parent_sha may not refer
// // to a `benchmark_request` and we want to be able to
// // see that error.
// if let Err(e) = result {
// if is_foreign_key_violation && !has_emitted_parent_sha_error {
// log::error!("Failed to create job for parent sha {e:?}");
// has_emitted_parent_sha_error = true;
// } else if has_emitted_parent_sha_error && is_foreign_key_violation {
// continue;
// } else {
// return Err(e);
// }
// }
// }
}
}
}
Expand All @@ -287,12 +304,15 @@ async fn process_benchmark_requests(
) -> anyhow::Result<Vec<BenchmarkRequest>> {
let queue = build_queue(conn).await?;

log::debug!("Current queue: {queue:?}");

let mut completed = vec![];
for request in queue {
match request.status() {
BenchmarkRequestStatus::InProgress => {
let tag = request.tag().expect("In progress request without a tag");
if conn.maybe_mark_benchmark_request_as_completed(tag).await? {
log::info!("Request {tag} marked as completed");
completed.push(request);
continue;
}
Expand All @@ -311,8 +331,9 @@ async fn process_benchmark_requests(
Ok(completed)
}

/// For queueing jobs, add the jobs you want to queue to this function
async fn cron_enqueue_jobs(ctxt: &SiteCtxt) -> anyhow::Result<()> {
/// Creates new benchmark requests, enqueues jobs for ready benchmark requests and
/// finishes completed benchmark requests.
async fn perform_queue_tick(ctxt: &SiteCtxt) -> anyhow::Result<()> {
let mut conn = ctxt.conn().await;

let index = ctxt.known_benchmark_requests.load();
Expand Down Expand Up @@ -387,7 +408,10 @@ async fn cron_enqueue_jobs(ctxt: &SiteCtxt) -> anyhow::Result<()> {
}

/// Entry point for the cron job that manages the benchmark request and job queue.
pub async fn cron_main(site_ctxt: Arc<RwLock<Option<Arc<SiteCtxt>>>>, run_interval: Duration) {
pub async fn create_queue_process(
site_ctxt: Arc<RwLock<Option<Arc<SiteCtxt>>>>,
run_interval: Duration,
) {
let mut interval = time::interval(run_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

Expand All @@ -398,7 +422,7 @@ pub async fn cron_main(site_ctxt: Arc<RwLock<Option<Arc<SiteCtxt>>>>, run_interv
let guard = ctxt.read();
guard.as_ref().cloned()
} {
match cron_enqueue_jobs(&ctxt_clone).await {
match perform_queue_tick(&ctxt_clone).await {
Ok(_) => log::info!("Cron job finished"),
Err(e) => log::error!("Cron job failed to execute: {e:?}"),
}
Expand Down
6 changes: 3 additions & 3 deletions site/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::future::FutureExt;
use parking_lot::RwLock;
use site::job_queue::{cron_main, run_new_queue};
use site::job_queue::{create_queue_process, is_job_queue_enabled};
use site::load;
use std::env;
use std::sync::Arc;
Expand Down Expand Up @@ -59,9 +59,9 @@ async fn main() {

let server = site::server::start(ctxt.clone(), port).fuse();

if run_new_queue() {
if is_job_queue_enabled() {
task::spawn(async move {
cron_main(
create_queue_process(
ctxt.clone(),
Duration::from_secs(queue_update_interval_seconds),
)
Expand Down
Loading
Loading