diff --git a/Cargo.lock b/Cargo.lock index 761c65495cb..8ee8ad92141 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -783,9 +783,9 @@ dependencies = [ [[package]] name = "crossterm" -version = "0.25.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e64e6c0fbe2c17357405f7c758c1ef960fce08bdfb2c03d88d2a18d7e09c4b67" +checksum = "a84cda67535339806297f1b331d6dd6320470d2a0fe65381e79ee9e156dd3d13" dependencies = [ "bitflags 1.3.2", "crossterm_winapi", @@ -809,9 +809,9 @@ dependencies = [ [[package]] name = "crosstermion" -version = "0.10.1" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99aabd9b02c2d5f72697f30ffb46f5a9ff4bd240d826049892cf62c31daeed04" +checksum = "152ef46d620d4614070109e076ffc6ab032f682a380ac2efce412100c5ee7749" dependencies = [ "ansi_term", "async-channel", @@ -819,7 +819,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-lite", - "tui", + "ratatui", "tui-react", ] @@ -1243,7 +1243,7 @@ dependencies = [ "gix-features 0.29.0", "is-terminal", "owo-colors", - "prodash", + "prodash 25.0.0", "tabled", "time", ] @@ -1322,7 +1322,7 @@ dependencies = [ "is_ci", "log", "once_cell", - "prodash", + "prodash 25.0.0", "regex", "reqwest", "serde", @@ -1628,7 +1628,7 @@ dependencies = [ "libc", "once_cell", "parking_lot", - "prodash", + "prodash 25.0.0", "sha1", "sha1_smol", "thiserror", @@ -1643,7 +1643,7 @@ checksum = "cf69b0f5c701cc3ae22d3204b671907668f6437ca88862d355eaf9bc47a4f897" dependencies = [ "gix-hash 0.11.1 (registry+https://github.com/rust-lang/crates.io-index)", "libc", - "prodash", + "prodash 23.1.2", "sha1_smol", "walkdir", ] @@ -3355,6 +3355,12 @@ name = "prodash" version = "23.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9516b775656bc3e8985e19cd4b8c0c0de045095074e453d2c0a513b5f978392d" + +[[package]] +name = "prodash" +version = "25.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3236ce1618b6da4c7b618e0143c4d5b5dc190f75f81c49f248221382f7e9e9ae" dependencies = [ "async-io", "atty", @@ -3366,9 +3372,9 @@ dependencies = [ "humantime", "log", "parking_lot", + "ratatui", "signal-hook", "time", - "tui", "tui-react", "unicode-segmentation", "unicode-width", @@ -3440,6 +3446,19 @@ dependencies = [ "getrandom", ] +[[package]] +name = "ratatui" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcc0d032bccba900ee32151ec0265667535c230169f5a011154cdcd984e16829" +dependencies = [ + "bitflags 1.3.2", + "cassowary", + "crossterm", + "unicode-segmentation", + "unicode-width", +] + [[package]] name = "rayon" version = "1.7.0" @@ -4215,27 +4234,14 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" -[[package]] -name = "tui" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccdd26cbd674007e649a272da4475fb666d3aa0ad0531da7136db6fab0e5bad1" -dependencies = [ - "bitflags 1.3.2", - "cassowary", - "crossterm", - "unicode-segmentation", - "unicode-width", -] - [[package]] name = "tui-react" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "542c37309aaf01ddaea86891f7845a8b0124194c6ccae6dbae7d223752648f4d" +checksum = "11f13a7edfda102bf2bc3f9c1714904c961d513ebf4ae0c09decc4f17340f9a7" dependencies = [ "log", - "tui", + "ratatui", "unicode-segmentation", "unicode-width", ] diff --git a/Cargo.toml b/Cargo.toml index 90ffd9731bc..85ec00ce72e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,10 +157,10 @@ gix = { version = "^0.44.1", path = "gix", default-features = false } time = "0.3.19" clap = { version = "4.1.1", features = ["derive", "cargo"] } -prodash = { version = "23.1", optional = true, default-features = false } +prodash = { version = "25.0.0", optional = true, default-features = false } is-terminal = { version = "0.4.0", optional = true } env_logger = { version = "0.10.0", default-features = false } -crosstermion = { version = "0.10.1", optional = true, default-features = false } +crosstermion = { version = "0.11.0", optional = true, default-features = false } futures-lite = { version = "1.12.0", optional = true, default-features = false, features = ["std"] } # for progress diff --git a/Makefile b/Makefile index c6a0479f3f3..278f7fe4e47 100644 --- a/Makefile +++ b/Makefile @@ -151,6 +151,7 @@ unit-tests: ## run all unit tests cd gix-ref/tests && cargo test --all-features cd gix-odb && cargo test && cargo test --all-features cd gix-object && cargo test && cargo test --features verbose-object-parsing-errors + cd gix-pack && cargo test --all-features cd gix-pack/tests && cargo test --features internal-testing-to-avoid-being-run-by-cargo-test-all \ && cargo test --features "internal-testing-gix-features-parallel" cd gix-index/tests && cargo test --features internal-testing-to-avoid-being-run-by-cargo-test-all \ diff --git a/gitoxide-core/src/hours/mod.rs b/gitoxide-core/src/hours/mod.rs index 444230db0d8..68696f7d1d5 100644 --- a/gitoxide-core/src/hours/mod.rs +++ b/gitoxide-core/src/hours/mod.rs @@ -344,10 +344,10 @@ where } None => Vec::new(), }; - if let Some(mut progress) = change_progress { + if let Some(progress) = change_progress { progress.show_throughput(start); } - if let Some(mut progress) = lines_progress { + if let Some(progress) = lines_progress { progress.show_throughput(start); } diff --git a/gitoxide-core/src/organize.rs b/gitoxide-core/src/organize.rs index 087499f361c..89b9519c168 100644 --- a/gitoxide-core/src/organize.rs +++ b/gitoxide-core/src/organize.rs @@ -12,15 +12,12 @@ pub enum Mode { Simulate, } -fn find_git_repository_workdirs( +fn find_git_repository_workdirs( root: impl AsRef, - mut progress: P, + mut progress: impl Progress, debug: bool, threads: Option, -) -> impl Iterator -where - P::SubProgress: Sync, -{ +) -> impl Iterator { progress.init(None, progress::count("filesystem items")); fn is_repository(path: &Path) -> Option { // Can be git dir or worktree checkout (file) @@ -211,10 +208,7 @@ pub fn discover( mut progress: P, debug: bool, threads: Option, -) -> anyhow::Result<()> -where - ::SubProgress: Sync, -{ +) -> anyhow::Result<()> { for (git_workdir, _kind) in find_git_repository_workdirs(source_dir, progress.add_child("Searching repositories"), debug, threads) { @@ -229,10 +223,7 @@ pub fn run( destination: impl AsRef, mut progress: P, threads: Option, -) -> anyhow::Result<()> -where - ::SubProgress: Sync, -{ +) -> anyhow::Result<()> { let mut num_errors = 0usize; let destination = destination.as_ref().canonicalize()?; for (path_to_move, kind) in diff --git a/gitoxide-core/src/pack/explode.rs b/gitoxide-core/src/pack/explode.rs index 603f06c5c14..9f194b7da8b 100644 --- a/gitoxide-core/src/pack/explode.rs +++ b/gitoxide-core/src/pack/explode.rs @@ -90,6 +90,7 @@ enum Error { } #[allow(clippy::large_enum_variant)] +#[derive(Clone)] enum OutputWriter { Loose(loose::Store), Sink(odb::Sink), @@ -177,7 +178,7 @@ pub fn pack_or_pack_index( } }); - let pack::index::traverse::Outcome{mut progress, ..} = bundle + let pack::index::traverse::Outcome { progress, .. } = bundle .index .traverse( &bundle.pack, @@ -185,43 +186,49 @@ pub fn pack_or_pack_index( &should_interrupt, { let object_path = object_path.map(|p| p.as_ref().to_owned()); - move || { - let out = OutputWriter::new(object_path.clone(), sink_compress, object_hash); - let loose_odb = verify.then(|| object_path.as_ref().map(|path| loose::Store::at(path, object_hash))).flatten(); - let mut read_buf = Vec::new(); - move |object_kind, buf, index_entry, progress| { - let written_id = out.write_buf(object_kind, buf).map_err(|err| { - Error::Write{source: Box::new(err) as Box, + let out = OutputWriter::new(object_path.clone(), sink_compress, object_hash); + let loose_odb = verify + .then(|| object_path.as_ref().map(|path| loose::Store::at(path, object_hash))) + .flatten(); + let mut read_buf = Vec::new(); + move |object_kind, buf, index_entry, progress| { + let written_id = out.write_buf(object_kind, buf).map_err(|err| Error::Write { + source: Box::new(err) as Box, + kind: object_kind, + id: index_entry.oid, + })?; + if written_id != index_entry.oid { + if let object::Kind::Tree = object_kind { + progress.info(format!( + "The tree in pack named {} was written as {} due to modes 100664 and 100640 rewritten as 100644.", + index_entry.oid, written_id + )); + } else { + return Err(Error::ObjectEncodeMismatch { kind: object_kind, - id: index_entry.oid, - } - })?; - if written_id != index_entry.oid { - if let object::Kind::Tree = object_kind { - progress.info(format!( - "The tree in pack named {} was written as {} due to modes 100664 and 100640 rewritten as 100644.", - index_entry.oid, written_id - )); - } else { - return Err(Error::ObjectEncodeMismatch{kind: object_kind, actual: index_entry.oid, expected:written_id}); - } + actual: index_entry.oid, + expected: written_id, + }); } - if let Some(verifier) = loose_odb.as_ref() { - let obj = verifier - .try_find(written_id, &mut read_buf) - .map_err(|err| Error::WrittenFileCorrupt{source:err, id:written_id})? - .ok_or(Error::WrittenFileMissing{id:written_id})?; - obj.verify_checksum(written_id)?; - } - Ok(()) } + if let Some(verifier) = loose_odb.as_ref() { + let obj = verifier + .try_find(written_id, &mut read_buf) + .map_err(|err| Error::WrittenFileCorrupt { + source: err, + id: written_id, + })? + .ok_or(Error::WrittenFileMissing { id: written_id })?; + obj.verify_checksum(written_id)?; + } + Ok(()) } }, pack::index::traverse::Options { traversal: algorithm, thread_limit, check: check.into(), - make_pack_lookup_cache: pack::cache::lru::StaticLinkedList::<64>::default, + make_pack_lookup_cache: pack::cache::lru::StaticLinkedList::<64>::default, }, ) .with_context(|| "Failed to explode the entire pack - some loose objects may have been created nonetheless")?; diff --git a/gitoxide-core/src/query/engine/update.rs b/gitoxide-core/src/query/engine/update.rs index 0da6a0eef12..c4ec3c5dc2b 100644 --- a/gitoxide-core/src/query/engine/update.rs +++ b/gitoxide-core/src/query/engine/update.rs @@ -46,14 +46,14 @@ pub fn update( }; let commit_counter = db_progress.counter().expect("shared counter available"); - let mut change_progress = { + let change_progress = { let mut p = progress.add_child("find changes"); p.init(None, progress::count("modified files")); p }; let change_counter = change_progress.counter().expect("shared counter available"); - let mut lines_progress = { + let lines_progress = { let mut p = progress.add_child("find changes"); p.init(None, progress::count("diff lines")); p diff --git a/gitoxide-core/src/repository/verify.rs b/gitoxide-core/src/repository/verify.rs index b5e6f422827..7e028fd2510 100644 --- a/gitoxide-core/src/repository/verify.rs +++ b/gitoxide-core/src/repository/verify.rs @@ -31,7 +31,7 @@ pub fn integrity( }: Context, ) -> anyhow::Result<()> { #[cfg_attr(not(feature = "serde"), allow(unused))] - let mut outcome = repo.objects.store_ref().verify_integrity( + let outcome = repo.objects.store_ref().verify_integrity( progress, should_interrupt, gix::odb::pack::index::verify::integrity::Options { diff --git a/gix-features/Cargo.toml b/gix-features/Cargo.toml index a8ae5737f6f..5db2023d226 100644 --- a/gix-features/Cargo.toml +++ b/gix-features/Cargo.toml @@ -5,7 +5,7 @@ repository = "https://github.com/Byron/gitoxide" version = "0.29.0" authors = ["Sebastian Thiel "] license = "MIT/Apache-2.0" -edition = "2018" +edition = "2021" rust-version = "1.64" [lib] @@ -123,7 +123,7 @@ crc32fast = { version = "1.2.1", optional = true } sha1 = { version = "0.10.0", optional = true } # progress -prodash = { version = "23.1", optional = true, default-features = false } +prodash = { version = "25.0.0", optional = true, default-features = false } bytesize = { version = "1.0.1", optional = true } # pipe diff --git a/gix-features/src/parallel/in_parallel.rs b/gix-features/src/parallel/in_parallel.rs index e1e2cc3e34a..241565b62d7 100644 --- a/gix-features/src/parallel/in_parallel.rs +++ b/gix-features/src/parallel/in_parallel.rs @@ -1,7 +1,10 @@ -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; use crate::parallel::{num_threads, Reduce}; +/// A scope to start threads within. +pub type Scope<'scope, 'env> = std::thread::Scope<'scope, 'env>; + /// Runs `left` and `right` in parallel, returning their output when both are done. pub fn join(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) { std::thread::scope(|s| { @@ -47,7 +50,7 @@ pub fn in_parallel( input: impl Iterator + Send, thread_limit: Option, new_thread_state: impl Fn(usize) -> S + Send + Clone, - consume: impl Fn(I, &mut S) -> O + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, mut reducer: R, ) -> Result<::Output, ::Error> where @@ -67,7 +70,7 @@ where let send_result = send_result.clone(); let receive_input = receive_input.clone(); let new_thread_state = new_thread_state.clone(); - let consume = consume.clone(); + let mut consume = consume.clone(); move || { let mut state = new_thread_state(thread_id); for item in receive_input { @@ -103,12 +106,19 @@ where /// This is only good for operations where near-random access isn't detrimental, so it's not usually great /// for file-io as it won't make use of sorted inputs well. /// Note that `periodic` is not guaranteed to be called in case other threads come up first and finish too fast. +/// `consume(&mut item, &mut stat, &Scope, &threads_available, &should_interrupt)` is called for performing the actual computation. +/// Note that `threads_available` should be decremented to start a thread that can steal your own work (as stored in `item`), +/// which allows callees to implement their own work-stealing in case the work is distributed unevenly. +/// Work stealing should only start after having processed at least one item to give all threads naturally operating on the slice +/// some time to start. Starting threads while slice-workers are still starting up would lead to over-allocation of threads, +/// which is why the number of threads left may turn negative. Once threads are started and stopped, be sure to adjust +/// the thread-count accordingly. // TODO: better docs pub fn in_parallel_with_slice( input: &mut [I], thread_limit: Option, new_thread_state: impl FnMut(usize) -> S + Send + Clone, - consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Send + Clone, + consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Send + Clone, mut periodic: impl FnMut() -> Option + Send, state_to_rval: impl FnOnce(S) -> R + Send + Clone, ) -> Result, E> @@ -121,8 +131,8 @@ where let mut results = Vec::with_capacity(num_threads); let stop_everything = &AtomicBool::default(); let index = &AtomicUsize::default(); + let threads_left = &AtomicIsize::new(num_threads as isize); - // TODO: use std::thread::scope() once Rust 1.63 is available. std::thread::scope({ move |s| { std::thread::Builder::new() @@ -163,29 +173,35 @@ where let mut consume = consume.clone(); let input = Input(input as *mut [I]); move || { + let _ = &input; + threads_left.fetch_sub(1, Ordering::SeqCst); let mut state = new_thread_state(thread_id); - while let Ok(input_index) = - index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { - (x < input_len).then_some(x + 1) - }) - { - if stop_everything.load(Ordering::Relaxed) { - break; - } - // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding - // each item exactly once. - let item = { - #[allow(unsafe_code)] - unsafe { - &mut (&mut *input.0)[input_index] + let res = (|| { + while let Ok(input_index) = + index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { + (x < input_len).then_some(x + 1) + }) + { + if stop_everything.load(Ordering::Relaxed) { + break; + } + // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding + // each item exactly once. + let item = { + #[allow(unsafe_code)] + unsafe { + &mut (&mut *input.0)[input_index] + } + }; + if let Err(err) = consume(item, &mut state, threads_left, stop_everything) { + stop_everything.store(true, Ordering::Relaxed); + return Err(err); } - }; - if let Err(err) = consume(item, &mut state) { - stop_everything.store(true, Ordering::Relaxed); - return Err(err); } - } - Ok(state_to_rval(state)) + Ok(state_to_rval(state)) + })(); + threads_left.fetch_add(1, Ordering::SeqCst); + res } }) .expect("valid name") diff --git a/gix-features/src/parallel/mod.rs b/gix-features/src/parallel/mod.rs index c994cb3b8e9..fa54951060e 100644 --- a/gix-features/src/parallel/mod.rs +++ b/gix-features/src/parallel/mod.rs @@ -35,11 +35,11 @@ #[cfg(feature = "parallel")] mod in_parallel; #[cfg(feature = "parallel")] -pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads}; +pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope}; mod serial; #[cfg(not(feature = "parallel"))] -pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads}; +pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope}; mod in_order; pub use in_order::{InOrderIter, SequenceId}; @@ -137,7 +137,7 @@ pub fn in_parallel_if( input: impl Iterator + Send, thread_limit: Option, new_thread_state: impl Fn(usize) -> S + Send + Clone, - consume: impl Fn(I, &mut S) -> O + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, reducer: R, ) -> Result<::Output, ::Error> where @@ -163,7 +163,7 @@ pub fn in_parallel_if( input: impl Iterator, thread_limit: Option, new_thread_state: impl Fn(usize) -> S, - consume: impl Fn(I, &mut S) -> O, + consume: impl FnMut(I, &mut S) -> O, reducer: R, ) -> Result<::Output, ::Error> where diff --git a/gix-features/src/parallel/serial.rs b/gix-features/src/parallel/serial.rs index 00723b2c3c4..3511c73e3e9 100644 --- a/gix-features/src/parallel/serial.rs +++ b/gix-features/src/parallel/serial.rs @@ -2,14 +2,17 @@ use crate::parallel::Reduce; #[cfg(not(feature = "parallel"))] mod not_parallel { + use std::sync::atomic::{AtomicBool, AtomicIsize}; + /// Runs `left` and then `right`, one after another, returning their output when both are done. pub fn join(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) { (left(), right()) } /// A scope for spawning threads. - pub struct Scope<'env> { - _marker: std::marker::PhantomData<&'env mut &'env ()>, + pub struct Scope<'scope, 'env: 'scope> { + _scope: std::marker::PhantomData<&'scope mut &'scope ()>, + _env: std::marker::PhantomData<&'env mut &'env ()>, } pub struct ThreadBuilder; @@ -20,32 +23,31 @@ mod not_parallel { } #[allow(unsafe_code)] - unsafe impl Sync for Scope<'_> {} + unsafe impl Sync for Scope<'_, '_> {} impl ThreadBuilder { pub fn name(self, _new: String) -> Self { self } - pub fn spawn_scoped<'a, 'env, F, T>( + pub fn spawn_scoped<'scope, 'env, F, T>( &self, - scope: &'a Scope<'env>, + scope: &'scope Scope<'scope, 'env>, f: F, - ) -> std::io::Result> + ) -> std::io::Result> where - F: FnOnce() -> T, - F: Send + 'env, - T: Send + 'env, + F: FnOnce() -> T + 'scope, + T: 'scope, { Ok(scope.spawn(f)) } } - impl<'env> Scope<'env> { - pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> + impl<'scope, 'env> Scope<'scope, 'env> { + /// Provided with this scope, let `f` start new threads that live within it. + pub fn spawn(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> where - F: FnOnce() -> T, - F: Send + 'env, - T: Send + 'env, + F: FnOnce() -> T + 'scope, + T: 'scope, { ScopedJoinHandle { result: f(), @@ -58,10 +60,11 @@ mod not_parallel { /// Note that this implementation will run the spawned functions immediately. pub fn threads<'env, F, R>(f: F) -> R where - F: FnOnce(&Scope<'env>) -> R, + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R, { f(&Scope { - _marker: Default::default(), + _scope: Default::default(), + _env: Default::default(), }) } @@ -79,6 +82,9 @@ mod not_parallel { pub fn join(self) -> std::thread::Result { Ok(self.result) } + pub fn is_finished(&self) -> bool { + true + } } /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state. @@ -89,13 +95,15 @@ mod not_parallel { input: &mut [I], _thread_limit: Option, mut new_thread_state: impl FnMut(usize) -> S + Clone, - mut consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Clone, + mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone, mut periodic: impl FnMut() -> Option, state_to_rval: impl FnOnce(S) -> R + Clone, ) -> Result, E> { let mut state = new_thread_state(0); + let should_interrupt = &AtomicBool::default(); + let threads_left = &AtomicIsize::default(); for item in input { - consume(item, &mut state)?; + consume(item, &mut state, threads_left, should_interrupt)?; if periodic().is_none() { break; } @@ -121,7 +129,7 @@ pub fn in_parallel( input: impl Iterator, _thread_limit: Option, new_thread_state: impl Fn(usize) -> S, - consume: impl Fn(I, &mut S) -> O, + mut consume: impl FnMut(I, &mut S) -> O, mut reducer: R, ) -> Result<::Output, ::Error> where diff --git a/gix-features/src/progress.rs b/gix-features/src/progress.rs index b7aeda620fd..8d1e30bc450 100644 --- a/gix-features/src/progress.rs +++ b/gix-features/src/progress.rs @@ -7,7 +7,7 @@ pub use prodash::{ self, messages::MessageLevel, progress::{Discard, DoOrDiscard, Either, Id, Step, StepShared, Task, ThroughputOnDrop, Value, UNKNOWN}, - unit, Progress, Unit, + unit, Progress, RawProgress, Unit, }; /// A stub for the portions of the `bytesize` crate that we use internally in `gitoxide`. #[cfg(not(feature = "progress-unit-bytes"))] diff --git a/gix-features/src/zlib/stream/deflate/mod.rs b/gix-features/src/zlib/stream/deflate/mod.rs index 55f575ea4b7..567e8fece76 100644 --- a/gix-features/src/zlib/stream/deflate/mod.rs +++ b/gix-features/src/zlib/stream/deflate/mod.rs @@ -11,6 +11,19 @@ pub struct Write { buf: [u8; BUF_SIZE], } +impl Clone for Write +where + W: Clone, +{ + fn clone(&self) -> Self { + Write { + compressor: impls::new_compress(), + inner: self.inner.clone(), + buf: self.buf, + } + } +} + mod impls { use std::io; @@ -18,6 +31,10 @@ mod impls { use crate::zlib::stream::deflate; + pub(crate) fn new_compress() -> Compress { + Compress::new(Compression::fast(), true) + } + impl deflate::Write where W: io::Write, @@ -25,7 +42,7 @@ mod impls { /// Create a new instance writing compressed bytes to `inner`. pub fn new(inner: W) -> deflate::Write { deflate::Write { - compressor: Compress::new(Compression::fast(), true), + compressor: new_compress(), inner, buf: [0; deflate::BUF_SIZE], } diff --git a/gix-features/tests/parallel/mod.rs b/gix-features/tests/parallel/mod.rs index b4b4236f81c..cc824332dcc 100644 --- a/gix-features/tests/parallel/mod.rs +++ b/gix-features/tests/parallel/mod.rs @@ -45,7 +45,7 @@ fn in_parallel_with_mut_slice_in_chunks() { &mut input, None, |_| 0usize, - |item, acc| { + |item, acc, _threads_eft, _should_interrupt| { *acc += *item; *item += 1; Ok::<_, ()>(()) diff --git a/gix-odb/src/lib.rs b/gix-odb/src/lib.rs index 08b14238c71..e0beac54848 100644 --- a/gix-odb/src/lib.rs +++ b/gix-odb/src/lib.rs @@ -52,6 +52,7 @@ pub mod cache; /// /// It can optionally compress the content, similarly to what would happen when using a [`loose::Store`][crate::loose::Store]. /// +#[derive(Clone)] pub struct Sink { compressor: Option>>, object_hash: gix_hash::Kind, diff --git a/gix-pack/Cargo.toml b/gix-pack/Cargo.toml index 0c2dafa6c86..02f73b770f4 100644 --- a/gix-pack/Cargo.toml +++ b/gix-pack/Cargo.toml @@ -5,7 +5,7 @@ repository = "https://github.com/Byron/gitoxide" authors = ["Sebastian Thiel "] license = "MIT/Apache-2.0" description = "Implements git packs and related data structures" -edition = "2018" +edition = "2021" include = ["src/**/*", "CHANGELOG.md"] rust-version = "1.64" autotests = false diff --git a/gix-pack/src/bundle/write/mod.rs b/gix-pack/src/bundle/write/mod.rs index fc0284b53a1..c5a048e85cc 100644 --- a/gix-pack/src/bundle/write/mod.rs +++ b/gix-pack/src/bundle/write/mod.rs @@ -62,17 +62,14 @@ impl crate::Bundle { /// * the resulting pack may be empty, that is, contains zero objects in some situations. This is a valid reply by a server and should /// be accounted for. /// - Empty packs always have the same name and not handling this case will result in at most one superfluous pack. - pub fn write_to_directory

( + pub fn write_to_directory( pack: impl io::BufRead, directory: Option>, - mut progress: P, + mut progress: impl Progress, should_interrupt: &AtomicBool, thin_pack_base_object_lookup_fn: Option, options: Options, - ) -> Result - where - P: Progress, - { + ) -> Result { let mut read_progress = progress.add_child_with_id("read pack", ProgressId::ReadPackBytes.into()); read_progress.init(None, progress::bytes()); let pack = progress::Read { @@ -354,20 +351,20 @@ impl crate::Bundle { } } +fn resolve_entry(range: data::EntryRange, mapped_file: &memmap2::Mmap) -> Option<&[u8]> { + mapped_file.get(range.start as usize..range.end as usize) +} + fn new_pack_file_resolver( data_file: SharedTempFile, -) -> io::Result) -> Option<()> + Send + Clone> { +) -> io::Result<( + impl Fn(data::EntryRange, &memmap2::Mmap) -> Option<&[u8]> + Send + Clone, + memmap2::Mmap, +)> { let mut guard = data_file.lock(); guard.flush()?; - let mapped_file = Arc::new(crate::mmap::read_only( - &guard.get_mut().with_mut(|f| f.path().to_owned())?, - )?); - let pack_data_lookup = move |range: std::ops::Range, out: &mut Vec| -> Option<()> { - mapped_file - .get(range.start as usize..range.end as usize) - .map(|pack_entry| out.copy_from_slice(pack_entry)) - }; - Ok(pack_data_lookup) + let mapped_file = crate::mmap::read_only(&guard.get_mut().with_mut(|f| f.path().to_owned())?)?; + Ok((resolve_entry, mapped_file)) } struct WriteOutcome { diff --git a/gix-pack/src/cache/delta/traverse/mod.rs b/gix-pack/src/cache/delta/traverse/mod.rs index bfe2ec68769..daf219d32bc 100644 --- a/gix-pack/src/cache/delta/traverse/mod.rs +++ b/gix-pack/src/cache/delta/traverse/mod.rs @@ -1,9 +1,10 @@ use std::sync::atomic::{AtomicBool, Ordering}; +use gix_features::threading::{Mutable, OwnShared}; use gix_features::{ parallel::in_parallel_with_slice, progress::{self, Progress}, - threading::{lock, Mutable, OwnShared}, + threading, }; use crate::{ @@ -36,18 +37,18 @@ pub enum Error { /// The base's offset which was from a resolved ref-delta that didn't actually get added to the tree base_pack_offset: crate::data::Offset, }, + #[error("Failed to spawn thread when switching to work-stealing mode")] + SpawnThread(#[from] std::io::Error), } /// Additional context passed to the `inspect_object(…)` function of the [`Tree::traverse()`] method. -pub struct Context<'a, S> { +pub struct Context<'a> { /// The pack entry describing the object pub entry: &'a crate::data::Entry, /// The offset at which `entry` ends in the pack, useful to learn about the exact range of `entry` within the pack. pub entry_end: u64, /// The decompressed object itself, ready to be decoded. pub decompressed: &'a [u8], - /// Custom state known to the function - pub state: &'a mut S, /// The depth at which this object resides in the delta-tree. It represents the amount of base objects, with 0 indicating /// an 'undeltified' object, and higher values indicating delta objects with the given amount of bases. pub level: u16, @@ -89,76 +90,87 @@ where /// operation as well. /// * `pack_entries_end` marks one-past-the-last byte of the last entry in the pack, as the last entries size would otherwise /// be unknown as it's not part of the index file. - /// * `new_thread_state() -> State` is a function to create state to be used in each thread, invoked once per thread. /// * `inspect_object(node_data: &mut T, progress: Progress, context: Context) -> Result<(), CustomError>` is a function /// running for each thread receiving fully decoded objects along with contextual information, which either succeeds with `Ok(())` /// or returns a `CustomError`. - /// Note that `node_data` can be modified to allow storing maintaining computation results on a per-object basis. + /// Note that `node_data` can be modified to allow storing maintaining computation results on a per-object basis. It should contain + /// its own mutable per-thread data as required. /// /// This method returns a vector of all tree items, along with their potentially modified custom node data. /// /// _Note_ that this method consumed the Tree to assure safe parallel traversal with mutation support. - pub fn traverse( + pub fn traverse( mut self, resolve: F, + resolve_data: &R, pack_entries_end: u64, - new_thread_state: impl Fn() -> S + Send + Clone, inspect_object: MBFN, Options { thread_limit, - object_progress, + mut object_progress, mut size_progress, should_interrupt, object_hash, }: Options<'_, P1, P2>, ) -> Result, Error> where - F: for<'r> Fn(EntryRange, &'r mut Vec) -> Option<()> + Send + Clone, + F: for<'r> FnMut(EntryRange, &'r R) -> Option<&'r [u8]> + Send + Clone, + R: Send + Sync, P1: Progress, P2: Progress, - MBFN: Fn(&mut T, &mut ::SubProgress, Context<'_, S>) -> Result<(), E> + Send + Clone, + MBFN: FnMut(&mut T, &::SubProgress, Context<'_>) -> Result<(), E> + Send + Clone, E: std::error::Error + Send + Sync + 'static, { self.set_pack_entries_end_and_resolve_ref_offsets(pack_entries_end)?; - let object_progress = OwnShared::new(Mutable::new(object_progress)); let num_objects = self.num_items(); let object_counter = { - let mut progress = lock(&object_progress); + let progress = &mut object_progress; progress.init(Some(num_objects), progress::count("objects")); progress.counter() }; size_progress.init(None, progress::bytes()); let size_counter = size_progress.counter(); let child_items = self.child_items.as_mut_slice(); + let object_progress = OwnShared::new(Mutable::new(object_progress)); let start = std::time::Instant::now(); in_parallel_with_slice( &mut self.root_items, thread_limit, { - let object_progress = object_progress.clone(); - let child_items = ItemSliceSend(child_items as *mut [Item]); - move |thread_index| { - ( - Vec::::with_capacity(4096), - lock(&object_progress) - .add_child_with_id(format!("thread {thread_index}"), gix_features::progress::UNKNOWN), - new_thread_state(), - resolve.clone(), - inspect_object.clone(), - ItemSliceSend(child_items.0), - ) + let child_items = ItemSliceSend(std::ptr::slice_from_raw_parts_mut( + child_items.as_mut_ptr(), + child_items.len(), + )); + { + let object_progress = object_progress.clone(); + move |thread_index| { + let _ = &child_items; + resolve::State { + bytes_buf: Vec::::with_capacity(4096), + delta_bytes: Vec::::with_capacity(4096), + fully_resolved_delta_bytes: Vec::::with_capacity(4096), + progress: threading::lock(&object_progress) + .add_child(format!("gix-pack.delta-traverse.inspect-object.{thread_index}")), + resolve: resolve.clone(), + modify_base: inspect_object.clone(), + child_items: child_items.clone(), + } + } } }, { - move |node, state| { + move |node, state, threads_left, should_interrupt| { resolve::deltas( object_counter.clone(), size_counter.clone(), node, state, + resolve_data, object_hash.len_in_bytes(), + threads_left, + should_interrupt, ) } }, @@ -166,7 +178,7 @@ where |_| (), )?; - lock(&object_progress).show_throughput(start); + threading::lock(&object_progress).show_throughput(start); size_progress.show_throughput(start); Ok(Outcome { diff --git a/gix-pack/src/cache/delta/traverse/resolve.rs b/gix-pack/src/cache/delta/traverse/resolve.rs index fc94d87ef20..7b08457d578 100644 --- a/gix-pack/src/cache/delta/traverse/resolve.rs +++ b/gix-pack/src/cache/delta/traverse/resolve.rs @@ -1,6 +1,8 @@ -use std::{cell::RefCell, collections::BTreeMap, sync::atomic::Ordering}; +use std::sync::atomic::{AtomicBool, AtomicIsize}; +use std::{collections::BTreeMap, sync::atomic::Ordering}; -use gix_features::{progress::Progress, zlib}; +use gix_features::progress::Progress; +use gix_features::{threading, zlib}; use crate::{ cache::delta::{ @@ -10,47 +12,59 @@ use crate::{ }, Item, }, + data, data::EntryRange, }; -pub(crate) fn deltas( +pub(crate) struct State { + pub bytes_buf: Vec, + pub delta_bytes: Vec, + pub fully_resolved_delta_bytes: Vec, + pub progress: P, + pub resolve: F, + pub modify_base: MBFN, + pub child_items: ItemSliceSend>, +} + +#[allow(clippy::too_many_arguments)] +pub(crate) fn deltas( object_counter: Option, size_counter: Option, - node: &mut crate::cache::delta::Item, - (bytes_buf, ref mut progress, state, resolve, modify_base, child_items): &mut ( - Vec, - P, - S, - F, - MBFN, - ItemSliceSend>, - ), + node: &mut Item, + State { + bytes_buf, + delta_bytes, + fully_resolved_delta_bytes, + progress, + resolve, + modify_base, + child_items, + }: &mut State, + resolve_data: &R, hash_len: usize, + threads_left: &AtomicIsize, + should_interrupt: &AtomicBool, ) -> Result<(), Error> where T: Send, - F: for<'r> Fn(EntryRange, &'r mut Vec) -> Option<()>, + R: Send + Sync, P: Progress, - MBFN: Fn(&mut T, &mut P, Context<'_, S>) -> Result<(), E>, + F: for<'r> FnMut(EntryRange, &'r R) -> Option<&'r [u8]> + Send + Clone, + MBFN: FnMut(&mut T, &P, Context<'_>) -> Result<(), E> + Send + Clone, E: std::error::Error + Send + Sync + 'static, { let mut decompressed_bytes_by_pack_offset = BTreeMap::new(); - let bytes_buf = RefCell::new(bytes_buf); - let decompress_from_resolver = |slice: EntryRange| -> Result<(crate::data::Entry, u64, Vec), Error> { - let mut bytes_buf = bytes_buf.borrow_mut(); - bytes_buf.resize((slice.end - slice.start) as usize, 0); - resolve(slice.clone(), &mut bytes_buf).ok_or(Error::ResolveFailed { + let mut decompress_from_resolver = |slice: EntryRange, out: &mut Vec| -> Result<(data::Entry, u64), Error> { + let bytes = resolve(slice.clone(), resolve_data).ok_or(Error::ResolveFailed { pack_offset: slice.start, })?; - let entry = crate::data::Entry::from_bytes(&bytes_buf, slice.start, hash_len); - let compressed = &bytes_buf[entry.header_size()..]; + let entry = data::Entry::from_bytes(bytes, slice.start, hash_len); + let compressed = &bytes[entry.header_size()..]; let decompressed_len = entry.decompressed_size as usize; - Ok((entry, slice.end, decompress_all_at_once(compressed, decompressed_len)?)) + decompress_all_at_once(compressed, decompressed_len, out)?; + Ok((entry, slice.end)) }; - // Traverse the tree breadth first and loose the data produced for the base as it won't be needed anymore. - progress.init(None, gix_features::progress::count_with_decimals("objects", 2)); - // each node is a base, and its children always start out as deltas which become a base after applying them. // These will be pushed onto our stack until all are processed let root_level = 0; @@ -58,16 +72,26 @@ where root_level, Node { item: node, - child_items: child_items.0, + child_items: child_items.clone(), }, )]; while let Some((level, mut base)) = nodes.pop() { + let storage; + if should_interrupt.load(Ordering::Relaxed) { + return Err(Error::Interrupted); + } let (base_entry, entry_end, base_bytes) = if level == root_level { - decompress_from_resolver(base.entry_slice())? + let (a, b) = decompress_from_resolver(base.entry_slice(), bytes_buf)?; + (a, b, &*bytes_buf) } else { - decompressed_bytes_by_pack_offset + if !bytes_buf.is_empty() { + *bytes_buf = Vec::new(); + } + let (a, b, v) = decompressed_bytes_by_pack_offset .remove(&base.offset()) - .expect("we store the resolved delta buffer when done") + .expect("we store the resolved delta buffer when done"); + storage = v; + (a, b, &storage) }; // anything done here must be repeated further down for leaf-nodes. @@ -80,8 +104,7 @@ where Context { entry: &base_entry, entry_end, - decompressed: &base_bytes, - state, + decompressed: base_bytes, level, }, ) @@ -93,20 +116,19 @@ where } for mut child in base.into_child_iter() { - let (mut child_entry, entry_end, delta_bytes) = decompress_from_resolver(child.entry_slice())?; - let (base_size, consumed) = crate::data::delta::decode_header_size(&delta_bytes); + let (mut child_entry, entry_end) = decompress_from_resolver(child.entry_slice(), delta_bytes)?; + let (base_size, consumed) = data::delta::decode_header_size(delta_bytes); let mut header_ofs = consumed; assert_eq!( base_bytes.len(), base_size as usize, - "recorded base size in delta does not match" + "recorded base size in delta does match the actual one" ); - let (result_size, consumed) = crate::data::delta::decode_header_size(&delta_bytes[consumed..]); + let (result_size, consumed) = data::delta::decode_header_size(&delta_bytes[consumed..]); header_ofs += consumed; - let mut fully_resolved_delta_bytes = bytes_buf.borrow_mut(); - fully_resolved_delta_bytes.resize(result_size as usize, 0); - crate::data::delta::apply(&base_bytes, &mut fully_resolved_delta_bytes, &delta_bytes[header_ofs..]); + resize_vec(fully_resolved_delta_bytes, result_size as usize); + data::delta::apply(base_bytes, fully_resolved_delta_bytes, &delta_bytes[header_ofs..]); // FIXME: this actually invalidates the "pack_offset()" computation, which is not obvious to consumers // at all @@ -114,7 +136,7 @@ where if child.has_children() { decompressed_bytes_by_pack_offset.insert( child.offset(), - (child_entry, entry_end, fully_resolved_delta_bytes.to_owned()), + (child_entry, entry_end, std::mem::take(fully_resolved_delta_bytes)), ); nodes.push((level + 1, child)); } else { @@ -124,8 +146,7 @@ where Context { entry: &child_entry, entry_end, - decompressed: &fully_resolved_delta_bytes, - state, + decompressed: fully_resolved_delta_bytes, level: level + 1, }, ) @@ -136,19 +157,273 @@ where .map(|c| c.fetch_add(base_bytes.len(), Ordering::SeqCst)); } } + + // After the first round, see if we can use additional threads, and if so we enter multi-threaded mode. + // In it we will keep using new threads as they become available while using this thread for coordination. + // We optimize for a low memory footprint as we are likely to get here if long delta-chains with large objects are involved. + // Try to avoid going into threaded mode if there isn't more than one unit of work anyway. + if nodes.len() > 1 { + if let Ok(initial_threads) = + threads_left.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |threads_available| { + (threads_available > 0).then_some(0) + }) + { + // Assure no memory is held here. + *bytes_buf = Vec::new(); + *delta_bytes = Vec::new(); + *fully_resolved_delta_bytes = Vec::new(); + return deltas_mt( + initial_threads, + decompressed_bytes_by_pack_offset, + object_counter, + size_counter, + progress, + nodes, + resolve.clone(), + resolve_data, + modify_base.clone(), + hash_len, + threads_left, + should_interrupt, + ); + } + } } Ok(()) } -fn decompress_all_at_once(b: &[u8], decompressed_len: usize) -> Result, Error> { - let mut out = Vec::new(); - out.resize(decompressed_len, 0); +/// * `initial_threads` is the threads we may spawn, not accounting for our own thread which is still considered used by the parent +/// system. Since this thread will take a controlling function, we may spawn one more than that. In threaded mode, we will finish +/// all remaining work. +#[allow(clippy::too_many_arguments)] +pub(crate) fn deltas_mt( + mut threads_to_create: isize, + decompressed_bytes_by_pack_offset: BTreeMap)>, + object_counter: Option, + size_counter: Option, + progress: &P, + nodes: Vec<(u16, Node<'_, T>)>, + resolve: F, + resolve_data: &R, + modify_base: MBFN, + hash_len: usize, + threads_left: &AtomicIsize, + should_interrupt: &AtomicBool, +) -> Result<(), Error> +where + T: Send, + R: Send + Sync, + P: Progress, + F: for<'r> FnMut(EntryRange, &'r R) -> Option<&'r [u8]> + Send + Clone, + MBFN: FnMut(&mut T, &P, Context<'_>) -> Result<(), E> + Send + Clone, + E: std::error::Error + Send + Sync + 'static, +{ + let nodes = gix_features::threading::Mutable::new(nodes); + let decompressed_bytes_by_pack_offset = gix_features::threading::Mutable::new(decompressed_bytes_by_pack_offset); + threads_to_create += 1; // ourselves + let mut returned_ourselves = false; + + gix_features::parallel::threads(|s| -> Result<(), Error> { + let mut threads = Vec::new(); + let poll_interval = std::time::Duration::from_millis(100); + loop { + for tid in 0..threads_to_create { + let thread = gix_features::parallel::build_thread() + .name(format!("gix-pack.traverse_deltas.{tid}")) + .spawn_scoped(s, { + let nodes = &nodes; + let decompressed_bytes_by_pack_offset = &decompressed_bytes_by_pack_offset; + let mut resolve = resolve.clone(); + let mut modify_base = modify_base.clone(); + let object_counter = object_counter.as_ref(); + let size_counter = size_counter.as_ref(); + + move || -> Result<(), Error> { + let mut bytes_buf = Vec::new(); + let mut delta_bytes = Vec::new(); + let mut fully_resolved_delta_bytes = Vec::new(); + let mut decompress_from_resolver = + |slice: EntryRange, out: &mut Vec| -> Result<(data::Entry, u64), Error> { + let bytes = resolve(slice.clone(), resolve_data).ok_or(Error::ResolveFailed { + pack_offset: slice.start, + })?; + let entry = data::Entry::from_bytes(bytes, slice.start, hash_len); + let compressed = &bytes[entry.header_size()..]; + let decompressed_len = entry.decompressed_size as usize; + decompress_all_at_once(compressed, decompressed_len, out)?; + Ok((entry, slice.end)) + }; + + loop { + let (level, mut base) = match threading::lock(nodes).pop() { + Some(v) => v, + None => break, + }; + if should_interrupt.load(Ordering::Relaxed) { + return Err(Error::Interrupted); + } + let storage; + let (base_entry, entry_end, base_bytes) = if level == 0 { + let (a, b) = decompress_from_resolver(base.entry_slice(), &mut bytes_buf)?; + (a, b, &*bytes_buf) + } else { + if !bytes_buf.is_empty() { + bytes_buf = Vec::new(); + } + let (a, b, v) = threading::lock(decompressed_bytes_by_pack_offset) + .remove(&base.offset()) + .expect("we store the resolved delta buffer when done"); + storage = v; + (a, b, storage.as_slice()) + }; + + // anything done here must be repeated further down for leaf-nodes. + // This way we avoid retaining their decompressed memory longer than needed (they have no children, + // thus their memory can be released right away, using 18% less peak memory on the linux kernel). + { + modify_base( + base.data(), + progress, + Context { + entry: &base_entry, + entry_end, + decompressed: base_bytes, + level, + }, + ) + .map_err(|err| Box::new(err) as Box)?; + object_counter.as_ref().map(|c| c.fetch_add(1, Ordering::SeqCst)); + size_counter + .as_ref() + .map(|c| c.fetch_add(base_bytes.len(), Ordering::SeqCst)); + } + + for mut child in base.into_child_iter() { + let (mut child_entry, entry_end) = + decompress_from_resolver(child.entry_slice(), &mut delta_bytes)?; + let (base_size, consumed) = data::delta::decode_header_size(&delta_bytes); + let mut header_ofs = consumed; + assert_eq!( + base_bytes.len(), + base_size as usize, + "recorded base size in delta does match the actual one" + ); + let (result_size, consumed) = + data::delta::decode_header_size(&delta_bytes[consumed..]); + header_ofs += consumed; + + fully_resolved_delta_bytes.resize(result_size as usize, 0); + data::delta::apply( + base_bytes, + &mut fully_resolved_delta_bytes, + &delta_bytes[header_ofs..], + ); + + // FIXME: this actually invalidates the "pack_offset()" computation, which is not obvious to consumers + // at all + child_entry.header = base_entry.header; // assign the actual object type, instead of 'delta' + if child.has_children() { + threading::lock(decompressed_bytes_by_pack_offset).insert( + child.offset(), + (child_entry, entry_end, std::mem::take(&mut fully_resolved_delta_bytes)), + ); + threading::lock(nodes).push((level + 1, child)); + } else { + modify_base( + child.data(), + progress, + Context { + entry: &child_entry, + entry_end, + decompressed: &fully_resolved_delta_bytes, + level: level + 1, + }, + ) + .map_err(|err| Box::new(err) as Box)?; + object_counter.as_ref().map(|c| c.fetch_add(1, Ordering::SeqCst)); + size_counter + .as_ref() + .map(|c| c.fetch_add(base_bytes.len(), Ordering::SeqCst)); + } + } + } + Ok(()) + } + })?; + threads.push(thread); + } + if threads_left + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |threads_available: isize| { + (threads_available > 0).then(|| { + threads_to_create = threads_available.min(threading::lock(&nodes).len() as isize); + threads_available - threads_to_create + }) + }) + .is_err() + { + threads_to_create = 0; + } + + // What we really want to do is either wait for one of our threads to go down + // or for another scheduled thread to become available. Unfortunately we can't do that, + // but may instead find a good way to set the polling interval instead of hard-coding it. + std::thread::sleep(poll_interval); + // Get out of threads are already starving or they would be starving soon as no work is left. + if threads.iter().any(|t| t.is_finished()) { + let mut running_threads = Vec::new(); + for thread in threads.drain(..) { + if thread.is_finished() { + match thread.join() { + Ok(Err(err)) => return Err(err), + Ok(Ok(())) => { + if !returned_ourselves { + returned_ourselves = true; + } else { + threads_left.fetch_add(1, Ordering::SeqCst); + } + } + Err(err) => { + std::panic::resume_unwind(err); + } + } + } else { + running_threads.push(thread); + } + } + if running_threads.is_empty() && threading::lock(&nodes).is_empty() { + break; + } + threads = running_threads; + } + } + + Ok(()) + }) +} + +fn resize_vec(v: &mut Vec, new_len: usize) { + if new_len > v.len() { + v.reserve_exact(new_len.saturating_sub(v.capacity()) + (v.capacity() - v.len())); + // SAFETY: + // 1. we have reserved enough capacity to fit `new_len` + // 2. the caller is trusted to write into `v` to completely fill `new_len`. + #[allow(unsafe_code, clippy::uninit_vec)] + unsafe { + v.set_len(new_len); + } + } else { + v.truncate(new_len) + } +} + +fn decompress_all_at_once(b: &[u8], decompressed_len: usize, out: &mut Vec) -> Result<(), Error> { + resize_vec(out, decompressed_len); zlib::Inflate::default() - .once(b, &mut out) + .once(b, out) .map_err(|err| Error::ZlibInflate { source: err, message: "Failed to decompress entry", })?; - Ok(out) + Ok(()) } diff --git a/gix-pack/src/cache/delta/traverse/util.rs b/gix-pack/src/cache/delta/traverse/util.rs index e7caf2ff53c..1b70153517f 100644 --- a/gix-pack/src/cache/delta/traverse/util.rs +++ b/gix-pack/src/cache/delta/traverse/util.rs @@ -4,6 +4,8 @@ pub struct ItemSliceSend(pub *mut [T]) where T: Send; +/// SAFETY: This would be unsafe if this would ever be abused, but it's used internally and only in a way that assure that the pointers +/// don't violate aliasing rules. impl Clone for ItemSliceSend where T: Send, @@ -18,12 +20,12 @@ where unsafe impl Send for ItemSliceSend where T: Send {} /// An item returned by `iter_root_chunks`, allowing access to the `data` stored alongside nodes in a [`Tree`]. -pub struct Node<'a, T> { +pub struct Node<'a, T: Send> { pub item: &'a mut Item, - pub child_items: *mut [Item], + pub child_items: ItemSliceSend>, } -impl<'a, T> Node<'a, T> { +impl<'a, T: Send> Node<'a, T> { /// Returns the offset into the pack at which the `Node`s data is located. pub fn offset(&self) -> u64 { self.item.offset @@ -55,8 +57,8 @@ impl<'a, T> Node<'a, T> { // SAFETY: The resulting mutable pointer cannot be yielded by any other node. #[allow(unsafe_code)] Node { - item: unsafe { &mut *(children as *mut Item).add(index as usize) }, - child_items: children, + item: &mut unsafe { &mut *children.0 }[index as usize], + child_items: children.clone(), } }) } diff --git a/gix-pack/src/cache/lru.rs b/gix-pack/src/cache/lru.rs index bba4f5d33b5..dc53375aca7 100644 --- a/gix-pack/src/cache/lru.rs +++ b/gix-pack/src/cache/lru.rs @@ -48,7 +48,7 @@ mod memory { impl DecodeEntry for MemoryCappedHashmap { fn put(&mut self, pack_id: u32, offset: u64, data: &[u8], kind: gix_object::Kind, compressed_size: usize) { self.debug.put(); - if let Ok(Some(previous_entry)) = self.inner.put_with_weight( + let res = self.inner.put_with_weight( (pack_id, offset), Entry { data: self @@ -64,8 +64,11 @@ mod memory { kind, compressed_size, }, - ) { - self.free_list.push(previous_entry.data) + ); + match res { + Ok(Some(previous_entry)) => self.free_list.push(previous_entry.data), + Ok(None) => {} + Err((_key, value)) => self.free_list.push(value.data), } } @@ -104,41 +107,74 @@ mod _static { /// Values of 64 seem to improve performance. pub struct StaticLinkedList { inner: uluru::LRUCache, - free_list: Vec>, + last_evicted: Vec, debug: gix_features::cache::Debug, + /// the amount of bytes we are currently holding, taking into account the capacities of all Vecs we keep. + mem_used: usize, + /// The total amount of memory we should be able to hold with all entries combined. + mem_limit: usize, } - impl Default for StaticLinkedList { - fn default() -> Self { + impl StaticLinkedList { + /// Create a new list with a memory limit of `mem_limit` in bytes. If 0, there is no memory limit. + pub fn new(mem_limit: usize) -> Self { StaticLinkedList { inner: Default::default(), - free_list: Vec::new(), + last_evicted: Vec::new(), debug: gix_features::cache::Debug::new(format!("StaticLinkedList<{SIZE}>")), + mem_used: 0, + mem_limit: if mem_limit == 0 { usize::MAX } else { mem_limit }, } } } + impl Default for StaticLinkedList { + fn default() -> Self { + Self::new(96 * 1024 * 1024) + } + } + impl DecodeEntry for StaticLinkedList { fn put(&mut self, pack_id: u32, offset: u64, data: &[u8], kind: gix_object::Kind, compressed_size: usize) { + // We cannot possibly hold this much. + if data.len() > self.mem_limit { + return; + } + // If we could hold it but are are at limit, all we can do is make space. + let mem_free = self.mem_limit - self.mem_used; + if data.len() > mem_free { + // prefer freeing free-lists instead of clearing our cache + let free_list_cap = self.last_evicted.len(); + self.last_evicted = Vec::new(); + // still not enough? clear everything + if data.len() > mem_free + free_list_cap { + self.inner.clear(); + self.mem_used = 0; + } else { + self.mem_used -= free_list_cap; + } + } self.debug.put(); + let (prev_cap, cur_cap); if let Some(previous) = self.inner.insert(Entry { offset, pack_id, - data: self - .free_list - .pop() - .map(|mut v| { - v.clear(); - v.resize(data.len(), 0); - v.copy_from_slice(data); - v - }) - .unwrap_or_else(|| Vec::from(data)), + data: { + let mut v = std::mem::take(&mut self.last_evicted); + prev_cap = v.capacity(); + v.clear(); + v.resize(data.len(), 0); + v.copy_from_slice(data); + cur_cap = v.capacity(); + v + }, kind, compressed_size, }) { - self.free_list.push(previous.data) + // No need to adjust capacity as we already counted it. + self.last_evicted = previous.data; } + self.mem_used = self.mem_used + cur_cap - prev_cap; } fn get(&mut self, pack_id: u32, offset: u64, out: &mut Vec) -> Option<(gix_object::Kind, usize)> { @@ -159,6 +195,56 @@ mod _static { res } } + + #[cfg(test)] + mod tests { + use super::*; + + #[test] + fn no_limit() { + let c = StaticLinkedList::<10>::new(0); + assert_eq!( + c.mem_limit, + usize::MAX, + "zero is automatically turned into a large limit that is equivalent to unlimited" + ); + } + + #[test] + fn journey() { + let mut c = StaticLinkedList::<10>::new(100); + assert_eq!(c.mem_limit, 100); + assert_eq!(c.mem_used, 0); + + // enough memory for normal operation + let mut last_mem_used = 0; + for _ in 0..10 { + c.put(0, 0, &[0], gix_object::Kind::Blob, 1); + assert!(c.mem_used > last_mem_used); + last_mem_used = c.mem_used; + } + assert_eq!(c.mem_used, 80, "there is a minimal vec size"); + assert_eq!(c.inner.len(), 10); + assert_eq!(c.last_evicted.len(), 0); + + c.put(0, 0, &(0..20).collect::>(), gix_object::Kind::Blob, 1); + assert_eq!(c.inner.len(), 10); + assert_eq!(c.mem_used, 80 + 20); + assert_eq!(c.last_evicted.len(), 1); + + c.put(0, 0, &(0..50).collect::>(), gix_object::Kind::Blob, 1); + assert_eq!(c.inner.len(), 1, "cache clearance wasn't necessary"); + assert_eq!(c.last_evicted.len(), 0, "the free list was cleared"); + assert_eq!(c.mem_used, 50); + + c.put(0, 0, &(0..101).collect::>(), gix_object::Kind::Blob, 1); + assert_eq!( + c.inner.len(), + 1, + "objects that won't ever fit within the memory limit are ignored" + ); + } + } } #[cfg(feature = "pack-cache-lru-static")] diff --git a/gix-pack/src/cache/object.rs b/gix-pack/src/cache/object.rs index e64f47a8c8d..5f83169d345 100644 --- a/gix-pack/src/cache/object.rs +++ b/gix-pack/src/cache/object.rs @@ -1,5 +1,3 @@ -//! # Note -//! //! This module is a bit 'misplaced' if spelled out like 'gix_pack::cache::object::*' but is best placed here for code re-use and //! general usefulness. use crate::cache; @@ -58,7 +56,7 @@ mod memory { /// Put the object going by `id` of `kind` with `data` into the cache. fn put(&mut self, id: gix_hash::ObjectId, kind: gix_object::Kind, data: &[u8]) { self.debug.put(); - if let Ok(Some(previous_entry)) = self.inner.put_with_weight( + let res = self.inner.put_with_weight( id, Entry { data: self @@ -73,8 +71,11 @@ mod memory { .unwrap_or_else(|| Vec::from(data)), kind, }, - ) { - self.free_list.push(previous_entry.data) + ); + match res { + Ok(Some(previous_entry)) => self.free_list.push(previous_entry.data), + Ok(None) => {} + Err((_key, value)) => self.free_list.push(value.data), } } diff --git a/gix-pack/src/data/file/decode/entry.rs b/gix-pack/src/data/file/decode/entry.rs index a467ccd8e6c..dedb31344fb 100644 --- a/gix-pack/src/data/file/decode/entry.rs +++ b/gix-pack/src/data/file/decode/entry.rs @@ -215,6 +215,8 @@ impl File { } break; } + // This is a pessimistic guess, as worst possible compression should not be bigger than the data itself. + // TODO: is this assumption actually true? total_delta_data_size += cursor.decompressed_size; let decompressed_size = cursor .decompressed_size diff --git a/gix-pack/src/index/traverse/mod.rs b/gix-pack/src/index/traverse/mod.rs index 68a502bcadb..83173f90454 100644 --- a/gix-pack/src/index/traverse/mod.rs +++ b/gix-pack/src/index/traverse/mod.rs @@ -1,6 +1,9 @@ use std::sync::atomic::AtomicBool; -use gix_features::{parallel, progress::Progress}; +use gix_features::{ + parallel, + progress::{Progress, RawProgress}, +}; use crate::index; @@ -79,7 +82,7 @@ impl index::File { pack: &crate::data::File, progress: P, should_interrupt: &AtomicBool, - new_processor: impl Fn() -> Processor + Send + Clone, + processor: Processor, Options { traversal, thread_limit, @@ -91,17 +94,12 @@ impl index::File { P: Progress, C: crate::cache::DecodeEntry, E: std::error::Error + Send + Sync + 'static, - Processor: FnMut( - gix_object::Kind, - &[u8], - &index::Entry, - &mut ::SubProgress, - ) -> Result<(), E>, + Processor: FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn RawProgress) -> Result<(), E> + Send + Clone, F: Fn() -> C + Send + Clone, { match traversal { Algorithm::Lookup => self.traverse_with_lookup( - new_processor, + processor, pack, progress, should_interrupt, @@ -113,10 +111,10 @@ impl index::File { ), Algorithm::DeltaTreeLookup => self.traverse_with_index( pack, - new_processor, + processor, progress, should_interrupt, - crate::index::traverse::with_index::Options { check, thread_limit }, + with_index::Options { check, thread_limit }, ), } } @@ -151,19 +149,18 @@ impl index::File { } #[allow(clippy::too_many_arguments)] - fn decode_and_process_entry( + fn decode_and_process_entry( &self, check: SafetyCheck, pack: &crate::data::File, cache: &mut C, buf: &mut Vec, - progress: &mut P, - index_entry: &crate::index::Entry, - processor: &mut impl FnMut(gix_object::Kind, &[u8], &index::Entry, &mut P) -> Result<(), E>, + progress: &mut dyn RawProgress, + index_entry: &index::Entry, + processor: &mut impl FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn RawProgress) -> Result<(), E>, ) -> Result> where C: crate::cache::DecodeEntry, - P: Progress, E: std::error::Error + Send + Sync + 'static, { let pack_entry = pack.entry(index_entry.pack_offset); @@ -192,9 +189,9 @@ impl index::File { check, object_kind, buf, - progress, index_entry, || pack.entry_crc32(index_entry.pack_offset, entry_len), + progress, processor, )?; Ok(entry_stats) @@ -202,17 +199,16 @@ impl index::File { } #[allow(clippy::too_many_arguments)] -fn process_entry( +fn process_entry( check: SafetyCheck, object_kind: gix_object::Kind, decompressed: &[u8], - progress: &mut P, - index_entry: &crate::index::Entry, + index_entry: &index::Entry, pack_entry_crc32: impl FnOnce() -> u32, - processor: &mut impl FnMut(gix_object::Kind, &[u8], &index::Entry, &mut P) -> Result<(), E>, + progress: &dyn RawProgress, + processor: &mut impl FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn RawProgress) -> Result<(), E>, ) -> Result<(), Error> where - P: Progress, E: std::error::Error + Send + Sync + 'static, { if check.object_checksum() { diff --git a/gix-pack/src/index/traverse/with_index.rs b/gix-pack/src/index/traverse/with_index.rs index 769bbd07f49..884277c9dfe 100644 --- a/gix-pack/src/index/traverse/with_index.rs +++ b/gix-pack/src/index/traverse/with_index.rs @@ -59,19 +59,16 @@ impl index::File { pub fn traverse_with_index( &self, pack: &crate::data::File, - new_processor: impl Fn() -> Processor + Send + Clone, + mut processor: Processor, mut progress: P, should_interrupt: &AtomicBool, Options { check, thread_limit }: Options, ) -> Result, Error> where P: Progress, - Processor: FnMut( - gix_object::Kind, - &[u8], - &index::Entry, - &mut ::SubProgress, - ) -> Result<(), E>, + Processor: FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn gix_features::progress::RawProgress) -> Result<(), E> + + Send + + Clone, E: std::error::Error + Send + Sync + 'static, { let (verify_result, traversal_result) = parallel::join( @@ -113,29 +110,27 @@ impl index::File { self.object_hash, )?; let mut outcome = digest_statistics(tree.traverse( - |slice, out| pack.entry_slice(slice).map(|entry| out.copy_from_slice(entry)), + |slice, pack| pack.entry_slice(slice), + pack, pack.pack_end() as u64, - new_processor, - |data, - progress, - traverse::Context { - entry: pack_entry, - entry_end, - decompressed: bytes, - state: ref mut processor, - level, - }| { + move |data, + progress, + traverse::Context { + entry: pack_entry, + entry_end, + decompressed: bytes, + level, + }| { let object_kind = pack_entry.header.as_kind().expect("non-delta object"); data.level = level; data.decompressed_size = pack_entry.decompressed_size; data.object_kind = object_kind; data.compressed_size = entry_end - pack_entry.data_offset; data.object_size = bytes.len() as u64; - let result = crate::index::traverse::process_entry( + let result = index::traverse::process_entry( check, object_kind, bytes, - progress, &data.index_entry, || { // TODO: Fix this - we overwrite the header of 'data' which also changes the computed entry size, @@ -146,7 +141,8 @@ impl index::File { .expect("slice pointing into the pack (by now data is verified)"), ) }, - processor, + progress, + &mut processor, ); match result { Err(err @ Error::PackDecode { .. }) if !check.fatal_decode_error() => { @@ -156,7 +152,7 @@ impl index::File { res => res, } }, - crate::cache::delta::traverse::Options { + traverse::Options { object_progress: progress.add_child_with_id("Resolving", ProgressId::DecodedObjects.into()), size_progress: progress.add_child_with_id("Decoding", ProgressId::DecodedBytes.into()), thread_limit, diff --git a/gix-pack/src/index/traverse/with_lookup.rs b/gix-pack/src/index/traverse/with_lookup.rs index 509ae4e4f90..0165e4e01db 100644 --- a/gix-pack/src/index/traverse/with_lookup.rs +++ b/gix-pack/src/index/traverse/with_lookup.rs @@ -67,8 +67,8 @@ impl index::File { /// For more details, see the documentation on the [`traverse()`][index::File::traverse()] method. pub fn traverse_with_lookup( &self, - new_processor: impl Fn() -> Processor + Send + Clone, - pack: &crate::data::File, + mut processor: Processor, + pack: &data::File, mut progress: P, should_interrupt: &AtomicBool, Options { @@ -81,12 +81,9 @@ impl index::File { P: Progress, C: crate::cache::DecodeEntry, E: std::error::Error + Send + Sync + 'static, - Processor: FnMut( - gix_object::Kind, - &[u8], - &index::Entry, - &mut ::SubProgress, - ) -> Result<(), E>, + Processor: FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn gix_features::progress::RawProgress) -> Result<(), E> + + Send + + Clone, F: Fn() -> C + Send + Clone, { let (verify_result, traversal_result) = parallel::join( @@ -133,7 +130,6 @@ impl index::File { move |index| { ( make_pack_lookup_cache(), - new_processor(), Vec::with_capacity(2048), // decode buffer lock(&reduce_progress) .add_child_with_id(format!("thread {index}"), gix_features::progress::UNKNOWN), // per thread progress @@ -146,9 +142,9 @@ impl index::File { input_chunks, thread_limit, state_per_thread, - |entries: &[index::Entry], - (cache, ref mut processor, buf, progress)| - -> Result, Error<_>> { + move |entries: &[index::Entry], + (cache, buf, progress)| + -> Result, Error<_>> { progress.init( Some(entries.len()), gix_features::progress::count_with_decimals("objects", 2), @@ -163,7 +159,7 @@ impl index::File { buf, progress, index_entry, - processor, + &mut processor, ); progress.inc(); let stat = match result { @@ -174,6 +170,9 @@ impl index::File { res => res, }?; stats.push(stat); + if should_interrupt.load(Ordering::Relaxed) { + break; + } } Ok(stats) }, diff --git a/gix-pack/src/index/verify.rs b/gix-pack/src/index/verify.rs index 182f816bab1..6af352ac91a 100644 --- a/gix-pack/src/index/verify.rs +++ b/gix-pack/src/index/verify.rs @@ -198,7 +198,7 @@ impl index::File { pack, progress, should_interrupt, - || { + { let mut encode_buf = Vec::with_capacity(2048); move |kind, data, index_entry, progress| { Self::verify_entry(verify_mode, &mut encode_buf, kind, data, index_entry, progress) @@ -231,17 +231,14 @@ impl index::File { } #[allow(clippy::too_many_arguments)] - fn verify_entry

( + fn verify_entry( verify_mode: Mode, encode_buf: &mut Vec, object_kind: gix_object::Kind, buf: &[u8], index_entry: &index::Entry, - progress: &mut P, - ) -> Result<(), integrity::Error> - where - P: Progress, - { + progress: &dyn gix_features::progress::RawProgress, + ) -> Result<(), integrity::Error> { if let Mode::HashCrc32Decode | Mode::HashCrc32DecodeEncode = verify_mode { use gix_object::Kind::*; match object_kind { @@ -260,7 +257,7 @@ impl index::File { .expect("writing to a memory buffer never fails"); if encode_buf.as_slice() != buf { let mut should_return_error = true; - if let gix_object::Kind::Tree = object_kind { + if let Tree = object_kind { if buf.as_bstr().find(b"100664").is_some() || buf.as_bstr().find(b"100640").is_some() { progress.info(format!("Tree object {} would be cleaned up during re-serialization, replacing mode '100664|100640' with '100644'", index_entry.oid)); should_return_error = false diff --git a/gix-pack/src/index/write/mod.rs b/gix-pack/src/index/write/mod.rs index 39ed0f31ed2..72a076a851b 100644 --- a/gix-pack/src/index/write/mod.rs +++ b/gix-pack/src/index/write/mod.rs @@ -83,20 +83,22 @@ impl crate::index::File { /// It should return `None` if the entry cannot be resolved from the pack that produced the `entries` iterator, causing /// the write operation to fail. #[allow(clippy::too_many_arguments)] - pub fn write_data_iter_to_stream( + pub fn write_data_iter_to_stream( version: crate::index::Version, make_resolver: F, entries: impl Iterator>, thread_limit: Option, - mut root_progress: impl Progress, + mut root_progress: P, out: impl io::Write, should_interrupt: &AtomicBool, object_hash: gix_hash::Kind, pack_version: crate::data::Version, ) -> Result where - F: FnOnce() -> io::Result, - F2: for<'r> Fn(crate::data::EntryRange, &'r mut Vec) -> Option<()> + Send + Clone, + F: FnOnce() -> io::Result<(F2, R)>, + R: Send + Sync, + F2: for<'r> Fn(crate::data::EntryRange, &'r R) -> Option<&'r [u8]> + Send + Clone, + P: Progress, { if version != crate::index::Version::default() { return Err(Error::Unsupported(version)); @@ -180,12 +182,12 @@ impl crate::index::File { root_progress.inc(); - let resolver = make_resolver()?; + let (resolver, pack) = make_resolver()?; let sorted_pack_offsets_by_oid = { let traverse::Outcome { roots, children } = tree.traverse( resolver, + &pack, pack_entries_end, - || (), |data, _progress, traverse::Context { @@ -250,14 +252,7 @@ impl crate::index::File { } fn modify_base(entry: &mut TreeEntry, pack_entry: &crate::data::Entry, decompressed: &[u8], hash: gix_hash::Kind) { - fn compute_hash(kind: gix_object::Kind, bytes: &[u8], object_hash: gix_hash::Kind) -> gix_hash::ObjectId { - let mut hasher = gix_features::hash::hasher(object_hash); - hasher.update(&gix_object::encode::loose_header(kind, bytes.len())); - hasher.update(bytes); - gix_hash::ObjectId::from(hasher.digest()) - } - let object_kind = pack_entry.header.as_kind().expect("base object as source of iteration"); - let id = compute_hash(object_kind, decompressed, hash); + let id = gix_object::compute_hash(hash, object_kind, decompressed); entry.id = id; } diff --git a/gix-pack/tests/pack/index.rs b/gix-pack/tests/pack/index.rs index e4a7701f57e..419bc27923b 100644 --- a/gix-pack/tests/pack/index.rs +++ b/gix-pack/tests/pack/index.rs @@ -115,28 +115,25 @@ mod version { use gix_features::progress; use gix_odb::pack; - use gix_pack::{ - data::{input, EntryRange}, - index, - }; + use gix_pack::{data::input, index}; use crate::{ fixture_path, pack::{INDEX_V2, V2_PACKS_AND_INDICES}, }; + fn slice_map(entry: gix_pack::data::EntryRange, map: &memmap2::Mmap) -> Option<&[u8]> { + map.get(entry.start as usize..entry.end as usize) + } + #[test] fn write_to_stream() -> Result<(), Box> { - fn assert_index_write( + fn assert_index_write( mode: &input::Mode, compressed: &input::EntryDataMode, index_path: &&str, data_path: &&str, - resolve: F, - ) -> Result<(), Box> - where - F: Fn(pack::data::EntryRange, &mut Vec) -> Option<()> + Send + Clone, - { + ) -> Result<(), Box> { let pack_iter = pack::data::input::BytesToEntriesIter::new_from_header( io::BufReader::new(fs::File::open(fixture_path(data_path))?), *mode, @@ -150,7 +147,11 @@ mod version { let pack_version = pack_iter.version(); let outcome = pack::index::File::write_data_iter_to_stream( desired_kind, - move || Ok(resolve), + || { + let file = std::fs::File::open(fixture_path(data_path))?; + let map = unsafe { memmap2::Mmap::map(&file)? }; + Ok((slice_map, map)) + }, pack_iter, None, progress::Discard, @@ -217,17 +218,7 @@ mod version { for mode in &[input::Mode::AsIs, input::Mode::Verify, input::Mode::Restore] { for compressed in &[input::EntryDataMode::Crc32, input::EntryDataMode::KeepAndCrc32] { for (index_path, data_path) in V2_PACKS_AND_INDICES { - let resolve = { - let buf = gix_features::threading::OwnShared::new({ - let file = std::fs::File::open(fixture_path(data_path))?; - unsafe { memmap2::Mmap::map(&file)? } - }); - move |entry: EntryRange, out: &mut Vec| { - buf.get(entry.start as usize..entry.end as usize) - .map(|slice| out.copy_from_slice(slice)) - } - }; - assert_index_write(mode, compressed, index_path, data_path, resolve)?; + assert_index_write(mode, compressed, index_path, data_path)?; } } } @@ -259,11 +250,9 @@ fn traverse_with_index_and_forward_ref_deltas() { let _it_should_work = index .traverse_with_index( &data, - || { - |_, _, _, _| { - count.fetch_add(1, Ordering::SeqCst); - Ok::<_, std::io::Error>(()) - } + |_, _, _, _| { + count.fetch_add(1, Ordering::SeqCst); + Ok::<_, std::io::Error>(()) }, progress::Discard, &AtomicBool::new(false), diff --git a/gix-worktree/src/status/content.rs b/gix-worktree/src/status/content.rs index d47749ef8fb..aa775821a7a 100644 --- a/gix-worktree/src/status/content.rs +++ b/gix-worktree/src/status/content.rs @@ -1,7 +1,5 @@ -use gix_features::hash; use gix_hash::ObjectId; use gix_index as index; -use gix_object::encode::loose_header; use index::Entry; /// Compares the content of two blobs in some way. @@ -76,11 +74,7 @@ impl CompareBlobs for HashEq { _entry_blob: impl ReadDataOnce<'a, E>, ) -> Result, E> { let blob = worktree_blob.read_data()?; - let header = loose_header(gix_object::Kind::Blob, blob.len()); - let mut hasher = hash::hasher(entry.id.kind()); - hasher.update(&header); - hasher.update(blob); - let file_hash: ObjectId = hasher.digest().into(); + let file_hash = gix_object::compute_hash(entry.id.kind(), gix_object::Kind::Blob, blob); Ok((entry.id != file_hash).then_some(file_hash)) } } diff --git a/gix/Cargo.toml b/gix/Cargo.toml index e7f6a062ef1..e015d6542fe 100644 --- a/gix/Cargo.toml +++ b/gix/Cargo.toml @@ -149,7 +149,7 @@ gix-index = { version = "^0.16.1", path = "../gix-index" } gix-worktree = { version = "^0.17.1", path = "../gix-worktree" } gix-hashtable = { version = "^0.2.0", path = "../gix-hashtable" } -prodash = { version = "23.1", optional = true, default-features = false, features = ["progress-tree"] } +prodash = { version = "25.0", optional = true, default-features = false, features = ["progress-tree"] } once_cell = "1.14.0" signal-hook = { version = "0.3.9", default-features = false } thiserror = "1.0.26" diff --git a/gix/src/config/cache/init.rs b/gix/src/config/cache/init.rs index ee20e03542e..19eae0eea17 100644 --- a/gix/src/config/cache/init.rs +++ b/gix/src/config/cache/init.rs @@ -151,7 +151,7 @@ impl Cache { lenient_config, )?; let object_kind_hint = util::disambiguate_hint(&config, lenient_config)?; - let (pack_cache_bytes, object_cache_bytes) = + let (static_pack_cache_limit_bytes, pack_cache_bytes, object_cache_bytes) = util::parse_object_caches(&config, lenient_config, filter_config_section)?; // NOTE: When adding a new initial cache, consider adjusting `reread_values_and_clear_caches()` as well. Ok(Cache { @@ -159,6 +159,7 @@ impl Cache { use_multi_pack_index, object_hash, object_kind_hint, + static_pack_cache_limit_bytes, pack_cache_bytes, object_cache_bytes, reflog, @@ -222,8 +223,11 @@ impl Cache { self.url_rewrite = Default::default(); self.diff_renames = Default::default(); self.diff_algorithm = Default::default(); - (self.pack_cache_bytes, self.object_cache_bytes) = - util::parse_object_caches(config, self.lenient_config, self.filter_config_section)?; + ( + self.static_pack_cache_limit_bytes, + self.pack_cache_bytes, + self.object_cache_bytes, + ) = util::parse_object_caches(config, self.lenient_config, self.filter_config_section)?; #[cfg(any(feature = "blocking-network-client", feature = "async-network-client"))] { self.url_scheme = Default::default(); diff --git a/gix/src/config/cache/util.rs b/gix/src/config/cache/util.rs index d5a0a4acbbf..84535093850 100644 --- a/gix/src/config/cache/util.rs +++ b/gix/src/config/cache/util.rs @@ -73,7 +73,12 @@ pub(crate) fn parse_object_caches( config: &gix_config::File<'static>, lenient: bool, mut filter_config_section: fn(&gix_config::file::Metadata) -> bool, -) -> Result<(Option, usize), Error> { +) -> Result<(Option, Option, usize), Error> { + let static_pack_cache_limit = config + .integer_filter_by_key("core.deltaBaseCacheLimit", &mut filter_config_section) + .map(|res| gitoxide::Core::DEFAULT_PACK_CACHE_MEMORY_LIMIT.try_into_usize(res)) + .transpose() + .with_leniency(lenient)?; let pack_cache_bytes = config .integer_filter_by_key("core.deltaBaseCacheLimit", &mut filter_config_section) .map(|res| Core::DELTA_BASE_CACHE_LIMIT.try_into_usize(res)) @@ -85,7 +90,7 @@ pub(crate) fn parse_object_caches( .transpose() .with_leniency(lenient)? .unwrap_or_default(); - Ok((pack_cache_bytes, object_cache_bytes)) + Ok((static_pack_cache_limit, pack_cache_bytes, object_cache_bytes)) } pub(crate) fn parse_core_abbrev( diff --git a/gix/src/config/mod.rs b/gix/src/config/mod.rs index 5da56960551..5a2fa064213 100644 --- a/gix/src/config/mod.rs +++ b/gix/src/config/mod.rs @@ -470,6 +470,8 @@ pub(crate) struct Cache { pub(crate) pack_cache_bytes: Option, /// The amount of bytes to use for caching whole objects, or 0 to turn it off entirely. pub(crate) object_cache_bytes: usize, + /// The amount of bytes we can hold in our static LRU cache. Otherwise, go with the defaults. + pub(crate) static_pack_cache_limit_bytes: Option, /// The config section filter from the options used to initialize this instance. Keep these in sync! filter_config_section: fn(&gix_config::file::Metadata) -> bool, /// The object kind to pick if a prefix is ambiguous. diff --git a/gix/src/config/tree/sections/gitoxide.rs b/gix/src/config/tree/sections/gitoxide.rs index 7d60f128718..3d63e94867d 100644 --- a/gix/src/config/tree/sections/gitoxide.rs +++ b/gix/src/config/tree/sections/gitoxide.rs @@ -67,6 +67,11 @@ mod subsections { pub struct Core; impl Core { + /// The `gitoxide.core.defaultPackCacheMemoryLimit` key. + pub const DEFAULT_PACK_CACHE_MEMORY_LIMIT: keys::UnsignedInteger = + keys::UnsignedInteger::new_unsigned_integer("defaultPackCacheMemoryLimit", &Gitoxide::CORE).with_note( + "If unset, we default to 96MB memory cap for the default 64 slot LRU cache for object deltas.", + ); /// The `gitoxide.core.useNsec` key. pub const USE_NSEC: keys::Boolean = keys::Boolean::new_boolean("useNsec", &Gitoxide::CORE) .with_note("A runtime version of the USE_NSEC build flag."); @@ -89,7 +94,12 @@ mod subsections { } fn keys(&self) -> &[&dyn Key] { - &[&Self::USE_NSEC, &Self::USE_STDEV, &Self::SHALLOW_FILE] + &[ + &Self::DEFAULT_PACK_CACHE_MEMORY_LIMIT, + &Self::USE_NSEC, + &Self::USE_STDEV, + &Self::SHALLOW_FILE, + ] } fn parent(&self) -> Option<&dyn Section> { diff --git a/gix/src/repository/init.rs b/gix/src/repository/init.rs index 16659a01367..255ff90d67e 100644 --- a/gix/src/repository/init.rs +++ b/gix/src/repository/init.rs @@ -37,7 +37,12 @@ fn setup_objects(mut objects: crate::OdbHandle, config: &crate::config::Cache) - #[cfg(feature = "max-performance-safe")] { match config.pack_cache_bytes { - None => objects.set_pack_cache(|| Box::>::default()), + None => match config.static_pack_cache_limit_bytes { + None => objects.set_pack_cache(|| Box::>::default()), + Some(limit) => { + objects.set_pack_cache(move || Box::new(gix_pack::cache::lru::StaticLinkedList::<64>::new(limit))) + } + }, Some(0) => objects.unset_pack_cache(), Some(bytes) => objects.set_pack_cache(move || -> Box { Box::new(gix_pack::cache::lru::MemoryCappedHashmap::new(bytes))