Skip to content

gix corpus improvements #902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
rustflags = [
# Rustc lints
# "-W", "warning_name"
"-A", "clippy::let_unit_value", # in 'small' builds this triggers as the `span!` macro yields `let x = ()`. No way to prevent it in macro apparently.

# Clippy lints
"-W", "clippy::cloned_instead_of_copied",
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion gitoxide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ estimate-hours = ["dep:itertools", "dep:fs-err", "dep:crossbeam-channel", "dep:s
query = ["dep:rusqlite"]
## Run algorithms on a corpus of repositories and store their results for later comparison and intelligence gathering.
## *Note that* `organize` we need for finding git repositories fast.
corpus = [ "dep:rusqlite", "dep:sysinfo", "organize", "dep:crossbeam-channel", "dep:serde_json", "dep:tracing-forest", "dep:tracing-subscriber", "dep:tracing" ]
corpus = [ "dep:rusqlite", "dep:sysinfo", "organize", "dep:crossbeam-channel", "dep:serde_json", "dep:tracing-forest", "dep:tracing-subscriber", "dep:tracing", "dep:parking_lot" ]

#! ### Mutually Exclusive Networking
#! If both are set, _blocking-client_ will take precedence, allowing `--all-features` to be used.
Expand Down Expand Up @@ -72,6 +72,7 @@ smallvec = { version = "1.10.0", optional = true }
rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] }

# for 'corpus'
parking_lot = { version = "0.12.1", optional = true }
sysinfo = { version = "0.29.2", optional = true, default-features = false }
serde_json = { version = "1.0.65", optional = true }
tracing-forest = { version = "0.1.5", features = ["serde"], optional = true }
Expand Down
19 changes: 16 additions & 3 deletions gitoxide-core/src/corpus/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,25 @@ impl Engine {
.con
.query_row(
"INSERT INTO gitoxide_version (version) VALUES (?1) ON CONFLICT DO UPDATE SET version = version RETURNING id",
[&self.gitoxide_version],
[&self.state.gitoxide_version],
|r| r.get(0),
)?)
}
pub(crate) fn tasks_or_insert(&self) -> anyhow::Result<Vec<(Id, &'static super::Task)>> {
let mut out: Vec<_> = super::run::ALL.iter().map(|task| (0, task)).collect();
pub(crate) fn tasks_or_insert(
&self,
allowed_short_names: &[String],
) -> anyhow::Result<Vec<(Id, &'static super::Task)>> {
let mut out: Vec<_> = super::run::ALL
.iter()
.filter(|task| {
if allowed_short_names.is_empty() {
true
} else {
allowed_short_names.iter().any(|allowed| task.short_name == allowed)
}
})
.map(|task| (0, task))
.collect();
for (id, task) in &mut out {
*id = self.con.query_row(
"INSERT INTO task (short_name, description) VALUES (?1, ?2) ON CONFLICT DO UPDATE SET short_name = short_name, description = ?2 RETURNING id",
Expand Down
124 changes: 91 additions & 33 deletions gitoxide-core/src/corpus/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,47 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};

pub type ProgressItem = gix::progress::DoOrDiscard<gix::progress::prodash::tree::Item>;

pub struct State {
pub progress: ProgressItem,
pub gitoxide_version: String,
pub trace_to_progress: bool,
pub reverse_trace_lines: bool,
}

impl Engine {
/// Open the corpus DB or create it.
pub fn open_or_create(db: PathBuf, gitoxide_version: String, progress: corpus::Progress) -> anyhow::Result<Engine> {
pub fn open_or_create(db: PathBuf, state: State) -> anyhow::Result<Engine> {
let con = crate::corpus::db::create(db).context("Could not open or create database")?;
Ok(Engine {
progress,
con,
gitoxide_version,
})
Ok(Engine { con, state })
}

/// Run on the existing set of repositories we have already seen or obtain them from `path` if there is none yet.
pub fn run(&mut self, corpus_path: PathBuf, threads: Option<usize>) -> anyhow::Result<()> {
pub fn run(
&mut self,
corpus_path: PathBuf,
threads: Option<usize>,
dry_run: bool,
repo_sql_suffix: Option<String>,
allowed_task_names: Vec<String>,
) -> anyhow::Result<()> {
let tasks = self.tasks_or_insert(&allowed_task_names)?;
if tasks.is_empty() {
bail!("Cannot run without any task to perform on the repositories");
}
let (corpus_path, corpus_id) = self.prepare_corpus_path(corpus_path)?;
let gitoxide_id = self.gitoxide_version_id_or_insert()?;
let runner_id = self.runner_id_or_insert()?;
let repos = self.find_repos_or_insert(&corpus_path, corpus_id)?;
let tasks = self.tasks_or_insert()?;
self.perform_run(&corpus_path, gitoxide_id, runner_id, &tasks, repos, threads)
let repos = self.find_repos_or_insert(&corpus_path, corpus_id, repo_sql_suffix)?;
self.perform_run(&corpus_path, gitoxide_id, runner_id, &tasks, repos, threads, dry_run)
}

pub fn refresh(&mut self, corpus_path: PathBuf) -> anyhow::Result<()> {
let (corpus_path, corpus_id) = self.prepare_corpus_path(corpus_path)?;
let repos = self.refresh_repos(&corpus_path, corpus_id)?;
self.progress.set_name("refresh repos");
self.progress.info(format!(
self.state.progress.set_name("refresh repos");
self.state.progress.info(format!(
"Added or updated {} repositories under {corpus_path:?}",
repos.len()
));
Expand All @@ -44,6 +59,7 @@ impl Engine {
}

impl Engine {
#[allow(clippy::too_many_arguments)]
fn perform_run(
&mut self,
corpus_path: &Path,
Expand All @@ -52,21 +68,43 @@ impl Engine {
tasks: &[(db::Id, &'static Task)],
mut repos: Vec<db::Repo>,
threads: Option<usize>,
dry_run: bool,
) -> anyhow::Result<()> {
let start = Instant::now();
let task_progress = &mut self.progress;
let task_progress = &mut self.state.progress;
task_progress.set_name("run");
task_progress.init(Some(tasks.len()), gix::progress::count("tasks"));
let threads = gix::parallel::num_threads(threads);
let db_path = self.con.path().expect("opened from path on disk").to_owned();
for (task_id, task) in tasks {
'tasks_loop: for (task_id, task) in tasks {
let task_start = Instant::now();
let mut repo_progress = task_progress.add_child(format!("run '{}'", task.short_name));
repo_progress.init(Some(repos.len()), gix::progress::count("repos"));

if task.execute_exclusive || threads == 1 {
if task.execute_exclusive || threads == 1 || dry_run {
if dry_run {
task_progress.set_name("WOULD run");
for repo in &repos {
task_progress.info(format!(
"{}",
repo.path
.strip_prefix(corpus_path)
.expect("corpus contains repo")
.display()
));
task_progress.inc();
}
task_progress.info(format!("with {} tasks", tasks.len()));
for (_, task) in tasks {
task_progress.info(format!("task '{}' ({})", task.description, task.short_name))
}
break 'tasks_loop;
}
repo_progress.init(Some(repos.len()), gix::progress::count("repos"));
let mut run_progress = repo_progress.add_child("set later");
let (_guard, current_id) = corpus::trace::override_thread_subscriber(db_path.as_str())?;
let (_guard, current_id) = corpus::trace::override_thread_subscriber(
db_path.as_str(),
self.state.trace_to_progress.then(|| task_progress.add_child("trace")),
self.state.reverse_trace_lines,
)?;

for repo in &repos {
if gix::interrupt::is_triggered() {
Expand All @@ -80,7 +118,7 @@ impl Engine {
.display()
));

// TODO: wait for new release to be able to provide run_id via span attributes
// TODO: wait for new release of `tracing-forest` to be able to provide run_id via span attributes
let mut run = Self::insert_run(&self.con, gitoxide_id, runner_id, *task_id, repo.id)?;
current_id.store(run.id, Ordering::SeqCst);
tracing::info_span!("run", run_id = run.id).in_scope(|| {
Expand All @@ -106,9 +144,11 @@ impl Engine {
let shared_repo_progress = repo_progress.clone();
let db_path = db_path.clone();
move |tid| {
let mut progress = gix::threading::lock(&shared_repo_progress);
(
corpus::trace::override_thread_subscriber(db_path.as_str()),
gix::threading::lock(&shared_repo_progress).add_child(format!("{tid}")),
// threaded printing is usually spammy, and lines interleave so it's useless.
corpus::trace::override_thread_subscriber(db_path.as_str(), None, false),
progress.add_child(format!("{tid}")),
rusqlite::Connection::open(&db_path),
)
}
Expand Down Expand Up @@ -166,13 +206,21 @@ impl Engine {
Ok((corpus_path, corpus_id))
}

fn find_repos(&mut self, corpus_path: &Path, corpus_id: db::Id) -> anyhow::Result<Vec<db::Repo>> {
self.progress.set_name("query db-repos");
self.progress.init(None, gix::progress::count("repos"));
fn find_repos(
&mut self,
corpus_path: &Path,
corpus_id: db::Id,
sql_suffix: Option<&str>,
) -> anyhow::Result<Vec<db::Repo>> {
self.state.progress.set_name("query db-repos");
self.state.progress.init(None, gix::progress::count("repos"));

Ok(self
.con
.prepare("SELECT id, rela_path, odb_size, num_objects, num_references FROM repository WHERE corpus = ?1")?
.prepare(&format!(
"SELECT id, rela_path, odb_size, num_objects, num_references FROM repository WHERE corpus = ?1 {}",
sql_suffix.unwrap_or_default()
))?
.query_map([corpus_id], |r| {
Ok(db::Repo {
id: r.get(0)?,
Expand All @@ -182,17 +230,17 @@ impl Engine {
num_references: r.get(4)?,
})
})?
.inspect(|_| self.progress.inc())
.inspect(|_| self.state.progress.inc())
.collect::<Result<_, _>>()?)
}

fn refresh_repos(&mut self, corpus_path: &Path, corpus_id: db::Id) -> anyhow::Result<Vec<db::Repo>> {
let start = Instant::now();
self.progress.set_name("refresh");
self.progress.init(None, gix::progress::count("repos"));
self.state.progress.set_name("refresh");
self.state.progress.init(None, gix::progress::count("repos"));

let repos = std::thread::scope({
let progress = &mut self.progress;
let progress = &mut self.state.progress;
let con = &mut self.con;
|scope| -> anyhow::Result<_> {
let threads = std::thread::available_parallelism()
Expand Down Expand Up @@ -264,13 +312,23 @@ impl Engine {
Ok(repos)
}

fn find_repos_or_insert(&mut self, corpus_path: &Path, corpus_id: db::Id) -> anyhow::Result<Vec<db::Repo>> {
fn find_repos_or_insert(
&mut self,
corpus_path: &Path,
corpus_id: db::Id,
sql_suffix: Option<String>,
) -> anyhow::Result<Vec<db::Repo>> {
let start = Instant::now();
let repos = self.find_repos(corpus_path, corpus_id)?;
let repos = self.find_repos(corpus_path, corpus_id, sql_suffix.as_deref())?;
if repos.is_empty() {
self.refresh_repos(corpus_path, corpus_id)
let res = self.refresh_repos(corpus_path, corpus_id);
if sql_suffix.is_some() {
self.find_repos(corpus_path, corpus_id, sql_suffix.as_deref())
} else {
res
}
} else {
self.progress.show_throughput(start);
self.state.progress.show_throughput(start);
Ok(repos)
}
}
Expand Down
Loading