diff --git a/pylops_mpi/Distributed.py b/pylops_mpi/Distributed.py new file mode 100644 index 00000000..7e616a3a --- /dev/null +++ b/pylops_mpi/Distributed.py @@ -0,0 +1,114 @@ +from mpi4py import MPI +from pylops.utils import deps as pylops_deps # avoid namespace crashes with pylops_mpi.utils +from pylops_mpi.utils._mpi import mpi_allreduce, mpi_allgather, mpi_bcast, mpi_send, mpi_recv, _prepare_allgather_inputs, _unroll_allgather_recv +from pylops_mpi.utils import deps + +cupy_message = pylops_deps.cupy_import("the DistributedArray module") +nccl_message = deps.nccl_import("the DistributedArray module") + +if nccl_message is None and cupy_message is None: + from pylops_mpi.utils._nccl import ( + nccl_allgather, nccl_allreduce, nccl_bcast, nccl_send, nccl_recv + ) + + +class DistributedMixIn: + r"""Distributed Mixin class + + This class implements all methods associated with communication primitives + from MPI and NCCL. It is mostly charged to identifying which commuicator + to use and whether the buffered or object MPI primitives should be used + (the former in the case of NumPy arrays or CuPy arrays when a CUDA-Aware + MPI installation is available, the latter with CuPy arrays when a CUDA-Aware + MPI installation is not available). + """ + def _allreduce(self, base_comm, base_comm_nccl, + send_buf, recv_buf=None, op: MPI.Op = MPI.SUM, + engine="numpy"): + """Allreduce operation + """ + if deps.nccl_enabled and base_comm_nccl is not None: + return nccl_allreduce(base_comm_nccl, send_buf, recv_buf, op) + else: + return mpi_allreduce(base_comm, send_buf, + recv_buf, engine, op) + + def _allreduce_subcomm(self, sub_comm, base_comm_nccl, + send_buf, recv_buf=None, op: MPI.Op = MPI.SUM, + engine="numpy"): + """Allreduce operation with subcommunicator + """ + if deps.nccl_enabled and base_comm_nccl is not None: + return nccl_allreduce(sub_comm, send_buf, recv_buf, op) + else: + return mpi_allreduce(sub_comm, send_buf, + recv_buf, engine, op) + + def _allgather(self, base_comm, base_comm_nccl, + send_buf, recv_buf=None, + engine="numpy"): + """Allgather operation + """ + if deps.nccl_enabled and base_comm_nccl is not None: + if isinstance(send_buf, (tuple, list, int)): + return nccl_allgather(base_comm_nccl, send_buf, recv_buf) + else: + send_shapes = base_comm.allgather(send_buf.shape) + (padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy") + raw_recv = nccl_allgather(base_comm_nccl, padded_send, recv_buf if recv_buf else padded_recv) + return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes) + else: + if isinstance(send_buf, (tuple, list, int)): + return base_comm.allgather(send_buf) + return mpi_allgather(base_comm, send_buf, recv_buf, engine) + + def _allgather_subcomm(self, send_buf, recv_buf=None): + """Allgather operation with subcommunicator + """ + if deps.nccl_enabled and getattr(self, "base_comm_nccl"): + if isinstance(send_buf, (tuple, list, int)): + return nccl_allgather(self.sub_comm, send_buf, recv_buf) + else: + send_shapes = self._allgather_subcomm(send_buf.shape) + (padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine="cupy") + raw_recv = nccl_allgather(self.sub_comm, padded_send, recv_buf if recv_buf else padded_recv) + return _unroll_allgather_recv(raw_recv, padded_send.shape, send_shapes) + else: + return mpi_allgather(self.sub_comm, send_buf, recv_buf, self.engine) + + def _bcast(self, local_array, index, value): + """BCast operation + """ + if deps.nccl_enabled and getattr(self, "base_comm_nccl"): + nccl_bcast(self.base_comm_nccl, local_array, index, value) + else: + # self.local_array[index] = self.base_comm.bcast(value) + mpi_bcast(self.base_comm, self.rank, self.local_array, index, value, + engine=self.engine) + + def _send(self, send_buf, dest, count=None, tag=0): + """Send operation + """ + if deps.nccl_enabled and self.base_comm_nccl: + if count is None: + count = send_buf.size + nccl_send(self.base_comm_nccl, send_buf, dest, count) + else: + mpi_send(self.base_comm, + send_buf, dest, count, tag=tag, + engine=self.engine) + + def _recv(self, recv_buf=None, source=0, count=None, tag=0): + """Receive operation + """ + if deps.nccl_enabled and self.base_comm_nccl: + if recv_buf is None: + raise ValueError("recv_buf must be supplied when using NCCL") + if count is None: + count = recv_buf.size + nccl_recv(self.base_comm_nccl, recv_buf, source, count) + return recv_buf + else: + return mpi_recv(self.base_comm, + recv_buf, source, count, tag=tag, + engine=self.engine) diff --git a/pylops_mpi/DistributedArray.py b/pylops_mpi/DistributedArray.py index 979882c0..da7712d7 100644 --- a/pylops_mpi/DistributedArray.py +++ b/pylops_mpi/DistributedArray.py @@ -4,6 +4,7 @@ import numpy as np from mpi4py import MPI +from pylops_mpi.Distributed import DistributedMixIn from pylops.utils import DTypeLike, NDArray from pylops.utils import deps as pylops_deps # avoid namespace crashes with pylops_mpi.utils from pylops.utils._internal import _value_or_sized_to_tuple @@ -14,7 +15,7 @@ nccl_message = deps.nccl_import("the DistributedArray module") if nccl_message is None and cupy_message is None: - from pylops_mpi.utils._nccl import nccl_allgather, nccl_allreduce, nccl_asarray, nccl_bcast, nccl_split, nccl_send, nccl_recv, _prepare_nccl_allgather_inputs, _unroll_nccl_allgather_recv + from pylops_mpi.utils._nccl import nccl_asarray, nccl_split from cupy.cuda.nccl import NcclCommunicator else: NcclCommunicator = Any @@ -99,7 +100,7 @@ def subcomm_split(mask, comm: Optional[Union[MPI.Comm, NcclCommunicatorType]] = return sub_comm -class DistributedArray: +class DistributedArray(DistributedMixIn): r"""Distributed Numpy Arrays Multidimensional NumPy-like distributed arrays. @@ -203,10 +204,7 @@ def __setitem__(self, index, value): the specified index positions. """ if self.partition is Partition.BROADCAST: - if deps.nccl_enabled and getattr(self, "base_comm_nccl"): - nccl_bcast(self.base_comm_nccl, self.local_array, index, value) - else: - self.local_array[index] = self.base_comm.bcast(value) + self._bcast(self.local_array, index, value) else: self.local_array[index] = value @@ -342,7 +340,9 @@ def local_shapes(self): if deps.nccl_enabled and getattr(self, "base_comm_nccl"): return self._nccl_local_shapes(False) else: - return self._allgather(self.local_shape) + return self._allgather(self.base_comm, + self.base_comm_nccl, + self.local_shape) @property def sub_comm(self): @@ -382,7 +382,10 @@ def asarray(self, masked: bool = False): if masked: final_array = self._allgather_subcomm(self.local_array) else: - final_array = self._allgather(self.local_array) + final_array = self._allgather(self.base_comm, + self.base_comm_nccl, + self.local_array, + engine=self.engine) return np.concatenate(final_array, axis=self.axis) @classmethod @@ -432,6 +435,7 @@ def to_dist(cls, x: NDArray, else: slices = [slice(None)] * x.ndim local_shapes = np.append([0], dist_array._allgather( + base_comm, base_comm_nccl, dist_array.local_shape[axis])) sum_shapes = np.cumsum(local_shapes) slices[axis] = slice(sum_shapes[dist_array.rank], @@ -472,92 +476,6 @@ def _check_mask(self, dist_array): if not np.array_equal(self.mask, dist_array.mask): raise ValueError("Mask of both the arrays must be same") - def _allreduce(self, send_buf, recv_buf=None, op: MPI.Op = MPI.SUM): - """Allreduce operation - """ - if deps.nccl_enabled and getattr(self, "base_comm_nccl"): - return nccl_allreduce(self.base_comm_nccl, send_buf, recv_buf, op) - else: - if recv_buf is None: - return self.base_comm.allreduce(send_buf, op) - # For MIN and MAX which require recv_buf - self.base_comm.Allreduce(send_buf, recv_buf, op) - return recv_buf - - def _allreduce_subcomm(self, send_buf, recv_buf=None, op: MPI.Op = MPI.SUM): - """Allreduce operation with subcommunicator - """ - if deps.nccl_enabled and getattr(self, "base_comm_nccl"): - return nccl_allreduce(self.sub_comm, send_buf, recv_buf, op) - else: - if recv_buf is None: - return self.sub_comm.allreduce(send_buf, op) - # For MIN and MAX which require recv_buf - self.sub_comm.Allreduce(send_buf, recv_buf, op) - return recv_buf - - def _allgather(self, send_buf, recv_buf=None): - """Allgather operation - """ - if deps.nccl_enabled and self.base_comm_nccl: - if isinstance(send_buf, (tuple, list, int)): - return nccl_allgather(self.base_comm_nccl, send_buf, recv_buf) - else: - send_shapes = self.base_comm.allgather(send_buf.shape) - (padded_send, padded_recv) = _prepare_nccl_allgather_inputs(send_buf, send_shapes) - raw_recv = nccl_allgather(self.base_comm_nccl, padded_send, recv_buf if recv_buf else padded_recv) - return _unroll_nccl_allgather_recv(raw_recv, padded_send.shape, send_shapes) - else: - if recv_buf is None: - return self.base_comm.allgather(send_buf) - self.base_comm.Allgather(send_buf, recv_buf) - return recv_buf - - def _allgather_subcomm(self, send_buf, recv_buf=None): - """Allgather operation with subcommunicator - """ - if deps.nccl_enabled and getattr(self, "base_comm_nccl"): - if isinstance(send_buf, (tuple, list, int)): - return nccl_allgather(self.sub_comm, send_buf, recv_buf) - else: - send_shapes = self._allgather_subcomm(send_buf.shape) - (padded_send, padded_recv) = _prepare_nccl_allgather_inputs(send_buf, send_shapes) - raw_recv = nccl_allgather(self.sub_comm, padded_send, recv_buf if recv_buf else padded_recv) - return _unroll_nccl_allgather_recv(raw_recv, padded_send.shape, send_shapes) - else: - if recv_buf is None: - return self.sub_comm.allgather(send_buf) - self.sub_comm.Allgather(send_buf, recv_buf) - - def _send(self, send_buf, dest, count=None, tag=None): - """ Send operation - """ - if deps.nccl_enabled and self.base_comm_nccl: - if count is None: - # assuming sending the whole array - count = send_buf.size - nccl_send(self.base_comm_nccl, send_buf, dest, count) - else: - self.base_comm.send(send_buf, dest, tag) - - def _recv(self, recv_buf=None, source=0, count=None, tag=None): - """ Receive operation - """ - # NCCL must be called with recv_buf. Size cannot be inferred from - # other arguments and thus cannot be dynamically allocated - if deps.nccl_enabled and self.base_comm_nccl and recv_buf is not None: - if recv_buf is not None: - if count is None: - # assuming data will take a space of the whole buffer - count = recv_buf.size - nccl_recv(self.base_comm_nccl, recv_buf, source, count) - return recv_buf - else: - raise ValueError("Using recv with NCCL must also supply receiver buffer ") - else: - # MPI allows a receiver buffer to be optional and receives as a Python Object - return self.base_comm.recv(source=source, tag=tag) - def _nccl_local_shapes(self, masked: bool): """Get the the list of shapes of every GPU in the communicator """ @@ -565,7 +483,9 @@ def _nccl_local_shapes(self, masked: bool): if masked: all_tuples = self._allgather_subcomm(self.local_shape).get() else: - all_tuples = self._allgather(self.local_shape).get() + all_tuples = self._allgather(self.base_comm, + self.base_comm_nccl, + self.local_shape).get() # NCCL returns the flat array that packs every tuple as 1-dimensional array # unpack each tuple from each rank tuple_len = len(self.local_shape) @@ -663,7 +583,9 @@ def dot(self, dist_array): y = DistributedArray.to_dist(x=dist_array.local_array, base_comm=self.base_comm, base_comm_nccl=self.base_comm_nccl) \ if self.partition in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST] else dist_array # Flatten the local arrays and calculate dot product - return self._allreduce_subcomm(ncp.dot(x.local_array.flatten(), y.local_array.flatten())) + return self._allreduce_subcomm(self.sub_comm, self.base_comm_nccl, + ncp.dot(x.local_array.flatten(), y.local_array.flatten()), + engine=self.engine) def _compute_vector_norm(self, local_array: NDArray, axis: int, ord: Optional[int] = None): @@ -691,31 +613,49 @@ def _compute_vector_norm(self, local_array: NDArray, raise ValueError(f"norm-{ord} not possible for vectors") elif ord == 0: # Count non-zero then sum reduction - recv_buf = self._allreduce_subcomm(ncp.count_nonzero(local_array, axis=axis).astype(ncp.float64)) + recv_buf = self._allreduce_subcomm(self.sub_comm, self.base_comm_nccl, + ncp.count_nonzero(local_array, axis=axis).astype(ncp.float64), + engine=self.engine) elif ord == ncp.inf: # Calculate max followed by max reduction - # TODO (tharitt): currently CuPy + MPI does not work well with buffered communication, particularly + # CuPy + non-CUDA-aware MPI does not work well with buffered communication, particularly # with MAX, MIN operator. Here we copy the array back to CPU, transfer, and copy them back to GPUs send_buf = ncp.max(ncp.abs(local_array), axis=axis).astype(ncp.float64) - if self.engine == "cupy" and self.base_comm_nccl is None: - recv_buf = self._allreduce_subcomm(send_buf.get(), recv_buf.get(), op=MPI.MAX) + if self.engine == "cupy" and self.base_comm_nccl is None and not deps.cuda_aware_mpi_enabled: + # CuPy + non-CUDA-aware MPI: This will call non-buffered communication + # which return a list of object - must be copied back to a GPU memory. + recv_buf = self._allreduce_subcomm(self.sub_comm, self.base_comm_nccl, + send_buf.get(), recv_buf.get(), + op=MPI.MAX, engine=self.engine) recv_buf = ncp.asarray(ncp.squeeze(recv_buf, axis=axis)) else: - recv_buf = self._allreduce_subcomm(send_buf, recv_buf, op=MPI.MAX) - recv_buf = ncp.squeeze(recv_buf, axis=axis) + recv_buf = self._allreduce_subcomm(self.sub_comm, self.base_comm_nccl, + send_buf, recv_buf, op=MPI.MAX, + engine=self.engine) + # TODO (tharitt): In current implementation, there seems to be a semantic difference between Buffered MPI and NCCL + # the (1, size) is collapsed to (size, ) with buffered MPI while NCCL retains it. + # There may be a way to unify it - may be something to do with how we allocate the recv_buf. + if self.base_comm_nccl: + recv_buf = ncp.squeeze(recv_buf, axis=axis) elif ord == -ncp.inf: # Calculate min followed by min reduction - # TODO (tharitt): see the comment above in infinity norm + # See the comment above in +infinity norm send_buf = ncp.min(ncp.abs(local_array), axis=axis).astype(ncp.float64) - if self.engine == "cupy" and self.base_comm_nccl is None: - recv_buf = self._allreduce_subcomm(send_buf.get(), recv_buf.get(), op=MPI.MIN) + if self.engine == "cupy" and self.base_comm_nccl is None and not deps.cuda_aware_mpi_enabled: + recv_buf = self._allreduce_subcomm(self.sub_comm, self.base_comm_nccl, + send_buf.get(), recv_buf.get(), + op=MPI.MIN, engine=self.engine) recv_buf = ncp.asarray(ncp.squeeze(recv_buf, axis=axis)) else: - recv_buf = self._allreduce_subcomm(send_buf, recv_buf, op=MPI.MIN) - recv_buf = ncp.asarray(ncp.squeeze(recv_buf, axis=axis)) - + recv_buf = self._allreduce_subcomm(self.sub_comm, self.base_comm_nccl, + send_buf, recv_buf, + op=MPI.MIN, engine=self.engine) + if self.base_comm_nccl: + recv_buf = ncp.asarray(ncp.squeeze(recv_buf, axis=axis)) else: - recv_buf = self._allreduce_subcomm(ncp.sum(ncp.abs(ncp.float_power(local_array, ord)), axis=axis)) + recv_buf = self._allreduce_subcomm(self.sub_comm, self.base_comm_nccl, + ncp.sum(ncp.abs(ncp.float_power(local_array, ord)), axis=axis), + engine=self.engine) recv_buf = ncp.power(recv_buf, 1.0 / ord) return recv_buf diff --git a/pylops_mpi/basicoperators/VStack.py b/pylops_mpi/basicoperators/VStack.py index 58581565..de66c342 100644 --- a/pylops_mpi/basicoperators/VStack.py +++ b/pylops_mpi/basicoperators/VStack.py @@ -6,7 +6,6 @@ from pylops import LinearOperator from pylops.utils import DTypeLike from pylops.utils.backend import get_module -from pylops.utils import deps as pylops_deps # avoid namespace crashes with pylops_mpi.utils from pylops_mpi import ( MPILinearOperator, @@ -15,17 +14,11 @@ Partition, StackedDistributedArray ) +from pylops_mpi.Distributed import DistributedMixIn from pylops_mpi.utils.decorators import reshaped -from pylops_mpi.utils import deps -cupy_message = pylops_deps.cupy_import("the VStack module") -nccl_message = deps.nccl_import("the VStack module") -if nccl_message is None and cupy_message is None: - from pylops_mpi.utils._nccl import nccl_allreduce - - -class MPIVStack(MPILinearOperator): +class MPIVStack(DistributedMixIn, MPILinearOperator): r"""MPI VStack Operator Create a vertical stack of a set of linear operators using MPI. Each rank must @@ -141,16 +134,18 @@ def _matvec(self, x: DistributedArray) -> DistributedArray: @reshaped(forward=False, stacking=True) def _rmatvec(self, x: DistributedArray) -> DistributedArray: ncp = get_module(x.engine) - y = DistributedArray(global_shape=self.shape[1], base_comm=x.base_comm, base_comm_nccl=x.base_comm_nccl, partition=Partition.BROADCAST, - engine=x.engine, dtype=self.dtype) + y = DistributedArray(global_shape=self.shape[1], + base_comm=x.base_comm, + base_comm_nccl=x.base_comm_nccl, + partition=Partition.BROADCAST, + engine=x.engine, + dtype=self.dtype) y1 = [] for iop, oper in enumerate(self.ops): y1.append(oper.rmatvec(x.local_array[self.nnops[iop]: self.nnops[iop + 1]])) y1 = ncp.sum(ncp.vstack(y1), axis=0) - if deps.nccl_enabled and x.base_comm_nccl: - y[:] = nccl_allreduce(x.base_comm_nccl, y1, op=MPI.SUM) - else: - y[:] = self.base_comm.allreduce(y1, op=MPI.SUM) + y[:] = self._allreduce(x.base_comm, x.base_comm_nccl, + y1, op=MPI.SUM, engine=x.engine) return y diff --git a/pylops_mpi/signalprocessing/Fredholm1.py b/pylops_mpi/signalprocessing/Fredholm1.py index 6ccd9d21..2969e3c9 100644 --- a/pylops_mpi/signalprocessing/Fredholm1.py +++ b/pylops_mpi/signalprocessing/Fredholm1.py @@ -128,7 +128,8 @@ def _matvec(self, x: DistributedArray) -> DistributedArray: for isl in range(self.nsls[self.rank]): y1[isl] = ncp.dot(self.G[isl], x[isl]) # gather results - y[:] = ncp.vstack(y._allgather(y1)).ravel() + y[:] = ncp.vstack(y._allgather(y.base_comm, y.base_comm_nccl, y1, + engine=y.engine)).ravel() return y def _rmatvec(self, x: NDArray) -> NDArray: @@ -165,5 +166,6 @@ def _rmatvec(self, x: NDArray) -> NDArray: y1[isl] = ncp.dot(x[isl].T.conj(), self.G[isl]).T.conj() # gather results - y[:] = ncp.vstack(y._allgather(y1)).ravel() + y[:] = ncp.vstack(y._allgather(y.base_comm, y.base_comm_nccl, y1, + engine=y.engine)).ravel() return y diff --git a/pylops_mpi/utils/_common.py b/pylops_mpi/utils/_common.py new file mode 100644 index 00000000..895265df --- /dev/null +++ b/pylops_mpi/utils/_common.py @@ -0,0 +1,89 @@ +__all__ = [ + "_prepare_allgather_inputs", + "_unroll_allgather_recv" +] + + +import numpy as np +from pylops.utils.backend import get_module + + +# TODO: return type annotation for both cupy and numpy +def _prepare_allgather_inputs(send_buf, send_buf_shapes, engine): + r""" Prepare send_buf and recv_buf for NCCL allgather (nccl_allgather) + + Buffered Allgather (MPI and NCCL) requires the sending buffer to have the same size for every device. + Therefore, padding is required when the array is not evenly partitioned across + all the ranks. The padding is applied such that the each dimension of the sending buffers + is equal to the max size of that dimension across all ranks. + + Similarly, each receiver buffer (recv_buf) is created with size equal to :math:n_rank \cdot send_buf.size + + Parameters + ---------- + send_buf : :obj: `numpy.ndarray` or `cupy.ndarray` or array-like + The data buffer from the local GPU to be sent for allgather. + send_buf_shapes: :obj:`list` + A list of shapes for each GPU send_buf (used to calculate padding size) + engine : :obj:`str` + Engine used to store array (``numpy`` or ``cupy``) + + Returns + ------- + send_buf: :obj:`cupy.ndarray` + A buffer containing the data and padded elements to be sent by this rank. + recv_buf : :obj:`cupy.ndarray` + An empty, padded buffer to gather data from all GPUs. + """ + ncp = get_module(engine) + sizes_each_dim = list(zip(*send_buf_shapes)) + send_shape = tuple(map(max, sizes_each_dim)) + pad_size = [ + (0, s_shape - l_shape) for s_shape, l_shape in zip(send_shape, send_buf.shape) + ] + + send_buf = ncp.pad( + send_buf, pad_size, mode="constant", constant_values=0 + ) + + ndev = len(send_buf_shapes) + recv_buf = ncp.zeros(ndev * send_buf.size, dtype=send_buf.dtype) + + return send_buf, recv_buf + + +def _unroll_allgather_recv(recv_buf, padded_send_buf_shape, send_buf_shapes) -> list: + r"""Unrolll recv_buf after Buffered Allgather (MPI and NCCL) + + Remove the padded elements in recv_buff, extract an individual array from each device and return them as a list of arrays + Each GPU may send array with a different shape, so the return type has to be a list of array + instead of the concatenated array. + + Parameters + ---------- + recv_buf: :obj:`cupy.ndarray` or array-like + The data buffer returned from nccl_allgather call + padded_send_buf_shape: :obj:`tuple`:int + The size of send_buf after padding used in nccl_allgather + send_buf_shapes: :obj:`list` + A list of original shapes for each GPU send_buf prior to padding + + Returns + ------- + chunks: :obj:`list` + A list of `cupy.ndarray` from each GPU with the padded element removed + """ + ndev = len(send_buf_shapes) + # extract an individual array from each device + chunk_size = np.prod(padded_send_buf_shape) + chunks = [ + recv_buf[i * chunk_size:(i + 1) * chunk_size] for i in range(ndev) + ] + + # Remove padding from each array: the padded value may appear somewhere + # in the middle of the flat array and thus the reshape and slicing for each dimension is required + for i in range(ndev): + slicing = tuple(slice(0, end) for end in send_buf_shapes[i]) + chunks[i] = chunks[i].reshape(padded_send_buf_shape)[slicing] + + return chunks diff --git a/pylops_mpi/utils/_mpi.py b/pylops_mpi/utils/_mpi.py new file mode 100644 index 00000000..89304b8c --- /dev/null +++ b/pylops_mpi/utils/_mpi.py @@ -0,0 +1,165 @@ +__all__ = [ + "mpi_allgather", + "mpi_allreduce", + "mpi_bcast", + # "mpi_asarray", + "mpi_send", + "mpi_recv", +] + +from typing import Optional + +import numpy as np +from mpi4py import MPI +from pylops.utils.backend import get_module +from pylops_mpi.utils import deps +from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv + + +def mpi_allgather(base_comm: MPI.Comm, + send_buf, recv_buf=None, + engine: Optional[str] = "numpy") -> np.ndarray: + + if deps.cuda_aware_mpi_enabled or engine == "numpy": + send_shapes = base_comm.allgather(send_buf.shape) + (padded_send, padded_recv) = _prepare_allgather_inputs(send_buf, send_shapes, engine=engine) + recv_buffer_to_use = recv_buf if recv_buf else padded_recv + base_comm.Allgather(padded_send, recv_buffer_to_use) + return _unroll_allgather_recv(recv_buffer_to_use, padded_send.shape, send_shapes) + + else: + # CuPy with non-CUDA-aware MPI + if recv_buf is None: + return base_comm.allgather(send_buf) + base_comm.Allgather(send_buf, recv_buf) + return recv_buf + + +def mpi_allreduce(base_comm: MPI.Comm, + send_buf, recv_buf=None, + engine: Optional[str] = "numpy", + op: MPI.Op = MPI.SUM) -> np.ndarray: + """MPI_Allreduce/allreduce + + Dispatch allreduce routine based on type of input and availability of + CUDA-Aware MPI + + Parameters + ---------- + base_comm : :obj:`MPI.Comm` + Base MPI Communicator. + send_buf : :obj:`numpy.ndarray` or :obj:`cupy.ndarray` + The data buffer from the local GPU to be reduced. + recv_buf : :obj:`cupy.ndarray`, optional + The buffer to store the result of the reduction. If None, + a new buffer will be allocated with the appropriate shape. + engine : :obj:`str`, optional + Engine used to store array (``numpy`` or ``cupy``) + op : :obj:mpi4py.MPI.Op, optional + The reduction operation to apply. Defaults to MPI.SUM. + + Returns + ------- + recv_buf : :obj:`numpy.ndarray` or :obj:`cupy.ndarray` + A buffer containing the result of the reduction, broadcasted + to all GPUs. + + """ + if deps.cuda_aware_mpi_enabled or engine == "numpy": + ncp = get_module(engine) + recv_buf = ncp.zeros(send_buf.size, dtype=send_buf.dtype) + base_comm.Allreduce(send_buf, recv_buf, op) + return recv_buf + else: + # CuPy with non-CUDA-aware MPI + if recv_buf is None: + return base_comm.allreduce(send_buf, op) + # For MIN and MAX which require recv_buf + base_comm.Allreduce(send_buf, recv_buf, op) + return recv_buf + + +def mpi_bcast(base_comm: MPI.Comm, + rank, local_array, index, value, + engine: Optional[str] = "numpy") -> np.ndarray: + if deps.cuda_aware_mpi_enabled or engine == "numpy": + if rank == 0: + local_array[index] = value + base_comm.Bcast(local_array[index]) + else: + # CuPy with non-CUDA-aware MPI + local_array[index] = base_comm.bcast(value) + + +def mpi_send(base_comm: MPI.Comm, + send_buf, dest, count, tag=0, + engine: Optional[str] = "numpy", + ) -> None: + """MPI_Send/send + + Dispatch send routine based on type of input and availability of + CUDA-Aware MPI + + Parameters + ---------- + base_comm : :obj:`MPI.Comm` + Base MPI Communicator. + send_buf : :obj:`numpy.ndarray` or :obj:`cupy.ndarray` + The array containing data to send. + dest: :obj:`int` + The rank of the destination CPU/GPU device. + count : :obj:`int` + Number of elements to send from `send_buf`. + tag : :obj:`int` + Tag of the message to be sent. + engine : :obj:`str`, optional + Engine used to store array (``numpy`` or ``cupy``) + """ + if deps.cuda_aware_mpi_enabled or engine == "numpy": + # Determine MPI type based on array dtype + mpi_type = MPI._typedict[send_buf.dtype.char] + if count is None: + count = send_buf.size + base_comm.Send([send_buf, count, mpi_type], dest=dest, tag=tag) + else: + # Uses CuPy without CUDA-aware MPI + base_comm.send(send_buf, dest, tag) + + +def mpi_recv(base_comm: MPI.Comm, + recv_buf=None, source=0, count=None, tag=0, + engine: Optional[str] = "numpy") -> np.ndarray: + """ MPI_Recv/recv + Dispatch receive routine based on type of input and availability of + CUDA-Aware MPI + + Parameters + ---------- + base_comm : :obj:`MPI.Comm` + Base MPI Communicator. + recv_buf : :obj:`numpy.ndarray` or :obj:`cupy.ndarray`, optional + The buffered array to receive data. + source : :obj:`int` + The rank of the sending CPU/GPU device. + count : :obj:`int` + Number of elements to receive. + tag : :obj:`int` + Tag of the message to be sent. + engine : :obj:`str`, optional + Engine used to store array (``numpy`` or ``cupy``) + """ + if deps.cuda_aware_mpi_enabled or engine == "numpy": + ncp = get_module(engine) + if recv_buf is None: + if count is None: + raise ValueError("Must provide either recv_buf or count for MPI receive") + # Default to int32 works currently because add_ghost_cells() is called + # with recv_buf and is not affected by this branch. The int32 is for when + # dimension or shape-related integers are send/recv + recv_buf = ncp.zeros(count, dtype=ncp.int32) + mpi_type = MPI._typedict[recv_buf.dtype.char] + base_comm.Recv([recv_buf, recv_buf.size, mpi_type], source=source, tag=tag) + else: + # Uses CuPy without CUDA-aware MPI + recv_buf = base_comm.recv(source=source, tag=tag) + return recv_buf diff --git a/pylops_mpi/utils/_nccl.py b/pylops_mpi/utils/_nccl.py index 19c09922..cac5b61c 100644 --- a/pylops_mpi/utils/_nccl.py +++ b/pylops_mpi/utils/_nccl.py @@ -1,6 +1,4 @@ __all__ = [ - "_prepare_nccl_allgather_inputs", - "_unroll_nccl_allgather_recv", "_nccl_sync", "initialize_nccl_comm", "nccl_split", @@ -13,12 +11,11 @@ ] from enum import IntEnum -from typing import Tuple from mpi4py import MPI import os -import numpy as np import cupy as cp import cupy.cuda.nccl as nccl +from pylops_mpi.utils._common import _prepare_allgather_inputs, _unroll_allgather_recv cupy_to_nccl_dtype = { "float32": nccl.NCCL_FLOAT32, @@ -70,85 +67,6 @@ def _nccl_sync(): cp.cuda.runtime.deviceSynchronize() -def _prepare_nccl_allgather_inputs(send_buf, send_buf_shapes) -> Tuple[cp.ndarray, cp.ndarray]: - r""" Prepare send_buf and recv_buf for NCCL allgather (nccl_allgather) - - NCCL's allGather requires the sending buffer to have the same size for every device. - Therefore, padding is required when the array is not evenly partitioned across - all the ranks. The padding is applied such that the each dimension of the sending buffers - is equal to the max size of that dimension across all ranks. - - Similarly, each receiver buffer (recv_buf) is created with size equal to :math:n_rank \cdot send_buf.size - - Parameters - ---------- - send_buf : :obj:`cupy.ndarray` or array-like - The data buffer from the local GPU to be sent for allgather. - send_buf_shapes: :obj:`list` - A list of shapes for each GPU send_buf (used to calculate padding size) - - Returns - ------- - send_buf: :obj:`cupy.ndarray` - A buffer containing the data and padded elements to be sent by this rank. - recv_buf : :obj:`cupy.ndarray` - An empty, padded buffer to gather data from all GPUs. - """ - sizes_each_dim = list(zip(*send_buf_shapes)) - send_shape = tuple(map(max, sizes_each_dim)) - pad_size = [ - (0, s_shape - l_shape) for s_shape, l_shape in zip(send_shape, send_buf.shape) - ] - - send_buf = cp.pad( - send_buf, pad_size, mode="constant", constant_values=0 - ) - - # NCCL recommends to use one MPI Process per GPU and so size of receiving buffer can be inferred - ndev = len(send_buf_shapes) - recv_buf = cp.zeros(ndev * send_buf.size, dtype=send_buf.dtype) - - return send_buf, recv_buf - - -def _unroll_nccl_allgather_recv(recv_buf, padded_send_buf_shape, send_buf_shapes) -> list: - """Unrolll recv_buf after NCCL allgather (nccl_allgather) - - Remove the padded elements in recv_buff, extract an individual array from each device and return them as a list of arrays - Each GPU may send array with a different shape, so the return type has to be a list of array - instead of the concatenated array. - - Parameters - ---------- - recv_buf: :obj:`cupy.ndarray` or array-like - The data buffer returned from nccl_allgather call - padded_send_buf_shape: :obj:`tuple`:int - The size of send_buf after padding used in nccl_allgather - send_buf_shapes: :obj:`list` - A list of original shapes for each GPU send_buf prior to padding - - Returns - ------- - chunks: :obj:`list` - A list of `cupy.ndarray` from each GPU with the padded element removed - """ - - ndev = len(send_buf_shapes) - # extract an individual array from each device - chunk_size = np.prod(padded_send_buf_shape) - chunks = [ - recv_buf[i * chunk_size:(i + 1) * chunk_size] for i in range(ndev) - ] - - # Remove padding from each array: the padded value may appear somewhere - # in the middle of the flat array and thus the reshape and slicing for each dimension is required - for i in range(ndev): - slicing = tuple(slice(0, end) for end in send_buf_shapes[i]) - chunks[i] = chunks[i].reshape(padded_send_buf_shape)[slicing] - - return chunks - - def mpi_op_to_nccl(mpi_op) -> NcclOp: """ Map MPI reduction operation to NCCL equivalent @@ -363,9 +281,9 @@ def nccl_asarray(nccl_comm, local_array, local_shapes, axis) -> cp.ndarray: Global array gathered from all GPUs and concatenated along `axis`. """ - send_buf, recv_buf = _prepare_nccl_allgather_inputs(local_array, local_shapes) + send_buf, recv_buf = _prepare_allgather_inputs(local_array, local_shapes, engine="cupy") nccl_allgather(nccl_comm, send_buf, recv_buf) - chunks = _unroll_nccl_allgather_recv(recv_buf, send_buf.shape, local_shapes) + chunks = _unroll_allgather_recv(recv_buf, send_buf.shape, local_shapes) # combine back to single global array return cp.concatenate(chunks, axis=axis) diff --git a/pylops_mpi/utils/deps.py b/pylops_mpi/utils/deps.py index 9d983f60..f0279ceb 100644 --- a/pylops_mpi/utils/deps.py +++ b/pylops_mpi/utils/deps.py @@ -39,6 +39,10 @@ def nccl_import(message: Optional[str] = None) -> str: return nccl_message +cuda_aware_mpi_enabled: bool = ( + True if int(os.getenv("PYLOPS_MPI_CUDA_AWARE", 1)) == 1 else False +) + nccl_enabled: bool = ( True if (nccl_import() is None and int(os.getenv("NCCL_PYLOPS_MPI", 1)) == 1) else False ) diff --git a/tests_nccl/test_ncclutils_nccl.py b/tests_nccl/test_ncclutils_nccl.py index 21b28ca3..52502afc 100644 --- a/tests_nccl/test_ncclutils_nccl.py +++ b/tests_nccl/test_ncclutils_nccl.py @@ -8,7 +8,8 @@ from numpy.testing import assert_allclose import pytest -from pylops_mpi.utils._nccl import initialize_nccl_comm, nccl_allgather, _prepare_nccl_allgather_inputs, _unroll_nccl_allgather_recv +from pylops_mpi.utils._nccl import initialize_nccl_comm, nccl_allgather +from pylops_mpi.utils._mpi import _prepare_allgather_inputs, _unroll_allgather_recv np.random.seed(42) @@ -83,9 +84,9 @@ def test_allgather_differentsize_withrecbuf(par): # Gathered array send_shapes = MPI.COMM_WORLD.allgather(local_array.shape) - send_buf, recv_buf = _prepare_nccl_allgather_inputs(local_array, send_shapes) + send_buf, recv_buf = _prepare_allgather_inputs(local_array, send_shapes, engine="cupy") recv_buf = nccl_allgather(nccl_comm, send_buf, recv_buf) - chunks = _unroll_nccl_allgather_recv(recv_buf, send_buf.shape, send_shapes) + chunks = _unroll_allgather_recv(recv_buf, send_buf.shape, send_shapes) gathered_array = cp.concatenate(chunks) # Compare with global array created in rank0