@@ -54,6 +54,7 @@ static int xcast(orte_jobid_t job,
54
54
orte_rml_tag_t tag );
55
55
static int bad_allgather (orte_grpcomm_collective_t * coll );
56
56
static int bad_barrier (orte_grpcomm_collective_t * coll );
57
+ static int bad_modex (orte_grpcomm_collective_t * modex );
57
58
58
59
/* Module def */
59
60
orte_grpcomm_base_module_t orte_grpcomm_bad_module = {
@@ -62,7 +63,7 @@ orte_grpcomm_base_module_t orte_grpcomm_bad_module = {
62
63
xcast ,
63
64
bad_allgather ,
64
65
bad_barrier ,
65
- orte_grpcomm_base_modex
66
+ bad_modex
66
67
};
67
68
68
69
/**
@@ -133,29 +134,14 @@ static int xcast(orte_jobid_t job,
133
134
return rc ;
134
135
}
135
136
136
-
137
- static int bad_barrier (orte_grpcomm_collective_t * coll )
137
+ static void process_barrier (int fd , short args , void * cbdata )
138
138
{
139
+ orte_grpcomm_caddy_t * caddy = (orte_grpcomm_caddy_t * )cbdata ;
140
+ orte_grpcomm_collective_t * coll = caddy -> op ;
139
141
int rc ;
140
142
opal_buffer_t * buf ;
141
143
orte_namelist_t * nm ;
142
144
143
- OPAL_OUTPUT_VERBOSE ((1 , orte_grpcomm_base .output ,
144
- "%s grpcomm:bad entering barrier" ,
145
- ORTE_NAME_PRINT (ORTE_PROC_MY_NAME )));
146
-
147
- /* if I am alone, just execute the callback */
148
- if (1 == orte_process_info .num_procs ) {
149
- coll -> active = false;
150
- if (NULL != coll -> cbfunc ) {
151
- coll -> cbfunc (NULL , coll -> cbdata );
152
- }
153
- return ORTE_SUCCESS ;
154
- }
155
-
156
- /* mark the collective as active */
157
- coll -> active = true;
158
-
159
145
/* setup the collective */
160
146
opal_list_append (& orte_grpcomm_base .active_colls , & coll -> super );
161
147
@@ -183,41 +169,47 @@ static int bad_barrier(orte_grpcomm_collective_t *coll)
183
169
ORTE_ERROR_LOG (rc );
184
170
OBJ_RELEASE (buf );
185
171
opal_list_remove_item (& orte_grpcomm_base .active_colls , & coll -> super );
186
- return rc ;
172
+ return ;
187
173
}
188
174
189
175
OPAL_OUTPUT_VERBOSE ((2 , orte_grpcomm_base .output ,
190
176
"%s grpcomm:bad barrier underway" ,
191
177
ORTE_NAME_PRINT (ORTE_PROC_MY_NAME )));
192
-
193
- return rc ;
194
178
}
195
179
196
- static int bad_allgather (orte_grpcomm_collective_t * gather )
180
+ static int bad_barrier (orte_grpcomm_collective_t * coll )
197
181
{
198
- int rc ;
199
- opal_buffer_t * buf ;
200
- orte_namelist_t * nm ;
201
- opal_list_item_t * item ;
202
-
203
182
OPAL_OUTPUT_VERBOSE ((1 , orte_grpcomm_base .output ,
204
- "%s grpcomm:bad entering allgather " ,
183
+ "%s grpcomm:bad entering barrier " ,
205
184
ORTE_NAME_PRINT (ORTE_PROC_MY_NAME )));
206
185
207
- /* if I am alone and nobody else is participating, then
208
- * nothing really to do
209
- */
210
- if (1 == orte_process_info .num_procs &&
211
- 0 == opal_list_get_size (& gather -> participants )) {
212
- gather -> active = false;
213
- if (NULL != gather -> cbfunc ) {
214
- gather -> cbfunc (& gather -> buffer , gather -> cbdata );
186
+ /* if I am alone, just execute the callback */
187
+ if (1 == orte_process_info .num_procs ) {
188
+ coll -> active = false;
189
+ if (NULL != coll -> cbfunc ) {
190
+ coll -> cbfunc (NULL , coll -> cbdata );
215
191
}
216
192
return ORTE_SUCCESS ;
217
193
}
218
194
219
195
/* mark the collective as active */
220
- gather -> active = true;
196
+ coll -> active = true;
197
+
198
+ /* push it into the event library for processing as
199
+ * we will be accessing global lists
200
+ */
201
+ ORTE_GRPCOMM_ACTIVATE (coll , process_barrier );
202
+ return ORTE_SUCCESS ;
203
+ }
204
+
205
+ static void process_allgather (int fd , short args , void * cbdata )
206
+ {
207
+ orte_grpcomm_caddy_t * caddy = (orte_grpcomm_caddy_t * )cbdata ;
208
+ orte_grpcomm_collective_t * gather = caddy -> op ;
209
+ int rc ;
210
+ opal_buffer_t * buf ;
211
+ orte_namelist_t * nm ;
212
+ opal_list_item_t * item ;
221
213
222
214
/* if this is an original request, then record the collective */
223
215
if (NULL == gather -> next_cb ) {
@@ -250,7 +242,7 @@ static int bad_allgather(orte_grpcomm_collective_t *gather)
250
242
ORTE_ERROR_LOG (rc );
251
243
OBJ_RELEASE (buf );
252
244
opal_list_remove_item (& orte_grpcomm_base .active_colls , & gather -> super );
253
- return rc ;
245
+ return ;
254
246
}
255
247
} else {
256
248
/* send directly to each participant - note that this will
@@ -274,15 +266,54 @@ static int bad_allgather(orte_grpcomm_collective_t *gather)
274
266
ORTE_ERROR_LOG (rc );
275
267
OBJ_RELEASE (buf );
276
268
opal_list_remove_item (& orte_grpcomm_base .active_colls , & gather -> super );
277
- return rc ;
269
+ return ;
278
270
}
279
271
}
280
- return ORTE_SUCCESS ;
272
+ return ;
281
273
}
282
274
283
275
OPAL_OUTPUT_VERBOSE ((1 , orte_grpcomm_base .output ,
284
276
"%s grpcomm:bad allgather underway" ,
285
277
ORTE_NAME_PRINT (ORTE_PROC_MY_NAME )));
278
+ }
279
+
280
+ static int bad_allgather (orte_grpcomm_collective_t * gather )
281
+ {
282
+ OPAL_OUTPUT_VERBOSE ((1 , orte_grpcomm_base .output ,
283
+ "%s grpcomm:bad entering allgather" ,
284
+ ORTE_NAME_PRINT (ORTE_PROC_MY_NAME )));
285
+
286
+ /* if I am alone and nobody else is participating, then
287
+ * nothing really to do
288
+ */
289
+ if (1 == orte_process_info .num_procs &&
290
+ 0 == opal_list_get_size (& gather -> participants )) {
291
+ gather -> active = false;
292
+ if (NULL != gather -> cbfunc ) {
293
+ gather -> cbfunc (& gather -> buffer , gather -> cbdata );
294
+ }
295
+ return ORTE_SUCCESS ;
296
+ }
286
297
298
+ /* mark the collective as active */
299
+ gather -> active = true;
300
+
301
+ /* push it into the event library for processing as
302
+ * we will be accessing global lists
303
+ */
304
+ ORTE_GRPCOMM_ACTIVATE (gather , process_allgather );
305
+ return ORTE_SUCCESS ;
306
+ }
307
+
308
+ static int bad_modex (orte_grpcomm_collective_t * modex )
309
+ {
310
+ /* mark the collective as active */
311
+ modex -> active = true;
312
+
313
+ /* we need to get this into the event library
314
+ * to avoid race conditions with modex data arriving
315
+ * from other sources via the RML
316
+ */
317
+ ORTE_GRPCOMM_ACTIVATE (modex , orte_grpcomm_base_modex );
287
318
return ORTE_SUCCESS ;
288
319
}
0 commit comments