From 923692bcda698b45d3d1ad518b29f6d30b23fbc0 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 14 May 2023 10:15:56 +0200 Subject: [PATCH 01/12] fix: memory capped hashmap as pack delta cache won't trash memory as much. (#851) Previously it would take a buffer from the free-list, copy data into it, and when exceeding the capacity loose it entirely. Now the freelist is handled correctly. --- gix-pack/src/cache/lru.rs | 9 ++++++--- gix-pack/src/cache/object.rs | 9 ++++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/gix-pack/src/cache/lru.rs b/gix-pack/src/cache/lru.rs index bba4f5d33b5..32216433d2b 100644 --- a/gix-pack/src/cache/lru.rs +++ b/gix-pack/src/cache/lru.rs @@ -48,7 +48,7 @@ mod memory { impl DecodeEntry for MemoryCappedHashmap { fn put(&mut self, pack_id: u32, offset: u64, data: &[u8], kind: gix_object::Kind, compressed_size: usize) { self.debug.put(); - if let Ok(Some(previous_entry)) = self.inner.put_with_weight( + let res = self.inner.put_with_weight( (pack_id, offset), Entry { data: self @@ -64,8 +64,11 @@ mod memory { kind, compressed_size, }, - ) { - self.free_list.push(previous_entry.data) + ); + match res { + Ok(Some(previous_entry)) => self.free_list.push(previous_entry.data), + Ok(None) => {} + Err((_key, value)) => self.free_list.push(value.data), } } diff --git a/gix-pack/src/cache/object.rs b/gix-pack/src/cache/object.rs index e64f47a8c8d..d85c52a4e07 100644 --- a/gix-pack/src/cache/object.rs +++ b/gix-pack/src/cache/object.rs @@ -58,7 +58,7 @@ mod memory { /// Put the object going by `id` of `kind` with `data` into the cache. fn put(&mut self, id: gix_hash::ObjectId, kind: gix_object::Kind, data: &[u8]) { self.debug.put(); - if let Ok(Some(previous_entry)) = self.inner.put_with_weight( + let res = self.inner.put_with_weight( id, Entry { data: self @@ -73,8 +73,11 @@ mod memory { .unwrap_or_else(|| Vec::from(data)), kind, }, - ) { - self.free_list.push(previous_entry.data) + ); + match res { + Ok(Some(previous_entry)) => self.free_list.push(previous_entry.data), + Ok(None) => {} + Err((_key, value)) => self.free_list.push(value.data), } } From f89cbc675b0acc67322e289e7b3a17288b9eae27 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 14 May 2023 15:57:40 +0200 Subject: [PATCH 02/12] fix: check for interrupt more often (#851) Previously when traversing a pack it could appear to hang as checks were only performed on chunk or base (of a delta-tree) level. Now interrupt checks are performed more often to stop all work much quicker. --- gix-pack/src/cache/delta/traverse/mod.rs | 1 + gix-pack/src/cache/delta/traverse/resolve.rs | 7 ++++++- gix-pack/src/index/traverse/with_index.rs | 4 ++-- gix-pack/src/index/traverse/with_lookup.rs | 5 ++++- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/gix-pack/src/cache/delta/traverse/mod.rs b/gix-pack/src/cache/delta/traverse/mod.rs index bfe2ec68769..69ec2010c5c 100644 --- a/gix-pack/src/cache/delta/traverse/mod.rs +++ b/gix-pack/src/cache/delta/traverse/mod.rs @@ -159,6 +159,7 @@ where node, state, object_hash.len_in_bytes(), + should_interrupt, ) } }, diff --git a/gix-pack/src/cache/delta/traverse/resolve.rs b/gix-pack/src/cache/delta/traverse/resolve.rs index fc94d87ef20..dca8b9a7394 100644 --- a/gix-pack/src/cache/delta/traverse/resolve.rs +++ b/gix-pack/src/cache/delta/traverse/resolve.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::AtomicBool; use std::{cell::RefCell, collections::BTreeMap, sync::atomic::Ordering}; use gix_features::{progress::Progress, zlib}; @@ -16,7 +17,7 @@ use crate::{ pub(crate) fn deltas( object_counter: Option, size_counter: Option, - node: &mut crate::cache::delta::Item, + node: &mut Item, (bytes_buf, ref mut progress, state, resolve, modify_base, child_items): &mut ( Vec, P, @@ -26,6 +27,7 @@ pub(crate) fn deltas( ItemSliceSend>, ), hash_len: usize, + should_interrupt: &AtomicBool, ) -> Result<(), Error> where T: Send, @@ -62,6 +64,9 @@ where }, )]; while let Some((level, mut base)) = nodes.pop() { + if should_interrupt.load(Ordering::Relaxed) { + return Err(Error::Interrupted); + } let (base_entry, entry_end, base_bytes) = if level == root_level { decompress_from_resolver(base.entry_slice())? } else { diff --git a/gix-pack/src/index/traverse/with_index.rs b/gix-pack/src/index/traverse/with_index.rs index 769bbd07f49..9b84d55440a 100644 --- a/gix-pack/src/index/traverse/with_index.rs +++ b/gix-pack/src/index/traverse/with_index.rs @@ -131,7 +131,7 @@ impl index::File { data.object_kind = object_kind; data.compressed_size = entry_end - pack_entry.data_offset; data.object_size = bytes.len() as u64; - let result = crate::index::traverse::process_entry( + let result = index::traverse::process_entry( check, object_kind, bytes, @@ -156,7 +156,7 @@ impl index::File { res => res, } }, - crate::cache::delta::traverse::Options { + traverse::Options { object_progress: progress.add_child_with_id("Resolving", ProgressId::DecodedObjects.into()), size_progress: progress.add_child_with_id("Decoding", ProgressId::DecodedBytes.into()), thread_limit, diff --git a/gix-pack/src/index/traverse/with_lookup.rs b/gix-pack/src/index/traverse/with_lookup.rs index 509ae4e4f90..5a5d7957ebc 100644 --- a/gix-pack/src/index/traverse/with_lookup.rs +++ b/gix-pack/src/index/traverse/with_lookup.rs @@ -68,7 +68,7 @@ impl index::File { pub fn traverse_with_lookup( &self, new_processor: impl Fn() -> Processor + Send + Clone, - pack: &crate::data::File, + pack: &data::File, mut progress: P, should_interrupt: &AtomicBool, Options { @@ -174,6 +174,9 @@ impl index::File { res => res, }?; stats.push(stat); + if should_interrupt.load(Ordering::Relaxed) { + break; + } } Ok(stats) }, From 977e135bfa00bdbc1a8f8324f85347ec9078c84f Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 14 May 2023 10:44:59 +0200 Subject: [PATCH 03/12] fix: static linked list delta cache with memory cap (#851) Previously, the 64 slot big LRU cache for pack deltas didn't use any memory limit which could lead to memory exhaustion in the face of untypical, large objects. Now we add a generous default limit to do *better* in such situations. It's worth noting though that that even without any cache, the working set of buffers to do delta resolution takes considerable memory, despite trying to keep it minimal. Note that for bigger objects, the cache is now not used at all, which probably leads to terrible performance as not even the base object can be cached. --- Makefile | 1 + gix-pack/src/cache/lru.rs | 113 ++++++++++++++++++++++++++++++----- gix-pack/src/cache/object.rs | 2 - 3 files changed, 99 insertions(+), 17 deletions(-) diff --git a/Makefile b/Makefile index c6a0479f3f3..278f7fe4e47 100644 --- a/Makefile +++ b/Makefile @@ -151,6 +151,7 @@ unit-tests: ## run all unit tests cd gix-ref/tests && cargo test --all-features cd gix-odb && cargo test && cargo test --all-features cd gix-object && cargo test && cargo test --features verbose-object-parsing-errors + cd gix-pack && cargo test --all-features cd gix-pack/tests && cargo test --features internal-testing-to-avoid-being-run-by-cargo-test-all \ && cargo test --features "internal-testing-gix-features-parallel" cd gix-index/tests && cargo test --features internal-testing-to-avoid-being-run-by-cargo-test-all \ diff --git a/gix-pack/src/cache/lru.rs b/gix-pack/src/cache/lru.rs index 32216433d2b..dc53375aca7 100644 --- a/gix-pack/src/cache/lru.rs +++ b/gix-pack/src/cache/lru.rs @@ -107,41 +107,74 @@ mod _static { /// Values of 64 seem to improve performance. pub struct StaticLinkedList { inner: uluru::LRUCache, - free_list: Vec>, + last_evicted: Vec, debug: gix_features::cache::Debug, + /// the amount of bytes we are currently holding, taking into account the capacities of all Vecs we keep. + mem_used: usize, + /// The total amount of memory we should be able to hold with all entries combined. + mem_limit: usize, } - impl Default for StaticLinkedList { - fn default() -> Self { + impl StaticLinkedList { + /// Create a new list with a memory limit of `mem_limit` in bytes. If 0, there is no memory limit. + pub fn new(mem_limit: usize) -> Self { StaticLinkedList { inner: Default::default(), - free_list: Vec::new(), + last_evicted: Vec::new(), debug: gix_features::cache::Debug::new(format!("StaticLinkedList<{SIZE}>")), + mem_used: 0, + mem_limit: if mem_limit == 0 { usize::MAX } else { mem_limit }, } } } + impl Default for StaticLinkedList { + fn default() -> Self { + Self::new(96 * 1024 * 1024) + } + } + impl DecodeEntry for StaticLinkedList { fn put(&mut self, pack_id: u32, offset: u64, data: &[u8], kind: gix_object::Kind, compressed_size: usize) { + // We cannot possibly hold this much. + if data.len() > self.mem_limit { + return; + } + // If we could hold it but are are at limit, all we can do is make space. + let mem_free = self.mem_limit - self.mem_used; + if data.len() > mem_free { + // prefer freeing free-lists instead of clearing our cache + let free_list_cap = self.last_evicted.len(); + self.last_evicted = Vec::new(); + // still not enough? clear everything + if data.len() > mem_free + free_list_cap { + self.inner.clear(); + self.mem_used = 0; + } else { + self.mem_used -= free_list_cap; + } + } self.debug.put(); + let (prev_cap, cur_cap); if let Some(previous) = self.inner.insert(Entry { offset, pack_id, - data: self - .free_list - .pop() - .map(|mut v| { - v.clear(); - v.resize(data.len(), 0); - v.copy_from_slice(data); - v - }) - .unwrap_or_else(|| Vec::from(data)), + data: { + let mut v = std::mem::take(&mut self.last_evicted); + prev_cap = v.capacity(); + v.clear(); + v.resize(data.len(), 0); + v.copy_from_slice(data); + cur_cap = v.capacity(); + v + }, kind, compressed_size, }) { - self.free_list.push(previous.data) + // No need to adjust capacity as we already counted it. + self.last_evicted = previous.data; } + self.mem_used = self.mem_used + cur_cap - prev_cap; } fn get(&mut self, pack_id: u32, offset: u64, out: &mut Vec) -> Option<(gix_object::Kind, usize)> { @@ -162,6 +195,56 @@ mod _static { res } } + + #[cfg(test)] + mod tests { + use super::*; + + #[test] + fn no_limit() { + let c = StaticLinkedList::<10>::new(0); + assert_eq!( + c.mem_limit, + usize::MAX, + "zero is automatically turned into a large limit that is equivalent to unlimited" + ); + } + + #[test] + fn journey() { + let mut c = StaticLinkedList::<10>::new(100); + assert_eq!(c.mem_limit, 100); + assert_eq!(c.mem_used, 0); + + // enough memory for normal operation + let mut last_mem_used = 0; + for _ in 0..10 { + c.put(0, 0, &[0], gix_object::Kind::Blob, 1); + assert!(c.mem_used > last_mem_used); + last_mem_used = c.mem_used; + } + assert_eq!(c.mem_used, 80, "there is a minimal vec size"); + assert_eq!(c.inner.len(), 10); + assert_eq!(c.last_evicted.len(), 0); + + c.put(0, 0, &(0..20).collect::>(), gix_object::Kind::Blob, 1); + assert_eq!(c.inner.len(), 10); + assert_eq!(c.mem_used, 80 + 20); + assert_eq!(c.last_evicted.len(), 1); + + c.put(0, 0, &(0..50).collect::>(), gix_object::Kind::Blob, 1); + assert_eq!(c.inner.len(), 1, "cache clearance wasn't necessary"); + assert_eq!(c.last_evicted.len(), 0, "the free list was cleared"); + assert_eq!(c.mem_used, 50); + + c.put(0, 0, &(0..101).collect::>(), gix_object::Kind::Blob, 1); + assert_eq!( + c.inner.len(), + 1, + "objects that won't ever fit within the memory limit are ignored" + ); + } + } } #[cfg(feature = "pack-cache-lru-static")] diff --git a/gix-pack/src/cache/object.rs b/gix-pack/src/cache/object.rs index d85c52a4e07..5f83169d345 100644 --- a/gix-pack/src/cache/object.rs +++ b/gix-pack/src/cache/object.rs @@ -1,5 +1,3 @@ -//! # Note -//! //! This module is a bit 'misplaced' if spelled out like 'gix_pack::cache::object::*' but is best placed here for code re-use and //! general usefulness. use crate::cache; From 969cc77ec7855fc8c23c2b50353813e6a04b779d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 14 May 2023 16:35:54 +0200 Subject: [PATCH 04/12] try to get improved hitrate for delta-cache by using it more, and fail (#851) This change will put more of the delta-chain into the cache which possibly leads to increased chances of cache-hits if objects aren't queried in random order, but in pack-offset order. Note that in general, it tends to be faster to not use any cache at all. This change was pruned back right away as the major difference to git, which does it by storing every object of the chain in the cache, is that we don't share the cache among threads. This leaves a much smaller per-thread cache size which really is a problem if the objects are large. So instead of slowing pack access down by trying it, with the default cache being unsuitable as it would evict all the time due to memory overruns, we do nothing here and rather improve the performance when dealing with pathological cases during pack traversal. --- gix-pack/src/data/file/decode/entry.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gix-pack/src/data/file/decode/entry.rs b/gix-pack/src/data/file/decode/entry.rs index a467ccd8e6c..dedb31344fb 100644 --- a/gix-pack/src/data/file/decode/entry.rs +++ b/gix-pack/src/data/file/decode/entry.rs @@ -215,6 +215,8 @@ impl File { } break; } + // This is a pessimistic guess, as worst possible compression should not be bigger than the data itself. + // TODO: is this assumption actually true? total_delta_data_size += cursor.decompressed_size; let decompressed_size = cursor .decompressed_size From 020ff4e383fc76a255eabf099bb9cf5116a95afa Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 14 May 2023 17:04:56 +0200 Subject: [PATCH 05/12] feat: Add `gitoxide.core.defaultPackCacheMemoryLimit` to control memory limits. (#851) Previously the 64 slot LRU cache didn't have any limit, now one is implemented that defaults to about 96MB. --- gix/src/config/cache/init.rs | 10 +++++++--- gix/src/config/cache/util.rs | 9 +++++++-- gix/src/config/mod.rs | 2 ++ gix/src/config/tree/sections/gitoxide.rs | 12 +++++++++++- gix/src/repository/init.rs | 7 ++++++- 5 files changed, 33 insertions(+), 7 deletions(-) diff --git a/gix/src/config/cache/init.rs b/gix/src/config/cache/init.rs index ee20e03542e..19eae0eea17 100644 --- a/gix/src/config/cache/init.rs +++ b/gix/src/config/cache/init.rs @@ -151,7 +151,7 @@ impl Cache { lenient_config, )?; let object_kind_hint = util::disambiguate_hint(&config, lenient_config)?; - let (pack_cache_bytes, object_cache_bytes) = + let (static_pack_cache_limit_bytes, pack_cache_bytes, object_cache_bytes) = util::parse_object_caches(&config, lenient_config, filter_config_section)?; // NOTE: When adding a new initial cache, consider adjusting `reread_values_and_clear_caches()` as well. Ok(Cache { @@ -159,6 +159,7 @@ impl Cache { use_multi_pack_index, object_hash, object_kind_hint, + static_pack_cache_limit_bytes, pack_cache_bytes, object_cache_bytes, reflog, @@ -222,8 +223,11 @@ impl Cache { self.url_rewrite = Default::default(); self.diff_renames = Default::default(); self.diff_algorithm = Default::default(); - (self.pack_cache_bytes, self.object_cache_bytes) = - util::parse_object_caches(config, self.lenient_config, self.filter_config_section)?; + ( + self.static_pack_cache_limit_bytes, + self.pack_cache_bytes, + self.object_cache_bytes, + ) = util::parse_object_caches(config, self.lenient_config, self.filter_config_section)?; #[cfg(any(feature = "blocking-network-client", feature = "async-network-client"))] { self.url_scheme = Default::default(); diff --git a/gix/src/config/cache/util.rs b/gix/src/config/cache/util.rs index d5a0a4acbbf..84535093850 100644 --- a/gix/src/config/cache/util.rs +++ b/gix/src/config/cache/util.rs @@ -73,7 +73,12 @@ pub(crate) fn parse_object_caches( config: &gix_config::File<'static>, lenient: bool, mut filter_config_section: fn(&gix_config::file::Metadata) -> bool, -) -> Result<(Option, usize), Error> { +) -> Result<(Option, Option, usize), Error> { + let static_pack_cache_limit = config + .integer_filter_by_key("core.deltaBaseCacheLimit", &mut filter_config_section) + .map(|res| gitoxide::Core::DEFAULT_PACK_CACHE_MEMORY_LIMIT.try_into_usize(res)) + .transpose() + .with_leniency(lenient)?; let pack_cache_bytes = config .integer_filter_by_key("core.deltaBaseCacheLimit", &mut filter_config_section) .map(|res| Core::DELTA_BASE_CACHE_LIMIT.try_into_usize(res)) @@ -85,7 +90,7 @@ pub(crate) fn parse_object_caches( .transpose() .with_leniency(lenient)? .unwrap_or_default(); - Ok((pack_cache_bytes, object_cache_bytes)) + Ok((static_pack_cache_limit, pack_cache_bytes, object_cache_bytes)) } pub(crate) fn parse_core_abbrev( diff --git a/gix/src/config/mod.rs b/gix/src/config/mod.rs index 5da56960551..5a2fa064213 100644 --- a/gix/src/config/mod.rs +++ b/gix/src/config/mod.rs @@ -470,6 +470,8 @@ pub(crate) struct Cache { pub(crate) pack_cache_bytes: Option, /// The amount of bytes to use for caching whole objects, or 0 to turn it off entirely. pub(crate) object_cache_bytes: usize, + /// The amount of bytes we can hold in our static LRU cache. Otherwise, go with the defaults. + pub(crate) static_pack_cache_limit_bytes: Option, /// The config section filter from the options used to initialize this instance. Keep these in sync! filter_config_section: fn(&gix_config::file::Metadata) -> bool, /// The object kind to pick if a prefix is ambiguous. diff --git a/gix/src/config/tree/sections/gitoxide.rs b/gix/src/config/tree/sections/gitoxide.rs index 7d60f128718..3d63e94867d 100644 --- a/gix/src/config/tree/sections/gitoxide.rs +++ b/gix/src/config/tree/sections/gitoxide.rs @@ -67,6 +67,11 @@ mod subsections { pub struct Core; impl Core { + /// The `gitoxide.core.defaultPackCacheMemoryLimit` key. + pub const DEFAULT_PACK_CACHE_MEMORY_LIMIT: keys::UnsignedInteger = + keys::UnsignedInteger::new_unsigned_integer("defaultPackCacheMemoryLimit", &Gitoxide::CORE).with_note( + "If unset, we default to 96MB memory cap for the default 64 slot LRU cache for object deltas.", + ); /// The `gitoxide.core.useNsec` key. pub const USE_NSEC: keys::Boolean = keys::Boolean::new_boolean("useNsec", &Gitoxide::CORE) .with_note("A runtime version of the USE_NSEC build flag."); @@ -89,7 +94,12 @@ mod subsections { } fn keys(&self) -> &[&dyn Key] { - &[&Self::USE_NSEC, &Self::USE_STDEV, &Self::SHALLOW_FILE] + &[ + &Self::DEFAULT_PACK_CACHE_MEMORY_LIMIT, + &Self::USE_NSEC, + &Self::USE_STDEV, + &Self::SHALLOW_FILE, + ] } fn parent(&self) -> Option<&dyn Section> { diff --git a/gix/src/repository/init.rs b/gix/src/repository/init.rs index 16659a01367..255ff90d67e 100644 --- a/gix/src/repository/init.rs +++ b/gix/src/repository/init.rs @@ -37,7 +37,12 @@ fn setup_objects(mut objects: crate::OdbHandle, config: &crate::config::Cache) - #[cfg(feature = "max-performance-safe")] { match config.pack_cache_bytes { - None => objects.set_pack_cache(|| Box::>::default()), + None => match config.static_pack_cache_limit_bytes { + None => objects.set_pack_cache(|| Box::>::default()), + Some(limit) => { + objects.set_pack_cache(move || Box::new(gix_pack::cache::lru::StaticLinkedList::<64>::new(limit))) + } + }, Some(0) => objects.unset_pack_cache(), Some(bytes) => objects.set_pack_cache(move || -> Box { Box::new(gix_pack::cache::lru::MemoryCappedHashmap::new(bytes)) From 0fa04bcbdf3102c5435e64cfef894a1bfc8d6e7b Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 14 May 2023 20:17:41 +0200 Subject: [PATCH 06/12] feat!: make current thread-count accessible in slice-workers. (#851) Threads started for working on an entry in a slice can now see the amount of threads left for use (and manipulate that variable) which effectively allows them to implement their own parallelization on top of the current one. This is useful if there is there is very imbalanced work within the slice itself. While at it, we also make consumer functions mutable as they exsit per thread. --- gix-features/Cargo.toml | 2 +- gix-features/src/parallel/in_parallel.rs | 66 +++++++++++++-------- gix-features/src/parallel/mod.rs | 8 +-- gix-features/src/parallel/serial.rs | 46 ++++++++------ gix-features/src/zlib/stream/deflate/mod.rs | 19 +++++- gix-features/tests/parallel/mod.rs | 2 +- gix-pack/src/cache/delta/traverse/mod.rs | 7 ++- 7 files changed, 97 insertions(+), 53 deletions(-) diff --git a/gix-features/Cargo.toml b/gix-features/Cargo.toml index a8ae5737f6f..893f76b1f68 100644 --- a/gix-features/Cargo.toml +++ b/gix-features/Cargo.toml @@ -5,7 +5,7 @@ repository = "https://github.com/Byron/gitoxide" version = "0.29.0" authors = ["Sebastian Thiel "] license = "MIT/Apache-2.0" -edition = "2018" +edition = "2021" rust-version = "1.64" [lib] diff --git a/gix-features/src/parallel/in_parallel.rs b/gix-features/src/parallel/in_parallel.rs index e1e2cc3e34a..241565b62d7 100644 --- a/gix-features/src/parallel/in_parallel.rs +++ b/gix-features/src/parallel/in_parallel.rs @@ -1,7 +1,10 @@ -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; use crate::parallel::{num_threads, Reduce}; +/// A scope to start threads within. +pub type Scope<'scope, 'env> = std::thread::Scope<'scope, 'env>; + /// Runs `left` and `right` in parallel, returning their output when both are done. pub fn join(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) { std::thread::scope(|s| { @@ -47,7 +50,7 @@ pub fn in_parallel( input: impl Iterator + Send, thread_limit: Option, new_thread_state: impl Fn(usize) -> S + Send + Clone, - consume: impl Fn(I, &mut S) -> O + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, mut reducer: R, ) -> Result<::Output, ::Error> where @@ -67,7 +70,7 @@ where let send_result = send_result.clone(); let receive_input = receive_input.clone(); let new_thread_state = new_thread_state.clone(); - let consume = consume.clone(); + let mut consume = consume.clone(); move || { let mut state = new_thread_state(thread_id); for item in receive_input { @@ -103,12 +106,19 @@ where /// This is only good for operations where near-random access isn't detrimental, so it's not usually great /// for file-io as it won't make use of sorted inputs well. /// Note that `periodic` is not guaranteed to be called in case other threads come up first and finish too fast. +/// `consume(&mut item, &mut stat, &Scope, &threads_available, &should_interrupt)` is called for performing the actual computation. +/// Note that `threads_available` should be decremented to start a thread that can steal your own work (as stored in `item`), +/// which allows callees to implement their own work-stealing in case the work is distributed unevenly. +/// Work stealing should only start after having processed at least one item to give all threads naturally operating on the slice +/// some time to start. Starting threads while slice-workers are still starting up would lead to over-allocation of threads, +/// which is why the number of threads left may turn negative. Once threads are started and stopped, be sure to adjust +/// the thread-count accordingly. // TODO: better docs pub fn in_parallel_with_slice( input: &mut [I], thread_limit: Option, new_thread_state: impl FnMut(usize) -> S + Send + Clone, - consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Send + Clone, + consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Send + Clone, mut periodic: impl FnMut() -> Option + Send, state_to_rval: impl FnOnce(S) -> R + Send + Clone, ) -> Result, E> @@ -121,8 +131,8 @@ where let mut results = Vec::with_capacity(num_threads); let stop_everything = &AtomicBool::default(); let index = &AtomicUsize::default(); + let threads_left = &AtomicIsize::new(num_threads as isize); - // TODO: use std::thread::scope() once Rust 1.63 is available. std::thread::scope({ move |s| { std::thread::Builder::new() @@ -163,29 +173,35 @@ where let mut consume = consume.clone(); let input = Input(input as *mut [I]); move || { + let _ = &input; + threads_left.fetch_sub(1, Ordering::SeqCst); let mut state = new_thread_state(thread_id); - while let Ok(input_index) = - index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { - (x < input_len).then_some(x + 1) - }) - { - if stop_everything.load(Ordering::Relaxed) { - break; - } - // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding - // each item exactly once. - let item = { - #[allow(unsafe_code)] - unsafe { - &mut (&mut *input.0)[input_index] + let res = (|| { + while let Ok(input_index) = + index.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { + (x < input_len).then_some(x + 1) + }) + { + if stop_everything.load(Ordering::Relaxed) { + break; + } + // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding + // each item exactly once. + let item = { + #[allow(unsafe_code)] + unsafe { + &mut (&mut *input.0)[input_index] + } + }; + if let Err(err) = consume(item, &mut state, threads_left, stop_everything) { + stop_everything.store(true, Ordering::Relaxed); + return Err(err); } - }; - if let Err(err) = consume(item, &mut state) { - stop_everything.store(true, Ordering::Relaxed); - return Err(err); } - } - Ok(state_to_rval(state)) + Ok(state_to_rval(state)) + })(); + threads_left.fetch_add(1, Ordering::SeqCst); + res } }) .expect("valid name") diff --git a/gix-features/src/parallel/mod.rs b/gix-features/src/parallel/mod.rs index c994cb3b8e9..fa54951060e 100644 --- a/gix-features/src/parallel/mod.rs +++ b/gix-features/src/parallel/mod.rs @@ -35,11 +35,11 @@ #[cfg(feature = "parallel")] mod in_parallel; #[cfg(feature = "parallel")] -pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads}; +pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope}; mod serial; #[cfg(not(feature = "parallel"))] -pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads}; +pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads, Scope}; mod in_order; pub use in_order::{InOrderIter, SequenceId}; @@ -137,7 +137,7 @@ pub fn in_parallel_if( input: impl Iterator + Send, thread_limit: Option, new_thread_state: impl Fn(usize) -> S + Send + Clone, - consume: impl Fn(I, &mut S) -> O + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, reducer: R, ) -> Result<::Output, ::Error> where @@ -163,7 +163,7 @@ pub fn in_parallel_if( input: impl Iterator, thread_limit: Option, new_thread_state: impl Fn(usize) -> S, - consume: impl Fn(I, &mut S) -> O, + consume: impl FnMut(I, &mut S) -> O, reducer: R, ) -> Result<::Output, ::Error> where diff --git a/gix-features/src/parallel/serial.rs b/gix-features/src/parallel/serial.rs index 00723b2c3c4..3511c73e3e9 100644 --- a/gix-features/src/parallel/serial.rs +++ b/gix-features/src/parallel/serial.rs @@ -2,14 +2,17 @@ use crate::parallel::Reduce; #[cfg(not(feature = "parallel"))] mod not_parallel { + use std::sync::atomic::{AtomicBool, AtomicIsize}; + /// Runs `left` and then `right`, one after another, returning their output when both are done. pub fn join(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) { (left(), right()) } /// A scope for spawning threads. - pub struct Scope<'env> { - _marker: std::marker::PhantomData<&'env mut &'env ()>, + pub struct Scope<'scope, 'env: 'scope> { + _scope: std::marker::PhantomData<&'scope mut &'scope ()>, + _env: std::marker::PhantomData<&'env mut &'env ()>, } pub struct ThreadBuilder; @@ -20,32 +23,31 @@ mod not_parallel { } #[allow(unsafe_code)] - unsafe impl Sync for Scope<'_> {} + unsafe impl Sync for Scope<'_, '_> {} impl ThreadBuilder { pub fn name(self, _new: String) -> Self { self } - pub fn spawn_scoped<'a, 'env, F, T>( + pub fn spawn_scoped<'scope, 'env, F, T>( &self, - scope: &'a Scope<'env>, + scope: &'scope Scope<'scope, 'env>, f: F, - ) -> std::io::Result> + ) -> std::io::Result> where - F: FnOnce() -> T, - F: Send + 'env, - T: Send + 'env, + F: FnOnce() -> T + 'scope, + T: 'scope, { Ok(scope.spawn(f)) } } - impl<'env> Scope<'env> { - pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> + impl<'scope, 'env> Scope<'scope, 'env> { + /// Provided with this scope, let `f` start new threads that live within it. + pub fn spawn(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> where - F: FnOnce() -> T, - F: Send + 'env, - T: Send + 'env, + F: FnOnce() -> T + 'scope, + T: 'scope, { ScopedJoinHandle { result: f(), @@ -58,10 +60,11 @@ mod not_parallel { /// Note that this implementation will run the spawned functions immediately. pub fn threads<'env, F, R>(f: F) -> R where - F: FnOnce(&Scope<'env>) -> R, + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R, { f(&Scope { - _marker: Default::default(), + _scope: Default::default(), + _env: Default::default(), }) } @@ -79,6 +82,9 @@ mod not_parallel { pub fn join(self) -> std::thread::Result { Ok(self.result) } + pub fn is_finished(&self) -> bool { + true + } } /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state. @@ -89,13 +95,15 @@ mod not_parallel { input: &mut [I], _thread_limit: Option, mut new_thread_state: impl FnMut(usize) -> S + Clone, - mut consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Clone, + mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone, mut periodic: impl FnMut() -> Option, state_to_rval: impl FnOnce(S) -> R + Clone, ) -> Result, E> { let mut state = new_thread_state(0); + let should_interrupt = &AtomicBool::default(); + let threads_left = &AtomicIsize::default(); for item in input { - consume(item, &mut state)?; + consume(item, &mut state, threads_left, should_interrupt)?; if periodic().is_none() { break; } @@ -121,7 +129,7 @@ pub fn in_parallel( input: impl Iterator, _thread_limit: Option, new_thread_state: impl Fn(usize) -> S, - consume: impl Fn(I, &mut S) -> O, + mut consume: impl FnMut(I, &mut S) -> O, mut reducer: R, ) -> Result<::Output, ::Error> where diff --git a/gix-features/src/zlib/stream/deflate/mod.rs b/gix-features/src/zlib/stream/deflate/mod.rs index 55f575ea4b7..567e8fece76 100644 --- a/gix-features/src/zlib/stream/deflate/mod.rs +++ b/gix-features/src/zlib/stream/deflate/mod.rs @@ -11,6 +11,19 @@ pub struct Write { buf: [u8; BUF_SIZE], } +impl Clone for Write +where + W: Clone, +{ + fn clone(&self) -> Self { + Write { + compressor: impls::new_compress(), + inner: self.inner.clone(), + buf: self.buf, + } + } +} + mod impls { use std::io; @@ -18,6 +31,10 @@ mod impls { use crate::zlib::stream::deflate; + pub(crate) fn new_compress() -> Compress { + Compress::new(Compression::fast(), true) + } + impl deflate::Write where W: io::Write, @@ -25,7 +42,7 @@ mod impls { /// Create a new instance writing compressed bytes to `inner`. pub fn new(inner: W) -> deflate::Write { deflate::Write { - compressor: Compress::new(Compression::fast(), true), + compressor: new_compress(), inner, buf: [0; deflate::BUF_SIZE], } diff --git a/gix-features/tests/parallel/mod.rs b/gix-features/tests/parallel/mod.rs index b4b4236f81c..cc824332dcc 100644 --- a/gix-features/tests/parallel/mod.rs +++ b/gix-features/tests/parallel/mod.rs @@ -45,7 +45,7 @@ fn in_parallel_with_mut_slice_in_chunks() { &mut input, None, |_| 0usize, - |item, acc| { + |item, acc, _threads_eft, _should_interrupt| { *acc += *item; *item += 1; Ok::<_, ()>(()) diff --git a/gix-pack/src/cache/delta/traverse/mod.rs b/gix-pack/src/cache/delta/traverse/mod.rs index 69ec2010c5c..40e3c958596 100644 --- a/gix-pack/src/cache/delta/traverse/mod.rs +++ b/gix-pack/src/cache/delta/traverse/mod.rs @@ -138,7 +138,10 @@ where thread_limit, { let object_progress = object_progress.clone(); - let child_items = ItemSliceSend(child_items as *mut [Item]); + let child_items = ItemSliceSend(std::ptr::slice_from_raw_parts_mut( + child_items.as_mut_ptr(), + child_items.len(), + )); move |thread_index| { ( Vec::::with_capacity(4096), @@ -147,7 +150,7 @@ where new_thread_state(), resolve.clone(), inspect_object.clone(), - ItemSliceSend(child_items.0), + child_items.clone(), ) } }, From 3db18c45e8b26243907521ffd11156afed28a0a3 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 15 May 2023 14:33:17 +0200 Subject: [PATCH 07/12] feat: implement `Clone` for `Sink`. --- gix-odb/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/gix-odb/src/lib.rs b/gix-odb/src/lib.rs index 08b14238c71..e0beac54848 100644 --- a/gix-odb/src/lib.rs +++ b/gix-odb/src/lib.rs @@ -52,6 +52,7 @@ pub mod cache; /// /// It can optionally compress the content, similarly to what would happen when using a [`loose::Store`][crate::loose::Store]. /// +#[derive(Clone)] pub struct Sink { compressor: Option>>, object_hash: gix_hash::Kind, From c48bbe330e5e99fa357a87a4aa210317ab7c8143 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 14 May 2023 20:35:46 +0200 Subject: [PATCH 08/12] adjust to changes in `gix-features` --- gitoxide-core/src/pack/explode.rs | 53 ++++++++++++------------ gix-pack/src/cache/delta/traverse/mod.rs | 2 +- 2 files changed, 27 insertions(+), 28 deletions(-) diff --git a/gitoxide-core/src/pack/explode.rs b/gitoxide-core/src/pack/explode.rs index 603f06c5c14..8d6369b62ce 100644 --- a/gitoxide-core/src/pack/explode.rs +++ b/gitoxide-core/src/pack/explode.rs @@ -90,6 +90,7 @@ enum Error { } #[allow(clippy::large_enum_variant)] +#[derive(Clone)] enum OutputWriter { Loose(loose::Store), Sink(odb::Sink), @@ -185,36 +186,34 @@ pub fn pack_or_pack_index( &should_interrupt, { let object_path = object_path.map(|p| p.as_ref().to_owned()); - move || { - let out = OutputWriter::new(object_path.clone(), sink_compress, object_hash); - let loose_odb = verify.then(|| object_path.as_ref().map(|path| loose::Store::at(path, object_hash))).flatten(); - let mut read_buf = Vec::new(); - move |object_kind, buf, index_entry, progress| { - let written_id = out.write_buf(object_kind, buf).map_err(|err| { - Error::Write{source: Box::new(err) as Box, - kind: object_kind, - id: index_entry.oid, - } - })?; - if written_id != index_entry.oid { - if let object::Kind::Tree = object_kind { - progress.info(format!( - "The tree in pack named {} was written as {} due to modes 100664 and 100640 rewritten as 100644.", - index_entry.oid, written_id - )); - } else { - return Err(Error::ObjectEncodeMismatch{kind: object_kind, actual: index_entry.oid, expected:written_id}); - } + let out = OutputWriter::new(object_path.clone(), sink_compress, object_hash); + let loose_odb = verify.then(|| object_path.as_ref().map(|path| loose::Store::at(path, object_hash))).flatten(); + let mut read_buf = Vec::new(); + move |object_kind, buf, index_entry, progress| { + let written_id = out.write_buf(object_kind, buf).map_err(|err| { + Error::Write{source: Box::new(err) as Box, + kind: object_kind, + id: index_entry.oid, } - if let Some(verifier) = loose_odb.as_ref() { - let obj = verifier - .try_find(written_id, &mut read_buf) - .map_err(|err| Error::WrittenFileCorrupt{source:err, id:written_id})? - .ok_or(Error::WrittenFileMissing{id:written_id})?; - obj.verify_checksum(written_id)?; + })?; + if written_id != index_entry.oid { + if let object::Kind::Tree = object_kind { + progress.info(format!( + "The tree in pack named {} was written as {} due to modes 100664 and 100640 rewritten as 100644.", + index_entry.oid, written_id + )); + } else { + return Err(Error::ObjectEncodeMismatch{kind: object_kind, actual: index_entry.oid, expected:written_id}); } - Ok(()) } + if let Some(verifier) = loose_odb.as_ref() { + let obj = verifier + .try_find(written_id, &mut read_buf) + .map_err(|err| Error::WrittenFileCorrupt{source:err, id:written_id})? + .ok_or(Error::WrittenFileMissing{id:written_id})?; + obj.verify_checksum(written_id)?; + } + Ok(()) } }, pack::index::traverse::Options { diff --git a/gix-pack/src/cache/delta/traverse/mod.rs b/gix-pack/src/cache/delta/traverse/mod.rs index 40e3c958596..26a03811d52 100644 --- a/gix-pack/src/cache/delta/traverse/mod.rs +++ b/gix-pack/src/cache/delta/traverse/mod.rs @@ -155,7 +155,7 @@ where } }, { - move |node, state| { + move |node, state, _threads_left, should_interrupt| { resolve::deltas( object_counter.clone(), size_counter.clone(), From 14e7ea0217af8a04ed2b50ff7b13c28335c29022 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 14 May 2023 21:52:40 +0200 Subject: [PATCH 09/12] feat: improve performance by avoiding zeroying buffers. (#851) Previously we would use `resize(new_len, 0)` to resize buffers, even though these values would then be overwritten (or the buffer isn't available). Now we use `set_len(new_len)` after calling `reserve` to do the same, but safe a memset. --- gix-pack/src/cache/delta/traverse/resolve.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/gix-pack/src/cache/delta/traverse/resolve.rs b/gix-pack/src/cache/delta/traverse/resolve.rs index dca8b9a7394..03daa8239fd 100644 --- a/gix-pack/src/cache/delta/traverse/resolve.rs +++ b/gix-pack/src/cache/delta/traverse/resolve.rs @@ -14,6 +14,7 @@ use crate::{ data::EntryRange, }; +#[allow(clippy::too_many_arguments)] pub(crate) fn deltas( object_counter: Option, size_counter: Option, @@ -147,8 +148,14 @@ where } fn decompress_all_at_once(b: &[u8], decompressed_len: usize) -> Result, Error> { - let mut out = Vec::new(); - out.resize(decompressed_len, 0); + let mut out = Vec::with_capacity(decompressed_len); + // SAFETY: + // 1. we have reserved `decompressed_len` + // 2. zlib is going to write all of `decompressed_len`. On error, none of the the buffer is made available to the user. + #[allow(unsafe_code, clippy::uninit_vec)] + unsafe { + out.set_len(decompressed_len); + } zlib::Inflate::default() .once(b, &mut out) .map_err(|err| Error::ZlibInflate { From add5ea8b83d00972b560536da82f9914ef6080d3 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 16 May 2023 18:50:43 +0200 Subject: [PATCH 10/12] feat: make `prodash::RawProgress` available. It's an object-safe version of the `Progress` trait. --- Cargo.lock | 20 ++++++++++++++------ gix-features/Cargo.toml | 2 +- gix-features/src/progress.rs | 2 +- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 761c65495cb..49b4d5aebd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1243,7 +1243,7 @@ dependencies = [ "gix-features 0.29.0", "is-terminal", "owo-colors", - "prodash", + "prodash 23.1.2", "tabled", "time", ] @@ -1322,7 +1322,7 @@ dependencies = [ "is_ci", "log", "once_cell", - "prodash", + "prodash 23.1.2", "regex", "reqwest", "serde", @@ -1628,7 +1628,7 @@ dependencies = [ "libc", "once_cell", "parking_lot", - "prodash", + "prodash 25.0.0", "sha1", "sha1_smol", "thiserror", @@ -1643,7 +1643,7 @@ checksum = "cf69b0f5c701cc3ae22d3204b671907668f6437ca88862d355eaf9bc47a4f897" dependencies = [ "gix-hash 0.11.1 (registry+https://github.com/rust-lang/crates.io-index)", "libc", - "prodash", + "prodash 23.1.2", "sha1_smol", "walkdir", ] @@ -3358,11 +3358,9 @@ checksum = "9516b775656bc3e8985e19cd4b8c0c0de045095074e453d2c0a513b5f978392d" dependencies = [ "async-io", "atty", - "bytesize", "crosstermion", "futures-core", "futures-lite", - "human_format", "humantime", "log", "parking_lot", @@ -3374,6 +3372,16 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "prodash" +version = "25.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3236ce1618b6da4c7b618e0143c4d5b5dc190f75f81c49f248221382f7e9e9ae" +dependencies = [ + "bytesize", + "human_format", +] + [[package]] name = "ptyprocess" version = "0.4.1" diff --git a/gix-features/Cargo.toml b/gix-features/Cargo.toml index 893f76b1f68..5db2023d226 100644 --- a/gix-features/Cargo.toml +++ b/gix-features/Cargo.toml @@ -123,7 +123,7 @@ crc32fast = { version = "1.2.1", optional = true } sha1 = { version = "0.10.0", optional = true } # progress -prodash = { version = "23.1", optional = true, default-features = false } +prodash = { version = "25.0.0", optional = true, default-features = false } bytesize = { version = "1.0.1", optional = true } # pipe diff --git a/gix-features/src/progress.rs b/gix-features/src/progress.rs index b7aeda620fd..8d1e30bc450 100644 --- a/gix-features/src/progress.rs +++ b/gix-features/src/progress.rs @@ -7,7 +7,7 @@ pub use prodash::{ self, messages::MessageLevel, progress::{Discard, DoOrDiscard, Either, Id, Step, StepShared, Task, ThroughputOnDrop, Value, UNKNOWN}, - unit, Progress, Unit, + unit, Progress, RawProgress, Unit, }; /// A stub for the portions of the `bytesize` crate that we use internally in `gitoxide`. #[cfg(not(feature = "progress-unit-bytes"))] From d22dd8fcc22e8dbe30524a1bdddc09bc841db341 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 14 May 2023 20:36:07 +0200 Subject: [PATCH 11/12] feat!: index-backed tree traversal with a form of work-stealing. (#851) When delta-trees are unbalanced, in pathological cases it's possible that that one thread ends up with more than half of the work. In this case it's required that it manages to spawn its own threads to parallelize the work it has. --- gix-pack/Cargo.toml | 2 +- gix-pack/src/bundle/write/mod.rs | 29 +- gix-pack/src/cache/delta/traverse/mod.rs | 60 +-- gix-pack/src/cache/delta/traverse/resolve.rs | 365 ++++++++++++++++--- gix-pack/src/cache/delta/traverse/util.rs | 12 +- gix-pack/src/index/traverse/mod.rs | 40 +- gix-pack/src/index/traverse/with_index.rs | 36 +- gix-pack/src/index/traverse/with_lookup.rs | 20 +- gix-pack/src/index/verify.rs | 13 +- gix-pack/src/index/write/mod.rs | 23 +- gix-pack/tests/pack/index.rs | 43 +-- 11 files changed, 441 insertions(+), 202 deletions(-) diff --git a/gix-pack/Cargo.toml b/gix-pack/Cargo.toml index 0c2dafa6c86..02f73b770f4 100644 --- a/gix-pack/Cargo.toml +++ b/gix-pack/Cargo.toml @@ -5,7 +5,7 @@ repository = "https://github.com/Byron/gitoxide" authors = ["Sebastian Thiel "] license = "MIT/Apache-2.0" description = "Implements git packs and related data structures" -edition = "2018" +edition = "2021" include = ["src/**/*", "CHANGELOG.md"] rust-version = "1.64" autotests = false diff --git a/gix-pack/src/bundle/write/mod.rs b/gix-pack/src/bundle/write/mod.rs index fc0284b53a1..c5a048e85cc 100644 --- a/gix-pack/src/bundle/write/mod.rs +++ b/gix-pack/src/bundle/write/mod.rs @@ -62,17 +62,14 @@ impl crate::Bundle { /// * the resulting pack may be empty, that is, contains zero objects in some situations. This is a valid reply by a server and should /// be accounted for. /// - Empty packs always have the same name and not handling this case will result in at most one superfluous pack. - pub fn write_to_directory

( + pub fn write_to_directory( pack: impl io::BufRead, directory: Option>, - mut progress: P, + mut progress: impl Progress, should_interrupt: &AtomicBool, thin_pack_base_object_lookup_fn: Option, options: Options, - ) -> Result - where - P: Progress, - { + ) -> Result { let mut read_progress = progress.add_child_with_id("read pack", ProgressId::ReadPackBytes.into()); read_progress.init(None, progress::bytes()); let pack = progress::Read { @@ -354,20 +351,20 @@ impl crate::Bundle { } } +fn resolve_entry(range: data::EntryRange, mapped_file: &memmap2::Mmap) -> Option<&[u8]> { + mapped_file.get(range.start as usize..range.end as usize) +} + fn new_pack_file_resolver( data_file: SharedTempFile, -) -> io::Result) -> Option<()> + Send + Clone> { +) -> io::Result<( + impl Fn(data::EntryRange, &memmap2::Mmap) -> Option<&[u8]> + Send + Clone, + memmap2::Mmap, +)> { let mut guard = data_file.lock(); guard.flush()?; - let mapped_file = Arc::new(crate::mmap::read_only( - &guard.get_mut().with_mut(|f| f.path().to_owned())?, - )?); - let pack_data_lookup = move |range: std::ops::Range, out: &mut Vec| -> Option<()> { - mapped_file - .get(range.start as usize..range.end as usize) - .map(|pack_entry| out.copy_from_slice(pack_entry)) - }; - Ok(pack_data_lookup) + let mapped_file = crate::mmap::read_only(&guard.get_mut().with_mut(|f| f.path().to_owned())?)?; + Ok((resolve_entry, mapped_file)) } struct WriteOutcome { diff --git a/gix-pack/src/cache/delta/traverse/mod.rs b/gix-pack/src/cache/delta/traverse/mod.rs index 26a03811d52..daf219d32bc 100644 --- a/gix-pack/src/cache/delta/traverse/mod.rs +++ b/gix-pack/src/cache/delta/traverse/mod.rs @@ -1,9 +1,10 @@ use std::sync::atomic::{AtomicBool, Ordering}; +use gix_features::threading::{Mutable, OwnShared}; use gix_features::{ parallel::in_parallel_with_slice, progress::{self, Progress}, - threading::{lock, Mutable, OwnShared}, + threading, }; use crate::{ @@ -36,18 +37,18 @@ pub enum Error { /// The base's offset which was from a resolved ref-delta that didn't actually get added to the tree base_pack_offset: crate::data::Offset, }, + #[error("Failed to spawn thread when switching to work-stealing mode")] + SpawnThread(#[from] std::io::Error), } /// Additional context passed to the `inspect_object(…)` function of the [`Tree::traverse()`] method. -pub struct Context<'a, S> { +pub struct Context<'a> { /// The pack entry describing the object pub entry: &'a crate::data::Entry, /// The offset at which `entry` ends in the pack, useful to learn about the exact range of `entry` within the pack. pub entry_end: u64, /// The decompressed object itself, ready to be decoded. pub decompressed: &'a [u8], - /// Custom state known to the function - pub state: &'a mut S, /// The depth at which this object resides in the delta-tree. It represents the amount of base objects, with 0 indicating /// an 'undeltified' object, and higher values indicating delta objects with the given amount of bases. pub level: u16, @@ -89,79 +90,86 @@ where /// operation as well. /// * `pack_entries_end` marks one-past-the-last byte of the last entry in the pack, as the last entries size would otherwise /// be unknown as it's not part of the index file. - /// * `new_thread_state() -> State` is a function to create state to be used in each thread, invoked once per thread. /// * `inspect_object(node_data: &mut T, progress: Progress, context: Context) -> Result<(), CustomError>` is a function /// running for each thread receiving fully decoded objects along with contextual information, which either succeeds with `Ok(())` /// or returns a `CustomError`. - /// Note that `node_data` can be modified to allow storing maintaining computation results on a per-object basis. + /// Note that `node_data` can be modified to allow storing maintaining computation results on a per-object basis. It should contain + /// its own mutable per-thread data as required. /// /// This method returns a vector of all tree items, along with their potentially modified custom node data. /// /// _Note_ that this method consumed the Tree to assure safe parallel traversal with mutation support. - pub fn traverse( + pub fn traverse( mut self, resolve: F, + resolve_data: &R, pack_entries_end: u64, - new_thread_state: impl Fn() -> S + Send + Clone, inspect_object: MBFN, Options { thread_limit, - object_progress, + mut object_progress, mut size_progress, should_interrupt, object_hash, }: Options<'_, P1, P2>, ) -> Result, Error> where - F: for<'r> Fn(EntryRange, &'r mut Vec) -> Option<()> + Send + Clone, + F: for<'r> FnMut(EntryRange, &'r R) -> Option<&'r [u8]> + Send + Clone, + R: Send + Sync, P1: Progress, P2: Progress, - MBFN: Fn(&mut T, &mut ::SubProgress, Context<'_, S>) -> Result<(), E> + Send + Clone, + MBFN: FnMut(&mut T, &::SubProgress, Context<'_>) -> Result<(), E> + Send + Clone, E: std::error::Error + Send + Sync + 'static, { self.set_pack_entries_end_and_resolve_ref_offsets(pack_entries_end)?; - let object_progress = OwnShared::new(Mutable::new(object_progress)); let num_objects = self.num_items(); let object_counter = { - let mut progress = lock(&object_progress); + let progress = &mut object_progress; progress.init(Some(num_objects), progress::count("objects")); progress.counter() }; size_progress.init(None, progress::bytes()); let size_counter = size_progress.counter(); let child_items = self.child_items.as_mut_slice(); + let object_progress = OwnShared::new(Mutable::new(object_progress)); let start = std::time::Instant::now(); in_parallel_with_slice( &mut self.root_items, thread_limit, { - let object_progress = object_progress.clone(); let child_items = ItemSliceSend(std::ptr::slice_from_raw_parts_mut( child_items.as_mut_ptr(), child_items.len(), )); - move |thread_index| { - ( - Vec::::with_capacity(4096), - lock(&object_progress) - .add_child_with_id(format!("thread {thread_index}"), gix_features::progress::UNKNOWN), - new_thread_state(), - resolve.clone(), - inspect_object.clone(), - child_items.clone(), - ) + { + let object_progress = object_progress.clone(); + move |thread_index| { + let _ = &child_items; + resolve::State { + bytes_buf: Vec::::with_capacity(4096), + delta_bytes: Vec::::with_capacity(4096), + fully_resolved_delta_bytes: Vec::::with_capacity(4096), + progress: threading::lock(&object_progress) + .add_child(format!("gix-pack.delta-traverse.inspect-object.{thread_index}")), + resolve: resolve.clone(), + modify_base: inspect_object.clone(), + child_items: child_items.clone(), + } + } } }, { - move |node, state, _threads_left, should_interrupt| { + move |node, state, threads_left, should_interrupt| { resolve::deltas( object_counter.clone(), size_counter.clone(), node, state, + resolve_data, object_hash.len_in_bytes(), + threads_left, should_interrupt, ) } @@ -170,7 +178,7 @@ where |_| (), )?; - lock(&object_progress).show_throughput(start); + threading::lock(&object_progress).show_throughput(start); size_progress.show_throughput(start); Ok(Outcome { diff --git a/gix-pack/src/cache/delta/traverse/resolve.rs b/gix-pack/src/cache/delta/traverse/resolve.rs index 03daa8239fd..7b08457d578 100644 --- a/gix-pack/src/cache/delta/traverse/resolve.rs +++ b/gix-pack/src/cache/delta/traverse/resolve.rs @@ -1,7 +1,8 @@ -use std::sync::atomic::AtomicBool; -use std::{cell::RefCell, collections::BTreeMap, sync::atomic::Ordering}; +use std::sync::atomic::{AtomicBool, AtomicIsize}; +use std::{collections::BTreeMap, sync::atomic::Ordering}; -use gix_features::{progress::Progress, zlib}; +use gix_features::progress::Progress; +use gix_features::{threading, zlib}; use crate::{ cache::delta::{ @@ -11,49 +12,59 @@ use crate::{ }, Item, }, + data, data::EntryRange, }; +pub(crate) struct State { + pub bytes_buf: Vec, + pub delta_bytes: Vec, + pub fully_resolved_delta_bytes: Vec, + pub progress: P, + pub resolve: F, + pub modify_base: MBFN, + pub child_items: ItemSliceSend>, +} + #[allow(clippy::too_many_arguments)] -pub(crate) fn deltas( +pub(crate) fn deltas( object_counter: Option, size_counter: Option, node: &mut Item, - (bytes_buf, ref mut progress, state, resolve, modify_base, child_items): &mut ( - Vec, - P, - S, - F, - MBFN, - ItemSliceSend>, - ), + State { + bytes_buf, + delta_bytes, + fully_resolved_delta_bytes, + progress, + resolve, + modify_base, + child_items, + }: &mut State, + resolve_data: &R, hash_len: usize, + threads_left: &AtomicIsize, should_interrupt: &AtomicBool, ) -> Result<(), Error> where T: Send, - F: for<'r> Fn(EntryRange, &'r mut Vec) -> Option<()>, + R: Send + Sync, P: Progress, - MBFN: Fn(&mut T, &mut P, Context<'_, S>) -> Result<(), E>, + F: for<'r> FnMut(EntryRange, &'r R) -> Option<&'r [u8]> + Send + Clone, + MBFN: FnMut(&mut T, &P, Context<'_>) -> Result<(), E> + Send + Clone, E: std::error::Error + Send + Sync + 'static, { let mut decompressed_bytes_by_pack_offset = BTreeMap::new(); - let bytes_buf = RefCell::new(bytes_buf); - let decompress_from_resolver = |slice: EntryRange| -> Result<(crate::data::Entry, u64, Vec), Error> { - let mut bytes_buf = bytes_buf.borrow_mut(); - bytes_buf.resize((slice.end - slice.start) as usize, 0); - resolve(slice.clone(), &mut bytes_buf).ok_or(Error::ResolveFailed { + let mut decompress_from_resolver = |slice: EntryRange, out: &mut Vec| -> Result<(data::Entry, u64), Error> { + let bytes = resolve(slice.clone(), resolve_data).ok_or(Error::ResolveFailed { pack_offset: slice.start, })?; - let entry = crate::data::Entry::from_bytes(&bytes_buf, slice.start, hash_len); - let compressed = &bytes_buf[entry.header_size()..]; + let entry = data::Entry::from_bytes(bytes, slice.start, hash_len); + let compressed = &bytes[entry.header_size()..]; let decompressed_len = entry.decompressed_size as usize; - Ok((entry, slice.end, decompress_all_at_once(compressed, decompressed_len)?)) + decompress_all_at_once(compressed, decompressed_len, out)?; + Ok((entry, slice.end)) }; - // Traverse the tree breadth first and loose the data produced for the base as it won't be needed anymore. - progress.init(None, gix_features::progress::count_with_decimals("objects", 2)); - // each node is a base, and its children always start out as deltas which become a base after applying them. // These will be pushed onto our stack until all are processed let root_level = 0; @@ -61,19 +72,26 @@ where root_level, Node { item: node, - child_items: child_items.0, + child_items: child_items.clone(), }, )]; while let Some((level, mut base)) = nodes.pop() { + let storage; if should_interrupt.load(Ordering::Relaxed) { return Err(Error::Interrupted); } let (base_entry, entry_end, base_bytes) = if level == root_level { - decompress_from_resolver(base.entry_slice())? + let (a, b) = decompress_from_resolver(base.entry_slice(), bytes_buf)?; + (a, b, &*bytes_buf) } else { - decompressed_bytes_by_pack_offset + if !bytes_buf.is_empty() { + *bytes_buf = Vec::new(); + } + let (a, b, v) = decompressed_bytes_by_pack_offset .remove(&base.offset()) - .expect("we store the resolved delta buffer when done") + .expect("we store the resolved delta buffer when done"); + storage = v; + (a, b, &storage) }; // anything done here must be repeated further down for leaf-nodes. @@ -86,8 +104,7 @@ where Context { entry: &base_entry, entry_end, - decompressed: &base_bytes, - state, + decompressed: base_bytes, level, }, ) @@ -99,20 +116,19 @@ where } for mut child in base.into_child_iter() { - let (mut child_entry, entry_end, delta_bytes) = decompress_from_resolver(child.entry_slice())?; - let (base_size, consumed) = crate::data::delta::decode_header_size(&delta_bytes); + let (mut child_entry, entry_end) = decompress_from_resolver(child.entry_slice(), delta_bytes)?; + let (base_size, consumed) = data::delta::decode_header_size(delta_bytes); let mut header_ofs = consumed; assert_eq!( base_bytes.len(), base_size as usize, - "recorded base size in delta does not match" + "recorded base size in delta does match the actual one" ); - let (result_size, consumed) = crate::data::delta::decode_header_size(&delta_bytes[consumed..]); + let (result_size, consumed) = data::delta::decode_header_size(&delta_bytes[consumed..]); header_ofs += consumed; - let mut fully_resolved_delta_bytes = bytes_buf.borrow_mut(); - fully_resolved_delta_bytes.resize(result_size as usize, 0); - crate::data::delta::apply(&base_bytes, &mut fully_resolved_delta_bytes, &delta_bytes[header_ofs..]); + resize_vec(fully_resolved_delta_bytes, result_size as usize); + data::delta::apply(base_bytes, fully_resolved_delta_bytes, &delta_bytes[header_ofs..]); // FIXME: this actually invalidates the "pack_offset()" computation, which is not obvious to consumers // at all @@ -120,7 +136,7 @@ where if child.has_children() { decompressed_bytes_by_pack_offset.insert( child.offset(), - (child_entry, entry_end, fully_resolved_delta_bytes.to_owned()), + (child_entry, entry_end, std::mem::take(fully_resolved_delta_bytes)), ); nodes.push((level + 1, child)); } else { @@ -130,8 +146,7 @@ where Context { entry: &child_entry, entry_end, - decompressed: &fully_resolved_delta_bytes, - state, + decompressed: fully_resolved_delta_bytes, level: level + 1, }, ) @@ -142,25 +157,273 @@ where .map(|c| c.fetch_add(base_bytes.len(), Ordering::SeqCst)); } } + + // After the first round, see if we can use additional threads, and if so we enter multi-threaded mode. + // In it we will keep using new threads as they become available while using this thread for coordination. + // We optimize for a low memory footprint as we are likely to get here if long delta-chains with large objects are involved. + // Try to avoid going into threaded mode if there isn't more than one unit of work anyway. + if nodes.len() > 1 { + if let Ok(initial_threads) = + threads_left.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |threads_available| { + (threads_available > 0).then_some(0) + }) + { + // Assure no memory is held here. + *bytes_buf = Vec::new(); + *delta_bytes = Vec::new(); + *fully_resolved_delta_bytes = Vec::new(); + return deltas_mt( + initial_threads, + decompressed_bytes_by_pack_offset, + object_counter, + size_counter, + progress, + nodes, + resolve.clone(), + resolve_data, + modify_base.clone(), + hash_len, + threads_left, + should_interrupt, + ); + } + } } Ok(()) } -fn decompress_all_at_once(b: &[u8], decompressed_len: usize) -> Result, Error> { - let mut out = Vec::with_capacity(decompressed_len); - // SAFETY: - // 1. we have reserved `decompressed_len` - // 2. zlib is going to write all of `decompressed_len`. On error, none of the the buffer is made available to the user. - #[allow(unsafe_code, clippy::uninit_vec)] - unsafe { - out.set_len(decompressed_len); +/// * `initial_threads` is the threads we may spawn, not accounting for our own thread which is still considered used by the parent +/// system. Since this thread will take a controlling function, we may spawn one more than that. In threaded mode, we will finish +/// all remaining work. +#[allow(clippy::too_many_arguments)] +pub(crate) fn deltas_mt( + mut threads_to_create: isize, + decompressed_bytes_by_pack_offset: BTreeMap)>, + object_counter: Option, + size_counter: Option, + progress: &P, + nodes: Vec<(u16, Node<'_, T>)>, + resolve: F, + resolve_data: &R, + modify_base: MBFN, + hash_len: usize, + threads_left: &AtomicIsize, + should_interrupt: &AtomicBool, +) -> Result<(), Error> +where + T: Send, + R: Send + Sync, + P: Progress, + F: for<'r> FnMut(EntryRange, &'r R) -> Option<&'r [u8]> + Send + Clone, + MBFN: FnMut(&mut T, &P, Context<'_>) -> Result<(), E> + Send + Clone, + E: std::error::Error + Send + Sync + 'static, +{ + let nodes = gix_features::threading::Mutable::new(nodes); + let decompressed_bytes_by_pack_offset = gix_features::threading::Mutable::new(decompressed_bytes_by_pack_offset); + threads_to_create += 1; // ourselves + let mut returned_ourselves = false; + + gix_features::parallel::threads(|s| -> Result<(), Error> { + let mut threads = Vec::new(); + let poll_interval = std::time::Duration::from_millis(100); + loop { + for tid in 0..threads_to_create { + let thread = gix_features::parallel::build_thread() + .name(format!("gix-pack.traverse_deltas.{tid}")) + .spawn_scoped(s, { + let nodes = &nodes; + let decompressed_bytes_by_pack_offset = &decompressed_bytes_by_pack_offset; + let mut resolve = resolve.clone(); + let mut modify_base = modify_base.clone(); + let object_counter = object_counter.as_ref(); + let size_counter = size_counter.as_ref(); + + move || -> Result<(), Error> { + let mut bytes_buf = Vec::new(); + let mut delta_bytes = Vec::new(); + let mut fully_resolved_delta_bytes = Vec::new(); + let mut decompress_from_resolver = + |slice: EntryRange, out: &mut Vec| -> Result<(data::Entry, u64), Error> { + let bytes = resolve(slice.clone(), resolve_data).ok_or(Error::ResolveFailed { + pack_offset: slice.start, + })?; + let entry = data::Entry::from_bytes(bytes, slice.start, hash_len); + let compressed = &bytes[entry.header_size()..]; + let decompressed_len = entry.decompressed_size as usize; + decompress_all_at_once(compressed, decompressed_len, out)?; + Ok((entry, slice.end)) + }; + + loop { + let (level, mut base) = match threading::lock(nodes).pop() { + Some(v) => v, + None => break, + }; + if should_interrupt.load(Ordering::Relaxed) { + return Err(Error::Interrupted); + } + let storage; + let (base_entry, entry_end, base_bytes) = if level == 0 { + let (a, b) = decompress_from_resolver(base.entry_slice(), &mut bytes_buf)?; + (a, b, &*bytes_buf) + } else { + if !bytes_buf.is_empty() { + bytes_buf = Vec::new(); + } + let (a, b, v) = threading::lock(decompressed_bytes_by_pack_offset) + .remove(&base.offset()) + .expect("we store the resolved delta buffer when done"); + storage = v; + (a, b, storage.as_slice()) + }; + + // anything done here must be repeated further down for leaf-nodes. + // This way we avoid retaining their decompressed memory longer than needed (they have no children, + // thus their memory can be released right away, using 18% less peak memory on the linux kernel). + { + modify_base( + base.data(), + progress, + Context { + entry: &base_entry, + entry_end, + decompressed: base_bytes, + level, + }, + ) + .map_err(|err| Box::new(err) as Box)?; + object_counter.as_ref().map(|c| c.fetch_add(1, Ordering::SeqCst)); + size_counter + .as_ref() + .map(|c| c.fetch_add(base_bytes.len(), Ordering::SeqCst)); + } + + for mut child in base.into_child_iter() { + let (mut child_entry, entry_end) = + decompress_from_resolver(child.entry_slice(), &mut delta_bytes)?; + let (base_size, consumed) = data::delta::decode_header_size(&delta_bytes); + let mut header_ofs = consumed; + assert_eq!( + base_bytes.len(), + base_size as usize, + "recorded base size in delta does match the actual one" + ); + let (result_size, consumed) = + data::delta::decode_header_size(&delta_bytes[consumed..]); + header_ofs += consumed; + + fully_resolved_delta_bytes.resize(result_size as usize, 0); + data::delta::apply( + base_bytes, + &mut fully_resolved_delta_bytes, + &delta_bytes[header_ofs..], + ); + + // FIXME: this actually invalidates the "pack_offset()" computation, which is not obvious to consumers + // at all + child_entry.header = base_entry.header; // assign the actual object type, instead of 'delta' + if child.has_children() { + threading::lock(decompressed_bytes_by_pack_offset).insert( + child.offset(), + (child_entry, entry_end, std::mem::take(&mut fully_resolved_delta_bytes)), + ); + threading::lock(nodes).push((level + 1, child)); + } else { + modify_base( + child.data(), + progress, + Context { + entry: &child_entry, + entry_end, + decompressed: &fully_resolved_delta_bytes, + level: level + 1, + }, + ) + .map_err(|err| Box::new(err) as Box)?; + object_counter.as_ref().map(|c| c.fetch_add(1, Ordering::SeqCst)); + size_counter + .as_ref() + .map(|c| c.fetch_add(base_bytes.len(), Ordering::SeqCst)); + } + } + } + Ok(()) + } + })?; + threads.push(thread); + } + if threads_left + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |threads_available: isize| { + (threads_available > 0).then(|| { + threads_to_create = threads_available.min(threading::lock(&nodes).len() as isize); + threads_available - threads_to_create + }) + }) + .is_err() + { + threads_to_create = 0; + } + + // What we really want to do is either wait for one of our threads to go down + // or for another scheduled thread to become available. Unfortunately we can't do that, + // but may instead find a good way to set the polling interval instead of hard-coding it. + std::thread::sleep(poll_interval); + // Get out of threads are already starving or they would be starving soon as no work is left. + if threads.iter().any(|t| t.is_finished()) { + let mut running_threads = Vec::new(); + for thread in threads.drain(..) { + if thread.is_finished() { + match thread.join() { + Ok(Err(err)) => return Err(err), + Ok(Ok(())) => { + if !returned_ourselves { + returned_ourselves = true; + } else { + threads_left.fetch_add(1, Ordering::SeqCst); + } + } + Err(err) => { + std::panic::resume_unwind(err); + } + } + } else { + running_threads.push(thread); + } + } + if running_threads.is_empty() && threading::lock(&nodes).is_empty() { + break; + } + threads = running_threads; + } + } + + Ok(()) + }) +} + +fn resize_vec(v: &mut Vec, new_len: usize) { + if new_len > v.len() { + v.reserve_exact(new_len.saturating_sub(v.capacity()) + (v.capacity() - v.len())); + // SAFETY: + // 1. we have reserved enough capacity to fit `new_len` + // 2. the caller is trusted to write into `v` to completely fill `new_len`. + #[allow(unsafe_code, clippy::uninit_vec)] + unsafe { + v.set_len(new_len); + } + } else { + v.truncate(new_len) } +} + +fn decompress_all_at_once(b: &[u8], decompressed_len: usize, out: &mut Vec) -> Result<(), Error> { + resize_vec(out, decompressed_len); zlib::Inflate::default() - .once(b, &mut out) + .once(b, out) .map_err(|err| Error::ZlibInflate { source: err, message: "Failed to decompress entry", })?; - Ok(out) + Ok(()) } diff --git a/gix-pack/src/cache/delta/traverse/util.rs b/gix-pack/src/cache/delta/traverse/util.rs index e7caf2ff53c..1b70153517f 100644 --- a/gix-pack/src/cache/delta/traverse/util.rs +++ b/gix-pack/src/cache/delta/traverse/util.rs @@ -4,6 +4,8 @@ pub struct ItemSliceSend(pub *mut [T]) where T: Send; +/// SAFETY: This would be unsafe if this would ever be abused, but it's used internally and only in a way that assure that the pointers +/// don't violate aliasing rules. impl Clone for ItemSliceSend where T: Send, @@ -18,12 +20,12 @@ where unsafe impl Send for ItemSliceSend where T: Send {} /// An item returned by `iter_root_chunks`, allowing access to the `data` stored alongside nodes in a [`Tree`]. -pub struct Node<'a, T> { +pub struct Node<'a, T: Send> { pub item: &'a mut Item, - pub child_items: *mut [Item], + pub child_items: ItemSliceSend>, } -impl<'a, T> Node<'a, T> { +impl<'a, T: Send> Node<'a, T> { /// Returns the offset into the pack at which the `Node`s data is located. pub fn offset(&self) -> u64 { self.item.offset @@ -55,8 +57,8 @@ impl<'a, T> Node<'a, T> { // SAFETY: The resulting mutable pointer cannot be yielded by any other node. #[allow(unsafe_code)] Node { - item: unsafe { &mut *(children as *mut Item).add(index as usize) }, - child_items: children, + item: &mut unsafe { &mut *children.0 }[index as usize], + child_items: children.clone(), } }) } diff --git a/gix-pack/src/index/traverse/mod.rs b/gix-pack/src/index/traverse/mod.rs index 68a502bcadb..83173f90454 100644 --- a/gix-pack/src/index/traverse/mod.rs +++ b/gix-pack/src/index/traverse/mod.rs @@ -1,6 +1,9 @@ use std::sync::atomic::AtomicBool; -use gix_features::{parallel, progress::Progress}; +use gix_features::{ + parallel, + progress::{Progress, RawProgress}, +}; use crate::index; @@ -79,7 +82,7 @@ impl index::File { pack: &crate::data::File, progress: P, should_interrupt: &AtomicBool, - new_processor: impl Fn() -> Processor + Send + Clone, + processor: Processor, Options { traversal, thread_limit, @@ -91,17 +94,12 @@ impl index::File { P: Progress, C: crate::cache::DecodeEntry, E: std::error::Error + Send + Sync + 'static, - Processor: FnMut( - gix_object::Kind, - &[u8], - &index::Entry, - &mut ::SubProgress, - ) -> Result<(), E>, + Processor: FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn RawProgress) -> Result<(), E> + Send + Clone, F: Fn() -> C + Send + Clone, { match traversal { Algorithm::Lookup => self.traverse_with_lookup( - new_processor, + processor, pack, progress, should_interrupt, @@ -113,10 +111,10 @@ impl index::File { ), Algorithm::DeltaTreeLookup => self.traverse_with_index( pack, - new_processor, + processor, progress, should_interrupt, - crate::index::traverse::with_index::Options { check, thread_limit }, + with_index::Options { check, thread_limit }, ), } } @@ -151,19 +149,18 @@ impl index::File { } #[allow(clippy::too_many_arguments)] - fn decode_and_process_entry( + fn decode_and_process_entry( &self, check: SafetyCheck, pack: &crate::data::File, cache: &mut C, buf: &mut Vec, - progress: &mut P, - index_entry: &crate::index::Entry, - processor: &mut impl FnMut(gix_object::Kind, &[u8], &index::Entry, &mut P) -> Result<(), E>, + progress: &mut dyn RawProgress, + index_entry: &index::Entry, + processor: &mut impl FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn RawProgress) -> Result<(), E>, ) -> Result> where C: crate::cache::DecodeEntry, - P: Progress, E: std::error::Error + Send + Sync + 'static, { let pack_entry = pack.entry(index_entry.pack_offset); @@ -192,9 +189,9 @@ impl index::File { check, object_kind, buf, - progress, index_entry, || pack.entry_crc32(index_entry.pack_offset, entry_len), + progress, processor, )?; Ok(entry_stats) @@ -202,17 +199,16 @@ impl index::File { } #[allow(clippy::too_many_arguments)] -fn process_entry( +fn process_entry( check: SafetyCheck, object_kind: gix_object::Kind, decompressed: &[u8], - progress: &mut P, - index_entry: &crate::index::Entry, + index_entry: &index::Entry, pack_entry_crc32: impl FnOnce() -> u32, - processor: &mut impl FnMut(gix_object::Kind, &[u8], &index::Entry, &mut P) -> Result<(), E>, + progress: &dyn RawProgress, + processor: &mut impl FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn RawProgress) -> Result<(), E>, ) -> Result<(), Error> where - P: Progress, E: std::error::Error + Send + Sync + 'static, { if check.object_checksum() { diff --git a/gix-pack/src/index/traverse/with_index.rs b/gix-pack/src/index/traverse/with_index.rs index 9b84d55440a..884277c9dfe 100644 --- a/gix-pack/src/index/traverse/with_index.rs +++ b/gix-pack/src/index/traverse/with_index.rs @@ -59,19 +59,16 @@ impl index::File { pub fn traverse_with_index( &self, pack: &crate::data::File, - new_processor: impl Fn() -> Processor + Send + Clone, + mut processor: Processor, mut progress: P, should_interrupt: &AtomicBool, Options { check, thread_limit }: Options, ) -> Result, Error> where P: Progress, - Processor: FnMut( - gix_object::Kind, - &[u8], - &index::Entry, - &mut ::SubProgress, - ) -> Result<(), E>, + Processor: FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn gix_features::progress::RawProgress) -> Result<(), E> + + Send + + Clone, E: std::error::Error + Send + Sync + 'static, { let (verify_result, traversal_result) = parallel::join( @@ -113,18 +110,17 @@ impl index::File { self.object_hash, )?; let mut outcome = digest_statistics(tree.traverse( - |slice, out| pack.entry_slice(slice).map(|entry| out.copy_from_slice(entry)), + |slice, pack| pack.entry_slice(slice), + pack, pack.pack_end() as u64, - new_processor, - |data, - progress, - traverse::Context { - entry: pack_entry, - entry_end, - decompressed: bytes, - state: ref mut processor, - level, - }| { + move |data, + progress, + traverse::Context { + entry: pack_entry, + entry_end, + decompressed: bytes, + level, + }| { let object_kind = pack_entry.header.as_kind().expect("non-delta object"); data.level = level; data.decompressed_size = pack_entry.decompressed_size; @@ -135,7 +131,6 @@ impl index::File { check, object_kind, bytes, - progress, &data.index_entry, || { // TODO: Fix this - we overwrite the header of 'data' which also changes the computed entry size, @@ -146,7 +141,8 @@ impl index::File { .expect("slice pointing into the pack (by now data is verified)"), ) }, - processor, + progress, + &mut processor, ); match result { Err(err @ Error::PackDecode { .. }) if !check.fatal_decode_error() => { diff --git a/gix-pack/src/index/traverse/with_lookup.rs b/gix-pack/src/index/traverse/with_lookup.rs index 5a5d7957ebc..0165e4e01db 100644 --- a/gix-pack/src/index/traverse/with_lookup.rs +++ b/gix-pack/src/index/traverse/with_lookup.rs @@ -67,7 +67,7 @@ impl index::File { /// For more details, see the documentation on the [`traverse()`][index::File::traverse()] method. pub fn traverse_with_lookup( &self, - new_processor: impl Fn() -> Processor + Send + Clone, + mut processor: Processor, pack: &data::File, mut progress: P, should_interrupt: &AtomicBool, @@ -81,12 +81,9 @@ impl index::File { P: Progress, C: crate::cache::DecodeEntry, E: std::error::Error + Send + Sync + 'static, - Processor: FnMut( - gix_object::Kind, - &[u8], - &index::Entry, - &mut ::SubProgress, - ) -> Result<(), E>, + Processor: FnMut(gix_object::Kind, &[u8], &index::Entry, &dyn gix_features::progress::RawProgress) -> Result<(), E> + + Send + + Clone, F: Fn() -> C + Send + Clone, { let (verify_result, traversal_result) = parallel::join( @@ -133,7 +130,6 @@ impl index::File { move |index| { ( make_pack_lookup_cache(), - new_processor(), Vec::with_capacity(2048), // decode buffer lock(&reduce_progress) .add_child_with_id(format!("thread {index}"), gix_features::progress::UNKNOWN), // per thread progress @@ -146,9 +142,9 @@ impl index::File { input_chunks, thread_limit, state_per_thread, - |entries: &[index::Entry], - (cache, ref mut processor, buf, progress)| - -> Result, Error<_>> { + move |entries: &[index::Entry], + (cache, buf, progress)| + -> Result, Error<_>> { progress.init( Some(entries.len()), gix_features::progress::count_with_decimals("objects", 2), @@ -163,7 +159,7 @@ impl index::File { buf, progress, index_entry, - processor, + &mut processor, ); progress.inc(); let stat = match result { diff --git a/gix-pack/src/index/verify.rs b/gix-pack/src/index/verify.rs index 182f816bab1..6af352ac91a 100644 --- a/gix-pack/src/index/verify.rs +++ b/gix-pack/src/index/verify.rs @@ -198,7 +198,7 @@ impl index::File { pack, progress, should_interrupt, - || { + { let mut encode_buf = Vec::with_capacity(2048); move |kind, data, index_entry, progress| { Self::verify_entry(verify_mode, &mut encode_buf, kind, data, index_entry, progress) @@ -231,17 +231,14 @@ impl index::File { } #[allow(clippy::too_many_arguments)] - fn verify_entry

( + fn verify_entry( verify_mode: Mode, encode_buf: &mut Vec, object_kind: gix_object::Kind, buf: &[u8], index_entry: &index::Entry, - progress: &mut P, - ) -> Result<(), integrity::Error> - where - P: Progress, - { + progress: &dyn gix_features::progress::RawProgress, + ) -> Result<(), integrity::Error> { if let Mode::HashCrc32Decode | Mode::HashCrc32DecodeEncode = verify_mode { use gix_object::Kind::*; match object_kind { @@ -260,7 +257,7 @@ impl index::File { .expect("writing to a memory buffer never fails"); if encode_buf.as_slice() != buf { let mut should_return_error = true; - if let gix_object::Kind::Tree = object_kind { + if let Tree = object_kind { if buf.as_bstr().find(b"100664").is_some() || buf.as_bstr().find(b"100640").is_some() { progress.info(format!("Tree object {} would be cleaned up during re-serialization, replacing mode '100664|100640' with '100644'", index_entry.oid)); should_return_error = false diff --git a/gix-pack/src/index/write/mod.rs b/gix-pack/src/index/write/mod.rs index 39ed0f31ed2..72a076a851b 100644 --- a/gix-pack/src/index/write/mod.rs +++ b/gix-pack/src/index/write/mod.rs @@ -83,20 +83,22 @@ impl crate::index::File { /// It should return `None` if the entry cannot be resolved from the pack that produced the `entries` iterator, causing /// the write operation to fail. #[allow(clippy::too_many_arguments)] - pub fn write_data_iter_to_stream( + pub fn write_data_iter_to_stream( version: crate::index::Version, make_resolver: F, entries: impl Iterator>, thread_limit: Option, - mut root_progress: impl Progress, + mut root_progress: P, out: impl io::Write, should_interrupt: &AtomicBool, object_hash: gix_hash::Kind, pack_version: crate::data::Version, ) -> Result where - F: FnOnce() -> io::Result, - F2: for<'r> Fn(crate::data::EntryRange, &'r mut Vec) -> Option<()> + Send + Clone, + F: FnOnce() -> io::Result<(F2, R)>, + R: Send + Sync, + F2: for<'r> Fn(crate::data::EntryRange, &'r R) -> Option<&'r [u8]> + Send + Clone, + P: Progress, { if version != crate::index::Version::default() { return Err(Error::Unsupported(version)); @@ -180,12 +182,12 @@ impl crate::index::File { root_progress.inc(); - let resolver = make_resolver()?; + let (resolver, pack) = make_resolver()?; let sorted_pack_offsets_by_oid = { let traverse::Outcome { roots, children } = tree.traverse( resolver, + &pack, pack_entries_end, - || (), |data, _progress, traverse::Context { @@ -250,14 +252,7 @@ impl crate::index::File { } fn modify_base(entry: &mut TreeEntry, pack_entry: &crate::data::Entry, decompressed: &[u8], hash: gix_hash::Kind) { - fn compute_hash(kind: gix_object::Kind, bytes: &[u8], object_hash: gix_hash::Kind) -> gix_hash::ObjectId { - let mut hasher = gix_features::hash::hasher(object_hash); - hasher.update(&gix_object::encode::loose_header(kind, bytes.len())); - hasher.update(bytes); - gix_hash::ObjectId::from(hasher.digest()) - } - let object_kind = pack_entry.header.as_kind().expect("base object as source of iteration"); - let id = compute_hash(object_kind, decompressed, hash); + let id = gix_object::compute_hash(hash, object_kind, decompressed); entry.id = id; } diff --git a/gix-pack/tests/pack/index.rs b/gix-pack/tests/pack/index.rs index e4a7701f57e..419bc27923b 100644 --- a/gix-pack/tests/pack/index.rs +++ b/gix-pack/tests/pack/index.rs @@ -115,28 +115,25 @@ mod version { use gix_features::progress; use gix_odb::pack; - use gix_pack::{ - data::{input, EntryRange}, - index, - }; + use gix_pack::{data::input, index}; use crate::{ fixture_path, pack::{INDEX_V2, V2_PACKS_AND_INDICES}, }; + fn slice_map(entry: gix_pack::data::EntryRange, map: &memmap2::Mmap) -> Option<&[u8]> { + map.get(entry.start as usize..entry.end as usize) + } + #[test] fn write_to_stream() -> Result<(), Box> { - fn assert_index_write( + fn assert_index_write( mode: &input::Mode, compressed: &input::EntryDataMode, index_path: &&str, data_path: &&str, - resolve: F, - ) -> Result<(), Box> - where - F: Fn(pack::data::EntryRange, &mut Vec) -> Option<()> + Send + Clone, - { + ) -> Result<(), Box> { let pack_iter = pack::data::input::BytesToEntriesIter::new_from_header( io::BufReader::new(fs::File::open(fixture_path(data_path))?), *mode, @@ -150,7 +147,11 @@ mod version { let pack_version = pack_iter.version(); let outcome = pack::index::File::write_data_iter_to_stream( desired_kind, - move || Ok(resolve), + || { + let file = std::fs::File::open(fixture_path(data_path))?; + let map = unsafe { memmap2::Mmap::map(&file)? }; + Ok((slice_map, map)) + }, pack_iter, None, progress::Discard, @@ -217,17 +218,7 @@ mod version { for mode in &[input::Mode::AsIs, input::Mode::Verify, input::Mode::Restore] { for compressed in &[input::EntryDataMode::Crc32, input::EntryDataMode::KeepAndCrc32] { for (index_path, data_path) in V2_PACKS_AND_INDICES { - let resolve = { - let buf = gix_features::threading::OwnShared::new({ - let file = std::fs::File::open(fixture_path(data_path))?; - unsafe { memmap2::Mmap::map(&file)? } - }); - move |entry: EntryRange, out: &mut Vec| { - buf.get(entry.start as usize..entry.end as usize) - .map(|slice| out.copy_from_slice(slice)) - } - }; - assert_index_write(mode, compressed, index_path, data_path, resolve)?; + assert_index_write(mode, compressed, index_path, data_path)?; } } } @@ -259,11 +250,9 @@ fn traverse_with_index_and_forward_ref_deltas() { let _it_should_work = index .traverse_with_index( &data, - || { - |_, _, _, _| { - count.fetch_add(1, Ordering::SeqCst); - Ok::<_, std::io::Error>(()) - } + |_, _, _, _| { + count.fetch_add(1, Ordering::SeqCst); + Ok::<_, std::io::Error>(()) }, progress::Discard, &AtomicBool::new(false), From 215889ceb976a59368c132aabfffb71a6a2ac9f8 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 15 May 2023 19:36:59 +0200 Subject: [PATCH 12/12] adjust to changes in `gix-pack` --- Cargo.lock | 66 ++++++++++++------------ Cargo.toml | 4 +- gitoxide-core/src/hours/mod.rs | 4 +- gitoxide-core/src/organize.rs | 19 ++----- gitoxide-core/src/pack/explode.rs | 30 +++++++---- gitoxide-core/src/query/engine/update.rs | 4 +- gitoxide-core/src/repository/verify.rs | 2 +- gix-worktree/src/status/content.rs | 8 +-- gix/Cargo.toml | 2 +- 9 files changed, 65 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 49b4d5aebd2..8ee8ad92141 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -783,9 +783,9 @@ dependencies = [ [[package]] name = "crossterm" -version = "0.25.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e64e6c0fbe2c17357405f7c758c1ef960fce08bdfb2c03d88d2a18d7e09c4b67" +checksum = "a84cda67535339806297f1b331d6dd6320470d2a0fe65381e79ee9e156dd3d13" dependencies = [ "bitflags 1.3.2", "crossterm_winapi", @@ -809,9 +809,9 @@ dependencies = [ [[package]] name = "crosstermion" -version = "0.10.1" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99aabd9b02c2d5f72697f30ffb46f5a9ff4bd240d826049892cf62c31daeed04" +checksum = "152ef46d620d4614070109e076ffc6ab032f682a380ac2efce412100c5ee7749" dependencies = [ "ansi_term", "async-channel", @@ -819,7 +819,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-lite", - "tui", + "ratatui", "tui-react", ] @@ -1243,7 +1243,7 @@ dependencies = [ "gix-features 0.29.0", "is-terminal", "owo-colors", - "prodash 23.1.2", + "prodash 25.0.0", "tabled", "time", ] @@ -1322,7 +1322,7 @@ dependencies = [ "is_ci", "log", "once_cell", - "prodash 23.1.2", + "prodash 25.0.0", "regex", "reqwest", "serde", @@ -3355,33 +3355,31 @@ name = "prodash" version = "23.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9516b775656bc3e8985e19cd4b8c0c0de045095074e453d2c0a513b5f978392d" + +[[package]] +name = "prodash" +version = "25.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3236ce1618b6da4c7b618e0143c4d5b5dc190f75f81c49f248221382f7e9e9ae" dependencies = [ "async-io", "atty", + "bytesize", "crosstermion", "futures-core", "futures-lite", + "human_format", "humantime", "log", "parking_lot", + "ratatui", "signal-hook", "time", - "tui", "tui-react", "unicode-segmentation", "unicode-width", ] -[[package]] -name = "prodash" -version = "25.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3236ce1618b6da4c7b618e0143c4d5b5dc190f75f81c49f248221382f7e9e9ae" -dependencies = [ - "bytesize", - "human_format", -] - [[package]] name = "ptyprocess" version = "0.4.1" @@ -3448,6 +3446,19 @@ dependencies = [ "getrandom", ] +[[package]] +name = "ratatui" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcc0d032bccba900ee32151ec0265667535c230169f5a011154cdcd984e16829" +dependencies = [ + "bitflags 1.3.2", + "cassowary", + "crossterm", + "unicode-segmentation", + "unicode-width", +] + [[package]] name = "rayon" version = "1.7.0" @@ -4223,27 +4234,14 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" -[[package]] -name = "tui" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccdd26cbd674007e649a272da4475fb666d3aa0ad0531da7136db6fab0e5bad1" -dependencies = [ - "bitflags 1.3.2", - "cassowary", - "crossterm", - "unicode-segmentation", - "unicode-width", -] - [[package]] name = "tui-react" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "542c37309aaf01ddaea86891f7845a8b0124194c6ccae6dbae7d223752648f4d" +checksum = "11f13a7edfda102bf2bc3f9c1714904c961d513ebf4ae0c09decc4f17340f9a7" dependencies = [ "log", - "tui", + "ratatui", "unicode-segmentation", "unicode-width", ] diff --git a/Cargo.toml b/Cargo.toml index 90ffd9731bc..85ec00ce72e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,10 +157,10 @@ gix = { version = "^0.44.1", path = "gix", default-features = false } time = "0.3.19" clap = { version = "4.1.1", features = ["derive", "cargo"] } -prodash = { version = "23.1", optional = true, default-features = false } +prodash = { version = "25.0.0", optional = true, default-features = false } is-terminal = { version = "0.4.0", optional = true } env_logger = { version = "0.10.0", default-features = false } -crosstermion = { version = "0.10.1", optional = true, default-features = false } +crosstermion = { version = "0.11.0", optional = true, default-features = false } futures-lite = { version = "1.12.0", optional = true, default-features = false, features = ["std"] } # for progress diff --git a/gitoxide-core/src/hours/mod.rs b/gitoxide-core/src/hours/mod.rs index 444230db0d8..68696f7d1d5 100644 --- a/gitoxide-core/src/hours/mod.rs +++ b/gitoxide-core/src/hours/mod.rs @@ -344,10 +344,10 @@ where } None => Vec::new(), }; - if let Some(mut progress) = change_progress { + if let Some(progress) = change_progress { progress.show_throughput(start); } - if let Some(mut progress) = lines_progress { + if let Some(progress) = lines_progress { progress.show_throughput(start); } diff --git a/gitoxide-core/src/organize.rs b/gitoxide-core/src/organize.rs index 087499f361c..89b9519c168 100644 --- a/gitoxide-core/src/organize.rs +++ b/gitoxide-core/src/organize.rs @@ -12,15 +12,12 @@ pub enum Mode { Simulate, } -fn find_git_repository_workdirs( +fn find_git_repository_workdirs( root: impl AsRef, - mut progress: P, + mut progress: impl Progress, debug: bool, threads: Option, -) -> impl Iterator -where - P::SubProgress: Sync, -{ +) -> impl Iterator { progress.init(None, progress::count("filesystem items")); fn is_repository(path: &Path) -> Option { // Can be git dir or worktree checkout (file) @@ -211,10 +208,7 @@ pub fn discover( mut progress: P, debug: bool, threads: Option, -) -> anyhow::Result<()> -where - ::SubProgress: Sync, -{ +) -> anyhow::Result<()> { for (git_workdir, _kind) in find_git_repository_workdirs(source_dir, progress.add_child("Searching repositories"), debug, threads) { @@ -229,10 +223,7 @@ pub fn run( destination: impl AsRef, mut progress: P, threads: Option, -) -> anyhow::Result<()> -where - ::SubProgress: Sync, -{ +) -> anyhow::Result<()> { let mut num_errors = 0usize; let destination = destination.as_ref().canonicalize()?; for (path_to_move, kind) in diff --git a/gitoxide-core/src/pack/explode.rs b/gitoxide-core/src/pack/explode.rs index 8d6369b62ce..9f194b7da8b 100644 --- a/gitoxide-core/src/pack/explode.rs +++ b/gitoxide-core/src/pack/explode.rs @@ -178,7 +178,7 @@ pub fn pack_or_pack_index( } }); - let pack::index::traverse::Outcome{mut progress, ..} = bundle + let pack::index::traverse::Outcome { progress, .. } = bundle .index .traverse( &bundle.pack, @@ -187,14 +187,15 @@ pub fn pack_or_pack_index( { let object_path = object_path.map(|p| p.as_ref().to_owned()); let out = OutputWriter::new(object_path.clone(), sink_compress, object_hash); - let loose_odb = verify.then(|| object_path.as_ref().map(|path| loose::Store::at(path, object_hash))).flatten(); + let loose_odb = verify + .then(|| object_path.as_ref().map(|path| loose::Store::at(path, object_hash))) + .flatten(); let mut read_buf = Vec::new(); move |object_kind, buf, index_entry, progress| { - let written_id = out.write_buf(object_kind, buf).map_err(|err| { - Error::Write{source: Box::new(err) as Box, - kind: object_kind, - id: index_entry.oid, - } + let written_id = out.write_buf(object_kind, buf).map_err(|err| Error::Write { + source: Box::new(err) as Box, + kind: object_kind, + id: index_entry.oid, })?; if written_id != index_entry.oid { if let object::Kind::Tree = object_kind { @@ -203,14 +204,21 @@ pub fn pack_or_pack_index( index_entry.oid, written_id )); } else { - return Err(Error::ObjectEncodeMismatch{kind: object_kind, actual: index_entry.oid, expected:written_id}); + return Err(Error::ObjectEncodeMismatch { + kind: object_kind, + actual: index_entry.oid, + expected: written_id, + }); } } if let Some(verifier) = loose_odb.as_ref() { let obj = verifier .try_find(written_id, &mut read_buf) - .map_err(|err| Error::WrittenFileCorrupt{source:err, id:written_id})? - .ok_or(Error::WrittenFileMissing{id:written_id})?; + .map_err(|err| Error::WrittenFileCorrupt { + source: err, + id: written_id, + })? + .ok_or(Error::WrittenFileMissing { id: written_id })?; obj.verify_checksum(written_id)?; } Ok(()) @@ -220,7 +228,7 @@ pub fn pack_or_pack_index( traversal: algorithm, thread_limit, check: check.into(), - make_pack_lookup_cache: pack::cache::lru::StaticLinkedList::<64>::default, + make_pack_lookup_cache: pack::cache::lru::StaticLinkedList::<64>::default, }, ) .with_context(|| "Failed to explode the entire pack - some loose objects may have been created nonetheless")?; diff --git a/gitoxide-core/src/query/engine/update.rs b/gitoxide-core/src/query/engine/update.rs index 0da6a0eef12..c4ec3c5dc2b 100644 --- a/gitoxide-core/src/query/engine/update.rs +++ b/gitoxide-core/src/query/engine/update.rs @@ -46,14 +46,14 @@ pub fn update( }; let commit_counter = db_progress.counter().expect("shared counter available"); - let mut change_progress = { + let change_progress = { let mut p = progress.add_child("find changes"); p.init(None, progress::count("modified files")); p }; let change_counter = change_progress.counter().expect("shared counter available"); - let mut lines_progress = { + let lines_progress = { let mut p = progress.add_child("find changes"); p.init(None, progress::count("diff lines")); p diff --git a/gitoxide-core/src/repository/verify.rs b/gitoxide-core/src/repository/verify.rs index b5e6f422827..7e028fd2510 100644 --- a/gitoxide-core/src/repository/verify.rs +++ b/gitoxide-core/src/repository/verify.rs @@ -31,7 +31,7 @@ pub fn integrity( }: Context, ) -> anyhow::Result<()> { #[cfg_attr(not(feature = "serde"), allow(unused))] - let mut outcome = repo.objects.store_ref().verify_integrity( + let outcome = repo.objects.store_ref().verify_integrity( progress, should_interrupt, gix::odb::pack::index::verify::integrity::Options { diff --git a/gix-worktree/src/status/content.rs b/gix-worktree/src/status/content.rs index d47749ef8fb..aa775821a7a 100644 --- a/gix-worktree/src/status/content.rs +++ b/gix-worktree/src/status/content.rs @@ -1,7 +1,5 @@ -use gix_features::hash; use gix_hash::ObjectId; use gix_index as index; -use gix_object::encode::loose_header; use index::Entry; /// Compares the content of two blobs in some way. @@ -76,11 +74,7 @@ impl CompareBlobs for HashEq { _entry_blob: impl ReadDataOnce<'a, E>, ) -> Result, E> { let blob = worktree_blob.read_data()?; - let header = loose_header(gix_object::Kind::Blob, blob.len()); - let mut hasher = hash::hasher(entry.id.kind()); - hasher.update(&header); - hasher.update(blob); - let file_hash: ObjectId = hasher.digest().into(); + let file_hash = gix_object::compute_hash(entry.id.kind(), gix_object::Kind::Blob, blob); Ok((entry.id != file_hash).then_some(file_hash)) } } diff --git a/gix/Cargo.toml b/gix/Cargo.toml index e7f6a062ef1..e015d6542fe 100644 --- a/gix/Cargo.toml +++ b/gix/Cargo.toml @@ -149,7 +149,7 @@ gix-index = { version = "^0.16.1", path = "../gix-index" } gix-worktree = { version = "^0.17.1", path = "../gix-worktree" } gix-hashtable = { version = "^0.2.0", path = "../gix-hashtable" } -prodash = { version = "23.1", optional = true, default-features = false, features = ["progress-tree"] } +prodash = { version = "25.0", optional = true, default-features = false, features = ["progress-tree"] } once_cell = "1.14.0" signal-hook = { version = "0.3.9", default-features = false } thiserror = "1.0.26"