Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions pylops_mpi/Distributed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from mpi4py import MPI
from pylops.utils import deps as pylops_deps # avoid namespace crashes with pylops_mpi.utils
from pylops_mpi.utils._mpi import mpi_allreduce, mpi_allgather, mpi_bcast, mpi_send, mpi_recv, _prepare_allgather_inputs, _unroll_allgather_recv
from pylops_mpi.utils import deps

cupy_message = pylops_deps.cupy_import("the DistributedArray module")
nccl_message = deps.nccl_import("the DistributedArray module")

if nccl_message is None and cupy_message is None:
from pylops_mpi.utils._nccl import (
nccl_allgather, nccl_allreduce, nccl_bcast, nccl_send, nccl_recv
)


class DistributedMixIn:
r"""Distributed Mixin class

This class implements all methods associated with communication primitives
from MPI and NCCL. It is mostly charged to identifying which commuicator
to use and whether the buffered or object MPI primitives should be used
(the former in the case of NumPy arrays or CuPy arrays when a CUDA-Aware
MPI installation is available, the latter with CuPy arrays when a CUDA-Aware
MPI installation is not available).
"""
def _allreduce(self, base_comm, base_comm_nccl,
send_buf, recv_buf=None, op: MPI.Op = MPI.SUM,
engine="numpy"):
"""Allreduce operation
"""
if deps.nccl_enabled and base_comm_nccl is not None:
return nccl_allreduce(base_comm_nccl, send_buf, recv_buf, op)
else:
return mpi_allreduce(base_comm, send_buf,
recv_buf, engine, op)

def _allreduce_subcomm(self, sub_comm, base_comm_nccl,
send_buf, recv_buf=None, op: MPI.Op = MPI.SUM,
engine="numpy"):
"""Allreduce operation with subcommunicator
"""
if deps.nccl_enabled and base_comm_nccl is not None:
return nccl_allreduce(sub_comm, send_buf, recv_buf, op)
else:
return mpi_allreduce(sub_comm, send_buf,
recv_buf, engine, op)

def _allgather(self, base_comm, base_comm_nccl,
send_buf, recv_buf=None,
engine="numpy"):
"""Allgather operation
"""
if deps.nccl_enabled and base_comm_nccl is not None:
if isinstance(send_buf, (tuple, list, int)):
return nccl_allgather(base_comm_nccl, send_buf, recv_buf)
else:
send_shapes = base_comm.allgather(send_buf.shape)
(padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy")
raw_recv = nccl_allgather(base_comm_nccl, padded_send, recv_buf if recv_buf else padded_recv)
return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes)
else:
if isinstance(send_buf, (tuple, list, int)):
return base_comm.allgather(send_buf)
return mpi_allgather(base_comm, send_buf, recv_buf, engine)

def _allgather_subcomm(self, send_buf, recv_buf=None):
"""Allgather operation with subcommunicator
"""
if deps.nccl_enabled and getattr(self, "base_comm_nccl"):
if isinstance(send_buf, (tuple, list, int)):
return nccl_allgather(self.sub_comm, send_buf, recv_buf)
else:
send_shapes = self._allgather_subcomm(send_buf.shape)
(padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy")
raw_recv = nccl_allgather(self.sub_comm, padded_send, recv_buf if recv_buf else padded_recv)
return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes)
else:
return mpi_allgather(self.sub_comm, send_buf, recv_buf, self.engine)

def _bcast(self, local_array, index, value):
"""BCast operation
"""
if deps.nccl_enabled and getattr(self, "base_comm_nccl"):
nccl_bcast(self.base_comm_nccl, local_array, index, value)
else:
# self.local_array[index] = self.base_comm.bcast(value)
mpi_bcast(self.base_comm, self.rank, self.local_array, index, value,
engine=self.engine)

def _send(self, send_buf, dest, count=None, tag=0):
"""Send operation
"""
if deps.nccl_enabled and self.base_comm_nccl:
if count is None:
count = send_buf.size
nccl_send(self.base_comm_nccl, send_buf, dest, count)
else:
mpi_send(self.base_comm,
send_buf, dest, count, tag=tag,
engine=self.engine)

def _recv(self, recv_buf=None, source=0, count=None, tag=0):
"""Receive operation
"""
if deps.nccl_enabled and self.base_comm_nccl:
if recv_buf is None:
raise ValueError("recv_buf must be supplied when using NCCL")
if count is None:
count = recv_buf.size
nccl_recv(self.base_comm_nccl, recv_buf, source, count)
return recv_buf
else:
return mpi_recv(self.base_comm,
recv_buf, source, count, tag=tag,
engine=self.engine)
158 changes: 49 additions & 109 deletions pylops_mpi/DistributedArray.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numpy as np
from mpi4py import MPI
from pylops_mpi.Distributed import DistributedMixIn
from pylops.utils import DTypeLike, NDArray
from pylops.utils import deps as pylops_deps # avoid namespace crashes with pylops_mpi.utils
from pylops.utils._internal import _value_or_sized_to_tuple
Expand All @@ -14,7 +15,7 @@
nccl_message = deps.nccl_import("the DistributedArray module")

if nccl_message is None and cupy_message is None:
from pylops_mpi.utils._nccl import nccl_allgather, nccl_allreduce, nccl_asarray, nccl_bcast, nccl_split, nccl_send, nccl_recv, _prepare_nccl_allgather_inputs, _unroll_nccl_allgather_recv
from pylops_mpi.utils._nccl import nccl_asarray, nccl_split
from cupy.cuda.nccl import NcclCommunicator
else:
NcclCommunicator = Any
Expand Down Expand Up @@ -99,7 +100,7 @@ def subcomm_split(mask, comm: Optional[Union[MPI.Comm, NcclCommunicatorType]] =
return sub_comm


class DistributedArray:
class DistributedArray(DistributedMixIn):
r"""Distributed Numpy Arrays

Multidimensional NumPy-like distributed arrays.
Expand Down Expand Up @@ -203,10 +204,7 @@ def __setitem__(self, index, value):
the specified index positions.
"""
if self.partition is Partition.BROADCAST:
if deps.nccl_enabled and getattr(self, "base_comm_nccl"):
nccl_bcast(self.base_comm_nccl, self.local_array, index, value)
else:
self.local_array[index] = self.base_comm.bcast(value)
self._bcast(self.local_array, index, value)
else:
self.local_array[index] = value

Expand Down Expand Up @@ -342,7 +340,9 @@ def local_shapes(self):
if deps.nccl_enabled and getattr(self, "base_comm_nccl"):
return self._nccl_local_shapes(False)
else:
return self._allgather(self.local_shape)
return self._allgather(self.base_comm,
self.base_comm_nccl,
self.local_shape)

@property
def sub_comm(self):
Expand Down Expand Up @@ -382,7 +382,10 @@ def asarray(self, masked: bool = False):
if masked:
final_array = self._allgather_subcomm(self.local_array)
else:
final_array = self._allgather(self.local_array)
final_array = self._allgather(self.base_comm,
self.base_comm_nccl,
self.local_array,
engine=self.engine)
return np.concatenate(final_array, axis=self.axis)

@classmethod
Expand Down Expand Up @@ -432,6 +435,7 @@ def to_dist(cls, x: NDArray,
else:
slices = [slice(None)] * x.ndim
local_shapes = np.append([0], dist_array._allgather(
base_comm, base_comm_nccl,
dist_array.local_shape[axis]))
sum_shapes = np.cumsum(local_shapes)
slices[axis] = slice(sum_shapes[dist_array.rank],
Expand Down Expand Up @@ -472,100 +476,16 @@ def _check_mask(self, dist_array):
if not np.array_equal(self.mask, dist_array.mask):
raise ValueError("Mask of both the arrays must be same")

def _allreduce(self, send_buf, recv_buf=None, op: MPI.Op = MPI.SUM):
"""Allreduce operation
"""
if deps.nccl_enabled and getattr(self, "base_comm_nccl"):
return nccl_allreduce(self.base_comm_nccl, send_buf, recv_buf, op)
else:
if recv_buf is None:
return self.base_comm.allreduce(send_buf, op)
# For MIN and MAX which require recv_buf
self.base_comm.Allreduce(send_buf, recv_buf, op)
return recv_buf

def _allreduce_subcomm(self, send_buf, recv_buf=None, op: MPI.Op = MPI.SUM):
"""Allreduce operation with subcommunicator
"""
if deps.nccl_enabled and getattr(self, "base_comm_nccl"):
return nccl_allreduce(self.sub_comm, send_buf, recv_buf, op)
else:
if recv_buf is None:
return self.sub_comm.allreduce(send_buf, op)
# For MIN and MAX which require recv_buf
self.sub_comm.Allreduce(send_buf, recv_buf, op)
return recv_buf

def _allgather(self, send_buf, recv_buf=None):
"""Allgather operation
"""
if deps.nccl_enabled and self.base_comm_nccl:
if isinstance(send_buf, (tuple, list, int)):
return nccl_allgather(self.base_comm_nccl, send_buf, recv_buf)
else:
send_shapes = self.base_comm.allgather(send_buf.shape)
(padded_send, padded_recv) = _prepare_nccl_allgather_inputs(send_buf, send_shapes)
raw_recv = nccl_allgather(self.base_comm_nccl, padded_send, recv_buf if recv_buf else padded_recv)
return _unroll_nccl_allgather_recv(raw_recv, padded_send.shape, send_shapes)
else:
if recv_buf is None:
return self.base_comm.allgather(send_buf)
self.base_comm.Allgather(send_buf, recv_buf)
return recv_buf

def _allgather_subcomm(self, send_buf, recv_buf=None):
"""Allgather operation with subcommunicator
"""
if deps.nccl_enabled and getattr(self, "base_comm_nccl"):
if isinstance(send_buf, (tuple, list, int)):
return nccl_allgather(self.sub_comm, send_buf, recv_buf)
else:
send_shapes = self._allgather_subcomm(send_buf.shape)
(padded_send, padded_recv) = _prepare_nccl_allgather_inputs(send_buf, send_shapes)
raw_recv = nccl_allgather(self.sub_comm, padded_send, recv_buf if recv_buf else padded_recv)
return _unroll_nccl_allgather_recv(raw_recv, padded_send.shape, send_shapes)
else:
if recv_buf is None:
return self.sub_comm.allgather(send_buf)
self.sub_comm.Allgather(send_buf, recv_buf)

def _send(self, send_buf, dest, count=None, tag=None):
""" Send operation
"""
if deps.nccl_enabled and self.base_comm_nccl:
if count is None:
# assuming sending the whole array
count = send_buf.size
nccl_send(self.base_comm_nccl, send_buf, dest, count)
else:
self.base_comm.send(send_buf, dest, tag)

def _recv(self, recv_buf=None, source=0, count=None, tag=None):
""" Receive operation
"""
# NCCL must be called with recv_buf. Size cannot be inferred from
# other arguments and thus cannot be dynamically allocated
if deps.nccl_enabled and self.base_comm_nccl and recv_buf is not None:
if recv_buf is not None:
if count is None:
# assuming data will take a space of the whole buffer
count = recv_buf.size
nccl_recv(self.base_comm_nccl, recv_buf, source, count)
return recv_buf
else:
raise ValueError("Using recv with NCCL must also supply receiver buffer ")
else:
# MPI allows a receiver buffer to be optional and receives as a Python Object
return self.base_comm.recv(source=source, tag=tag)

def _nccl_local_shapes(self, masked: bool):
"""Get the the list of shapes of every GPU in the communicator
"""
# gather tuple of shapes from every rank within thee communicator and copy from GPU to CPU
if masked:
all_tuples = self._allgather_subcomm(self.local_shape).get()
else:
all_tuples = self._allgather(self.local_shape).get()
all_tuples = self._allgather(self.base_comm,
self.base_comm_nccl,
self.local_shape).get()
# NCCL returns the flat array that packs every tuple as 1-dimensional array
# unpack each tuple from each rank
tuple_len = len(self.local_shape)
Expand Down Expand Up @@ -663,7 +583,9 @@ def dot(self, dist_array):
y = DistributedArray.to_dist(x=dist_array.local_array, base_comm=self.base_comm, base_comm_nccl=self.base_comm_nccl) \
if self.partition in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST] else dist_array
# Flatten the local arrays and calculate dot product
return self._allreduce_subcomm(ncp.dot(x.local_array.flatten(), y.local_array.flatten()))
return self._allreduce_subcomm(self.sub_comm, self.base_comm_nccl,
ncp.dot(x.local_array.flatten(), y.local_array.flatten()),
engine=self.engine)

def _compute_vector_norm(self, local_array: NDArray,
axis: int, ord: Optional[int] = None):
Expand Down Expand Up @@ -691,31 +613,49 @@ def _compute_vector_norm(self, local_array: NDArray,
raise ValueError(f"norm-{ord} not possible for vectors")
elif ord == 0:
# Count non-zero then sum reduction
recv_buf = self._allreduce_subcomm(ncp.count_nonzero(local_array, axis=axis).astype(ncp.float64))
recv_buf = self._allreduce_subcomm(self.sub_comm, self.base_comm_nccl,
ncp.count_nonzero(local_array, axis=axis).astype(ncp.float64),
engine=self.engine)
elif ord == ncp.inf:
# Calculate max followed by max reduction
# TODO (tharitt): currently CuPy + MPI does not work well with buffered communication, particularly
# CuPy + non-CUDA-aware MPI does not work well with buffered communication, particularly
# with MAX, MIN operator. Here we copy the array back to CPU, transfer, and copy them back to GPUs
send_buf = ncp.max(ncp.abs(local_array), axis=axis).astype(ncp.float64)
if self.engine == "cupy" and self.base_comm_nccl is None:
recv_buf = self._allreduce_subcomm(send_buf.get(), recv_buf.get(), op=MPI.MAX)
if self.engine == "cupy" and self.base_comm_nccl is None and not deps.cuda_aware_mpi_enabled:
# CuPy + non-CUDA-aware MPI: This will call non-buffered communication
# which return a list of object - must be copied back to a GPU memory.
recv_buf = self._allreduce_subcomm(self.sub_comm, self.base_comm_nccl,
send_buf.get(), recv_buf.get(),
op=MPI.MAX, engine=self.engine)
recv_buf = ncp.asarray(ncp.squeeze(recv_buf, axis=axis))
else:
recv_buf = self._allreduce_subcomm(send_buf, recv_buf, op=MPI.MAX)
recv_buf = ncp.squeeze(recv_buf, axis=axis)
recv_buf = self._allreduce_subcomm(self.sub_comm, self.base_comm_nccl,
send_buf, recv_buf, op=MPI.MAX,
engine=self.engine)
# TODO (tharitt): In current implementation, there seems to be a semantic difference between Buffered MPI and NCCL
# the (1, size) is collapsed to (size, ) with buffered MPI while NCCL retains it.
# There may be a way to unify it - may be something to do with how we allocate the recv_buf.
if self.base_comm_nccl:
recv_buf = ncp.squeeze(recv_buf, axis=axis)
elif ord == -ncp.inf:
# Calculate min followed by min reduction
# TODO (tharitt): see the comment above in infinity norm
# See the comment above in +infinity norm
send_buf = ncp.min(ncp.abs(local_array), axis=axis).astype(ncp.float64)
if self.engine == "cupy" and self.base_comm_nccl is None:
recv_buf = self._allreduce_subcomm(send_buf.get(), recv_buf.get(), op=MPI.MIN)
if self.engine == "cupy" and self.base_comm_nccl is None and not deps.cuda_aware_mpi_enabled:
recv_buf = self._allreduce_subcomm(self.sub_comm, self.base_comm_nccl,
send_buf.get(), recv_buf.get(),
op=MPI.MIN, engine=self.engine)
recv_buf = ncp.asarray(ncp.squeeze(recv_buf, axis=axis))
else:
recv_buf = self._allreduce_subcomm(send_buf, recv_buf, op=MPI.MIN)
recv_buf = ncp.asarray(ncp.squeeze(recv_buf, axis=axis))

recv_buf = self._allreduce_subcomm(self.sub_comm, self.base_comm_nccl,
send_buf, recv_buf,
op=MPI.MIN, engine=self.engine)
if self.base_comm_nccl:
recv_buf = ncp.asarray(ncp.squeeze(recv_buf, axis=axis))
else:
recv_buf = self._allreduce_subcomm(ncp.sum(ncp.abs(ncp.float_power(local_array, ord)), axis=axis))
recv_buf = self._allreduce_subcomm(self.sub_comm, self.base_comm_nccl,
ncp.sum(ncp.abs(ncp.float_power(local_array, ord)), axis=axis),
engine=self.engine)
recv_buf = ncp.power(recv_buf, 1.0 / ord)
return recv_buf

Expand Down
Loading