Skip to content

Ensure we correctly identify local vs non-local peers #13111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[submodule "prrte"]
path = 3rd-party/prrte
url = ../../open-mpi/prrte
url = ../../openpmix/prrte.git
branch = master
[submodule "openpmix"]
path = 3rd-party/openpmix
Expand Down
2 changes: 1 addition & 1 deletion 3rd-party/openpmix
Submodule openpmix updated 222 files
2 changes: 1 addition & 1 deletion 3rd-party/prrte
Submodule prrte updated 276 files
23 changes: 7 additions & 16 deletions ompi/communicator/comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* Copyright (c) 2014-2020 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Mellanox Technologies. All rights reserved.
* Copyright (c) 2017-2022 IBM Corporation. All rights reserved.
* Copyright (c) 2021 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2025 Nanook Consulting All rights reserved.
* Copyright (c) 2018-2024 Triad National Security, LLC. All rights
* reserved.
* Copyright (c) 2023-2025 Advanced Micro Devices, Inc. All rights reserved.
Expand Down Expand Up @@ -1137,7 +1137,7 @@ static int ompi_comm_split_unguided(ompi_communicator_t *comm, int split_type, i
if (new_size < original_size) {
/* If a valid info object was passed, set the selected topology */
if (NULL != info) {
opal_info_set(info, "mpi_hw_resource_type",
opal_info_set(info, "mpi_hw_resource_type",
ompi_comm_split_type_hw_guided_support[i].info_value);
}
ompi_comm_free(&unguided_comm);
Expand Down Expand Up @@ -1166,7 +1166,7 @@ static int ompi_comm_split_unguided(ompi_communicator_t *comm, int split_type, i
* info(in/out) : Info guiding the split operation
* newcomm(out) : Pointer to the newly created communicator, or pointer to MPI_COMM_NULL
* if no communicator created.
*/
*/
int ompi_comm_split_type (ompi_communicator_t *comm, int split_type, int key,
opal_info_t *info, ompi_communicator_t **newcomm)
{
Expand Down Expand Up @@ -1632,7 +1632,7 @@ int ompi_comm_create_from_group (ompi_group_t *group, const char *tag, opal_info
newcomp->instance = group->grp_instance;

/*
* setup predefined keyvals - see MPI Standard for predefined keyvals cached on
* setup predefined keyvals - see MPI Standard for predefined keyvals cached on
* communicators created via MPI_Comm_create_from_group or MPI_Intercomm_create_from_groups
*/
ompi_attr_hash_init(&newcomp->c_keyhash);
Expand Down Expand Up @@ -2392,18 +2392,9 @@ int ompi_comm_get_rprocs (ompi_communicator_t *local_comm, ompi_communicator_t *
goto err_exit;
}

/* set the locality of the remote procs */
for (i=0; i < rsize; i++) {
/* get the locality information - all RTEs are required
* to provide this information at startup */
uint16_t *u16ptr, u16;
u16ptr = &u16;
OPAL_MODEX_RECV_VALUE_OPTIONAL(rc, PMIX_LOCALITY, &rprocs[i]->super.proc_name, &u16ptr, PMIX_UINT16);
if (OPAL_SUCCESS == rc) {
rprocs[i]->super.proc_flags = u16;
} else {
rprocs[i]->super.proc_flags = OPAL_PROC_NON_LOCAL;
}
rc = ompi_dpm_set_locality(rprocs, rsize);
if (OMPI_SUCCESS != rc) {
goto err_exit;
}

/* And now add the information into the database */
Expand Down
173 changes: 109 additions & 64 deletions ompi/dpm/dpm.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
* Copyright (c) 2014-2020 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2018 Amazon.com, Inc. or its affiliates. All Rights reserved.
* Copyright (c) 2021-2024 Nanook Consulting All rights reserved.
* Copyright (c) 2021-2025 Nanook Consulting All rights reserved.
* Copyright (c) 2018-2022 Triad National Security, LLC. All rights
* reserved.
* Copyright (c) 2022 IBM Corporation. All rights reserved.
Expand Down Expand Up @@ -110,7 +110,6 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
opal_process_name_t pname;
opal_list_t ilist, mlist, rlist;
pmix_info_t info, tinfo;
pmix_value_t pval;
pmix_pdata_t pdat;
pmix_proc_t *procs, pxproc;
size_t nprocs, n;
Expand Down Expand Up @@ -394,86 +393,45 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
goto exit;
}
if (!opal_list_is_empty(&ilist)) {
int prn, nprn = 0;
char *val;
opal_process_name_t wildcard_rank;
i = 0; /* start from the begining */

/* convert the list of new procs to a proc_t array */
new_proc_list = (ompi_proc_t**)calloc(opal_list_get_size(&ilist),
sizeof(ompi_proc_t *));
/* Extract the modex info for the first proc on the ilist, and then
* remove all processors in the same jobid from the list by getting
* their connection information and moving them into the proc array.
*/
size = opal_list_get_size(&ilist);
new_proc_list = (ompi_proc_t**)calloc(size, sizeof(ompi_proc_t *));
// put the procs in the array, but order them by jobid so that
// all members of the same jobid are sequential
i = 0;
do {
uint32_t *local_ranks_in_jobid = NULL;
ompi_dpm_proct_caddy_t* next = NULL;
cd = (ompi_dpm_proct_caddy_t*)opal_list_get_first(&ilist);
proc = cd->p;
wildcard_rank.jobid = proc->super.proc_name.jobid;
wildcard_rank.vpid = OMPI_NAME_WILDCARD->vpid;
/* retrieve the local peers for the specified jobid */
OPAL_MODEX_RECV_VALUE_IMMEDIATE(rc, PMIX_LOCAL_PEERS,
&wildcard_rank, &val, PMIX_STRING);
if (OPAL_SUCCESS == rc && NULL != val) {
char **peers = opal_argv_split(val, ',');
free(val);
nprn = opal_argv_count(peers);
local_ranks_in_jobid = (uint32_t*)calloc(nprn, sizeof(uint32_t));
for (prn = 0; NULL != peers[prn]; prn++) {
local_ranks_in_jobid[prn] = strtoul(peers[prn], NULL, 10);
}
opal_argv_free(peers);
}

OPAL_LIST_FOREACH_SAFE(cd, next, &ilist, ompi_dpm_proct_caddy_t) {
proc = cd->p;
if( proc->super.proc_name.jobid != wildcard_rank.jobid )
if (proc->super.proc_name.jobid != wildcard_rank.jobid) {
continue; /* not a proc from this jobid */

}
// check name setup and set arch
ompi_proc_complete_init_single(proc);
new_proc_list[i] = proc;
++i;
opal_list_remove_item(&ilist, (opal_list_item_t*)cd); // TODO: do we need to release cd ?
OBJ_RELEASE(cd);
/* ompi_proc_complete_init_single() initializes and optionally retrieves
* OPAL_PMIX_LOCALITY and OPAL_PMIX_HOSTNAME. since we can live without
* them, we are just fine */
ompi_proc_complete_init_single(proc);
/* if this proc is local, then get its locality */
if (NULL != local_ranks_in_jobid) {
uint16_t u16;
for (prn=0; prn < nprn; prn++) {
if (local_ranks_in_jobid[prn] == proc->super.proc_name.vpid) {
/* get their locality string */
val = NULL;
OPAL_MODEX_RECV_VALUE_IMMEDIATE(rc, PMIX_LOCALITY_STRING,
&proc->super.proc_name, &val, PMIX_STRING);
if (OPAL_SUCCESS == rc && NULL != ompi_process_info.locality) {
u16 = opal_hwloc_compute_relative_locality(ompi_process_info.locality, val);
free(val);
} else {
/* all we can say is that it shares our node */
u16 = OPAL_PROC_ON_CLUSTER | OPAL_PROC_ON_CU | OPAL_PROC_ON_NODE;
}
proc->super.proc_flags = u16;
/* save the locality for later */
OPAL_PMIX_CONVERT_NAME(&pxproc, &proc->super.proc_name);
pval.type = PMIX_UINT16;
pval.data.uint16 = proc->super.proc_flags;
PMIx_Store_internal(&pxproc, PMIX_LOCALITY, &pval);
break;
}
}
}
++i;
}
if (NULL != local_ranks_in_jobid) {
free(local_ranks_in_jobid);
}
} while (!opal_list_is_empty(&ilist));

// set locality for each proc
rc = ompi_dpm_set_locality(new_proc_list, size);
if (OPAL_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
free(new_proc_list);
new_proc_list = NULL;
OPAL_LIST_DESTRUCT(&ilist);
goto exit;
}

/* call add_procs on the new ones */
rc = MCA_PML_CALL(add_procs(new_proc_list, opal_list_get_size(&ilist)));
rc = MCA_PML_CALL(add_procs(new_proc_list, size));
free(new_proc_list);
new_proc_list = NULL;
if (OMPI_SUCCESS != rc) {
Expand Down Expand Up @@ -561,6 +519,93 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
return rc;
}

int ompi_dpm_set_locality(ompi_proc_t **procs, int rsize)
{
pmix_nspace_t nspace;
pmix_proc_t *local_procs = NULL;
size_t nlocalprocs = 0;
pmix_status_t rc;
int i, ret;
pmix_proc_t pproc;
bool local;
size_t m;
uint16_t u16, *u16ptr = &u16;
char *val;
pmix_value_t pval;

// lazy-execute the resolve - we may not need to do it and
// it is an expensive operation since it must go to the
// local server if we aren't a singleton

/* set the locality of the remote procs */
for (i=0; i < rsize; i++) {
OPAL_PMIX_CONVERT_NAME(&pproc, &procs[i]->super.proc_name);

// first check to see if the locality is available - do
// this as an "optional" check. It could be we previously
// computed locality for this proc, and the check is fast
// since it only is done locally.
OPAL_MODEX_RECV_VALUE_OPTIONAL(ret, PMIX_LOCALITY,
&procs[i]->super.proc_name, &u16ptr, PMIX_UINT16);
if (OPAL_SUCCESS == ret) {
procs[i]->super.proc_flags = u16;
continue;
}

// if we didn't find it, then we have to actually compute
// the locality for this proc. check to see if we have
// already resolved the peers - if not, then do so
if (NULL == local_procs) {
/* get the local procs - need all local procs since
* we may have multiple namespaces involved */
PMIx_Load_nspace(nspace, NULL);
rc = PMIx_Resolve_peers(NULL, nspace, &local_procs, &nlocalprocs);
if (PMIX_SUCCESS != rc) {
return OMPI_ERROR;
}
}

/* see if this process is local to this node */
local = false;
for (m=0; m < nlocalprocs; m++) {
if (PMIX_CHECK_PROCID(&local_procs[m], &pproc)) {
// this is a local process
local = true;
break;
}
}
if (!local) {
// this proc is not on the same node as us
procs[i]->super.proc_flags = OPAL_PROC_NON_LOCAL;
continue;
}

/* get the locality information - all RTEs are required
* to provide this information at startup. However, note
* that locality is ONLY defined for procs that are BOUND
* as it requires that a proc be in a known location! */
val = NULL;
OPAL_MODEX_RECV_VALUE_IMMEDIATE(rc, PMIX_LOCALITY_STRING,
&procs[i]->super.proc_name, &val, PMIX_STRING);
if (OPAL_SUCCESS == rc && NULL != ompi_process_info.locality) {
u16 = opal_hwloc_compute_relative_locality(ompi_process_info.locality, val);
free(val);
} else {
/* all we can say is that it shares our node */
u16 = OPAL_PROC_ON_CLUSTER | OPAL_PROC_ON_CU | OPAL_PROC_ON_NODE;
}
procs[i]->super.proc_flags = u16;
/* save the locality for later */
pval.type = PMIX_UINT16;
pval.data.uint16 = procs[i]->super.proc_flags;
PMIx_Store_internal(&pproc, PMIX_LOCALITY, &pval);
}
if (NULL != local_procs) {
PMIX_PROC_FREE(local_procs, nlocalprocs);
}
return OMPI_SUCCESS;
}

static int construct_peers(ompi_group_t *group, opal_list_t *peers)
{
int i;
Expand Down
6 changes: 6 additions & 0 deletions ompi/dpm/dpm.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* reserved.
* Copyright (c) 2018 Triad National Security, LLC. All rights
* reserved.
* Copyright (c) 2025 Nanook Consulting All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -98,6 +99,11 @@ int ompi_dpm_open_port(char *port_name);
*/
int ompi_dpm_close_port(const char *port_name);

/*
* Compute locality for array of procs
*/
int ompi_dpm_set_locality(ompi_proc_t **procs, int rsize);

END_C_DECLS

#endif /* OMPI_DPM_H */