diff --git a/.travis.yml b/.travis.yml index 7bee622df..480e123c9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,7 +31,7 @@ script: cargo test --release --verbose --no-default-features && cargo build --verbose --features "$FEATURES" && cargo test --verbose --features "$FEATURES" && + CARGO_TARGET_DIR=target/ cargo test --manifest-path=parallel/Cargo.toml --verbose && CARGO_TARGET_DIR=target/ cargo test --manifest-path=serialization-tests/Cargo.toml --verbose && CARGO_TARGET_DIR=target/ cargo test --manifest-path=numeric-tests/Cargo.toml --verbose && - CARGO_TARGET_DIR=target/ cargo test --manifest-path=parallel/Cargo.toml --verbose && ([ "$BENCH" != 1 ] || cargo bench --no-run --verbose --features "$FEATURES") diff --git a/parallel/Cargo.toml b/parallel/Cargo.toml index ca80ff8c1..4fb0b33a0 100644 --- a/parallel/Cargo.toml +++ b/parallel/Cargo.toml @@ -13,7 +13,7 @@ keywords = ["data-structure", "multidimensional", "parallel", "concurrent"] categories = ["data-structures", "science", "concurrency"] [dependencies] -rayon = "0.6" +rayon = { version = "0.7.0" } ndarray = { version = "0.9.0-alpha.1", path = "../" } [dev-dependencies] diff --git a/parallel/README.rst b/parallel/README.rst index 9c01a2cff..d7785df5f 100644 --- a/parallel/README.rst +++ b/parallel/README.rst @@ -47,6 +47,12 @@ How to use with cargo:: Recent Changes (ndarray-parallel) --------------------------------- +- 0.3.0-alpha.1 + + - ParallelIterator for Zip, including ``.par_apply``. + - ``.par_map_inplace`` and ``.par_mav_inplace`` for arrays + - Require ndarray 0.9 (when released) and rayon 0.7 + - 0.2.0 - Require for ndarray 0.8 diff --git a/parallel/benches/rayon.rs b/parallel/benches/rayon.rs index 91044b5d3..ee27fa17b 100644 --- a/parallel/benches/rayon.rs +++ b/parallel/benches/rayon.rs @@ -12,13 +12,17 @@ extern crate ndarray_parallel; use ndarray::prelude::*; use ndarray_parallel::prelude::*; +use ndarray::Zip; + const EXP_N: usize = 128; +const ADDN: usize = 1024; use std::cmp::max; fn set_threads() { let n = max(1, num_cpus::get() / 2); - let cfg = rayon::Configuration::new().set_num_threads(n); + //println!("Using {} threads", n); + let cfg = rayon::Configuration::new().num_threads(n); let _ = rayon::initialize(cfg); } @@ -112,3 +116,42 @@ fn rayon_fastexp_by_axis(bench: &mut Bencher) .for_each(|mut sheet| sheet.mapv_inplace(fastexp)); }); } + +#[bench] +fn rayon_fastexp_zip(bench: &mut Bencher) +{ + set_threads(); + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + Zip::from(&mut a).into_par_iter().for_each(|(elt, )| *elt = fastexp(*elt)); + }); +} + +#[bench] +fn add(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((ADDN, ADDN)); + let b = Array2::::zeros((ADDN, ADDN)); + let c = Array2::::zeros((ADDN, ADDN)); + let d = Array2::::zeros((ADDN, ADDN)); + bench.iter(|| { + Zip::from(&mut a).and(&b).and(&c).and(&d).apply(|a, &b, &c, &d| { + *a += b.exp() + c.exp() + d.exp(); + }) + }); +} + +#[bench] +fn rayon_add(bench: &mut Bencher) +{ + set_threads(); + let mut a = Array2::::zeros((ADDN, ADDN)); + let b = Array2::::zeros((ADDN, ADDN)); + let c = Array2::::zeros((ADDN, ADDN)); + let d = Array2::::zeros((ADDN, ADDN)); + bench.iter(|| { + Zip::from(&mut a).and(&b).and(&c).and(&d).into_par_iter().for_each(|(a, &b, &c, &d)| { + *a += b.exp() + c.exp() + d.exp(); + }) + }); +} diff --git a/parallel/src/ext_traits.rs b/parallel/src/ext_traits.rs new file mode 100644 index 000000000..ad08848dd --- /dev/null +++ b/parallel/src/ext_traits.rs @@ -0,0 +1,83 @@ + +use ndarray::{ + Dimension, + NdProducer, + Zip, + ArrayBase, + DataMut, +}; + +use prelude::*; + +// Arrays + +/// Parallel versions of `map_inplace` and `mapv_inplace`. +pub trait ParMap { + type Item; + fn par_map_inplace(&mut self, f: F) + where F: Fn(&mut Self::Item) + Sync; + fn par_mapv_inplace(&mut self, f: F) + where F: Fn(Self::Item) -> Self::Item + Sync, + Self::Item: Clone; +} + +impl ParMap for ArrayBase + where S: DataMut, + D: Dimension, + A: Send + Sync, +{ + type Item = A; + fn par_map_inplace(&mut self, f: F) + where F: Fn(&mut Self::Item) + Sync + { + self.view_mut().into_par_iter().for_each(f) + } + fn par_mapv_inplace(&mut self, f: F) + where F: Fn(Self::Item) -> Self::Item + Sync, + Self::Item: Clone + { + self.view_mut().into_par_iter() + .for_each(move |x| *x = f(x.clone())) + } +} + + + + +// Zip + +macro_rules! zip_impl { + ($([$name:ident $($p:ident)*],)+) => { + $( + /// The `par_apply` method for `Zip`. + /// + /// This is a shorthand for using `.into_par_iter().for_each()` on + /// `Zip`. + pub trait $name<$($p),*> { + fn par_apply(self, function: F) + where F: Fn($($p),*) + Sync; + } + + #[allow(non_snake_case)] + impl),*> $name<$($p::Item),*> for Zip<($($p,)*), Dim> + where $($p::Item : Send , )* + $($p : Send , )* + { + fn par_apply(self, function: F) + where F: Fn($($p::Item),*) + Sync + { + self.into_par_iter().for_each(move |($($p,)*)| function($($p),*)) + } + } + )+ + } +} + +zip_impl!{ + [ParApply1 P1], + [ParApply2 P1 P2], + [ParApply3 P1 P2 P3], + [ParApply4 P1 P2 P3 P4], + [ParApply5 P1 P2 P3 P4 P5], + [ParApply6 P1 P2 P3 P4 P5 P6], +} diff --git a/parallel/src/into_traits.rs b/parallel/src/into_traits.rs index a87004e99..9abc630ba 100644 --- a/parallel/src/into_traits.rs +++ b/parallel/src/into_traits.rs @@ -1,5 +1,5 @@ -use rayon::par_iter::ParallelIterator; +use rayon::iter::ParallelIterator; pub trait NdarrayIntoParallelIterator { type Iter: ParallelIterator; diff --git a/parallel/src/lib.rs b/parallel/src/lib.rs index 9449aa5ff..c7a5f01e0 100644 --- a/parallel/src/lib.rs +++ b/parallel/src/lib.rs @@ -8,11 +8,17 @@ //! `.axis_iter()` and `.axis_iter_mut()` also have parallel counterparts, //! and their parallel iterators are indexed (and thus ordered) and exact length. //! +//! `Zip` also implements `NdarrayIntoParallelIterator`, and there is an +//! extension trait so that it can use a method `.par_apply` directly. +//! //! (*) This regime of a custom trait instead of rayon’s own is since we //! use this intermediate ndarray-parallel crate. //! //! # Examples //! +//! +//! ## Arrays and array views +//! //! Compute the exponential of each element in an array, parallelized. //! //! ``` @@ -24,10 +30,18 @@ //! //! fn main() { //! let mut a = Array2::::zeros((128, 128)); +//! +//! // Parallel versions of regular array methods (ParMap trait) +//! a.par_map_inplace(|x| *x = x.exp()); +//! a.par_mapv_inplace(f64::exp); +//! +//! // You can also use the parallel iterator directly //! a.par_iter_mut().for_each(|x| *x = x.exp()); //! } //! ``` //! +//! ## Axis iterators +//! //! Use the parallel `.axis_iter()` to compute the sum of each row. //! //! ``` @@ -49,10 +63,39 @@ //! assert_eq!(sums, [120., 376., 632., 888.]); //! } //! ``` +//! +//! ## Zip +//! +//! Use zip for lock step function application across several arrays +//! +//! ``` +//! extern crate ndarray; +//! extern crate ndarray_parallel; +//! +//! use ndarray::Array3; +//! use ndarray::Zip; +//! use ndarray_parallel::prelude::*; +//! +//! type Array3f64 = Array3; +//! +//! fn main() { +//! const N: usize = 128; +//! let a = Array3f64::from_elem((N, N, N), 1.); +//! let b = Array3f64::from_elem(a.dim(), 2.); +//! let mut c = Array3f64::zeros(a.dim()); +//! +//! Zip::from(&mut c) +//! .and(&a) +//! .and(&b) +//! .par_apply(|c, &a, &b| { +//! *c += a - b; +//! }); +//! } +//! ``` -extern crate ndarray; -extern crate rayon; +pub extern crate ndarray; +pub extern crate rayon; /// Into- traits for creating parallelized iterators. pub mod prelude { @@ -63,6 +106,16 @@ pub mod prelude { #[doc(no_inline)] pub use rayon::prelude::{ParallelIterator, IndexedParallelIterator, ExactParallelIterator}; + + pub use ext_traits::{ + ParApply1, + ParApply2, + ParApply3, + ParApply4, + ParApply5, + ParApply6, + }; + pub use ext_traits::ParMap; } pub use par::Parallel; @@ -73,5 +126,6 @@ pub use into_traits::{ }; mod par; +mod ext_traits; mod into_traits; mod into_impls; diff --git a/parallel/src/par.rs b/parallel/src/par.rs index 0afa2d5b4..7aecdfaea 100644 --- a/parallel/src/par.rs +++ b/parallel/src/par.rs @@ -1,14 +1,15 @@ -use rayon::par_iter::ParallelIterator; -use rayon::par_iter::IndexedParallelIterator; -use rayon::par_iter::ExactParallelIterator; -use rayon::par_iter::BoundedParallelIterator; -use rayon::par_iter::internal::{Consumer, UnindexedConsumer}; -use rayon::par_iter::internal::bridge; -use rayon::par_iter::internal::ProducerCallback; -use rayon::par_iter::internal::Producer; -use rayon::par_iter::internal::UnindexedProducer; -use rayon::par_iter::internal::bridge_unindexed; +use rayon::iter::ParallelIterator; +use rayon::iter::IndexedParallelIterator; +use rayon::iter::ExactParallelIterator; +use rayon::iter::BoundedParallelIterator; +use rayon::iter::internal::{Consumer, UnindexedConsumer}; +use rayon::iter::internal::bridge; +use rayon::iter::internal::ProducerCallback; +use rayon::iter::internal::Producer; +use rayon::iter::internal::UnindexedProducer; +use rayon::iter::internal::bridge_unindexed; +use rayon::iter::internal::Folder; use ndarray::iter::AxisIter; use ndarray::iter::AxisIterMut; @@ -110,9 +111,11 @@ macro_rules! par_iter_wrapper { where D: Dimension, A: $($thread_bounds)*, { - fn cost(&mut self, len: usize) -> f64 { - // FIXME: No idea about what this is - len as f64 + type IntoIter = $iter_name<'a, A, D>; + type Item = ::Item; + + fn into_iter(self) -> Self::IntoIter { + self.0 } fn split_at(self, i: usize) -> (Self, Self) { @@ -167,19 +170,18 @@ macro_rules! par_iter_view_wrapper { where D: Dimension, A: $($thread_bounds)*, { - fn can_split(&self) -> bool { - self.0.len() > 1 - } - - fn split(self) -> (Self, Self) { + type Item = <$view_name<'a, A, D> as IntoIterator>::Item; + fn split(self) -> (Self, Option) { + if self.0.len() <= 1 { + return (self, None) + } let array = self.0; let max_axis = array.max_stride_axis(); let mid = array.len_of(max_axis) / 2; let (a, b) = array.split_at(max_axis, mid); - (ParallelProducer(a), ParallelProducer(b)) + (ParallelProducer(a), Some(ParallelProducer(b))) } - #[cfg(rayon_fold_with)] fn fold_with(self, folder: F) -> F where F: Folder, { @@ -203,3 +205,82 @@ macro_rules! par_iter_view_wrapper { par_iter_view_wrapper!(ArrayView, [Sync]); par_iter_view_wrapper!(ArrayViewMut, [Sync + Send]); + + +use ndarray::{Zip, NdProducer, FoldWhile}; + +macro_rules! zip_impl { + ($([$($p:ident)*],)+) => { + $( + #[allow(non_snake_case)] + impl),*> NdarrayIntoParallelIterator for Zip<($($p,)*), Dim> + where $($p::Item : Send , )* + $($p : Send , )* + { + type Item = ($($p::Item ,)*); + type Iter = Parallel; + fn into_par_iter(self) -> Self::Iter { + Parallel { + iter: self, + } + } + } + + #[allow(non_snake_case)] + impl),*> ParallelIterator for Parallel> + where $($p::Item : Send , )* + $($p : Send , )* + { + type Item = ($($p::Item ,)*); + + fn drive_unindexed(self, consumer: Cons) -> Cons::Result + where Cons: UnindexedConsumer + { + bridge_unindexed(ParallelProducer(self.iter), consumer) + } + + fn opt_len(&mut self) -> Option { + None + } + } + + #[allow(non_snake_case)] + impl),*> UnindexedProducer for ParallelProducer> + where $($p : Send , )* + $($p::Item : Send , )* + { + type Item = ($($p::Item ,)*); + + fn split(self) -> (Self, Option) { + if self.0.size() <= 1 { + return (self, None) + } + let (a, b) = self.0.split(); + (ParallelProducer(a), Some(ParallelProducer(b))) + } + + fn fold_with(self, folder: Fold) -> Fold + where Fold: Folder, + { + self.0.fold_while(folder, |mut folder, $($p),*| { + folder = folder.consume(($($p ,)*)); + if folder.full() { + FoldWhile::Done(folder) + } else { + FoldWhile::Continue(folder) + } + }).into_inner() + } + } + )+ + } +} + +zip_impl!{ + [P1], + [P1 P2], + [P1 P2 P3], + [P1 P2 P3 P4], + [P1 P2 P3 P4 P5], + [P1 P2 P3 P4 P5 P6], +} diff --git a/parallel/tests/rayon.rs b/parallel/tests/rayon.rs index 543118f2b..4739e016d 100644 --- a/parallel/tests/rayon.rs +++ b/parallel/tests/rayon.rs @@ -16,7 +16,7 @@ fn test_axis_iter() { v.fill(i as _); } assert_eq!(a.axis_iter(Axis(0)).len(), M); - let s = a.axis_iter(Axis(0)).into_par_iter().map(|x| x.scalar_sum()).sum(); + let s: f64 = a.axis_iter(Axis(0)).into_par_iter().map(|x| x.scalar_sum()).sum(); println!("{:?}", a.slice(s![..10, ..5])); assert_eq!(s, a.scalar_sum()); } @@ -36,7 +36,7 @@ fn test_regular_iter() { for (i, mut v) in a.axis_iter_mut(Axis(0)).enumerate() { v.fill(i as _); } - let s = a.view().into_par_iter().map(|&x| x).sum(); + let s: f64 = a.view().into_par_iter().map(|&x| x).sum(); println!("{:?}", a.slice(s![..10, ..5])); assert_eq!(s, a.scalar_sum()); } diff --git a/parallel/tests/zip.rs b/parallel/tests/zip.rs new file mode 100644 index 000000000..cca0cacd5 --- /dev/null +++ b/parallel/tests/zip.rs @@ -0,0 +1,83 @@ + +extern crate ndarray; +extern crate ndarray_parallel; + +use ndarray::prelude::*; +use ndarray_parallel::prelude::*; + +use ndarray::Zip; + +const M: usize = 1024 * 10; +const N: usize = 100; + +#[test] +fn test_zip_1() { + let mut a = Array2::::zeros((M, N)); + + Zip::from(&mut a) + .par_apply(|x| { + *x = x.exp() + }); +} + +#[test] +fn test_zip_index_1() { + let mut a = Array2::default((10, 10)); + + Zip::indexed(&mut a) + .par_apply(|i, x| { + *x = i; + }); + + for (i, elt) in a.indexed_iter() { + assert_eq!(*elt, i); + } +} + +#[test] +fn test_zip_index_2() { + let mut a = Array2::default((M, N)); + + Zip::indexed(&mut a) + .par_apply(|i, x| { + *x = i; + }); + + for (i, elt) in a.indexed_iter() { + assert_eq!(*elt, i); + } +} + +#[test] +fn test_zip_index_3() { + let mut a = Array::default((1, 2, 1, 2, 3)); + + Zip::indexed(&mut a) + .par_apply(|i, x| { + *x = i; + }); + + for (i, elt) in a.indexed_iter() { + assert_eq!(*elt, i); + } +} + +#[test] +fn test_zip_index_4() { + let mut a = Array2::zeros((M, N)); + let mut b = Array2::zeros((M, N)); + + Zip::indexed(&mut a) + .and(&mut b) + .par_apply(|(i, j), x, y| { + *x = i; + *y = j; + }); + + for ((i, _), elt) in a.indexed_iter() { + assert_eq!(*elt, i); + } + for ((_, j), elt) in b.indexed_iter() { + assert_eq!(*elt, j); + } +}