Skip to content

Commit 0fe756d

Browse files
committed
mpi: retain operation and datatype in non blocking collectives
MPI standard states a user MPI_Op and/or user MPI_Datatype can be free'd after a call to a non blocking collective and before the non-blocking collective completes. Retain user (only) MPI_Op and MPI_Datatype when the non blocking call is invoked, and set a request callback so they are free'd when the MPI_Request completes. Thanks Thomas Ponweiser for reporting this Fixes #2151 Fixes #1304 Signed-off-by: Gilles Gouaillardet <[email protected]>
1 parent d0dc621 commit 0fe756d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+649
-139
lines changed

ompi/mca/coll/base/coll_base_util.c

Lines changed: 187 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
* University of Stuttgart. All rights reserved.
1010
* Copyright (c) 2004-2005 The Regents of the University of California.
1111
* All rights reserved.
12-
* Copyright (c) 2014-2017 Research Organization for Information Science
13-
* and Technology (RIST). All rights reserved.
12+
* Copyright (c) 2014-2019 Research Organization for Information Science
13+
* and Technology (RIST). All rights reserved.
1414
* $COPYRIGHT$
1515
*
1616
* Additional copyrights may follow
@@ -26,6 +26,7 @@
2626
#include "ompi/communicator/communicator.h"
2727
#include "ompi/mca/coll/base/coll_tags.h"
2828
#include "ompi/mca/coll/base/coll_base_functions.h"
29+
#include "ompi/mca/topo/base/base.h"
2930
#include "ompi/mca/pml/pml.h"
3031
#include "coll_base_util.h"
3132

@@ -103,3 +104,187 @@ int ompi_rounddown(int num, int factor)
103104
num /= factor;
104105
return num * factor; /* floor(num / factor) * factor */
105106
}
107+
108+
static void release_objs_callback(struct ompi_coll_base_nbc_request_t *request) {
109+
if (NULL != request->data.objs.objs[0]) {
110+
OBJ_RELEASE(request->data.objs.objs[0]);
111+
}
112+
if (NULL != request->data.objs.objs[1]) {
113+
OBJ_RELEASE(request->data.objs.objs[1]);
114+
}
115+
}
116+
117+
static int complete_objs_callback(struct ompi_request_t *req) {
118+
struct ompi_coll_base_nbc_request_t *request = (ompi_coll_base_nbc_request_t *)req;
119+
int rc = OMPI_SUCCESS;
120+
assert (NULL != request);
121+
if (NULL != request->cb.req_complete_cb) {
122+
rc = request->cb.req_complete_cb(request->req_complete_cb_data);
123+
}
124+
release_objs_callback(request);
125+
return rc;
126+
}
127+
128+
static int free_objs_callback(struct ompi_request_t **rptr) {
129+
struct ompi_coll_base_nbc_request_t *request = *(ompi_coll_base_nbc_request_t **)rptr;
130+
int rc = OMPI_SUCCESS;
131+
if (NULL != request->cb.req_free) {
132+
rc = request->cb.req_free(rptr);
133+
}
134+
release_objs_callback(request);
135+
return rc;
136+
}
137+
138+
int ompi_coll_base_retain_op( ompi_request_t *req, ompi_op_t *op,
139+
ompi_datatype_t *type) {
140+
ompi_coll_base_nbc_request_t *request = (ompi_coll_base_nbc_request_t *)req;
141+
bool retain = false;
142+
if (!ompi_op_is_intrinsic(op)) {
143+
OBJ_RETAIN(op);
144+
request->data.op.op = op;
145+
retain = true;
146+
}
147+
if (!ompi_datatype_is_predefined(type)) {
148+
OBJ_RETAIN(type);
149+
request->data.op.datatype = type;
150+
retain = true;
151+
}
152+
if (OPAL_UNLIKELY(retain)) {
153+
/* We need to consider two cases :
154+
* - non blocking collectives:
155+
* the objects can be released when MPI_Wait() completes
156+
* and we use the req_complete_cb callback
157+
* - persistent non blocking collectives:
158+
* the objects can only be released when the request is freed
159+
* (e.g. MPI_Request_free() completes) and we use req_free callback
160+
*/
161+
if (req->req_persistent) {
162+
request->cb.req_free = req->req_free;
163+
req->req_free = free_objs_callback;
164+
} else {
165+
request->cb.req_complete_cb = req->req_complete_cb;
166+
request->req_complete_cb_data = req->req_complete_cb_data;
167+
req->req_complete_cb = complete_objs_callback;
168+
req->req_complete_cb_data = request;
169+
}
170+
}
171+
return OMPI_SUCCESS;
172+
}
173+
174+
int ompi_coll_base_retain_datatypes( ompi_request_t *req, ompi_datatype_t *stype,
175+
ompi_datatype_t *rtype) {
176+
ompi_coll_base_nbc_request_t *request = (ompi_coll_base_nbc_request_t *)req;
177+
bool retain = false;
178+
if (NULL != stype && !ompi_datatype_is_predefined(stype)) {
179+
OBJ_RETAIN(stype);
180+
request->data.types.stype = stype;
181+
retain = true;
182+
}
183+
if (NULL != rtype && !ompi_datatype_is_predefined(rtype)) {
184+
OBJ_RETAIN(rtype);
185+
request->data.types.rtype = rtype;
186+
retain = true;
187+
}
188+
if (OPAL_UNLIKELY(retain)) {
189+
if (req->req_persistent) {
190+
request->cb.req_free = req->req_free;
191+
req->req_free = free_objs_callback;
192+
} else {
193+
request->cb.req_complete_cb = req->req_complete_cb;
194+
request->req_complete_cb_data = req->req_complete_cb_data;
195+
req->req_complete_cb = complete_objs_callback;
196+
req->req_complete_cb_data = request;
197+
}
198+
}
199+
return OMPI_SUCCESS;
200+
}
201+
202+
static void release_vecs_callback(ompi_coll_base_nbc_request_t *request) {
203+
ompi_communicator_t *comm = request->super.req_mpi_object.comm;
204+
int scount, rcount;
205+
if (OMPI_COMM_IS_TOPO(comm)) {
206+
(void)mca_topo_base_neighbor_count (comm, &rcount, &scount);
207+
} else {
208+
scount = rcount = OMPI_COMM_IS_INTER(comm)?ompi_comm_remote_size(comm):ompi_comm_size(comm);
209+
}
210+
for (int i=0; i<scount; i++) {
211+
if (NULL != request->data.vecs.stypes && NULL != request->data.vecs.stypes[i]) {
212+
OMPI_DATATYPE_RELEASE(request->data.vecs.stypes[i]);
213+
}
214+
}
215+
for (int i=0; i<rcount; i++) {
216+
if (NULL != request->data.vecs.rtypes && NULL != request->data.vecs.rtypes[i]) {
217+
OMPI_DATATYPE_RELEASE(request->data.vecs.rtypes[i]);
218+
}
219+
}
220+
}
221+
222+
static int complete_vecs_callback(struct ompi_request_t *req) {
223+
ompi_coll_base_nbc_request_t *request = (ompi_coll_base_nbc_request_t *)req;
224+
int rc = OMPI_SUCCESS;
225+
assert (NULL != request);
226+
if (NULL != request->cb.req_complete_cb) {
227+
rc = request->cb.req_complete_cb(request->req_complete_cb_data);
228+
}
229+
release_vecs_callback(request);
230+
return rc;
231+
}
232+
233+
static int free_vecs_callback(struct ompi_request_t **rptr) {
234+
struct ompi_coll_base_nbc_request_t *request = *(ompi_coll_base_nbc_request_t **)rptr;
235+
int rc = OMPI_SUCCESS;
236+
if (NULL != request->cb.req_free) {
237+
rc = request->cb.req_free(rptr);
238+
}
239+
release_vecs_callback(request);
240+
return rc;
241+
}
242+
243+
int ompi_coll_base_retain_datatypes_w( ompi_request_t *req,
244+
ompi_datatype_t *stypes[], ompi_datatype_t *rtypes[]) {
245+
ompi_coll_base_nbc_request_t *request = (ompi_coll_base_nbc_request_t *)req;
246+
bool retain = false;
247+
ompi_communicator_t *comm = request->super.req_mpi_object.comm;
248+
int scount, rcount;
249+
if (OMPI_COMM_IS_TOPO(comm)) {
250+
(void)mca_topo_base_neighbor_count (comm, &rcount, &scount);
251+
} else {
252+
scount = rcount = OMPI_COMM_IS_INTER(comm)?ompi_comm_remote_size(comm):ompi_comm_size(comm);
253+
}
254+
255+
for (int i=0; i<scount; i++) {
256+
if (NULL != stypes && NULL != stypes[i] && !ompi_datatype_is_predefined(stypes[i])) {
257+
OBJ_RETAIN(stypes[i]);
258+
retain = true;
259+
}
260+
}
261+
for (int i=0; i<rcount; i++) {
262+
if (NULL != rtypes && NULL != rtypes[i] && !ompi_datatype_is_predefined(rtypes[i])) {
263+
OBJ_RETAIN(rtypes[i]);
264+
retain = true;
265+
}
266+
}
267+
if (OPAL_UNLIKELY(retain)) {
268+
request->data.vecs.stypes = stypes;
269+
request->data.vecs.rtypes = rtypes;
270+
if (req->req_persistent) {
271+
request->cb.req_free = req->req_free;
272+
req->req_free = free_vecs_callback;
273+
} else {
274+
request->cb.req_complete_cb = req->req_complete_cb;
275+
request->req_complete_cb_data = req->req_complete_cb_data;
276+
req->req_complete_cb = complete_vecs_callback;
277+
req->req_complete_cb_data = request;
278+
}
279+
}
280+
return OMPI_SUCCESS;
281+
}
282+
283+
static void nbc_req_cons(ompi_coll_base_nbc_request_t *req) {
284+
req->cb.req_complete_cb = NULL;
285+
req->req_complete_cb_data = NULL;
286+
req->data.objs.objs[0] = NULL;
287+
req->data.objs.objs[1] = NULL;
288+
}
289+
290+
OBJ_CLASS_INSTANCE(ompi_coll_base_nbc_request_t, ompi_request_t, nbc_req_cons, NULL);

ompi/mca/coll/base/coll_base_util.h

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
* University of Stuttgart. All rights reserved.
1010
* Copyright (c) 2004-2005 The Regents of the University of California.
1111
* All rights reserved.
12-
* Copyright (c) 2014-2017 Research Organization for Information Science
13-
* and Technology (RIST). All rights reserved.
12+
* Copyright (c) 2014-2019 Research Organization for Information Science
13+
* and Technology (RIST). All rights reserved.
1414
* $COPYRIGHT$
1515
*
1616
* Additional copyrights may follow
@@ -27,10 +27,41 @@
2727
#include "ompi/mca/mca.h"
2828
#include "ompi/datatype/ompi_datatype.h"
2929
#include "ompi/request/request.h"
30+
#include "ompi/op/op.h"
3031
#include "ompi/mca/pml/pml.h"
3132

3233
BEGIN_C_DECLS
3334

35+
struct ompi_coll_base_nbc_request_t {
36+
ompi_request_t super;
37+
union {
38+
ompi_request_complete_fn_t req_complete_cb;
39+
ompi_request_free_fn_t req_free;
40+
} cb;
41+
void *req_complete_cb_data;
42+
union {
43+
struct {
44+
ompi_op_t *op;
45+
ompi_datatype_t *datatype;
46+
} op;
47+
struct {
48+
ompi_datatype_t *stype;
49+
ompi_datatype_t *rtype;
50+
} types;
51+
struct {
52+
opal_object_t *objs[2];
53+
} objs;
54+
struct {
55+
ompi_datatype_t **stypes;
56+
ompi_datatype_t **rtypes;
57+
} vecs;
58+
} data;
59+
};
60+
61+
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_coll_base_nbc_request_t);
62+
63+
typedef struct ompi_coll_base_nbc_request_t ompi_coll_base_nbc_request_t;
64+
3465
/**
3566
* A MPI_like function doing a send and a receive simultaneously.
3667
* If one of the communications results in a zero-byte message the
@@ -84,5 +115,17 @@ unsigned int ompi_mirror_perm(unsigned int x, int nbits);
84115
*/
85116
int ompi_rounddown(int num, int factor);
86117

118+
int ompi_coll_base_retain_op( ompi_request_t *request,
119+
ompi_op_t *op,
120+
ompi_datatype_t *type);
121+
122+
int ompi_coll_base_retain_datatypes( ompi_request_t *request,
123+
ompi_datatype_t *stype,
124+
ompi_datatype_t *rtype);
125+
126+
int ompi_coll_base_retain_datatypes_w( ompi_request_t *request,
127+
ompi_datatype_t *stypes[],
128+
ompi_datatype_t *rtypes[]);
129+
87130
END_C_DECLS
88131
#endif /* MCA_COLL_BASE_UTIL_EXPORT_H */

ompi/mca/coll/libnbc/coll_libnbc.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
1414
* Copyright (c) 2013-2015 Los Alamos National Security, LLC. All rights
1515
* reserved.
16-
* Copyright (c) 2014-2017 Research Organization for Information Science
17-
* and Technology (RIST). All rights reserved.
16+
* Copyright (c) 2014-2019 Research Organization for Information Science
17+
* and Technology (RIST). All rights reserved.
1818
* Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
1919
* Copyright (c) 2018 FUJITSU LIMITED. All rights reserved.
2020
* $COPYRIGHT$
@@ -28,7 +28,7 @@
2828
#define MCA_COLL_LIBNBC_EXPORT_H
2929

3030
#include "ompi/mca/coll/coll.h"
31-
#include "ompi/request/request.h"
31+
#include "ompi/mca/coll/base/coll_base_util.h"
3232
#include "opal/sys/atomic.h"
3333

3434
BEGIN_C_DECLS
@@ -121,7 +121,7 @@ typedef struct NBC_Schedule NBC_Schedule;
121121
OBJ_CLASS_DECLARATION(NBC_Schedule);
122122

123123
struct ompi_coll_libnbc_request_t {
124-
ompi_request_t super;
124+
ompi_coll_base_nbc_request_t super;
125125
MPI_Comm comm;
126126
long row_offset;
127127
bool nbc_complete; /* status in libnbc level */
@@ -145,13 +145,13 @@ typedef ompi_coll_libnbc_request_t NBC_Handle;
145145
opal_free_list_item_t *item; \
146146
item = opal_free_list_wait (&mca_coll_libnbc_component.requests); \
147147
req = (ompi_coll_libnbc_request_t*) item; \
148-
OMPI_REQUEST_INIT(&req->super, persistent); \
149-
req->super.req_mpi_object.comm = comm; \
148+
OMPI_REQUEST_INIT(&req->super.super, persistent); \
149+
req->super.super.req_mpi_object.comm = comm; \
150150
} while (0)
151151

152152
#define OMPI_COLL_LIBNBC_REQUEST_RETURN(req) \
153153
do { \
154-
OMPI_REQUEST_FINI(&(req)->super); \
154+
OMPI_REQUEST_FINI(&(req)->super.super); \
155155
opal_free_list_return (&mca_coll_libnbc_component.requests, \
156156
(opal_free_list_item_t*) (req)); \
157157
} while (0)

ompi/mca/coll/libnbc/coll_libnbc_component.c

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
1414
* Copyright (c) 2013-2015 Los Alamos National Security, LLC. All rights
1515
* reserved.
16-
* Copyright (c) 2016-2017 Research Organization for Information Science
17-
* and Technology (RIST). All rights reserved.
16+
* Copyright (c) 2016-2019 Research Organization for Information Science
17+
* and Technology (RIST). All rights reserved.
1818
* Copyright (c) 2016 IBM Corporation. All rights reserved.
1919
* Copyright (c) 2017 Ian Bradley Morgan and Anthony Skjellum. All
2020
* rights reserved.
@@ -448,21 +448,21 @@ ompi_coll_libnbc_progress(void)
448448
/* done, remove and complete */
449449
OPAL_THREAD_LOCK(&mca_coll_libnbc_component.lock);
450450
opal_list_remove_item(&mca_coll_libnbc_component.active_requests,
451-
&request->super.super.super);
451+
&request->super.super.super.super);
452452
OPAL_THREAD_UNLOCK(&mca_coll_libnbc_component.lock);
453453

454454
if( OMPI_SUCCESS == res || NBC_OK == res || NBC_SUCCESS == res ) {
455-
request->super.req_status.MPI_ERROR = OMPI_SUCCESS;
455+
request->super.super.req_status.MPI_ERROR = OMPI_SUCCESS;
456456
}
457457
else {
458-
request->super.req_status.MPI_ERROR = res;
458+
request->super.super.req_status.MPI_ERROR = res;
459459
}
460-
if(request->super.req_persistent) {
460+
if(request->super.super.req_persistent) {
461461
/* reset for the next communication */
462462
request->row_offset = 0;
463463
}
464-
if(!request->super.req_persistent || !REQUEST_COMPLETE(&request->super)) {
465-
ompi_request_complete(&request->super, true);
464+
if(!request->super.super.req_persistent || !REQUEST_COMPLETE(&request->super.super)) {
465+
ompi_request_complete(&request->super.super, true);
466466
}
467467
}
468468
OPAL_THREAD_LOCK(&mca_coll_libnbc_component.lock);
@@ -527,7 +527,7 @@ request_start(size_t count, ompi_request_t ** requests)
527527
NBC_DEBUG(5, "tmpbuf address=%p size=%u\n", handle->tmpbuf, sizeof(handle->tmpbuf));
528528
NBC_DEBUG(5, "--------------------------------\n");
529529

530-
handle->super.req_complete = REQUEST_PENDING;
530+
handle->super.super.req_complete = REQUEST_PENDING;
531531
handle->nbc_complete = false;
532532

533533
res = NBC_Start(handle);
@@ -557,7 +557,7 @@ request_free(struct ompi_request_t **ompi_req)
557557
ompi_coll_libnbc_request_t *request =
558558
(ompi_coll_libnbc_request_t*) *ompi_req;
559559

560-
if( !REQUEST_COMPLETE(&request->super) ) {
560+
if( !REQUEST_COMPLETE(&request->super.super) ) {
561561
return MPI_ERR_REQUEST;
562562
}
563563

@@ -571,11 +571,11 @@ request_free(struct ompi_request_t **ompi_req)
571571
static void
572572
request_construct(ompi_coll_libnbc_request_t *request)
573573
{
574-
request->super.req_type = OMPI_REQUEST_COLL;
575-
request->super.req_status._cancelled = 0;
576-
request->super.req_start = request_start;
577-
request->super.req_free = request_free;
578-
request->super.req_cancel = request_cancel;
574+
request->super.super.req_type = OMPI_REQUEST_COLL;
575+
request->super.super.req_status._cancelled = 0;
576+
request->super.super.req_start = request_start;
577+
request->super.super.req_free = request_free;
578+
request->super.super.req_cancel = request_cancel;
579579
}
580580

581581

0 commit comments

Comments
 (0)