Skip to content

Commit ca095ed

Browse files
committed
feat: Make a scope-like abstraction available (#293)
This allows more delicate threading control like is required for the index.
1 parent a22cb0f commit ca095ed

File tree

3 files changed

+69
-5
lines changed

3 files changed

+69
-5
lines changed

git-features/src/parallel/in_parallel.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,17 @@ pub fn join<O1: Send, O2: Send>(left: impl FnOnce() -> O1 + Send, right: impl Fn
1010
.unwrap()
1111
}
1212

13+
/// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
14+
/// That way it's possible to handle threads without needing the 'static lifetime for data they interact with.
15+
///
16+
/// Note that the threads should not rely on actual parallelism as threading might be turned off entirely.
17+
pub fn threads<'env, F, R>(f: F) -> std::thread::Result<R>
18+
where
19+
F: FnOnce(&crossbeam_utils::thread::Scope<'env>) -> R,
20+
{
21+
crossbeam_utils::thread::scope(f)
22+
}
23+
1324
/// Read items from `input` and `consume` them in multiple threads,
1425
/// whose output output is collected by a `reducer`. Its task is to
1526
/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.

git-features/src/parallel/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@
3535
#[cfg(feature = "parallel")]
3636
mod in_parallel;
3737
#[cfg(feature = "parallel")]
38-
pub use in_parallel::{in_parallel, join};
38+
pub use in_parallel::{in_parallel, join, threads};
3939

4040
mod serial;
4141
#[cfg(not(feature = "parallel"))]
42-
pub use serial::{in_parallel, join};
42+
pub use serial::{in_parallel, join, threads};
4343

4444
mod eager_iter;
4545
pub use eager_iter::{EagerIter, EagerIterIf};

git-features/src/parallel/serial.rs

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,63 @@
11
use crate::parallel::Reduce;
22

3-
/// Runs `left` and then `right`, one after another, returning their output when both are done.
43
#[cfg(not(feature = "parallel"))]
5-
pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
6-
(left(), right())
4+
mod not_parallel {
5+
/// Runs `left` and then `right`, one after another, returning their output when both are done.
6+
pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
7+
(left(), right())
8+
}
9+
10+
/// A scope for spawning threads.
11+
pub struct Scope<'env> {
12+
_marker: std::marker::PhantomData<&'env mut &'env ()>,
13+
}
14+
15+
#[allow(unsafe_code)]
16+
unsafe impl Sync for Scope<'_> {}
17+
18+
impl<'env> Scope<'env> {
19+
pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
20+
where
21+
F: FnOnce(&Scope<'env>) -> T,
22+
F: Send + 'env,
23+
T: Send + 'env,
24+
{
25+
ScopedJoinHandle {
26+
result: f(self),
27+
_marker: Default::default(),
28+
}
29+
}
30+
}
31+
32+
/// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
33+
/// Note that this implementation will run the spawned functions immediately.
34+
pub fn threads<'env, F, R>(f: F) -> std::thread::Result<R>
35+
where
36+
F: FnOnce(&Scope<'env>) -> R,
37+
{
38+
Ok(f(&Scope {
39+
_marker: Default::default(),
40+
}))
41+
}
42+
43+
/// A handle that can be used to join its scoped thread.
44+
///
45+
/// This struct is created by the [`Scope::spawn`] method and the
46+
/// [`ScopedThreadBuilder::spawn`] method.
47+
pub struct ScopedJoinHandle<'scope, T> {
48+
/// Holds the result of the inner closure.
49+
result: T,
50+
_marker: std::marker::PhantomData<&'scope mut &'scope ()>,
51+
}
52+
53+
impl<T> ScopedJoinHandle<'_, T> {
54+
pub fn join(self) -> std::thread::Result<T> {
55+
Ok(self.result)
56+
}
57+
}
758
}
59+
#[cfg(not(feature = "parallel"))]
60+
pub use not_parallel::{join, threads, Scope, ScopedJoinHandle};
861

962
/// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`,
1063
/// whose task is to aggregate these outputs into the final result returned by this function.

0 commit comments

Comments
 (0)