Skip to content

Commit de84a3a

Browse files
committed
parallel loading of entries right before reducing them (#293)
1 parent 6fea17d commit de84a3a

File tree

4 files changed

+133
-52
lines changed

4 files changed

+133
-52
lines changed

git-index/src/decode/entries.rs

+9-19
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::ops::Range;
22

33
use crate::{
4-
decode::{header, Error},
4+
decode::{self, header},
55
entry,
66
util::{read_u32, split_at_byte_exclusive, split_at_pos},
77
Entry, Version,
@@ -11,8 +11,6 @@ use crate::{
1111
pub const AVERAGE_V4_DELTA_PATH_LEN_IN_BYTES: usize = 80;
1212

1313
pub struct Outcome {
14-
pub entries: Vec<Entry>,
15-
pub path_backing: Vec<u8>,
1614
pub is_sparse: bool,
1715
}
1816

@@ -42,15 +40,14 @@ pub fn estimate_path_storage_requirements_in_bytes(
4240
}
4341

4442
/// Note that `data` must point to the beginning of the entries, right past the header.
45-
pub fn load_all(
46-
mut data: &[u8],
43+
pub fn load_chunk<'a>(
44+
mut data: &'a [u8],
45+
entries: &mut Vec<Entry>,
46+
path_backing: &mut Vec<u8>,
4747
num_entries: u32,
48-
path_backing_capacity: usize,
4948
object_hash: git_hash::Kind,
5049
version: Version,
51-
) -> Result<(Outcome, &[u8]), Error> {
52-
let mut path_backing = Vec::<u8>::with_capacity(path_backing_capacity);
53-
let mut entries = Vec::<Entry>::with_capacity(num_entries as usize);
50+
) -> Result<(Outcome, &'a [u8]), decode::Error> {
5451
let mut is_sparse = false;
5552
let has_delta_paths = version == Version::V4;
5653
let mut prev_path = None;
@@ -59,12 +56,12 @@ pub fn load_all(
5956
for idx in 0..num_entries {
6057
let (entry, remaining) = load_one(
6158
data,
62-
&mut path_backing,
59+
path_backing,
6360
object_hash.len_in_bytes(),
6461
has_delta_paths,
6562
prev_path,
6663
)
67-
.ok_or(Error::Entry(idx))?;
64+
.ok_or(decode::Error::Entry(idx))?;
6865

6966
data = remaining;
7067
if entry::mode::is_sparse(entry.stat.mode) {
@@ -77,14 +74,7 @@ pub fn load_all(
7774
prev_path = entries.last().map(|e| (e.path.clone(), &mut delta_buf));
7875
}
7976

80-
Ok((
81-
Outcome {
82-
entries,
83-
path_backing,
84-
is_sparse,
85-
},
86-
data,
87-
))
77+
Ok((Outcome { is_sparse }, data))
8878
}
8979

9080
/// Note that `prev_path` is only useful if the version is V4

git-index/src/decode/mod.rs

+106-26
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use filetime::FileTime;
22

3-
use crate::{extension, State};
3+
use crate::{extension, Entry, State, Version};
44

55
mod entries;
66
pub mod header;
@@ -49,13 +49,13 @@ impl State {
4949
Options {
5050
object_hash,
5151
thread_limit,
52-
min_extension_block_in_bytes_for_threading: _,
52+
min_extension_block_in_bytes_for_threading,
5353
}: Options,
5454
) -> Result<(Self, git_hash::ObjectId), Error> {
5555
let (version, num_entries, post_header_data) = header::decode(data, object_hash)?;
5656
let start_of_extensions = extension::end_of_index_entry::decode(data, object_hash);
5757

58-
let num_threads = git_features::parallel::num_threads(thread_limit);
58+
let mut num_threads = git_features::parallel::num_threads(thread_limit);
5959
let path_backing_buffer_size = entries::estimate_path_storage_requirements_in_bytes(
6060
num_entries,
6161
data.len(),
@@ -66,37 +66,82 @@ impl State {
6666

6767
let (entries, ext, data) = match start_of_extensions {
6868
Some(offset) if num_threads > 1 => {
69-
let start_of_extensions = &data[offset..];
70-
let index_offsets_table = extension::index_entry_offset_table::find(start_of_extensions, object_hash);
71-
let (entries_res, (ext, data)) = git_features::parallel::threads(|_scope| {
72-
match index_offsets_table {
69+
let extensions_data = &data[offset..];
70+
let index_offsets_table = extension::index_entry_offset_table::find(extensions_data, object_hash);
71+
let (entries_res, (ext, data)) = git_features::parallel::threads(|scope| {
72+
let extension_loading =
73+
(extensions_data.len() > min_extension_block_in_bytes_for_threading).then({
74+
num_threads -= 1;
75+
|| scope.spawn(|_| extension::decode::all(extensions_data, object_hash))
76+
});
77+
let entries_res = match index_offsets_table {
7378
Some(entry_offsets) => {
74-
dbg!(entry_offsets);
75-
todo!("threaded entry loading if its worth it")
79+
let chunk_size = (entry_offsets.len() as f32 / num_threads as f32).ceil() as usize;
80+
let num_chunks = entry_offsets.chunks(chunk_size).count();
81+
let mut threads = Vec::with_capacity(num_chunks);
82+
for (id, chunks) in entry_offsets.chunks(chunk_size).enumerate() {
83+
let chunks = chunks.to_vec();
84+
threads.push(scope.spawn(move |_| {
85+
let num_entries = chunks.iter().map(|c| c.num_entries).sum::<u32>() as usize;
86+
let mut entries = Vec::with_capacity(num_entries);
87+
let path_backing_buffer_size = entries::estimate_path_storage_requirements_in_bytes(
88+
num_entries as u32,
89+
data.len() / num_chunks,
90+
start_of_extensions.map(|ofs| ofs / num_chunks),
91+
object_hash,
92+
version,
93+
);
94+
let mut path_backing = Vec::with_capacity(path_backing_buffer_size);
95+
let mut is_sparse = false;
96+
for offset in chunks {
97+
let (
98+
entries::Outcome {
99+
is_sparse: chunk_is_sparse,
100+
},
101+
_data,
102+
) = entries::load_chunk(
103+
&data[offset.from_beginning_of_file as usize..],
104+
&mut entries,
105+
&mut path_backing,
106+
offset.num_entries,
107+
object_hash,
108+
version,
109+
)?;
110+
is_sparse |= chunk_is_sparse;
111+
}
112+
Ok::<_, Error>((
113+
id,
114+
EntriesOutcome {
115+
entries,
116+
path_backing,
117+
is_sparse,
118+
},
119+
))
120+
}));
121+
}
122+
todo!("combined thread results in order ")
76123
}
77-
None => {
78-
// TODO load all extensions in scoped, then get IEOT, then possibly multi-threaded entry parsing
79-
(
80-
entries::load_all(
81-
post_header_data,
82-
num_entries,
83-
path_backing_buffer_size,
84-
object_hash,
85-
version,
86-
),
87-
extension::decode::all(start_of_extensions, object_hash),
88-
)
89-
}
90-
}
124+
None => load_entries(
125+
post_header_data,
126+
path_backing_buffer_size,
127+
num_entries,
128+
object_hash,
129+
version,
130+
),
131+
};
132+
let ext_res = extension_loading
133+
.map(|thread| thread.join().unwrap())
134+
.unwrap_or_else(|| extension::decode::all(extensions_data, object_hash));
135+
(entries_res, ext_res)
91136
})
92137
.unwrap(); // this unwrap is for panics - if these happened we are done anyway.
93138
(entries_res?.0, ext, data)
94139
}
95140
None | Some(_) => {
96-
let (entries, data) = entries::load_all(
141+
let (entries, data) = load_entries(
97142
post_header_data,
98-
num_entries,
99143
path_backing_buffer_size,
144+
num_entries,
100145
object_hash,
101146
version,
102147
)?;
@@ -113,7 +158,7 @@ impl State {
113158
}
114159

115160
let checksum = git_hash::ObjectId::from(data);
116-
let entries::Outcome {
161+
let EntriesOutcome {
117162
entries,
118163
path_backing,
119164
is_sparse,
@@ -133,3 +178,38 @@ impl State {
133178
))
134179
}
135180
}
181+
182+
struct EntriesOutcome {
183+
pub entries: Vec<Entry>,
184+
pub path_backing: Vec<u8>,
185+
pub is_sparse: bool,
186+
}
187+
188+
fn load_entries(
189+
post_header_data: &[u8],
190+
path_backing_buffer_size: usize,
191+
num_entries: u32,
192+
object_hash: git_hash::Kind,
193+
version: Version,
194+
) -> Result<(EntriesOutcome, &[u8]), Error> {
195+
let mut entries = Vec::with_capacity(num_entries as usize);
196+
let mut path_backing = Vec::with_capacity(path_backing_buffer_size);
197+
entries::load_chunk(
198+
post_header_data,
199+
&mut entries,
200+
&mut path_backing,
201+
num_entries,
202+
object_hash,
203+
version,
204+
)
205+
.map(|(entries::Outcome { is_sparse }, data): (entries::Outcome, &[u8])| {
206+
(
207+
EntriesOutcome {
208+
entries,
209+
path_backing,
210+
is_sparse,
211+
},
212+
data,
213+
)
214+
})
215+
}

git-index/src/extension/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub(crate) mod end_of_index_entry;
3131
pub(crate) mod index_entry_offset_table {
3232
use crate::{extension, extension::Signature, util::read_u32};
3333

34-
#[derive(Debug)]
34+
#[derive(Debug, Clone, Copy)]
3535
pub struct Offset {
3636
pub from_beginning_of_file: u32,
3737
pub num_entries: u32,

git-index/tests/index/file/mod.rs

+17-6
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,28 @@ mod init {
55
fn file(name: &str) -> git_index::File {
66
git_index::File::at(crate::index::fixture_path(name), git_index::decode::Options::default()).unwrap()
77
}
8+
fn file_opt(name: &str, opts: git_index::decode::Options) -> git_index::File {
9+
git_index::File::at(crate::index::fixture_path(name), opts).unwrap()
10+
}
811

912
#[test]
1013
fn read_v2_with_single_entry_tree_and_eoie_ext() {
11-
let file = file("v2");
12-
assert_eq!(file.version(), Version::V2);
14+
let file_disallow_threaded_loading = file_opt(
15+
"v2",
16+
git_index::decode::Options {
17+
min_extension_block_in_bytes_for_threading: 100000,
18+
..Default::default()
19+
},
20+
);
21+
for file in [file("v2"), file_disallow_threaded_loading] {
22+
assert_eq!(file.version(), Version::V2);
1323

14-
assert_eq!(file.entries().len(), 1);
24+
assert_eq!(file.entries().len(), 1);
1525

16-
let entry = &file.entries()[0];
17-
assert_eq!(entry.id, hex_to_id("e69de29bb2d1d6434b8b29ae775ad8c2e48c5391"));
18-
assert_eq!(entry.path(&file.state), "a");
26+
let entry = &file.entries()[0];
27+
assert_eq!(entry.id, hex_to_id("e69de29bb2d1d6434b8b29ae775ad8c2e48c5391"));
28+
assert_eq!(entry.path(&file.state), "a");
29+
}
1930
}
2031

2132
#[test]

0 commit comments

Comments
 (0)