diff --git a/.travis.yml b/.travis.yml index a9c2e8f8b..05b642267 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,4 +32,5 @@ script: cargo test --release --verbose --features "" && CARGO_TARGET_DIR=target/ cargo test --manifest-path=serialization-tests/Cargo.toml --verbose && CARGO_TARGET_DIR=target/ cargo test --manifest-path=numeric-tests/Cargo.toml --verbose && + CARGO_TARGET_DIR=target/ cargo test --manifest-path=parallel/Cargo.toml --verbose && ([ "$BENCH" != 1 ] || cargo bench --no-run --verbose --features "$FEATURES") diff --git a/parallel/Cargo.toml b/parallel/Cargo.toml new file mode 100644 index 000000000..a3c55bf39 --- /dev/null +++ b/parallel/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "ndarray-parallel" +version = "0.1.0" +authors = ["bluss"] +license = "MIT/Apache-2.0" + +repository = "https://github.com/bluss/rust-ndarray" +documentation = "https://docs.rs/ndarray-parallel/" + +description = "Parallelization for ndarray (using rayon)." + +keywords = ["data-structure", "multidimensional", "parallel", "concurrent"] + +[dependencies] +rayon = "0.6" +ndarray = { version = "0.7.2", path = "../" } + +[dev-dependencies] +num_cpus = "1.2" + diff --git a/parallel/README.rst b/parallel/README.rst new file mode 100644 index 000000000..8e7a50502 --- /dev/null +++ b/parallel/README.rst @@ -0,0 +1,65 @@ +ndarray-parallel +================ + +``ndarray-parallel`` integrates ndarray with rayon__ for simple parallelization. + +__ https://github.com/nikomatsakis/rayon +Please read the `API documentation here`__ + +__ http://docs.rs/ndarray-parallel/ + +|build_status|_ |crates|_ + +.. |build_status| image:: https://travis-ci.org/bluss/rust-ndarray.svg?branch=master +.. _build_status: https://travis-ci.org/bluss/rust-ndarray + +.. |crates| image:: http://meritbadge.herokuapp.com/ndarray-parallel +.. _crates: https://crates.io/crates/ndarray-parallel + +Highlights +---------- + +- Parallel elementwise (no order) iterator +- Parallel `.axis_iter()` (and `_mut`) + +Status and Lookout +------------------ + +- Still iterating on and evolving the crate + + + A separate crate is less convenient (doesn't use rayon IntoParallelIterator + trait, but a separate trait) but allows rapid iteration and we can follow + the evolution of rayon's internals. + This crate is double pace: For every ndarray or rayon major version, this + crate goes up one major version. + +- Performance: + + + TBD. Tell me about your experience. + + You'll need a big chunk of data (or an expensive operation per data point) + to gain from parallelization. + +How to use with cargo:: + + [dependencies] + ndarray-parallel = "0.1" + +Recent Changes (ndarray-parallel) +--------------------------------- + +- * + + - Not yet released + +License +======= + +Dual-licensed to be compatible with the Rust project. + +Licensed under the Apache License, Version 2.0 +http://www.apache.org/licenses/LICENSE-2.0 or the MIT license +http://opensource.org/licenses/MIT, at your +option. This file may not be copied, modified, or distributed +except according to those terms. + + diff --git a/parallel/benches/rayon.rs b/parallel/benches/rayon.rs new file mode 100644 index 000000000..91044b5d3 --- /dev/null +++ b/parallel/benches/rayon.rs @@ -0,0 +1,114 @@ + +#![feature(test)] + +extern crate num_cpus; +extern crate test; +use test::Bencher; + +extern crate rayon; +#[macro_use(s)] +extern crate ndarray; +extern crate ndarray_parallel; +use ndarray::prelude::*; +use ndarray_parallel::prelude::*; + +const EXP_N: usize = 128; + +use std::cmp::max; + +fn set_threads() { + let n = max(1, num_cpus::get() / 2); + let cfg = rayon::Configuration::new().set_num_threads(n); + let _ = rayon::initialize(cfg); +} + +#[bench] +fn map_exp_regular(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((EXP_N, EXP_N)); + a.swap_axes(0, 1); + bench.iter(|| { + a.mapv_inplace(|x| x.exp()); + }); +} + +#[bench] +fn rayon_exp_regular(bench: &mut Bencher) +{ + set_threads(); + let mut a = Array2::::zeros((EXP_N, EXP_N)); + a.swap_axes(0, 1); + bench.iter(|| { + a.view_mut().into_par_iter().for_each(|x| *x = x.exp()); + }); +} + +const FASTEXP: usize = 800; + +#[inline] +fn fastexp(x: f64) -> f64 { + let x = 1. + x/1024.; + x.powi(1024) +} + +#[bench] +fn map_fastexp_regular(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + a.mapv_inplace(|x| fastexp(x)) + }); +} + +#[bench] +fn rayon_fastexp_regular(bench: &mut Bencher) +{ + set_threads(); + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + a.view_mut().into_par_iter().for_each(|x| *x = fastexp(*x)); + }); +} + +#[bench] +fn map_fastexp_cut(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + let mut a = a.slice_mut(s![.., ..-1]); + bench.iter(|| { + a.mapv_inplace(|x| fastexp(x)) + }); +} + +#[bench] +fn rayon_fastexp_cut(bench: &mut Bencher) +{ + set_threads(); + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + let mut a = a.slice_mut(s![.., ..-1]); + bench.iter(|| { + a.view_mut().into_par_iter().for_each(|x| *x = fastexp(*x)); + }); +} + +#[bench] +fn map_fastexp_by_axis(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + for mut sheet in a.axis_iter_mut(Axis(0)) { + sheet.mapv_inplace(fastexp) + } + }); +} + +#[bench] +fn rayon_fastexp_by_axis(bench: &mut Bencher) +{ + set_threads(); + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + a.axis_iter_mut(Axis(0)).into_par_iter() + .for_each(|mut sheet| sheet.mapv_inplace(fastexp)); + }); +} diff --git a/parallel/src/into_impls.rs b/parallel/src/into_impls.rs new file mode 100644 index 000000000..082a4b960 --- /dev/null +++ b/parallel/src/into_impls.rs @@ -0,0 +1,50 @@ +use ndarray::{Array, RcArray, Dimension, ArrayView, ArrayViewMut}; + +use NdarrayIntoParallelIterator as IntoParallelIterator; +use Parallel; + +impl<'a, A, D> IntoParallelIterator for &'a Array + where D: Dimension, + A: Sync +{ + type Item = &'a A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view().into_par_iter() + } +} + +// This is allowed: goes through `.view()` +impl<'a, A, D> IntoParallelIterator for &'a RcArray + where D: Dimension, + A: Sync +{ + type Item = &'a A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view().into_par_iter() + } +} + +impl<'a, A, D> IntoParallelIterator for &'a mut Array + where D: Dimension, + A: Sync + Send +{ + type Item = &'a mut A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view_mut().into_par_iter() + } +} + +// This is allowed: goes through `.view_mut()`, which is unique access +impl<'a, A, D> IntoParallelIterator for &'a mut RcArray + where D: Dimension, + A: Sync + Send + Clone, +{ + type Item = &'a mut A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view_mut().into_par_iter() + } +} diff --git a/parallel/src/into_traits.rs b/parallel/src/into_traits.rs new file mode 100644 index 000000000..a87004e99 --- /dev/null +++ b/parallel/src/into_traits.rs @@ -0,0 +1,42 @@ + +use rayon::par_iter::ParallelIterator; + +pub trait NdarrayIntoParallelIterator { + type Iter: ParallelIterator; + type Item: Send; + fn into_par_iter(self) -> Self::Iter; +} + +pub trait NdarrayIntoParallelRefIterator<'x> { + type Iter: ParallelIterator; + type Item: Send + 'x; + fn par_iter(&'x self) -> Self::Iter; +} + +pub trait NdarrayIntoParallelRefMutIterator<'x> { + type Iter: ParallelIterator; + type Item: Send + 'x; + fn par_iter_mut(&'x mut self) -> Self::Iter; +} + +impl<'data, I: 'data + ?Sized> NdarrayIntoParallelRefIterator<'data> for I + where &'data I: NdarrayIntoParallelIterator +{ + type Iter = <&'data I as NdarrayIntoParallelIterator>::Iter; + type Item = <&'data I as NdarrayIntoParallelIterator>::Item; + + fn par_iter(&'data self) -> Self::Iter { + self.into_par_iter() + } +} + +impl<'data, I: 'data + ?Sized> NdarrayIntoParallelRefMutIterator<'data> for I + where &'data mut I: NdarrayIntoParallelIterator +{ + type Iter = <&'data mut I as NdarrayIntoParallelIterator>::Iter; + type Item = <&'data mut I as NdarrayIntoParallelIterator>::Item; + + fn par_iter_mut(&'data mut self) -> Self::Iter { + self.into_par_iter() + } +} diff --git a/parallel/src/lib.rs b/parallel/src/lib.rs new file mode 100644 index 000000000..f92a32082 --- /dev/null +++ b/parallel/src/lib.rs @@ -0,0 +1,76 @@ +//! Parallelization features for ndarray. +//! +//! The array views and references to owned arrays all implement +//! `NdarrayIntoParallelIterator` (*); the default parallel iterators (each element +//! by reference or mutable reference) have no ordering guarantee in their +//! parallel implementations. +//! +//! `.axis_iter()` and `.axis_iter_mut()` also have parallel counterparts. +//! +//! (*) This regime of a custom trait instead of rayon’s own is since we +//! use this intermediate ndarray-parallel crate. +//! +//! # Examples +//! +//! Compute the exponential of each element in an array, parallelized. +//! +//! ``` +//! extern crate ndarray; +//! extern crate ndarray_parallel; +//! +//! use ndarray::Array2; +//! use ndarray_parallel::prelude::*; +//! +//! fn main() { +//! let mut a = Array2::::zeros((128, 128)); +//! a.par_iter_mut().for_each(|x| *x = x.exp()); +//! } +//! ``` +//! +//! Use the parallel `.axis_iter()` to compute the sum of each row. +//! +//! ``` +//! extern crate ndarray; +//! extern crate ndarray_parallel; +//! +//! use ndarray::Array; +//! use ndarray::Axis; +//! use ndarray_parallel::prelude::*; +//! +//! fn main() { +//! let a = Array::linspace(0., 63., 64).into_shape((4, 16)).unwrap(); +//! let mut sums = Vec::new(); +//! a.axis_iter(Axis(0)) +//! .into_par_iter() +//! .map(|row| row.scalar_sum()) +//! .collect_into(&mut sums); +//! +//! assert_eq!(sums, [120., 376., 632., 888.]); +//! } +//! ``` + + +extern crate ndarray; +extern crate rayon; + +/// Into- traits for creating parallelized iterators. +pub mod prelude { + // happy and insane; ignorance is bluss + pub use NdarrayIntoParallelIterator; + pub use NdarrayIntoParallelRefIterator; + pub use NdarrayIntoParallelRefMutIterator; + + #[doc(no_inline)] + pub use rayon::prelude::{ParallelIterator, IndexedParallelIterator, ExactParallelIterator}; +} + +pub use par::Parallel; +pub use into_traits::{ + NdarrayIntoParallelIterator, + NdarrayIntoParallelRefIterator, + NdarrayIntoParallelRefMutIterator, +}; + +mod par; +mod into_traits; +mod into_impls; diff --git a/parallel/src/par.rs b/parallel/src/par.rs new file mode 100644 index 000000000..b28129f07 --- /dev/null +++ b/parallel/src/par.rs @@ -0,0 +1,208 @@ + +use rayon::par_iter::ParallelIterator; +use rayon::par_iter::IndexedParallelIterator; +use rayon::par_iter::ExactParallelIterator; +use rayon::par_iter::BoundedParallelIterator; +use rayon::par_iter::internal::{Consumer, UnindexedConsumer}; +use rayon::par_iter::internal::bridge; +use rayon::par_iter::internal::ProducerCallback; +use rayon::par_iter::internal::Producer; +use rayon::par_iter::internal::UnindexedProducer; +use rayon::par_iter::internal::bridge_unindexed; + +use ndarray::AxisIter; +use ndarray::AxisIterMut; +use ndarray::{Dimension}; +use ndarray::{ArrayView, ArrayViewMut}; + +use super::NdarrayIntoParallelIterator; + +/// Parallel iterator wrapper. +#[derive(Copy, Clone, Debug)] +pub struct Parallel { + iter: I, +} + +/// Parallel producer wrapper. +#[derive(Copy, Clone, Debug)] +struct ParallelProducer(I); + +macro_rules! par_iter_wrapper { + // thread_bounds are either Sync or Send + Sync + ($iter_name:ident, [$($thread_bounds:tt)*]) => { + impl<'a, A, D> NdarrayIntoParallelIterator for $iter_name<'a, A, D> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = ::Item; + type Iter = Parallel; + fn into_par_iter(self) -> Self::Iter { + Parallel { + iter: self, + } + } + } + + impl<'a, A, D> ParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = <$iter_name<'a, A, D> as Iterator>::Item; + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + bridge(self, consumer) + } + + fn opt_len(&mut self) -> Option { + Some(self.iter.len()) + } + } + + impl<'a, A, D> IndexedParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + fn with_producer(self, callback: Cb) -> Cb::Output + where Cb: ProducerCallback + { + callback.callback(ParallelProducer(self.iter)) + } + } + + impl<'a, A, D> ExactParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + fn len(&mut self) -> usize { + ExactSizeIterator::len(&self.iter) + } + } + + impl<'a, A, D> BoundedParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + fn upper_bound(&mut self) -> usize { + ExactSizeIterator::len(&self.iter) + } + + fn drive(self, consumer: C) -> C::Result + where C: Consumer + { + bridge(self, consumer) + } + } + + impl<'a, A, D> Iterator for ParallelProducer<$iter_name<'a, A, D>> + where D: Dimension, + { + type Item = <$iter_name<'a, A, D> as Iterator>::Item; + #[inline(always)] + fn next(&mut self) -> Option { + self.0.next() + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } + } + + // This is the real magic, I guess + impl<'a, A, D> Producer for ParallelProducer<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + fn cost(&mut self, len: usize) -> f64 { + // FIXME: No idea about what this is + len as f64 + } + + fn split_at(self, i: usize) -> (Self, Self) { + let (a, b) = self.0.split_at(i); + (ParallelProducer(a), ParallelProducer(b)) + } + } + + } +} + + +par_iter_wrapper!(AxisIter, [Sync]); +par_iter_wrapper!(AxisIterMut, [Send + Sync]); + + + +macro_rules! par_iter_view_wrapper { + // thread_bounds are either Sync or Send + Sync + ($view_name:ident, [$($thread_bounds:tt)*]) => { + impl<'a, A, D> NdarrayIntoParallelIterator for $view_name<'a, A, D> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = ::Item; + type Iter = Parallel; + fn into_par_iter(self) -> Self::Iter { + Parallel { + iter: self, + } + } + } + + + impl<'a, A, D> ParallelIterator for Parallel<$view_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = <$view_name<'a, A, D> as IntoIterator>::Item; + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + bridge_unindexed(ParallelProducer(self.iter), consumer) + } + + fn opt_len(&mut self) -> Option { + Some(self.iter.len()) + } + } + + impl<'a, A, D> UnindexedProducer for ParallelProducer<$view_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + fn can_split(&self) -> bool { + self.0.len() > 1 + } + + fn split(self) -> (Self, Self) { + let array = self.0; + let max_axis = array.max_stride_axis(); + let mid = array.len_of(max_axis) / 2; + let (a, b) = array.split_at(max_axis, mid); + (ParallelProducer(a), ParallelProducer(b)) + } + + #[cfg(rayon_fold_with)] + fn fold_with(self, folder: F) -> F + where F: Folder, + { + self.into_iter().fold(folder, move |f, elt| f.consume(elt)) + } + } + + impl<'a, A, D> IntoIterator for ParallelProducer<$view_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = <$view_name<'a, A, D> as IntoIterator>::Item; + type IntoIter = <$view_name<'a, A, D> as IntoIterator>::IntoIter; + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } + } + + } +} + +par_iter_view_wrapper!(ArrayView, [Sync]); +par_iter_view_wrapper!(ArrayViewMut, [Sync + Send]); diff --git a/parallel/tests/rayon.rs b/parallel/tests/rayon.rs new file mode 100644 index 000000000..543118f2b --- /dev/null +++ b/parallel/tests/rayon.rs @@ -0,0 +1,42 @@ + +extern crate rayon; +#[macro_use(s)] extern crate ndarray; +extern crate ndarray_parallel; + +use ndarray::prelude::*; +use ndarray_parallel::prelude::*; + +const M: usize = 1024 * 10; +const N: usize = 100; + +#[test] +fn test_axis_iter() { + let mut a = Array2::::zeros((M, N)); + for (i, mut v) in a.axis_iter_mut(Axis(0)).enumerate() { + v.fill(i as _); + } + assert_eq!(a.axis_iter(Axis(0)).len(), M); + let s = a.axis_iter(Axis(0)).into_par_iter().map(|x| x.scalar_sum()).sum(); + println!("{:?}", a.slice(s![..10, ..5])); + assert_eq!(s, a.scalar_sum()); +} + +#[test] +fn test_axis_iter_mut() { + let mut a = Array::linspace(0., 1.0f64, M * N).into_shape((M, N)).unwrap(); + let b = a.mapv(|x| x.exp()); + a.axis_iter_mut(Axis(0)).into_par_iter().for_each(|mut v| v.mapv_inplace(|x| x.exp())); + println!("{:?}", a.slice(s![..10, ..5])); + assert!(a.all_close(&b, 0.001)); +} + +#[test] +fn test_regular_iter() { + let mut a = Array2::::zeros((M, N)); + for (i, mut v) in a.axis_iter_mut(Axis(0)).enumerate() { + v.fill(i as _); + } + let s = a.view().into_par_iter().map(|&x| x).sum(); + println!("{:?}", a.slice(s![..10, ..5])); + assert_eq!(s, a.scalar_sum()); +}