Skip to content

Commit cb7e4e7

Browse files
committed
feat: Add InOrderIter to 'parallel' module (#293)
This iterator makes possible identifies results using a sequence id and returns only consecutive items. Use it to collect unordered results produced by threads. It's advantage to collecting yourself and sorting is the potential for a smaller memory footprint of in-flight results, one doesn't have to collect them all for ordering, necessarily.
1 parent de84a3a commit cb7e4e7

File tree

4 files changed

+144
-0
lines changed

4 files changed

+144
-0
lines changed

git-features/src/parallel/in_order.rs

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
use std::{cmp::Ordering, collections::BTreeMap};
2+
3+
/// A counter for chunks to be able to put them back into original order later.
4+
pub type ChunkId = usize;
5+
6+
/// An iterator which olds iterated items with a **sequential** ID starting at 0 long enough to dispense them in order.
7+
///
8+
/// Note that this iterator is made specifically to support the signature of the iterator returned
9+
/// by [from_counts_iter(…)][super::entry::iter_from_counts()].
10+
pub struct InOrderIter<T, I> {
11+
/// The iterator yielding the out-of-order elements we are to yield in order.
12+
pub inner: I,
13+
store: BTreeMap<ChunkId, T>,
14+
next_chunk: ChunkId,
15+
is_done: bool,
16+
}
17+
18+
impl<T, E, I> From<I> for InOrderIter<T, I>
19+
where
20+
I: Iterator<Item = Result<(ChunkId, T), E>>,
21+
{
22+
fn from(iter: I) -> Self {
23+
InOrderIter {
24+
inner: iter,
25+
store: Default::default(),
26+
next_chunk: 0,
27+
is_done: false,
28+
}
29+
}
30+
}
31+
32+
impl<T, E, I> Iterator for InOrderIter<T, I>
33+
where
34+
I: Iterator<Item = Result<(ChunkId, T), E>>,
35+
{
36+
type Item = Result<T, E>;
37+
38+
fn next(&mut self) -> Option<Self::Item> {
39+
if self.is_done {
40+
return None;
41+
}
42+
'find_next_in_sequence: loop {
43+
match self.inner.next() {
44+
Some(Ok((c, v))) => match c.cmp(&self.next_chunk) {
45+
Ordering::Equal => {
46+
self.next_chunk += 1;
47+
return Some(Ok(v));
48+
}
49+
Ordering::Less => {
50+
unreachable!("in a correctly ordered sequence we can never see keys again, got {}", c)
51+
}
52+
Ordering::Greater => {
53+
let previous = self.store.insert(c, v);
54+
assert!(
55+
previous.is_none(),
56+
"Chunks are returned only once, input is an invalid sequence"
57+
);
58+
if let Some(v) = self.store.remove(&self.next_chunk) {
59+
self.next_chunk += 1;
60+
return Some(Ok(v));
61+
}
62+
continue 'find_next_in_sequence;
63+
}
64+
},
65+
Some(Err(e)) => {
66+
self.is_done = true;
67+
self.store.clear();
68+
return Some(Err(e));
69+
}
70+
None => match self.store.remove(&self.next_chunk) {
71+
Some(v) => {
72+
self.next_chunk += 1;
73+
return Some(Ok(v));
74+
}
75+
None => {
76+
debug_assert!(
77+
self.store.is_empty(),
78+
"When iteration is done we should not have stored items left"
79+
);
80+
return None;
81+
}
82+
},
83+
}
84+
}
85+
}
86+
}

git-features/src/parallel/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ mod serial;
4141
#[cfg(not(feature = "parallel"))]
4242
pub use serial::{in_parallel, join, threads};
4343

44+
mod in_order;
45+
pub use in_order::InOrderIter;
46+
4447
mod eager_iter;
4548
pub use eager_iter::{EagerIter, EagerIterIf};
4649

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use git_features::parallel::InOrderIter;
2+
use std::convert::Infallible;
3+
4+
#[test]
5+
fn in_order_stays_in_order() {
6+
assert_eq!(
7+
InOrderIter::from(vec![Ok::<_, Infallible>((0usize, 'a')), Ok((1, 'b')), Ok((2, 'c'))].into_iter())
8+
.collect::<Result<Vec<_>, _>>()
9+
.expect("infallible"),
10+
vec!['a', 'b', 'c']
11+
)
12+
}
13+
14+
#[test]
15+
fn out_of_order_items_are_held_until_the_sequence_is_complete() {
16+
assert_eq!(
17+
InOrderIter::from(
18+
vec![
19+
Ok::<_, Infallible>((2usize, 'c')),
20+
Ok((1, 'b')),
21+
Ok((0, 'a')),
22+
Ok((3, 'd'))
23+
]
24+
.into_iter()
25+
)
26+
.collect::<Result<Vec<_>, _>>()
27+
.expect("infallible"),
28+
vec!['a', 'b', 'c', 'd']
29+
)
30+
}
31+
32+
#[test]
33+
fn in_sequence_errors_immediately_trigger_a_fuse() {
34+
let mut iter = InOrderIter::from(vec![Ok::<_, &'static str>((0usize, 'a')), Err("err"), Ok((1, 'b'))].into_iter());
35+
assert_eq!(iter.next(), Some(Ok('a')));
36+
assert_eq!(iter.next(), Some(Err("err")));
37+
assert_eq!(
38+
iter.next(),
39+
None,
40+
"fuse should have triggered so we don't see anything else"
41+
);
42+
}
43+
44+
#[test]
45+
fn out_of_sequence_errors_immediately_trigger_a_fuse() {
46+
let mut iter = InOrderIter::from(vec![Ok::<_, &'static str>((1usize, 'b')), Err("err"), Ok((0, 'a'))].into_iter());
47+
assert_eq!(iter.next(), Some(Err("err")));
48+
assert_eq!(
49+
iter.next(),
50+
None,
51+
"fuse should have triggered so we don't see anything else"
52+
);
53+
}

git-features/tests/parallel/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
//! Tests that are working similarly in parallel and serial mode
22
use git_features::parallel;
33

4+
mod in_order_iter;
5+
46
#[derive(Default)]
57
struct Adder {
68
count: usize,

0 commit comments

Comments
 (0)