Skip to content

More dynamic op cleanups #871

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 7, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 122 additions & 144 deletions ompi/dpm/dpm.c

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions ompi/mpi/c/comm_join.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ int MPI_Comm_join(int fd, MPI_Comm *intercomm)
send_first = true;
}

/* ensure the port name is NULL terminated */
memset(port_name, 0, MPI_MAX_PORT_NAME);

/* Assumption: socket_send should not block, even if the socket
is not configured to be non-blocking, because the message length are
so short. */
Expand Down
6 changes: 3 additions & 3 deletions opal/mca/pmix/pmix1xx/pmix1_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ int pmix1_get(const opal_process_name_t *proc,
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"%s PMIx_client get on proc %s key %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
OPAL_NAME_PRINT(*proc), key);
(NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key);

/* prep default response */
*val = NULL;
Expand Down Expand Up @@ -371,7 +371,7 @@ int pmix1_getnb(const opal_process_name_t *proc, const char *key,
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
"%s PMIx_client get_nb on proc %s key %s",
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
OPAL_NAME_PRINT(*proc), key);
(NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key);

/* create the caddy */
op = OBJ_NEW(pmix1_opcaddy_t);
Expand Down Expand Up @@ -501,7 +501,7 @@ int pmix1_lookup(opal_list_t *data, opal_list_t *info)
++n;
}
} else {
pdata = NULL;
pinfo = NULL;
ninfo = 0;
}

Expand Down
9 changes: 9 additions & 0 deletions orte/orted/pmix/pmix_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,15 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
}
}

static void opcon(orte_pmix_server_op_caddy_t *p)
{
p->procs = NULL;
p->info = NULL;
p->cbdata = NULL;
}
OBJ_CLASS_INSTANCE(orte_pmix_server_op_caddy_t,
opal_object_t,
opcon, NULL);

static void rqcon(pmix_server_req_t *p)
{
Expand Down
158 changes: 149 additions & 9 deletions orte/orted/pmix/pmix_server_dyn.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
#include "orte/runtime/orte_globals.h"
#include "orte/mca/rml/rml.h"

#include "pmix_server_internal.h"
#include "orte/orted/pmix/pmix_server.h"
#include "orte/orted/pmix/pmix_server_internal.h"

void pmix_server_launch_resp(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
Expand Down Expand Up @@ -327,6 +328,119 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
return OPAL_SUCCESS;
}

static void _cnct(int sd, short args, void *cbdata);

static void _cnlk(int status, opal_list_t *data, void *cbdata)
{
orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
int rc, cnt;
opal_pmix_pdata_t *pdat;
orte_job_t *jdata;
opal_buffer_t buf;

/* if we failed to get the required data, then just inform
* the embedded server that the connect cannot succeed */
if (ORTE_SUCCESS != status || NULL == data) {
if (NULL != cd->cbfunc) {
rc = status;
goto release;
}
}

/* register the returned data with the embedded PMIx server */
pdat = (opal_pmix_pdata_t*)opal_list_get_first(data);
if (OPAL_BYTE_OBJECT != pdat->value.type) {
rc = ORTE_ERR_BAD_PARAM;
goto release;
}
/* the data will consist of a packed buffer with the job data in it */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.load(&buf, pdat->value.data.bo.bytes, pdat->value.data.bo.size);
pdat->value.data.bo.bytes = NULL;
pdat->value.data.bo.size = 0;
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &jdata, &cnt, ORTE_JOB))) {
OBJ_DESTRUCT(&buf);
goto release;
}
OBJ_DESTRUCT(&buf);
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata))) {
OBJ_RELEASE(jdata);
goto release;
}
OBJ_RELEASE(jdata); // no reason to keep this around

/* restart the cnct processor */
ORTE_PMIX_OPERATION(cd->procs, cd->info, _cnct, cd->cbfunc, cd->cbdata);
OBJ_RELEASE(cd);

release:
if (NULL != cd->cbfunc) {
cd->cbfunc(rc, cd->cbdata);
}
OBJ_RELEASE(cd);
}

static void _cnct(int sd, short args, void *cbdata)
{
orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
orte_namelist_t *nm;
char **keys = NULL, *key;
orte_job_t *jdata;
int rc = ORTE_SUCCESS;

/* at some point, we need to add bookeeping to track which
* procs are "connected" so we know who to notify upon
* termination or failure. For now, we have to ensure
* that we have registered all participating nspaces so
* the embedded PMIx server can provide them to the client.
* Otherwise, the client will receive an error as it won't
* be able to resolve any of the required data for the
* missing nspaces */

/* cycle thru the procs */
OPAL_LIST_FOREACH(nm, cd->procs, orte_namelist_t) {
/* see if we have the job object for this job */
if (NULL == (jdata = orte_get_job_data_object(nm->name.jobid))) {
/* we don't know about this job. If our "global" data
* server is just our HNP, then we have no way of finding
* out about it, and all we can do is return an error */
if (orte_pmix_server_globals.server.jobid == ORTE_PROC_MY_HNP->jobid &&
orte_pmix_server_globals.server.vpid == ORTE_PROC_MY_HNP->vpid) {
rc = ORTE_ERR_NOT_SUPPORTED;
goto release;
}
/* ask the global data server for the data - if we get it,
* then we can complete the request */
key = opal_convert_jobid_to_string(nm->name.jobid);
opal_argv_append_nosize(&keys, key);
free(key);
if (ORTE_SUCCESS != (rc = pmix_server_lookup_fn(&nm->name, keys, cd->info, _cnlk, cd))) {
opal_argv_free(keys);
goto release;
}
opal_argv_free(keys);
/* the callback function on this lookup will return us to this
* routine so we can continue the process */
return;
}
/* we know about the job - check to ensure it has been
* registered with the local PMIx server */
if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, NULL, OPAL_BOOL)) {
/* it hasn't been registered yet, so register it now */
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata))) {
goto release;
}
}
}

release:
if (NULL != cd->cbfunc) {
cd->cbfunc(rc, cd->cbdata);
}
OBJ_RELEASE(cd);
}

int pmix_server_connect_fn(opal_list_t *procs, opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
Expand All @@ -335,26 +449,52 @@ int pmix_server_connect_fn(opal_list_t *procs, opal_list_t *info,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)opal_list_get_size(procs));

/* for now, just ack the call */
if (NULL != cbfunc) {
cbfunc(OPAL_SUCCESS, cbdata);
/* protect ourselves */
if (NULL == procs || 0 == opal_list_get_size(procs)) {
return ORTE_ERR_BAD_PARAM;
}
/* must thread shift this as we will be accessing global data */
ORTE_PMIX_OPERATION(procs, info, _cnct, cbfunc, cbdata);
return ORTE_SUCCESS;
}

return OPAL_SUCCESS;
static void mdxcbfunc(int status,
const char *data, size_t ndata, void *cbdata,
opal_pmix_release_cbfunc_t relcbfunc, void *relcbdata)
{
orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;

/* ack the call */
if (NULL != cd->cbfunc) {
cd->cbfunc(status, cd->cbdata);
}
OBJ_RELEASE(cd);
}

int pmix_server_disconnect_fn(opal_list_t *procs, opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
orte_pmix_server_op_caddy_t *cd;
int rc;

opal_output_verbose(2, orte_pmix_server_globals.output,
"%s disconnect called with %d procs",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)opal_list_get_size(procs));

/* for now, just ack the call */
if (NULL != cbfunc) {
cbfunc(OPAL_SUCCESS, cbdata);
/* at some point, we need to add bookeeping to track which
* procs are "connected" so we know who to notify upon
* termination or failure. For now, just execute a fence
* Note that we do not need to thread-shift here as the
* fence function will do it for us */
cd = OBJ_NEW(orte_pmix_server_op_caddy_t);
cd->cbfunc = cbfunc;
cd->cbdata = cbdata;

if (ORTE_SUCCESS != (rc = pmix_server_fencenb_fn(procs, info, NULL, 0,
mdxcbfunc, cd))) {
OBJ_RELEASE(cd);
}

return OPAL_SUCCESS;
return rc;
}
33 changes: 14 additions & 19 deletions orte/orted/pmix/pmix_server_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ OBJ_CLASS_DECLARATION(pmix_server_req_t);
typedef struct {
opal_object_t super;
opal_event_t ev;
orte_job_t *jdata;
orte_process_name_t proc;
int status;
orte_proc_t *object;
opal_list_t *procs;
opal_list_t *info;
opal_pmix_op_cbfunc_t cbfunc;
void *cbdata;
} orte_pmix_server_op_caddy_t;
Expand Down Expand Up @@ -115,21 +113,18 @@ do { \
opal_event_active(&(_req->ev), OPAL_EV_WRITE, 1); \
} while(0);

#define ORTE_PMIX_OPERATION(n, r, ob, s, fn, cf, cb) \
do { \
orte_pmix_server_op_caddy_t *_cd; \
_cd = OBJ_NEW(orte_pmix_server_op_caddy_t); \
/* convert the namespace to jobid and create name */ \
orte_util_convert_string_to_jobid(&(_cd->proc.jobid), (n)); \
_cd->proc.vpid = (r); \
_cd->object = (ob); \
_cd->cbfunc = (cf); \
_cd->cbdata = (cb); \
_cd->status = (s); \
opal_event_set(orte_event_base, &(_cd->ev), -1, \
OPAL_EV_WRITE, (fn), _cd); \
opal_event_set_priority(&(_cd->ev), ORTE_MSG_PRI); \
opal_event_active(&(_cd->ev), OPAL_EV_WRITE, 1); \
#define ORTE_PMIX_OPERATION(p, i, fn, cf, cb) \
do { \
orte_pmix_server_op_caddy_t *_cd; \
_cd = OBJ_NEW(orte_pmix_server_op_caddy_t); \
_cd->procs = (p); \
_cd->info = (i); \
_cd->cbfunc = (cf); \
_cd->cbdata = (cb); \
opal_event_set(orte_event_base, &(_cd->ev), -1, \
OPAL_EV_WRITE, (fn), _cd); \
opal_event_set_priority(&(_cd->ev), ORTE_MSG_PRI); \
opal_event_active(&(_cd->ev), OPAL_EV_WRITE, 1); \
} while(0);


Expand Down
3 changes: 3 additions & 0 deletions orte/orted/pmix/pmix_server_register_fns.c
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,9 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata)
opal_list_append(pmap, &kv->super);
}

/* mark the job as registered */
orte_set_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, ORTE_ATTR_LOCAL, NULL, OPAL_BOOL);

/* pass it down */
if (OPAL_SUCCESS != opal_pmix.server_register_nspace(jdata->jobid,
jdata->num_local_procs,
Expand Down
2 changes: 2 additions & 0 deletions orte/util/attr.c
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ const char *orte_attr_key_to_str(orte_attribute_key_t key)
return "JOB-ROOM-NUM";
case ORTE_JOB_LAUNCH_PROXY:
return "JOB-LAUNCH-PROXY";
case ORTE_JOB_NSPACE_REGISTERED:
return "JOB-NSPACE-REGISTERED";

case ORTE_PROC_NOBARRIER:
return "PROC-NOBARRIER";
Expand Down
1 change: 1 addition & 0 deletions orte/util/attr.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ typedef uint16_t orte_job_flags_t;
#define ORTE_JOB_NOTIFICATIONS (ORTE_JOB_START_KEY + 38) // string - comma-separated list of desired notifications+methods
#define ORTE_JOB_ROOM_NUM (ORTE_JOB_START_KEY + 39) // int - number of remote request's hotel room
#define ORTE_JOB_LAUNCH_PROXY (ORTE_JOB_START_KEY + 40) // opal_process_name_t - name of spawn requestor
#define ORTE_JOB_NSPACE_REGISTERED (ORTE_JOB_START_KEY + 41) // bool - job has been registered with embedded PMIx server

#define ORTE_JOB_MAX_KEY 300

Expand Down