Skip to content

Commit 6eaab8c

Browse files
authored
MPMD Support (AMReX-Codes#2895)
Add support for multiple programs multiple data (MPMD). For now, we assume there are only two programs (i.e., executables) in the MPMD mode. During the initialization, MPI_COMM_WORLD is split into two communicators. The MPMD::Copier class can be used to copy FabArray/MultiFab data between two programs. This new capability can be used by FHDeX to couple FHD with SPPARKS.
1 parent 9469329 commit 6eaab8c

File tree

6 files changed

+427
-5
lines changed

6 files changed

+427
-5
lines changed

Src/Base/AMReX_BLBackTrace.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
#include <AMReX_AsyncOut.H>
66
#include <AMReX.H>
77
#include <AMReX_Utility.H>
8+
#ifdef AMREX_USE_MPI
9+
#include <AMReX_MPMD.H>
10+
#endif
811

912
#ifdef AMREX_TINY_PROFILING
1013
#include <AMReX_TinyProfiler.H>
@@ -71,7 +74,15 @@ BLBackTrace::handler(int s)
7174
std::string errfilename;
7275
{
7376
std::ostringstream ss;
74-
ss << "Backtrace." << ParallelDescriptor::MyProc();
77+
#ifdef AMREX_USE_MPI
78+
if (MPMD::Initialized()) {
79+
ss << "Backtrace.prog" << MPMD::MyProgId() << ".";
80+
} else
81+
#endif
82+
{
83+
ss << "Backtrace.";
84+
}
85+
ss << ParallelDescriptor::MyProc();
7586
#ifdef AMREX_USE_OMP
7687
ss << "." << omp_get_thread_num();
7788
#endif

Src/Base/AMReX_BoxList.H

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,9 @@ public:
206206
BoxList& convert (IndexType typ) noexcept;
207207

208208
//! Returns a reference to the Vector<Box>.
209-
Vector<Box>& data() noexcept { return m_lbox; }
209+
Vector<Box>& data () noexcept { return m_lbox; }
210210
//! Returns a constant reference to the Vector<Box>.
211-
const Vector<Box>& data() const noexcept { return m_lbox; }
211+
const Vector<Box>& data () const noexcept { return m_lbox; }
212212

213213
void swap (BoxList& rhs) {
214214
std::swap(m_lbox, rhs.m_lbox);

Src/Base/AMReX_MPMD.H

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
#ifndef AMREX_MPMD_H_
2+
#define AMREX_MPMD_H_
3+
#include <AMReX_Config.H>
4+
5+
#ifdef AMREX_USE_MPI
6+
7+
#include <AMReX_FabArray.H>
8+
9+
#include <mpi.h>
10+
11+
namespace amrex { namespace MPMD {
12+
13+
MPI_Comm Initialize (int argc, char* argv[]);
14+
15+
void Finalize ();
16+
17+
bool Initialized ();
18+
19+
int MyProc (); //! Process ID in MPI_COMM_WORLD
20+
int NProcs (); //! Number of processes in MPI_COMM_WORLD
21+
int MyProgId (); //! Program ID
22+
23+
class Copier
24+
{
25+
public:
26+
Copier (BoxArray const& ba, DistributionMapping const& dm);
27+
28+
template <typename FAB>
29+
void send (FabArray<FAB> const& fa, int icomp, int ncomp) const;
30+
31+
template <typename FAB>
32+
void recv (FabArray<FAB>& fa, int icomp, int ncomp) const;
33+
34+
private:
35+
std::map<int,FabArrayBase::CopyComTagsContainer> m_SndTags;
36+
std::map<int,FabArrayBase::CopyComTagsContainer> m_RcvTags;
37+
};
38+
39+
template <typename FAB>
40+
void Copier::send (FabArray<FAB> const& mf, int icomp, int ncomp) const
41+
{
42+
const int N_snds = m_SndTags.size();
43+
44+
if (N_snds == 0) return;
45+
46+
// Prepare buffer
47+
48+
Vector<char*> send_data;
49+
Vector<std::size_t> send_size;
50+
Vector<int> send_rank;
51+
Vector<MPI_Request> send_reqs;
52+
Vector<FabArrayBase::CopyComTagsContainer const*> send_cctc;
53+
54+
Vector<std::size_t> offset;
55+
std::size_t total_volume = 0;
56+
for (auto const& kv : m_SndTags) {
57+
auto const& cctc = kv.second;
58+
59+
std::size_t nbytes = 0;
60+
for (auto const& cct : cctc) {
61+
nbytes += cct.sbox.numPts() * ncomp * sizeof(typename FAB::value_type);
62+
}
63+
64+
std::size_t acd = ParallelDescriptor::alignof_comm_data(nbytes);
65+
nbytes = amrex::aligned_size(acd, nbytes); // so that bytes are aligned
66+
67+
// Also need to align the offset properly
68+
total_volume = amrex::aligned_size(std::max(alignof(typename FAB::value_type),
69+
acd), total_volume);
70+
71+
offset.push_back(total_volume);
72+
total_volume += nbytes;
73+
74+
send_data.push_back(nullptr);
75+
send_size.push_back(nbytes);
76+
send_rank.push_back(kv.first);
77+
send_reqs.push_back(MPI_REQUEST_NULL);
78+
send_cctc.push_back(&cctc);
79+
}
80+
81+
Gpu::PinnedVector<char> send_buffer(total_volume);
82+
char* the_send_data = send_buffer.data();
83+
for (int i = 0; i < N_snds; ++i) {
84+
send_data[i] = the_send_data + offset[i];
85+
}
86+
87+
// Pack buffer
88+
#ifdef AMREX_USE_GPU
89+
if (Gpu::inLaunchRegion() && (mf.arena()->isDevice() || mf.arena()->isManaged())) {
90+
mf.pack_send_buffer_gpu(mf, icomp, ncomp, send_data, send_size, send_cctc);
91+
} else
92+
#endif
93+
{
94+
mf.pack_send_buffer_cpu(mf, icomp, ncomp, send_data, send_size, send_cctc);
95+
}
96+
97+
// Send
98+
for (int i = 0; i < N_snds; ++i) {
99+
send_reqs[i] = ParallelDescriptor::Asend
100+
(send_data[i], send_size[i], send_rank[i], 100, MPI_COMM_WORLD).req();
101+
}
102+
Vector<MPI_Status> stats(N_snds);
103+
ParallelDescriptor::Waitall(send_reqs, stats);
104+
}
105+
106+
template <typename FAB>
107+
void Copier::recv (FabArray<FAB>& mf, int icomp, int ncomp) const
108+
{
109+
const int N_rcvs = m_RcvTags.size();
110+
111+
if (N_rcvs == 0) return;
112+
113+
// Prepare buffer
114+
115+
Vector<char*> recv_data;
116+
Vector<std::size_t> recv_size;
117+
Vector<int> recv_from;
118+
Vector<MPI_Request> recv_reqs;
119+
120+
Vector<std::size_t> offset;
121+
std::size_t TotalRcvsVolume = 0;
122+
for (auto const& kv : m_RcvTags) {
123+
std::size_t nbytes = 0;
124+
for (auto const& cct : kv.second) {
125+
nbytes += cct.dbox.numPts() * ncomp * sizeof(typename FAB::value_type);
126+
}
127+
128+
std::size_t acd = ParallelDescriptor::alignof_comm_data(nbytes);
129+
nbytes = amrex::aligned_size(acd, nbytes); // so that nbytes are aligned
130+
131+
// Also need to align the offset properly
132+
TotalRcvsVolume = amrex::aligned_size(std::max(alignof(typename FAB::value_type),
133+
acd), TotalRcvsVolume);
134+
135+
offset.push_back(TotalRcvsVolume);
136+
TotalRcvsVolume += nbytes;
137+
138+
recv_data.push_back(nullptr);
139+
recv_size.push_back(nbytes);
140+
recv_from.push_back(kv.first);
141+
recv_reqs.push_back(MPI_REQUEST_NULL);
142+
}
143+
144+
Gpu::PinnedVector<char> recv_buffer(TotalRcvsVolume);
145+
char* the_recv_data = recv_buffer.data();
146+
147+
// Recv
148+
for (int i = 0; i < N_rcvs; ++i) {
149+
recv_data[i] = the_recv_data + offset[i];
150+
recv_reqs[i] = ParallelDescriptor::Arecv
151+
(recv_data[i], recv_size[i], recv_from[i], 100, MPI_COMM_WORLD).req();
152+
}
153+
154+
Vector<FabArrayBase::CopyComTagsContainer const*> recv_cctc(N_rcvs, nullptr);
155+
for (int i = 0; i < N_rcvs; ++i) {
156+
recv_cctc[i] = &(m_RcvTags.at(recv_from[i]));
157+
}
158+
159+
Vector<MPI_Status> stats(N_rcvs);
160+
ParallelDescriptor::Waitall(recv_reqs, stats);
161+
162+
// Unpack buffer
163+
#ifdef AMREX_USE_GPU
164+
if (Gpu::inLaunchRegion() && (mf.arena()->isDevice() || mf.arena()->isManaged())) {
165+
mf.unpack_recv_buffer_gpu(mf, icomp, ncomp, recv_data, recv_size, recv_cctc,
166+
FabArrayBase::COPY, true);
167+
} else
168+
#endif
169+
{
170+
mf.unpack_recv_buffer_cpu(mf, icomp, ncomp, recv_data, recv_size, recv_cctc,
171+
FabArrayBase::COPY, true);
172+
}
173+
}
174+
175+
}}
176+
177+
#endif
178+
#endif

0 commit comments

Comments
 (0)