Skip to content

Commit 61f2fc2

Browse files
pieternfacebook-github-bot
authored andcommitted
Add new style reduce collective (#140)
Summary: This implements a ring reduction resulting the result scattered across processes, followed by a gather operation to the root process. The halving/doubling implementation is slated to be ported to this stateless style later and provided as an option through RedudeOptions. Rebase needed after #137 is merged. Pull Request resolved: #140 Reviewed By: manojkris Differential Revision: D10376422 Pulled By: pietern fbshipit-source-id: 2f32f693b8437cdff82a6528f18e576966984395
1 parent 7a26270 commit 61f2fc2

File tree

7 files changed

+428
-0
lines changed

7 files changed

+428
-0
lines changed

gloo/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ list(APPEND GLOO_SRCS
1111
"${CMAKE_CURRENT_SOURCE_DIR}/allreduce_local.cc"
1212
"${CMAKE_CURRENT_SOURCE_DIR}/context.cc"
1313
"${CMAKE_CURRENT_SOURCE_DIR}/gather.cc"
14+
"${CMAKE_CURRENT_SOURCE_DIR}/reduce.cc"
1415
"${CMAKE_CURRENT_SOURCE_DIR}/types.cc"
1516
)
1617

@@ -28,6 +29,7 @@ list(APPEND GLOO_HDRS
2829
"${CMAKE_CURRENT_SOURCE_DIR}/barrier_all_to_one.h"
2930
"${CMAKE_CURRENT_SOURCE_DIR}/broadcast_one_to_all.h"
3031
"${CMAKE_CURRENT_SOURCE_DIR}/gather.h"
32+
"${CMAKE_CURRENT_SOURCE_DIR}/reduce.h"
3133
"${CMAKE_CURRENT_SOURCE_DIR}/reduce_scatter.h"
3234
"${CMAKE_CURRENT_SOURCE_DIR}/context.h"
3335
"${CMAKE_CURRENT_SOURCE_DIR}/math.h"

gloo/math.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ void min(T* x, const T* y, size_t n) {
4141
}
4242
}
4343

44+
template <typename T>
45+
T roundUp(T value, T multiple) {
46+
T remainder = value % multiple;
47+
if (remainder == 0) {
48+
return value;
49+
}
50+
return value + multiple - remainder;
51+
}
52+
4453
#if GLOO_USE_AVX
4554

4655
// Assumes x and y are either both aligned to 32 bytes or unaligned by the same

gloo/reduce.cc

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
/**
2+
* Copyright (c) 2018-present, Facebook, Inc.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree. An additional grant
7+
* of patent rights can be found in the PATENTS file in the same directory.
8+
*/
9+
10+
#include "gloo/reduce.h"
11+
12+
#include <algorithm>
13+
#include <cstring>
14+
15+
#include "gloo/common/logging.h"
16+
#include "gloo/math.h"
17+
#include "gloo/types.h"
18+
19+
namespace gloo {
20+
21+
void reduce(const std::shared_ptr<Context>& context, ReduceOptions& opts) {
22+
std::unique_ptr<transport::UnboundBuffer> tmpInBuffer;
23+
std::unique_ptr<transport::UnboundBuffer> tmpOutBuffer;
24+
transport::UnboundBuffer* in = nullptr;
25+
transport::UnboundBuffer* out = nullptr;
26+
const auto slot = Slot::build(kReduceSlotPrefix, opts.tag);
27+
28+
// Sanity checks
29+
GLOO_ENFORCE(opts.elements > 0);
30+
GLOO_ENFORCE(opts.elementSize > 0);
31+
GLOO_ENFORCE(opts.root >= 0 && opts.root < context->size);
32+
GLOO_ENFORCE(opts.reduce != nullptr);
33+
const auto recvRank = (context->size + context->rank + 1) % context->size;
34+
GLOO_ENFORCE(
35+
context->getPair(recvRank),
36+
"missing connection between rank " + std::to_string(context->rank) +
37+
" (this process) and rank " + std::to_string(recvRank));
38+
const auto sendRank = (context->size + context->rank - 1) % context->size;
39+
GLOO_ENFORCE(
40+
context->getPair(sendRank),
41+
"missing connection between rank " + std::to_string(context->rank) +
42+
" (this process) and rank " + std::to_string(sendRank));
43+
44+
// Figure out pointer to output buffer
45+
if (opts.outBuffer) {
46+
out = opts.outBuffer.get();
47+
} else {
48+
GLOO_ENFORCE(opts.outPtr != nullptr);
49+
tmpOutBuffer = context->createUnboundBuffer(
50+
opts.outPtr, opts.elements * opts.elementSize);
51+
out = tmpOutBuffer.get();
52+
}
53+
54+
// Figure out pointer to input buffer
55+
if (opts.inBuffer) {
56+
in = opts.inBuffer.get();
57+
} else if (opts.inPtr != nullptr) {
58+
tmpInBuffer = context->createUnboundBuffer(
59+
opts.inPtr, opts.elements * opts.elementSize);
60+
in = tmpInBuffer.get();
61+
} else {
62+
in = out;
63+
}
64+
65+
GLOO_ENFORCE_EQ(in->size, opts.elements * opts.elementSize);
66+
GLOO_ENFORCE_EQ(out->size, opts.elements * opts.elementSize);
67+
68+
// The ring algorithm works as follows.
69+
//
70+
// The given input is split into a number of chunks equal to the
71+
// number of processes. Once the algorithm has finished, every
72+
// process hosts one chunk of reduced output, in sequential order
73+
// (rank 0 has chunk 0, rank 1 has chunk 1, etc.). As the input may
74+
// not be divisible by the number of processes, the chunk on the
75+
// final ranks may have partial output or may be empty.
76+
//
77+
// As a chunk is passed along the ring and contains the reduction of
78+
// successively more ranks, we have to alternate between performing
79+
// I/O for that chunk and computing the reduction between the
80+
// received chunk and the local chunk. To avoid this alternating
81+
// pattern, we split up a chunk into multiple segments (>= 2), and
82+
// ensure we have one segment in flight while computing a reduction
83+
// on the other. The segment size has an upper bound to minimize
84+
// memory usage and avoid poor cache behavior. This means we may
85+
// have many segments per chunk when dealing with very large inputs.
86+
//
87+
// The nomenclature here is reflected in the variable naming below
88+
// (one chunk per rank and many segments per chunk).
89+
//
90+
const size_t totalBytes = opts.elements * opts.elementSize;
91+
92+
// Ensure that maximum segment size is a multiple of the element size.
93+
// Otherwise, the segment size can exceed the maximum segment size after
94+
// rounding it up to the nearest multiple of the element size.
95+
// For example, if maxSegmentSize = 10, and elementSize = 4,
96+
// then after rounding up: segmentSize = 12;
97+
const size_t maxSegmentSize =
98+
opts.elementSize * (opts.maxSegmentSize / opts.elementSize);
99+
100+
// The number of bytes per segment must be a multiple of the bytes
101+
// per element for the reduction to work; round up if necessary.
102+
const size_t segmentBytes = roundUp(
103+
std::min(
104+
// Rounded division to have >= 2 segments per chunk.
105+
(totalBytes + (context->size * 2 - 1)) / (context->size * 2),
106+
// Configurable segment size limit
107+
maxSegmentSize),
108+
opts.elementSize);
109+
110+
// Compute how many segments make up the input buffer.
111+
//
112+
// Round up to the nearest multiple of the context size such that
113+
// there is an equal number of segments per process and execution is
114+
// symmetric across processes.
115+
//
116+
// The minimum is twice the context size, because the algorithm
117+
// below overlaps sending/receiving a segment with computing the
118+
// reduction of the another segment.
119+
//
120+
const size_t numSegments = roundUp(
121+
std::max(
122+
(totalBytes + (segmentBytes - 1)) / segmentBytes,
123+
(size_t)context->size * 2),
124+
(size_t)context->size);
125+
GLOO_ENFORCE_EQ(numSegments % context->size, 0);
126+
GLOO_ENFORCE_GE(numSegments, context->size * 2);
127+
const size_t numSegmentsPerRank = numSegments / context->size;
128+
const size_t chunkBytes = numSegmentsPerRank * segmentBytes;
129+
130+
// Allocate scratch space to hold two chunks
131+
std::unique_ptr<uint8_t[]> tmpAllocation(new uint8_t[segmentBytes * 2]);
132+
std::unique_ptr<transport::UnboundBuffer> tmpBuffer =
133+
context->createUnboundBuffer(tmpAllocation.get(), segmentBytes * 2);
134+
transport::UnboundBuffer* tmp = tmpBuffer.get();
135+
136+
// Use dynamic lookup for chunk offset in the temporary buffer.
137+
// With two operations in flight we need two offsets.
138+
// They can be indexed using the loop counter.
139+
std::array<size_t, 2> segmentOffset;
140+
segmentOffset[0] = 0;
141+
segmentOffset[1] = segmentBytes;
142+
143+
// Function computes the offsets and lengths of the chunks to be
144+
// sent and received for a given chunk iteration.
145+
auto computeReduceScatterOffsets = [&](size_t i) {
146+
struct {
147+
size_t sendOffset;
148+
size_t recvOffset;
149+
ssize_t sendLength;
150+
ssize_t recvLength;
151+
} result;
152+
153+
// Compute segment index to send from (to rank - 1) and segment
154+
// index to receive into (from rank + 1). Multiply by the number
155+
// of bytes in a chunk to get to an offset. The offset is allowed
156+
// to be out of range (>= totalBytes) and this is taken into
157+
// account when computing the associated length.
158+
result.sendOffset =
159+
((((context->rank + 1) * numSegmentsPerRank) + i) * segmentBytes) %
160+
(numSegments * segmentBytes);
161+
result.recvOffset =
162+
((((context->rank + 2) * numSegmentsPerRank) + i) * segmentBytes) %
163+
(numSegments * segmentBytes);
164+
165+
// If the segment is entirely in range, the following statement is
166+
// equal to segmentBytes. If it isn't, it will be less, or even
167+
// negative. This is why the ssize_t typecasts are needed.
168+
result.sendLength = std::min(
169+
(ssize_t)segmentBytes,
170+
(ssize_t)totalBytes - (ssize_t)result.sendOffset);
171+
result.recvLength = std::min(
172+
(ssize_t)segmentBytes,
173+
(ssize_t)totalBytes - (ssize_t)result.recvOffset);
174+
175+
return result;
176+
};
177+
178+
for (auto i = 0; i < numSegments; i++) {
179+
if (i >= 2) {
180+
// Compute send and receive offsets and lengths two iterations
181+
// ago. Needed so we know when to wait for an operation and when
182+
// to ignore (when the offset was out of bounds), and know where
183+
// to reduce the contents of the temporary buffer.
184+
auto prev = computeReduceScatterOffsets(i - 2);
185+
if (prev.recvLength > 0) {
186+
tmp->waitRecv();
187+
opts.reduce(
188+
static_cast<uint8_t*>(out->ptr) + prev.recvOffset,
189+
static_cast<const uint8_t*>(in->ptr) + prev.recvOffset,
190+
static_cast<const uint8_t*>(tmp->ptr) + segmentOffset[i & 0x1],
191+
prev.recvLength / opts.elementSize);
192+
}
193+
if (prev.sendLength > 0) {
194+
if ((i - 2) < numSegmentsPerRank) {
195+
in->waitSend();
196+
} else {
197+
out->waitSend();
198+
}
199+
}
200+
}
201+
202+
// Issue new send and receive operation in all but the final two
203+
// iterations. At that point we have already sent all data we
204+
// needed to and only have to wait for the final segments to be
205+
// reduced into the output.
206+
if (i < (numSegments - 2)) {
207+
// Compute send and receive offsets and lengths for this iteration.
208+
auto cur = computeReduceScatterOffsets(i);
209+
if (cur.recvLength > 0) {
210+
tmp->recv(recvRank, slot, segmentOffset[i & 0x1], cur.recvLength);
211+
}
212+
if (cur.sendLength > 0) {
213+
if (i < numSegmentsPerRank) {
214+
in->send(sendRank, slot, cur.sendOffset, cur.sendLength);
215+
} else {
216+
out->send(sendRank, slot, cur.sendOffset, cur.sendLength);
217+
}
218+
}
219+
}
220+
}
221+
222+
// Gather to root rank.
223+
//
224+
// Beware: totalBytes <= (numSegments * segmentBytes), which is
225+
// incompatible with the generic gather algorithm where the
226+
// contribution is identical across processes.
227+
//
228+
if (context->rank == opts.root) {
229+
size_t numRecv = 0;
230+
for (size_t rank = 0; rank < context->size; rank++) {
231+
if (rank == context->rank) {
232+
continue;
233+
}
234+
size_t recvOffset = rank * numSegmentsPerRank * segmentBytes;
235+
ssize_t recvLength = std::min(
236+
(ssize_t)chunkBytes, (ssize_t)totalBytes - (ssize_t)recvOffset);
237+
if (recvLength > 0) {
238+
out->recv(rank, slot, recvOffset, recvLength);
239+
numRecv++;
240+
}
241+
}
242+
for (size_t i = 0; i < numRecv; i++) {
243+
out->waitRecv();
244+
}
245+
} else {
246+
size_t sendOffset = context->rank * numSegmentsPerRank * segmentBytes;
247+
ssize_t sendLength = std::min(
248+
(ssize_t)chunkBytes, (ssize_t)totalBytes - (ssize_t)sendOffset);
249+
if (sendLength > 0) {
250+
out->send(opts.root, slot, sendOffset, sendLength);
251+
out->waitSend();
252+
}
253+
}
254+
}
255+
256+
} // namespace gloo

gloo/reduce.h

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Copyright (c) 2018-present, Facebook, Inc.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree. An additional grant
7+
* of patent rights can be found in the PATENTS file in the same directory.
8+
*/
9+
10+
#pragma once
11+
12+
#include "gloo/context.h"
13+
#include "gloo/transport/unbound_buffer.h"
14+
15+
namespace gloo {
16+
17+
struct ReduceOptions {
18+
// The input and output buffers can either be specified as a unbound
19+
// buffer (that can be cached and reused by the caller), or a
20+
// literal pointer and number of elements stored at that pointer.
21+
std::unique_ptr<transport::UnboundBuffer> inBuffer;
22+
void* inPtr = nullptr;
23+
std::unique_ptr<transport::UnboundBuffer> outBuffer;
24+
void* outPtr = nullptr;
25+
26+
// Number of elements.
27+
size_t elements = 0;
28+
29+
// Number of bytes per element.
30+
size_t elementSize = 0;
31+
32+
// Rank of process to reduce to.
33+
int root = 0;
34+
35+
// Reduction function (output, input 1, input 2, number of elements).
36+
std::function<void(void*, const void*, const void*, size_t)> reduce;
37+
38+
// Tag for this gather operation.
39+
// Must be unique across operations executing in parallel.
40+
uint32_t tag = 0;
41+
42+
// This is the maximum size of each I/O operation (send/recv) of which
43+
// two are in flight at all times. A smaller value leads to more
44+
// overhead and a larger value leads to poor cache behavior.
45+
static constexpr size_t kMaxSegmentSize = 1024 * 1024;
46+
47+
// Internal use only. This is used to exercise code paths where we
48+
// have more than 2 segments per rank without making the tests slow
49+
// (because they would require millions of elements if the default
50+
// were not configurable).
51+
size_t maxSegmentSize = kMaxSegmentSize;
52+
};
53+
54+
void reduce(const std::shared_ptr<Context>& context, ReduceOptions& opts);
55+
56+
} // namespace gloo

gloo/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ set(GLOO_TEST_SRCS
88
"${CMAKE_CURRENT_SOURCE_DIR}/gather_test.cc"
99
"${CMAKE_CURRENT_SOURCE_DIR}/linux_test.cc"
1010
"${CMAKE_CURRENT_SOURCE_DIR}/main.cc"
11+
"${CMAKE_CURRENT_SOURCE_DIR}/reduce_test.cc"
1112
"${CMAKE_CURRENT_SOURCE_DIR}/send_recv_test.cc"
1213
)
1314

0 commit comments

Comments
 (0)