Skip to content

Commit 28e07b6

Browse files
authored
Merge pull request #6446 from rhc54/cmr40x/cnct
v4.0.x: Fix cross-mpirun connect/accept operations
2 parents 1bff180 + b5d4649 commit 28e07b6

File tree

4 files changed

+151
-10
lines changed

4 files changed

+151
-10
lines changed

orte/mca/state/base/state_base_fns.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -953,8 +953,9 @@ void orte_state_base_check_all_complete(int fd, short args, void *cbdata)
953953
one_still_alive = false;
954954
j = opal_hash_table_get_first_key_uint32(orte_job_data, &u32, (void **)&job, &nptr);
955955
while (OPAL_SUCCESS == j) {
956-
/* skip the daemon job */
957-
if (job->jobid == ORTE_PROC_MY_NAME->jobid) {
956+
/* skip the daemon job and all jobs from other families */
957+
if (job->jobid == ORTE_PROC_MY_NAME->jobid ||
958+
ORTE_JOB_FAMILY(job->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
958959
goto next;
959960
}
960961
/* if this is the job we are checking AND it normally terminated,

orte/orted/pmix/pmix_server_dyn.c

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
#include "orte/mca/errmgr/errmgr.h"
4444
#include "orte/mca/rmaps/base/base.h"
45+
#include "orte/mca/rml/base/rml_contact.h"
4546
#include "orte/mca/state/state.h"
4647
#include "orte/util/name_fns.h"
4748
#include "orte/util/show_help.h"
@@ -537,7 +538,14 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
537538
int rc, cnt;
538539
opal_pmix_pdata_t *pdat;
539540
orte_job_t *jdata;
540-
opal_buffer_t buf;
541+
orte_node_t *node;
542+
orte_proc_t *proc;
543+
opal_buffer_t buf, bucket;
544+
opal_byte_object_t *bo;
545+
orte_process_name_t dmn, pname;
546+
char *uri;
547+
opal_value_t val;
548+
opal_list_t nodes;
541549

542550
ORTE_ACQUIRE_OBJECT(cd);
543551

@@ -554,6 +562,7 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
554562
pdat = (opal_pmix_pdata_t*)opal_list_get_first(data);
555563
if (OPAL_BYTE_OBJECT != pdat->value.type) {
556564
rc = ORTE_ERR_BAD_PARAM;
565+
ORTE_ERROR_LOG(rc);
557566
goto release;
558567
}
559568
/* the data will consist of a packed buffer with the job data in it */
@@ -563,15 +572,107 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
563572
pdat->value.data.bo.size = 0;
564573
cnt = 1;
565574
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &jdata, &cnt, ORTE_JOB))) {
575+
ORTE_ERROR_LOG(rc);
576+
OBJ_DESTRUCT(&buf);
577+
goto release;
578+
}
579+
580+
/* unpack the byte object containing the daemon uri's */
581+
cnt=1;
582+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
583+
ORTE_ERROR_LOG(rc);
566584
OBJ_DESTRUCT(&buf);
567585
goto release;
568586
}
587+
/* load it into a buffer */
588+
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
589+
opal_dss.load(&bucket, bo->bytes, bo->size);
590+
bo->bytes = NULL;
591+
free(bo);
592+
/* prep a list to save the nodes */
593+
OBJ_CONSTRUCT(&nodes, opal_list_t);
594+
/* unpack and store the URI's */
595+
cnt = 1;
596+
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &uri, &cnt, OPAL_STRING))) {
597+
rc = orte_rml_base_parse_uris(uri, &dmn, NULL);
598+
if (ORTE_SUCCESS != rc) {
599+
OBJ_DESTRUCT(&buf);
600+
OBJ_DESTRUCT(&bucket);
601+
goto release;
602+
}
603+
/* save a node object for this daemon */
604+
node = OBJ_NEW(orte_node_t);
605+
node->daemon = OBJ_NEW(orte_proc_t);
606+
memcpy(&node->daemon->name, &dmn, sizeof(orte_process_name_t));
607+
opal_list_append(&nodes, &node->super);
608+
/* register the URI */
609+
OBJ_CONSTRUCT(&val, opal_value_t);
610+
val.key = OPAL_PMIX_PROC_URI;
611+
val.type = OPAL_STRING;
612+
val.data.string = uri;
613+
if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&dmn, &val))) {
614+
ORTE_ERROR_LOG(rc);
615+
val.key = NULL;
616+
val.data.string = NULL;
617+
OBJ_DESTRUCT(&val);
618+
OBJ_DESTRUCT(&buf);
619+
OBJ_DESTRUCT(&bucket);
620+
goto release;
621+
}
622+
val.key = NULL;
623+
val.data.string = NULL;
624+
OBJ_DESTRUCT(&val);
625+
cnt = 1;
626+
}
627+
OBJ_DESTRUCT(&bucket);
628+
629+
/* unpack the proc-to-daemon map */
630+
cnt=1;
631+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
632+
ORTE_ERROR_LOG(rc);
633+
OBJ_DESTRUCT(&buf);
634+
goto release;
635+
}
636+
/* load it into a buffer */
637+
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
638+
opal_dss.load(&bucket, bo->bytes, bo->size);
639+
bo->bytes = NULL;
640+
free(bo);
641+
/* unpack and store the map */
642+
cnt = 1;
643+
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &pname, &cnt, ORTE_NAME))) {
644+
/* get the name of the daemon hosting it */
645+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&bucket, &dmn, &cnt, ORTE_NAME))) {
646+
OBJ_DESTRUCT(&buf);
647+
OBJ_DESTRUCT(&bucket);
648+
goto release;
649+
}
650+
/* create the proc object */
651+
proc = OBJ_NEW(orte_proc_t);
652+
memcpy(&proc->name, &pname, sizeof(orte_process_name_t));
653+
opal_pointer_array_set_item(jdata->procs, pname.vpid, proc);
654+
/* find the daemon */
655+
OPAL_LIST_FOREACH(node, &nodes, orte_node_t) {
656+
if (node->daemon->name.vpid == dmn.vpid) {
657+
OBJ_RETAIN(node);
658+
proc->node = node;
659+
break;
660+
}
661+
}
662+
}
663+
OBJ_DESTRUCT(&bucket);
664+
OPAL_LIST_DESTRUCT(&nodes);
569665
OBJ_DESTRUCT(&buf);
666+
667+
/* register the nspace */
570668
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
669+
ORTE_ERROR_LOG(rc);
571670
OBJ_RELEASE(jdata);
572671
goto release;
573672
}
574-
OBJ_RELEASE(jdata); // no reason to keep this around
673+
674+
/* save the job object so we don't endlessly cycle */
675+
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, jdata);
575676

576677
/* restart the cnct processor */
577678
ORTE_PMIX_OPERATION(cd->procs, cd->info, _cnct, cd->cbfunc, cd->cbdata);
@@ -617,6 +718,7 @@ static void _cnct(int sd, short args, void *cbdata)
617718
* out about it, and all we can do is return an error */
618719
if (orte_pmix_server_globals.server.jobid == ORTE_PROC_MY_HNP->jobid &&
619720
orte_pmix_server_globals.server.vpid == ORTE_PROC_MY_HNP->vpid) {
721+
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
620722
rc = ORTE_ERR_NOT_SUPPORTED;
621723
goto release;
622724
}
@@ -632,6 +734,7 @@ static void _cnct(int sd, short args, void *cbdata)
632734
kv->data.uint32 = geteuid();
633735
opal_list_append(cd->info, &kv->super);
634736
if (ORTE_SUCCESS != (rc = pmix_server_lookup_fn(&nm->name, keys, cd->info, _cnlk, cd))) {
737+
ORTE_ERROR_LOG(rc);
635738
opal_argv_free(keys);
636739
goto release;
637740
}
@@ -645,6 +748,7 @@ static void _cnct(int sd, short args, void *cbdata)
645748
if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, NULL, OPAL_BOOL)) {
646749
/* it hasn't been registered yet, so register it now */
647750
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
751+
ORTE_ERROR_LOG(rc);
648752
goto release;
649753
}
650754
}

orte/orted/pmix/pmix_server_fence.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ static void dmodex_req(int sd, short args, void *cbdata)
227227
rc = ORTE_ERR_NOT_FOUND;
228228
goto callback;
229229
}
230+
230231
/* point the request to the daemon that is hosting the
231232
* target process */
232233
req->proxy.vpid = dmn->name.vpid;
@@ -240,7 +241,8 @@ static void dmodex_req(int sd, short args, void *cbdata)
240241

241242
/* if we are the host daemon, then this is a local request, so
242243
* just wait for the data to come in */
243-
if (ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
244+
if (ORTE_PROC_MY_NAME->jobid == dmn->name.jobid &&
245+
ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
244246
return;
245247
}
246248

orte/orted/pmix/pmix_server_register_fns.c

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* All rights reserved.
1414
* Copyright (c) 2009-2018 Cisco Systems, Inc. All rights reserved
1515
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
16-
* Copyright (c) 2013-2018 Intel, Inc. All rights reserved.
16+
* Copyright (c) 2013-2019 Intel, Inc. All rights reserved.
1717
* Copyright (c) 2014 Mellanox Technologies, Inc.
1818
* All rights reserved.
1919
* Copyright (c) 2014-2016 Research Organization for Information Science
@@ -71,6 +71,9 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force)
7171
gid_t gid;
7272
opal_list_t *cache;
7373
hwloc_obj_t machine;
74+
opal_buffer_t buf, bucket;
75+
opal_byte_object_t bo, *boptr;
76+
orte_proc_t *proc;
7477

7578
opal_output_verbose(2, orte_pmix_server_globals.output,
7679
"%s register nspace for %s",
@@ -472,21 +475,52 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force)
472475
jdata->num_local_procs,
473476
info, NULL, NULL);
474477
OPAL_LIST_RELEASE(info);
478+
if (OPAL_SUCCESS != rc) {
479+
return rc;
480+
}
475481

476-
/* if the user has connected us to an external server, then we must
477-
* assume there is going to be some cross-mpirun exchange, and so
482+
/* if I am the HNP and this job is a member of my family, then we must
483+
* assume there could be some cross-mpirun exchange, and so
478484
* we protect against that situation by publishing the job info
479485
* for this job - this allows any subsequent "connect" to retrieve
480486
* the job info */
481-
if (NULL != orte_data_server_uri) {
482-
opal_buffer_t buf;
483487

488+
if (ORTE_PROC_IS_HNP && ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) == ORTE_JOB_FAMILY(jdata->jobid)) {
489+
/* pack the job - note that this doesn't include the procs
490+
* or their locations */
484491
OBJ_CONSTRUCT(&buf, opal_buffer_t);
485492
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &jdata, 1, ORTE_JOB))) {
486493
ORTE_ERROR_LOG(rc);
487494
OBJ_DESTRUCT(&buf);
488495
return rc;
489496
}
497+
498+
/* pack the hostname, daemon vpid and contact URI for each involved node */
499+
map = jdata->map;
500+
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
501+
for (i=0; i < map->nodes->size; i++) {
502+
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
503+
continue;
504+
}
505+
opal_dss.pack(&bucket, &node->daemon->rml_uri, 1, OPAL_STRING);
506+
}
507+
opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size);
508+
boptr = &bo;
509+
opal_dss.pack(&buf, &boptr, 1, OPAL_BYTE_OBJECT);
510+
511+
/* pack the proc name and daemon vpid for each proc */
512+
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
513+
for (i=0; i < jdata->procs->size; i++) {
514+
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
515+
continue;
516+
}
517+
opal_dss.pack(&bucket, &proc->name, 1, ORTE_NAME);
518+
opal_dss.pack(&bucket, &proc->node->daemon->name, 1, ORTE_NAME);
519+
}
520+
opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size);
521+
boptr = &bo;
522+
opal_dss.pack(&buf, &boptr, 1, OPAL_BYTE_OBJECT);
523+
490524
info = OBJ_NEW(opal_list_t);
491525
/* create a key-value with the key being the string jobid
492526
* and the value being the byte object */

0 commit comments

Comments
 (0)