Skip to content

Commit 5d38283

Browse files
author
rhc54
committed
Merge pull request #647 from rhc54/topic/hangs
Ensure we properly commit suicide if/when we lose connection to the daemon.
2 parents 7068846 + cc9b416 commit 5d38283

File tree

4 files changed

+94
-47
lines changed

4 files changed

+94
-47
lines changed

opal/mca/pmix/native/pmix_native.c

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,7 @@ const opal_pmix_base_module_t opal_pmix_native_module = {
102102

103103
// local variables
104104
static int init_cntr = 0;
105-
opal_process_name_t native_pname = {0};
106-
105+
static opal_process_name_t native_pname = {0};
107106

108107
/* callback for wait completion */
109108
static void wait_cbfunc(opal_buffer_t *buf, void *cbdata)
@@ -213,7 +212,7 @@ static int native_fini(void)
213212

214213
if (1 != init_cntr) {
215214
--init_cntr;
216-
return OPAL_SUCCESS;
215+
return OPAL_SUCCESS;
217216
}
218217
init_cntr = 0;
219218

@@ -275,12 +274,20 @@ static bool native_initialized(void)
275274
return false;
276275
}
277276

277+
static void timeout(int sd, short args, void *cbdata)
278+
{
279+
pmix_cb_t *cb = (pmix_cb_t*)cbdata;
280+
cb->active = false;
281+
}
282+
278283
static int native_abort(int flag, const char msg[])
279284
{
280285
opal_buffer_t *bfr;
281286
pmix_cmd_t cmd = PMIX_ABORT_CMD;
282287
int rc;
283288
pmix_cb_t *cb;
289+
opal_event_t ev;
290+
struct timeval tv = {1, 0};
284291

285292
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
286293
"%s pmix:native abort called",
@@ -291,39 +298,48 @@ static int native_abort(int flag, const char msg[])
291298
return OPAL_SUCCESS;
292299
}
293300

294-
/* create a buffer to hold the message */
295-
bfr = OBJ_NEW(opal_buffer_t);
296-
/* pack the cmd */
297-
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &cmd, 1, PMIX_CMD_T))) {
298-
OPAL_ERROR_LOG(rc);
299-
OBJ_RELEASE(bfr);
300-
return rc;
301-
}
302-
/* pack the status flag */
303-
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &flag, 1, OPAL_INT))) {
304-
OPAL_ERROR_LOG(rc);
305-
OBJ_RELEASE(bfr);
306-
return rc;
307-
}
308-
/* pack the string message - a NULL is okay */
309-
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &msg, 1, OPAL_STRING))) {
310-
OPAL_ERROR_LOG(rc);
311-
OBJ_RELEASE(bfr);
312-
return rc;
313-
}
301+
if (PMIX_USOCK_CONNECTED == mca_pmix_native_component.state) {
302+
/* create a buffer to hold the message */
303+
bfr = OBJ_NEW(opal_buffer_t);
304+
/* pack the cmd */
305+
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &cmd, 1, PMIX_CMD_T))) {
306+
OPAL_ERROR_LOG(rc);
307+
OBJ_RELEASE(bfr);
308+
return rc;
309+
}
310+
/* pack the status flag */
311+
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &flag, 1, OPAL_INT))) {
312+
OPAL_ERROR_LOG(rc);
313+
OBJ_RELEASE(bfr);
314+
return rc;
315+
}
316+
/* pack the string message - a NULL is okay */
317+
if (OPAL_SUCCESS != (rc = opal_dss.pack(bfr, &msg, 1, OPAL_STRING))) {
318+
OPAL_ERROR_LOG(rc);
319+
OBJ_RELEASE(bfr);
320+
return rc;
321+
}
314322

315-
/* create a callback object as we need to pass it to the
316-
* recv routine so we know which callback to use when
317-
* the return message is recvd */
318-
cb = OBJ_NEW(pmix_cb_t);
319-
cb->active = true;
323+
/* create a callback object as we need to pass it to the
324+
* recv routine so we know which callback to use when
325+
* the return message is recvd */
326+
cb = OBJ_NEW(pmix_cb_t);
327+
cb->active = true;
320328

321-
/* push the message into our event base to send to the server */
322-
PMIX_ACTIVATE_SEND_RECV(bfr, wait_cbfunc, cb);
329+
/* push a timeout event to wake us up just in case this
330+
* message cannot get thru - e.g., someone else may have
331+
* detected the failure of the server and ordered an abort */
332+
opal_event_evtimer_set(mca_pmix_native_component.evbase,
333+
&ev, timeout, cb);
334+
opal_event_evtimer_add(&ev, &tv);
323335

324-
/* wait for the release */
325-
PMIX_WAIT_FOR_COMPLETION(cb->active);
326-
OBJ_RELEASE(cb);
336+
/* push the message into our event base to send to the server */
337+
PMIX_ACTIVATE_SEND_RECV(bfr, wait_cbfunc, cb);
338+
339+
/* wait for the release */
340+
PMIX_WAIT_FOR_COMPLETION(cb->active);
341+
OBJ_RELEASE(cb);
342+
}
327343
return OPAL_SUCCESS;
328344
}
329345

opal/mca/pmix/native/pmix_native.h

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ typedef enum {
4141
PMIX_USOCK_ACCEPTING
4242
} pmix_usock_state_t;
4343

44+
/* define a macro for abnormal termination */
45+
#define PMIX_NATIVE_ABNORMAL_TERM \
46+
do { \
47+
mca_pmix_native_component.state = PMIX_USOCK_FAILED; \
48+
opal_pmix_base_errhandler(OPAL_ERR_COMM_FAILURE); \
49+
} while(0);
50+
4451
/* define a command type for communicating to the
4552
* pmix server */
4653
typedef uint8_t pmix_cmd_t;
@@ -202,12 +209,13 @@ OPAL_MODULE_DECLSPEC int usock_send_connect_ack(void);
202209
opal_event_active(&ms->ev, OPAL_EV_WRITE, 1); \
203210
} while(0);
204211

205-
#define CLOSE_THE_SOCKET(socket) \
206-
do { \
207-
shutdown(socket, 2); \
208-
close(socket); \
209-
/* notify the error handler */ \
210-
opal_pmix_base_errhandler(OPAL_ERR_COMM_FAILURE); \
212+
#define CLOSE_THE_SOCKET(socket) \
213+
do { \
214+
if (0 <= socket) { \
215+
shutdown(socket, 2); \
216+
close(socket); \
217+
socket = -1; \
218+
} \
211219
} while(0)
212220

213221

opal/mca/pmix/native/usock.c

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@
3535
#ifdef HAVE_SYS_TYPES_H
3636
#include <sys/types.h>
3737
#endif
38+
#ifdef HAVE_SYS_STAT_H
39+
#include <sys/stat.h>
40+
#endif
41+
#ifdef HAVE_FCNTL_H
42+
#include <fcntl.h>
43+
#endif
3844

3945
#include "opal_stdint.h"
4046
#include "opal/opal_socket_errno.h"
@@ -191,6 +197,7 @@ void pmix_usock_process_msg(int fd, short flags, void *cbdata)
191197
/* we get here if no matching recv was found - this is an error */
192198
opal_output(0, "%s UNEXPECTED MESSAGE",
193199
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
200+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
194201
OBJ_RELEASE(msg);
195202
}
196203

@@ -222,9 +229,10 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
222229
"usock_peer_try_connect: attempting to connect to server on socket %d",
223230
mca_pmix_native_component.sd);
224231
/* try to connect */
225-
if (connect(mca_pmix_native_component.sd, &mca_pmix_native_component.address, addrlen) < 0) {
232+
if (connect(mca_pmix_native_component.sd, (struct sockaddr*)&mca_pmix_native_component.address, addrlen) < 0) {
226233
if (opal_socket_errno == ETIMEDOUT) {
227-
/* The server may be too busy to accept new connections */
234+
/* The server may be too busy to accept new connections,
235+
* so cycle around and let it try again */
228236
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
229237
"timeout connecting to server");
230238
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
@@ -235,7 +243,7 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
235243
abort a connection that was ECONNREFUSED on the last
236244
attempt, without even trying to establish the
237245
connection. Handle that case in a semi-rational
238-
way by trying twice before giving up */
246+
way by trying again before giving up */
239247
if (ECONNABORTED == opal_socket_errno) {
240248
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
241249
"connection to server aborted by OS - retrying");
@@ -255,6 +263,7 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
255263
if (0 <= mca_pmix_native_component.sd) {
256264
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
257265
}
266+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
258267
return;
259268
}
260269

@@ -282,7 +291,7 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
282291
opal_event_set_priority(&mca_pmix_native_component.send_event, OPAL_EV_MSG_LO_PRI);
283292
mca_pmix_native_component.send_ev_active = false;
284293

285-
/* setup the socket as non-blocking */
294+
/* setup the socket as non-blocking */
286295
if ((flags = fcntl(mca_pmix_native_component.sd, F_GETFL, 0)) < 0) {
287296
opal_output(0, "usock_peer_connect: fcntl(F_GETFL) failed: %s (%d)\n",
288297
strerror(opal_socket_errno),
@@ -310,8 +319,8 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata)
310319
"usock_send_connect_ack to server failed: %s (%d)",
311320
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
312321
opal_strerror(rc), rc);
313-
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
314322
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
323+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
315324
return;
316325
}
317326
}

opal/mca/pmix/native/usock_sendrecv.c

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata)
176176
mca_pmix_native_component.send_ev_active = false;
177177
OBJ_RELEASE(msg);
178178
mca_pmix_native_component.send_msg = NULL;
179+
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
180+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
179181
return;
180182
}
181183
}
@@ -208,6 +210,8 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata)
208210
mca_pmix_native_component.send_ev_active = false;
209211
OBJ_RELEASE(msg);
210212
mca_pmix_native_component.send_msg = NULL;
213+
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
214+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
211215
return;
212216
}
213217
}
@@ -239,6 +243,8 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata)
239243
opal_event_del(&mca_pmix_native_component.send_event);
240244
mca_pmix_native_component.send_ev_active = false;
241245
}
246+
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
247+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
242248
break;
243249
}
244250
}
@@ -356,6 +362,8 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
356362
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
357363
opal_event_del(&mca_pmix_native_component.recv_event);
358364
mca_pmix_native_component.recv_ev_active = false;
365+
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
366+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
359367
return;
360368
}
361369
break;
@@ -372,6 +380,8 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
372380
if (NULL == mca_pmix_native_component.recv_msg) {
373381
opal_output(0, "%s usock_recv_handler: unable to allocate recv message\n",
374382
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
383+
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
384+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
375385
return;
376386
}
377387
/* start by reading the header */
@@ -416,6 +426,7 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
416426
"%s usock:recv:handler error reading bytes - closing connection",
417427
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
418428
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
429+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
419430
return;
420431
}
421432
}
@@ -447,6 +458,7 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
447458
opal_event_del(&mca_pmix_native_component.recv_event);
448459
mca_pmix_native_component.recv_ev_active = false;
449460
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
461+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
450462
return;
451463
}
452464
}
@@ -456,6 +468,7 @@ void pmix_usock_recv_handler(int sd, short flags, void *cbdata)
456468
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
457469
mca_pmix_native_component.state);
458470
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
471+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
459472
break;
460473
}
461474
}
@@ -689,6 +702,7 @@ static void usock_complete_connect(void)
689702
opal_socket_errno);
690703
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
691704
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
705+
PMIX_NATIVE_ABNORMAL_TERM;
692706
return;
693707
}
694708

@@ -703,8 +717,8 @@ static void usock_complete_connect(void)
703717
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
704718
strerror(so_error),
705719
so_error);
706-
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
707720
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
721+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
708722
return;
709723
} else if (so_error != 0) {
710724
/* No need to worry about the return code here - we return regardless
@@ -715,8 +729,8 @@ static void usock_complete_connect(void)
715729
"connection to server failed with error %d",
716730
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
717731
so_error);
718-
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
719732
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
733+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
720734
return;
721735
}
722736

@@ -737,8 +751,8 @@ static void usock_complete_connect(void)
737751
} else {
738752
opal_output(0, "%s usock_complete_connect: unable to send connect ack to server",
739753
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
740-
mca_pmix_native_component.state = PMIX_USOCK_FAILED;
741754
CLOSE_THE_SOCKET(mca_pmix_native_component.sd);
755+
PMIX_NATIVE_ABNORMAL_TERM; // report the error upstream
742756
}
743757
}
744758

0 commit comments

Comments
 (0)