@@ -5,16 +5,8 @@ module DistributedArrays
55using Compat
66import Compat. view
77
8- if VERSION >= v " 0.5.0-dev+4340"
9- using Primes
10- using Primes: factor
11- end
12-
13- if VERSION < v " 0.5.0-"
14- typealias Future RemoteRef
15- typealias RemoteChannel RemoteRef
16- typealias AbstractSerializer SerializationState # On 0.4 fallback to the only concrete implementation
17- end
8+ using Primes
9+ using Primes: factor
1810
1911importall Base
2012import Base. Callable
@@ -195,40 +187,24 @@ function DArray(refs)
195187
196188 DArray (identity, refs, ndims, reshape (npids, dimdist), nindexes, ncuts)
197189end
198- if VERSION < v " 0.5.0-"
199- macro DArray (ex:: Expr )
200- if ex. head != = :comprehension
201- throw (ArgumentError (" invalid @DArray syntax" ))
202- end
203- ex. args[1 ] = esc (ex. args[1 ])
204- ndim = length (ex. args) - 1
205- ranges = map (r-> esc (r. args[2 ]), ex. args[2 : end ])
206- for d = 1 : ndim
207- var = ex. args[d+ 1 ]. args[1 ]
208- ex. args[d+ 1 ] = :( $ (esc (var)) = ($ (ranges[d]))[I[$ d]] )
209- end
210- return :( DArray ((I:: Tuple{Vararg{UnitRange{Int}}} )-> ($ ex),
211- tuple ($ (map (r-> :(length ($ r)), ranges)... ))) )
190+
191+ macro DArray (ex0:: Expr )
192+ if ex0. head != = :comprehension
193+ throw (ArgumentError (" invalid @DArray syntax" ))
212194 end
213- else
214- macro DArray (ex0:: Expr )
215- if ex0. head != = :comprehension
216- throw (ArgumentError (" invalid @DArray syntax" ))
217- end
218- ex = ex0. args[1 ]
219- if ex. head != = :generator
220- throw (ArgumentError (" invalid @DArray syntax" ))
221- end
222- ex. args[1 ] = esc (ex. args[1 ])
223- ndim = length (ex. args) - 1
224- ranges = map (r-> esc (r. args[2 ]), ex. args[2 : end ])
225- for d = 1 : ndim
226- var = ex. args[d+ 1 ]. args[1 ]
227- ex. args[d+ 1 ] = :( $ (esc (var)) = ($ (ranges[d]))[I[$ d]] )
228- end
229- return :( DArray ((I:: Tuple{Vararg{UnitRange{Int}}} )-> ($ ex0),
230- tuple ($ (map (r-> :(length ($ r)), ranges)... ))) )
195+ ex = ex0. args[1 ]
196+ if ex. head != = :generator
197+ throw (ArgumentError (" invalid @DArray syntax" ))
231198 end
199+ ex. args[1 ] = esc (ex. args[1 ])
200+ ndim = length (ex. args) - 1
201+ ranges = map (r-> esc (r. args[2 ]), ex. args[2 : end ])
202+ for d = 1 : ndim
203+ var = ex. args[d+ 1 ]. args[1 ]
204+ ex. args[d+ 1 ] = :( $ (esc (var)) = ($ (ranges[d]))[I[$ d]] )
205+ end
206+ return :( DArray ((I:: Tuple{Vararg{UnitRange{Int}}} )-> ($ ex0),
207+ tuple ($ (map (r-> :(length ($ r)), ranges)... ))) )
232208end
233209
234210# new DArray similar to an existing one
@@ -658,131 +634,130 @@ end
658634
659635# We also want to optimize setindex! with a SubDArray source, but this is hard
660636# and only works on 0.5.
661- if VERSION > v " 0.5.0-dev+5230"
662- # Similar to Base.indexin, but just create a logical mask. Note that this
663- # must return a logical mask in order to support merging multiple masks
664- # together into one linear index since we need to know how many elements to
665- # skip at the end. In many cases range intersection would be much faster
666- # than generating a logical mask, but that loses the endpoint information.
667- indexin_mask (a, b:: Number ) = a .== b
668- indexin_mask (a, r:: Range{Int} ) = [i in r for i in a]
669- indexin_mask (a, b:: AbstractArray{Int} ) = indexin_mask (a, IntSet (b))
670- indexin_mask (a, b:: AbstractArray ) = indexin_mask (a, Set (b))
671- indexin_mask (a, b) = [i in b for i in a]
672-
673- import Base: tail
674- # Given a tuple of indices and a tuple of masks, restrict the indices to the
675- # valid regions. This is, effectively, reversing Base.setindex_shape_check.
676- # We can't just use indexing into MergedIndices here because getindex is much
677- # pickier about singleton dimensions than setindex! is.
678- restrict_indices (:: Tuple{} , :: Tuple{} ) = ()
679- function restrict_indices (a:: Tuple{Any, Vararg{Any}} , b:: Tuple{Any, Vararg{Any}} )
680- if (length (a[1 ]) == length (b[1 ]) == 1 ) || (length (a[1 ]) > 1 && length (b[1 ]) > 1 )
681- (vec (a[1 ])[vec (b[1 ])], restrict_indices (tail (a), tail (b))... )
682- elseif length (a[1 ]) == 1
683- (a[1 ], restrict_indices (tail (a), b))
684- elseif length (b[1 ]) == 1 && b[1 ][1 ]
685- restrict_indices (a, tail (b))
686- else
687- throw (DimensionMismatch (" this should be caught by setindex_shape_check; please submit an issue" ))
688- end
689- end
690- # The final indices are funky - they're allowed to accumulate together.
691- # An easy (albeit very inefficient) fix for too many masks is to use the
692- # outer product to merge them. But we can do that lazily with a custom type:
693- function restrict_indices (a:: Tuple{Any} , b:: Tuple{Any, Any, Vararg{Any}} )
694- (vec (a[1 ])[vec (ProductIndices (b, map (length, b)))],)
695- end
696- # But too many indices is much harder; this requires merging the indices
697- # in `a` before applying the final mask in `b`.
698- function restrict_indices (a:: Tuple{Any, Any, Vararg{Any}} , b:: Tuple{Any} )
699- if length (a[1 ]) == 1
700- (a[1 ], restrict_indices (tail (a), b))
701- else
702- # When one mask spans multiple indices, we need to merge the indices
703- # together. At this point, we can just use indexing to merge them since
704- # there's no longer special handling of singleton dimensions
705- (view (MergedIndices (a, map (length, a)), b[1 ]),)
706- end
707- end
708637
709- immutable ProductIndices{I,N} <: AbstractArray{Bool, N}
710- indices:: I
711- sz:: NTuple{N,Int}
712- end
713- Base. size (P:: ProductIndices ) = P. sz
714- # This gets passed to map to avoid breaking propagation of inbounds
715- Base. @propagate_inbounds propagate_getindex (A, I... ) = A[I... ]
716- Base. @propagate_inbounds Base. getindex {_,N} (P:: ProductIndices{_,N} , I:: Vararg{Int, N} ) =
717- Bool ((& )(map (propagate_getindex, P. indices, I)... ))
718-
719- immutable MergedIndices{I,N} <: AbstractArray{CartesianIndex{N}, N}
720- indices:: I
721- sz:: NTuple{N,Int}
722- end
723- Base. size (M:: MergedIndices ) = M. sz
724- Base. @propagate_inbounds Base. getindex {_,N} (M:: MergedIndices{_,N} , I:: Vararg{Int, N} ) =
725- CartesianIndex (map (propagate_getindex, M. indices, I))
726- # Additionally, we optimize bounds checking when using MergedIndices as an
727- # array index since checking, e.g., A[1:500, 1:500] is *way* faster than
728- # checking an array of 500^2 elements of CartesianIndex{2}. This optimization
729- # also applies to reshapes of MergedIndices since the outer shape of the
730- # container doesn't affect the index elements themselves. We can go even
731- # farther and say that even restricted views of MergedIndices must be valid
732- # over the entire array. This is overly strict in general, but in this
733- # use-case all the merged indices must be valid at some point, so it's ok.
734- typealias ReshapedMergedIndices{T,N,M<: MergedIndices } Base. ReshapedArray{T,N,M}
735- typealias SubMergedIndices{T,N,M<: Union{MergedIndices, ReshapedMergedIndices} } SubArray{T,N,M}
736- typealias MergedIndicesOrSub Union{MergedIndices, ReshapedMergedIndices, SubMergedIndices}
737- import Base: checkbounds_indices
738- @inline checkbounds_indices (:: Type{Bool} , inds:: Tuple{} , I:: Tuple{MergedIndicesOrSub,Vararg{Any}} ) =
739- checkbounds_indices (Bool, inds, (parent (parent (I[1 ])). indices... , tail (I)... ))
740- @inline checkbounds_indices (:: Type{Bool} , inds:: Tuple{Any} , I:: Tuple{MergedIndicesOrSub,Vararg{Any}} ) =
741- checkbounds_indices (Bool, inds, (parent (parent (I[1 ])). indices... , tail (I)... ))
742- @inline checkbounds_indices (:: Type{Bool} , inds:: Tuple , I:: Tuple{MergedIndicesOrSub,Vararg{Any}} ) =
743- checkbounds_indices (Bool, inds, (parent (parent (I[1 ])). indices... , tail (I)... ))
744-
745- # The tricky thing here is that we want to optimize the accesses into the
746- # distributed array, but in doing so, we lose track of which indices in I we
747- # should be using.
748- #
749- # I’ve come to the conclusion that the function is utterly insane.
750- # There are *6* flavors of indices with four different reference points:
751- # 1. Find the indices of each portion of the DArray.
752- # 2. Find the valid subset of indices for the SubArray into that portion.
753- # 3. Find the portion of the `I` indices that should be used when you access the
754- # `K` indices in the subarray. This guy is nasty. It’s totally backwards
755- # from all other arrays, wherein we simply iterate over the source array’s
756- # elements. You need to *both* know which elements in `J` were skipped
757- # (`indexin_mask`) and which dimensions should match up (`restrict_indices`)
758- # 4. If `K` doesn’t correspond to an entire chunk, reinterpret `K` in terms of
759- # the local portion of the source array
760- function Base. setindex! (a:: Array , s:: SubDArray ,
761- I:: Union{UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}} ...)
762- Base. setindex_shape_check (s, Base. index_lengths (a, I... )... )
763- n = length (I)
764- d = s. parent
765- J = Base. decolon (d, s. indexes... )
766- @sync for i = 1 : length (d. pids)
767- K_c = d. indexes[i]
768- K = map (intersect, J, K_c)
769- if ! any (isempty, K)
770- K_mask = map (indexin_mask, J, K_c)
771- idxs = restrict_indices (Base. decolon (a, I... ), K_mask)
772- if isequal (K, K_c)
773- # whole chunk
774- @async a[idxs... ] = chunk (d, i)
775- else
776- # partial chunk
777- @async a[idxs... ] =
778- remotecall_fetch (d. pids[i]) do
779- view (localpart (d), [K[j]- first (K_c[j])+ 1 for j= 1 : length (J)]. .. )
780- end
781- end
638+ # Similar to Base.indexin, but just create a logical mask. Note that this
639+ # must return a logical mask in order to support merging multiple masks
640+ # together into one linear index since we need to know how many elements to
641+ # skip at the end. In many cases range intersection would be much faster
642+ # than generating a logical mask, but that loses the endpoint information.
643+ indexin_mask (a, b:: Number ) = a .== b
644+ indexin_mask (a, r:: Range{Int} ) = [i in r for i in a]
645+ indexin_mask (a, b:: AbstractArray{Int} ) = indexin_mask (a, IntSet (b))
646+ indexin_mask (a, b:: AbstractArray ) = indexin_mask (a, Set (b))
647+ indexin_mask (a, b) = [i in b for i in a]
648+
649+ import Base: tail
650+ # Given a tuple of indices and a tuple of masks, restrict the indices to the
651+ # valid regions. This is, effectively, reversing Base.setindex_shape_check.
652+ # We can't just use indexing into MergedIndices here because getindex is much
653+ # pickier about singleton dimensions than setindex! is.
654+ restrict_indices (:: Tuple{} , :: Tuple{} ) = ()
655+ function restrict_indices (a:: Tuple{Any, Vararg{Any}} , b:: Tuple{Any, Vararg{Any}} )
656+ if (length (a[1 ]) == length (b[1 ]) == 1 ) || (length (a[1 ]) > 1 && length (b[1 ]) > 1 )
657+ (vec (a[1 ])[vec (b[1 ])], restrict_indices (tail (a), tail (b))... )
658+ elseif length (a[1 ]) == 1
659+ (a[1 ], restrict_indices (tail (a), b))
660+ elseif length (b[1 ]) == 1 && b[1 ][1 ]
661+ restrict_indices (a, tail (b))
662+ else
663+ throw (DimensionMismatch (" this should be caught by setindex_shape_check; please submit an issue" ))
664+ end
665+ end
666+ # The final indices are funky - they're allowed to accumulate together.
667+ # An easy (albeit very inefficient) fix for too many masks is to use the
668+ # outer product to merge them. But we can do that lazily with a custom type:
669+ function restrict_indices (a:: Tuple{Any} , b:: Tuple{Any, Any, Vararg{Any}} )
670+ (vec (a[1 ])[vec (ProductIndices (b, map (length, b)))],)
671+ end
672+ # But too many indices is much harder; this requires merging the indices
673+ # in `a` before applying the final mask in `b`.
674+ function restrict_indices (a:: Tuple{Any, Any, Vararg{Any}} , b:: Tuple{Any} )
675+ if length (a[1 ]) == 1
676+ (a[1 ], restrict_indices (tail (a), b))
677+ else
678+ # When one mask spans multiple indices, we need to merge the indices
679+ # together. At this point, we can just use indexing to merge them since
680+ # there's no longer special handling of singleton dimensions
681+ (view (MergedIndices (a, map (length, a)), b[1 ]),)
682+ end
683+ end
684+
685+ immutable ProductIndices{I,N} <: AbstractArray{Bool, N}
686+ indices:: I
687+ sz:: NTuple{N,Int}
688+ end
689+ Base. size (P:: ProductIndices ) = P. sz
690+ # This gets passed to map to avoid breaking propagation of inbounds
691+ Base. @propagate_inbounds propagate_getindex (A, I... ) = A[I... ]
692+ Base. @propagate_inbounds Base. getindex {_,N} (P:: ProductIndices{_,N} , I:: Vararg{Int, N} ) =
693+ Bool ((& )(map (propagate_getindex, P. indices, I)... ))
694+
695+ immutable MergedIndices{I,N} <: AbstractArray{CartesianIndex{N}, N}
696+ indices:: I
697+ sz:: NTuple{N,Int}
698+ end
699+ Base. size (M:: MergedIndices ) = M. sz
700+ Base. @propagate_inbounds Base. getindex {_,N} (M:: MergedIndices{_,N} , I:: Vararg{Int, N} ) =
701+ CartesianIndex (map (propagate_getindex, M. indices, I))
702+ # Additionally, we optimize bounds checking when using MergedIndices as an
703+ # array index since checking, e.g., A[1:500, 1:500] is *way* faster than
704+ # checking an array of 500^2 elements of CartesianIndex{2}. This optimization
705+ # also applies to reshapes of MergedIndices since the outer shape of the
706+ # container doesn't affect the index elements themselves. We can go even
707+ # farther and say that even restricted views of MergedIndices must be valid
708+ # over the entire array. This is overly strict in general, but in this
709+ # use-case all the merged indices must be valid at some point, so it's ok.
710+ typealias ReshapedMergedIndices{T,N,M<: MergedIndices } Base. ReshapedArray{T,N,M}
711+ typealias SubMergedIndices{T,N,M<: Union{MergedIndices, ReshapedMergedIndices} } SubArray{T,N,M}
712+ typealias MergedIndicesOrSub Union{MergedIndices, ReshapedMergedIndices, SubMergedIndices}
713+ import Base: checkbounds_indices
714+ @inline checkbounds_indices (:: Type{Bool} , inds:: Tuple{} , I:: Tuple{MergedIndicesOrSub,Vararg{Any}} ) =
715+ checkbounds_indices (Bool, inds, (parent (parent (I[1 ])). indices... , tail (I)... ))
716+ @inline checkbounds_indices (:: Type{Bool} , inds:: Tuple{Any} , I:: Tuple{MergedIndicesOrSub,Vararg{Any}} ) =
717+ checkbounds_indices (Bool, inds, (parent (parent (I[1 ])). indices... , tail (I)... ))
718+ @inline checkbounds_indices (:: Type{Bool} , inds:: Tuple , I:: Tuple{MergedIndicesOrSub,Vararg{Any}} ) =
719+ checkbounds_indices (Bool, inds, (parent (parent (I[1 ])). indices... , tail (I)... ))
720+
721+ # The tricky thing here is that we want to optimize the accesses into the
722+ # distributed array, but in doing so, we lose track of which indices in I we
723+ # should be using.
724+ #
725+ # I’ve come to the conclusion that the function is utterly insane.
726+ # There are *6* flavors of indices with four different reference points:
727+ # 1. Find the indices of each portion of the DArray.
728+ # 2. Find the valid subset of indices for the SubArray into that portion.
729+ # 3. Find the portion of the `I` indices that should be used when you access the
730+ # `K` indices in the subarray. This guy is nasty. It’s totally backwards
731+ # from all other arrays, wherein we simply iterate over the source array’s
732+ # elements. You need to *both* know which elements in `J` were skipped
733+ # (`indexin_mask`) and which dimensions should match up (`restrict_indices`)
734+ # 4. If `K` doesn’t correspond to an entire chunk, reinterpret `K` in terms of
735+ # the local portion of the source array
736+ function Base. setindex! (a:: Array , s:: SubDArray ,
737+ I:: Union{UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}} ...)
738+ Base. setindex_shape_check (s, Base. index_lengths (a, I... )... )
739+ n = length (I)
740+ d = s. parent
741+ J = Base. decolon (d, s. indexes... )
742+ @sync for i = 1 : length (d. pids)
743+ K_c = d. indexes[i]
744+ K = map (intersect, J, K_c)
745+ if ! any (isempty, K)
746+ K_mask = map (indexin_mask, J, K_c)
747+ idxs = restrict_indices (Base. decolon (a, I... ), K_mask)
748+ if isequal (K, K_c)
749+ # whole chunk
750+ @async a[idxs... ] = chunk (d, i)
751+ else
752+ # partial chunk
753+ @async a[idxs... ] =
754+ remotecall_fetch (d. pids[i]) do
755+ view (localpart (d), [K[j]- first (K_c[j])+ 1 for j= 1 : length (J)]. .. )
756+ end
782757 end
783758 end
784- return a
785759 end
760+ return a
786761end
787762
788763Base. fill! (A:: DArray , x) = begin
@@ -1494,16 +1469,7 @@ function compute_boundaries{T}(d::DVector{T}; kwargs...)
14941469 np = length (pids)
14951470 sample_sz_on_wrkr = 512
14961471
1497- if VERSION < v " 0.5.0-"
1498- results = Array (Any,np)
1499- @sync begin
1500- for (i,p) in enumerate (pids)
1501- @async results[i] = remotecall_fetch (sample_n_setup_ref, p, d, sample_sz_on_wrkr; kwargs... )
1502- end
1503- end
1504- else
1505- results = asyncmap (p -> remotecall_fetch (sample_n_setup_ref, p, d, sample_sz_on_wrkr; kwargs... ), pids)
1506- end
1472+ results = asyncmap (p -> remotecall_fetch (sample_n_setup_ref, p, d, sample_sz_on_wrkr; kwargs... ), pids)
15071473
15081474 samples = Array (T,0 )
15091475 for x in results
@@ -1554,14 +1520,7 @@ function Base.sort{T}(d::DVector{T}; sample=true, kwargs...)
15541520
15551521 elseif sample== false
15561522 # Assume an uniform distribution between min and max values
1557- if VERSION < v " 0.5.0-"
1558- minmax= Array (Tuple, np)
1559- @sync for (i,p) in enumerate (pids)
1560- @async minmax[i] = remotecall_fetch (d-> (minimum (localpart (d)), maximum (localpart (d))), p, d)
1561- end
1562- else
1563- minmax= asyncmap (p-> remotecall_fetch (d-> (minimum (localpart (d)), maximum (localpart (d))), p, d), pids)
1564- end
1523+ minmax= asyncmap (p-> remotecall_fetch (d-> (minimum (localpart (d)), maximum (localpart (d))), p, d), pids)
15651524 min_d = minimum (T[x[1 ] for x in minmax])
15661525 max_d = maximum (T[x[2 ] for x in minmax])
15671526
@@ -1602,19 +1561,10 @@ function Base.sort{T}(d::DVector{T}; sample=true, kwargs...)
16021561 end
16031562
16041563 local_sort_results = Array (Tuple, np)
1605- if VERSION < v " 0.5.0-"
1606- @sync begin
1607- for (i,p) in enumerate (pids)
1608- @async local_sort_results[i] =
1609- remotecall_fetch (
1610- scatter_n_sort_localparts, p, presorted ? nothing : d, i, refs, boundaries; kwargs... )
1611- end
1612- end
1613- else
1614- Base. asyncmap! ((i,p) -> remotecall_fetch (
1564+
1565+ Base. asyncmap! ((i,p) -> remotecall_fetch (
16151566 scatter_n_sort_localparts, p, presorted ? nothing : d, i, refs, boundaries; kwargs... ),
16161567 local_sort_results, 1 : np, pids)
1617- end
16181568
16191569 # Construct a new DArray from the sorted refs. Remove parts with 0-length since
16201570 # the DArray constructor_from_refs does not yet support it. This implies that
0 commit comments