Skip to content

Commit cc9b416

Browse files
author
Ralph Castain
committed
Ensure we properly commit suicide if/when we lose connection to the daemon. There are multiple paths by which a lost daemon can be reported, and so a race condition exists in the pmix support. Our MPI layer wants the ability to determine the response to the failure, and so it will call down to the RTE with any abort request. This comes down to the pmix layer as a "pmix_abort" command, which involves communicating the request to the daemon - who is gone. Sadly, the pmix component may not know that just yet, and so we hang.
So add a brief timer event to kick us out of the communication. The precise amount of time we should wait is somewhat TBD, but set something short for now and we can adjust.
1 parent 8ab2b11 commit cc9b416

File tree

4 files changed

+94
-47
lines changed

4 files changed

+94
-47
lines changed

opal/mca/pmix/native/pmix_native.c

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,7 @@ const opal_pmix_base_module_t opal_pmix_native_module = {
102102

103103
// local variables
104104
static int init_cntr = 0;
105-
opal_process_name_t native_pname = {0};
106-
105+
static opal_process_name_t native_pname = {0};
107106

108107
/* callback for wait completion */
109108
static void wait_cbfunc(opal_buffer_t *buf, void *cbdata)
@@ -213,7 +212,7 @@ static int native_fini(void)
213212

214213
if (1 != init_cntr) {
215214
--init_cntr;
216-
return OPAL_SUCCESS;
215+
return OPAL_SUCCESS;
217216
}
218217
init_cntr = 0;
219218

@@ -275,12 +274,20 @@ static bool native_initialized(void)
275274
return false;
276275
}
277276

277+
static void timeout(int sd, short args, void *cbdata)
278+
{
279+
pmix_cb_t *cb = (pmix_cb_t*)cbdata;
280+
cb->active = false;
281+
}
282+
278283
static int native_abort(int flag, const char msg[])
279284
{
280285
opal_buffer_t *bfr;
281286
pmix_cmd_t cmd = PMIX_ABORT_CMD;
282287
int rc;
283288
pmix_cb_t *cb;
289+
opal_event_t ev;
290+
struct timeval tv = {1, 0};
284291

285292
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
286293
"%s pmix:native abort called",
@@ -291,39 +298,48 @@ static int native_abort(int flag, const char msg[])
291298
return OPAL_SUCCESS;
292299
}
293300

294-
/* create a buffer to hold the message */
295-
bfr = OBJ_NEW(opal_buffer_t);
296-
/* pack the cmd */
297-
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &cmd, 1, PMIX_CMD_T))) {
298-
OPAL_ERROR_LOG(rc);
299-
OBJ_RELEASE(bfr);
300-
return rc;
301-
}
302-
/* pack the status flag */
303-
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &flag, 1, OPAL_INT))) {
304-
OPAL_ERROR_LOG(rc);
305-
OBJ_RELEASE(bfr);
306-
return rc;
307-
}
308-
/* pack the string message - a NULL is okay */
309-
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &msg, 1, OPAL_STRING))) {
310-
OPAL_ERROR_LOG(rc);
311-
OBJ_RELEASE(bfr);
312-
return rc;
313-
}
301+
if (PMIX_USOCK_CONNECTED == mca_pmix_native_component.state) {
302+
/* create a buffer to hold the message */
303+
bfr = OBJ_NEW(opal_buffer_t);
304+
/* pack the cmd */
305+
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &cmd, 1, PMIX_CMD_T))) {
306+
OPAL_ERROR_LOG(rc);
307+
OBJ_RELEASE(bfr);
308+
return rc;
309+
}
310+
/* pack the status flag */
311+
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &flag, 1, OPAL_INT))) {
312+
OPAL_ERROR_LOG(rc);
313+
OBJ_RELEASE(bfr);
314+
return rc;
315+
}
316+
/* pack the string message - a NULL is okay */
317+
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &msg, 1, OPAL_STRING))) {
318+
OPAL_ERROR_LOG(rc);
319+
OBJ_RELEASE(bfr);
320+
return rc;
321+
}
314322

315-
/* create a callback object as we need to pass it to the
316-
* recv routine so we know which callback to use when
317-
* the return message is recvd */
318-
cb = OBJ_NEW(pmix_cb_t);
319-
cb->active = true;
323+
/* create a callback object as we need to pass it to the
324+
* recv routine so we know which callback to use when
325+
* the return message is recvd */
326+
cb = OBJ_NEW(pmix_cb_t);
327+
cb->active = true;
320328

321-
/* push the message into our event base to send to the server */
322-
PMIX_ACTIVATE_SEND_RECV(bfr, wait_cbfunc, cb);
329+
/* push a timeout event to wake us up just in case this
330+
* message cannot get thru - e.g., someone else may have
331+
* detected the failure of the server and ordered an abort */
332+
opal_event_evtimer_set(mca_pmix_native_component.evbase,
333+
&ev, timeout, cb);
334+
opal_event_evtimer_add(&ev, &tv);
323335

324-
/* wait for the release */
325-
PMIX_WAIT_FOR_COMPLETION(cb->active);
326-
OBJ_RELEASE(cb);
336+
/* push the message into our event base to send to the server */
337+
PMIX_ACTIVATE_SEND_RECV(bfr, wait_cbfunc, cb);
338+
339+
/* wait for the release */
340+
PMIX_WAIT_FOR_COMPLETION(cb->active);
341+
OBJ_RELEASE(cb);
342+
}
327343
return OPAL_SUCCESS;
328344
}
329345

opal/mca/pmix/native/pmix_native.h

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ typedef enum {
4141
PMIX_USOCK_ACCEPTING
4242
} pmix_usock_state_t;
4343

44+
/* define a macro for abnormal termination */
45+
#define PMIX_NATIVE_ABNORMAL_TERM \
46+
do { \
47+
mca_pmix_native_component.state = PMIX_USOCK_FAILED; \
48+
opal_pmix_base_errhandler(OPAL_ERR_COMM_FAILURE); \
49+
} while(0);
50+
4451
/* define a command type for communicating to the
4552
* pmix server */
4653
typedef uint8_t pmix_cmd_t;
@@ -202,12 +209,13 @@ OPAL_MODULE_DECLSPEC int usock_send_connect_ack(void);
202209
opal_event_active(&ms->ev, OPAL_EV_WRITE, 1); \
203210
} while(0);
204211

205-
#define CLOSE_THE_SOCKET(socket) \
206-
do { \
207-
shutdown(socket, 2); \
208-
close(socket); \
209-
/* notify the error handler */ \
210-
opal_pmix_base_errhandler(OPAL_ERR_COMM_FAILURE); \
212+
#define CLOSE_THE_SOCKET(socket) \
213+
do { \
214+
if (0 <= socket) { \
215+
shutdown(socket, 2); \
216+
close(socket); \
217+
socket = -1; \
218+
} \
211219
} while(0)
212220

213221

opal/mca/pmix/native/usock.c

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@
3535
#ifdef HAVE_SYS_TYPES_H
3636
#include <sys/types.h>
3737
#endif
38+
#ifdef HAVE_SYS_STAT_H
39+
#include <sys/stat.h>
40+
#endif
41+
#ifdef HAVE_FCNTL_H
42+
#include <fcntl.h>
43+
#endif
3844

3945
#include "opal_stdint.h"
4046
#include "opal/opal_socket_errno.h"
@@ -191,6 +197,7 @@ void pmix_usock_process_msg(int fd, short flags, void *cbdata)
191197
/* we get here if no matching recv was found - this is an error */
192198
opal_output(0, "%s UNEXPECTED MESSAGE",
193199
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
200+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
194201
OBJ_RELEASE(msg);
195202
}
196203

@@ -222,9 +229,10 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
222229
"usock_peer_try_connect: attempting to connect to server on socket %d",
223230
mca_pmix_native_component.sd);
224231
/* try to connect */
225-
if (connect(mca_pmix_native_component.sd, &mca_pmix_native_component.address, addrlen) < 0) {
232+
if (connect(mca_pmix_native_component.sd, (struct sockaddr*)&mca_pmix_native_component.address, addrlen) < 0) {
226233
if (opal_socket_errno == ETIMEDOUT) {
227-
/* The server may be too busy to accept new connections */
234+
/* The server may be too busy to accept new connections,
235+
* so cycle around and let it try again */
228236
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
229237
"timeout connecting to server");
230238
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
@@ -235,7 +243,7 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
235243
abort a connection that was ECONNREFUSED on the last
236244
attempt, without even trying to establish the
237245
connection. Handle that case in a semi-rational
238-
way by trying twice before giving up */
246+
way by trying again before giving up */
239247
if (ECONNABORTED == opal_socket_errno) {
240248
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
241249
"connection to server aborted by OS - retrying");
@@ -255,6 +263,7 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
255263
if (0 <= mca_pmix_native_component.sd) {
256264
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
257265
}
266+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
258267
return;
259268
}
260269

@@ -282,7 +291,7 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
282291
opal_event_set_priority(&mca_pmix_native_component.send_event, OPAL_EV_MSG_LO_PRI);
283292
mca_pmix_native_component.send_ev_active = false;
284293

285-
/* setup the socket as non-blocking */
294+
/* setup the socket as non-blocking */
286295
if ((flags = fcntl(mca_pmix_native_component.sd, F_GETFL, 0)) < 0) {
287296
opal_output(0, "usock_peer_connect: fcntl(F_GETFL) failed: %s (%d)\n",
288297
strerror(opal_socket_errno),
@@ -310,8 +319,8 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
310319
"usock_send_connect_ack to server failed: %s (%d)",
311320
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
312321
opal_strerror(rc), rc);
313-
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
314322
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
323+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
315324
return;
316325
}
317326
}

opal/mca/pmix/native/usock_sendrecv.c

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata)
176176
mca_pmix_native_component.send_ev_active = false;
177177
OBJ_RELEASE(msg);
178178
mca_pmix_native_component.send_msg = NULL;
179+
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
180+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
179181
return;
180182
}
181183
}
@@ -208,6 +210,8 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata)
208210
mca_pmix_native_component.send_ev_active = false;
209211
OBJ_RELEASE(msg);
210212
mca_pmix_native_component.send_msg = NULL;
213+
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
214+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
211215
return;
212216
}
213217
}
@@ -239,6 +243,8 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata)
239243
opal_event_del(&mca_pmix_native_component.send_event);
240244
mca_pmix_native_component.send_ev_active = false;
241245
}
246+
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
247+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
242248
break;
243249
}
244250
}
@@ -356,6 +362,8 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
356362
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
357363
opal_event_del(&mca_pmix_native_component.recv_event);
358364
mca_pmix_native_component.recv_ev_active = false;
365+
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
366+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
359367
return;
360368
}
361369
break;
@@ -372,6 +380,8 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
372380
if (NULL == mca_pmix_native_component.recv_msg) {
373381
opal_output(0, "%s usock_recv_handler: unable to allocate recv message\n",
374382
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
383+
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
384+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
375385
return;
376386
}
377387
/* start by reading the header */
@@ -416,6 +426,7 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
416426
"%s usock:recv:handler error reading bytes - closing connection",
417427
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
418428
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
429+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
419430
return;
420431
}
421432
}
@@ -447,6 +458,7 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
447458
opal_event_del(&mca_pmix_native_component.recv_event);
448459
mca_pmix_native_component.recv_ev_active = false;
449460
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
461+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
450462
return;
451463
}
452464
}
@@ -456,6 +468,7 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
456468
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
457469
mca_pmix_native_component.state);
458470
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
471+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
459472
break;
460473
}
461474
}
@@ -689,6 +702,7 @@ static void usock_complete_connect(void)
689702
opal_socket_errno);
690703
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
691704
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
705+
PMIX_NATIVE_ABNORMAL_TERM;
692706
return;
693707
}
694708

@@ -703,8 +717,8 @@ static void usock_complete_connect(void)
703717
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
704718
strerror(so_error),
705719
so_error);
706-
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
707720
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
721+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
708722
return;
709723
} else if (so_error != 0) {
710724
/* No need to worry about the return code here - we return regardless
@@ -715,8 +729,8 @@ static void usock_complete_connect(void)
715729
"connection to server failed with error %d",
716730
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
717731
so_error);
718-
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
719732
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
733+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
720734
return;
721735
}
722736

@@ -737,8 +751,8 @@ static void usock_complete_connect(void)
737751
} else {
738752
opal_output(0, "%s usock_complete_connect: unable to send connect ack to server",
739753
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
740-
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
741754
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
755+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
742756
}
743757
}
744758

0 commit comments

Comments
 (0)