Skip to content

Update ORTE and PMIx-related components to release status #3723

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions ompi/dpm/dpm.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
sizeof(ompi_proc_t *));
for (i=0 ; i<group->grp_proc_count ; i++) {
if (NULL == (proc_list[i] = ompi_group_peer_lookup(group,i))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
OMPI_ERROR_LOG(OMPI_ERR_NOT_FOUND);
rc = OMPI_ERR_NOT_FOUND;
free(proc_list);
goto exit;
}
Expand Down Expand Up @@ -672,10 +672,10 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
for (i = 0; i < count; ++i) {
app = OBJ_NEW(opal_pmix_app_t);
if (NULL == app) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OMPI_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
OPAL_LIST_DESTRUCT(&apps);
opal_progress_event_users_decrement();
return ORTE_ERR_OUT_OF_RESOURCE;
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* add the app to the job data */
opal_list_append(&apps, &app->super);
Expand Down Expand Up @@ -900,9 +900,9 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
ompi_info_get (array_of_info[i], "ompi_stdin_target", sizeof(stdin_target) - 1, stdin_target, &flag);
if ( flag ) {
if (0 == strcmp(stdin_target, "all")) {
ui32 = ORTE_VPID_WILDCARD;
ui32 = OPAL_VPID_WILDCARD;
} else if (0 == strcmp(stdin_target, "none")) {
ui32 = ORTE_VPID_INVALID;
ui32 = OPAL_VPID_INVALID;
} else {
ui32 = strtoul(stdin_target, NULL, 10);
}
Expand All @@ -918,7 +918,7 @@ int ompi_dpm_spawn(int count, const char *array_of_commands[],
*/
if ( !have_wdir ) {
if (OMPI_SUCCESS != (rc = opal_getcwd(cwd, OPAL_PATH_MAX))) {
ORTE_ERROR_LOG(rc);
OMPI_ERROR_LOG(rc);
OPAL_LIST_DESTRUCT(&apps);
opal_progress_event_users_decrement();
return rc;
Expand Down
3 changes: 2 additions & 1 deletion opal/include/opal/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ enum {
OPAL_ERR_PROC_MIGRATE = (OPAL_ERR_BASE - 65),
OPAL_ERR_EVENT_REGISTRATION = (OPAL_ERR_BASE - 66),
OPAL_ERR_HEARTBEAT_ALERT = (OPAL_ERR_BASE - 67),
OPAL_ERR_FILE_ALERT = (OPAL_ERR_BASE - 68)
OPAL_ERR_FILE_ALERT = (OPAL_ERR_BASE - 68),
OPAL_ERR_MODEL_DECLARED = (OPAL_ERR_BASE - 69)
};

#define OPAL_ERR_MAX (OPAL_ERR_BASE - 100)
Expand Down
122 changes: 121 additions & 1 deletion opal/mca/pmix/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#include "opal_config.h"
#include "opal/types.h"

#include "opal/threads/threads.h"
#include "opal/mca/mca.h"
#include "opal/mca/base/mca_base_framework.h"

Expand Down Expand Up @@ -55,13 +55,133 @@ OPAL_DECLSPEC int opal_pmix_base_exchange(opal_value_t *info,

OPAL_DECLSPEC void opal_pmix_base_set_evbase(opal_event_base_t *evbase);

#define opal_pmix_condition_wait(a,b) pthread_cond_wait(a, &(b)->m_lock_pthread)
typedef pthread_cond_t opal_pmix_condition_t;
#define opal_pmix_condition_broadcast(a) pthread_cond_broadcast(a)
#define opal_pmix_condition_signal(a) pthread_cond_signal(a)
#define OPAL_PMIX_CONDITION_STATIC_INIT PTHREAD_COND_INITIALIZER

typedef struct {
opal_mutex_t mutex;
opal_pmix_condition_t cond;
volatile bool active;
} opal_pmix_lock_t;


typedef struct {
opal_event_base_t *evbase;
int timeout;
int initialized;
opal_pmix_lock_t lock;
} opal_pmix_base_t;

extern opal_pmix_base_t opal_pmix_base;

#define OPAL_PMIX_CONSTRUCT_LOCK(l) \
do { \
OBJ_CONSTRUCT(&(l)->mutex, opal_mutex_t); \
pthread_cond_init(&(l)->cond, NULL); \
(l)->active = true; \
} while(0)

#define OPAL_PMIX_DESTRUCT_LOCK(l) \
do { \
OBJ_DESTRUCT(&(l)->mutex); \
pthread_cond_destroy(&(l)->cond); \
} while(0)


#if OPAL_ENABLE_DEBUG
#define OPAL_PMIX_ACQUIRE_THREAD(lck) \
do { \
opal_mutex_lock(&(lck)->mutex); \
if (opal_debug_threads) { \
opal_output(0, "Waiting for thread %s:%d", \
__FILE__, __LINE__); \
} \
while ((lck)->active) { \
opal_pmix_condition_wait(&(lck)->cond, &(lck)->mutex); \
} \
if (opal_debug_threads) { \
opal_output(0, "Thread obtained %s:%d", \
__FILE__, __LINE__); \
} \
(lck)->active = true; \
} while(0)
#else
#define OPAL_PMIX_ACQUIRE_THREAD(lck) \
do { \
opal_mutex_lock(&(lck)->mutex); \
while ((lck)->active) { \
opal_pmix_condition_wait(&(lck)->cond, &(lck)->mutex); \
} \
(lck)->active = true; \
} while(0)
#endif


#if OPAL_ENABLE_DEBUG
#define OPAL_PMIX_WAIT_THREAD(lck) \
do { \
opal_mutex_lock(&(lck)->mutex); \
if (opal_debug_threads) { \
opal_output(0, "Waiting for thread %s:%d", \
__FILE__, __LINE__); \
} \
while ((lck)->active) { \
opal_pmix_condition_wait(&(lck)->cond, &(lck)->mutex); \
} \
if (opal_debug_threads) { \
opal_output(0, "Thread obtained %s:%d", \
__FILE__, __LINE__); \
} \
OPAL_ACQUIRE_OBJECT(&lck); \
opal_mutex_unlock(&(lck)->mutex); \
} while(0)
#else
#define OPAL_PMIX_WAIT_THREAD(lck) \
do { \
opal_mutex_lock(&(lck)->mutex); \
while ((lck)->active) { \
opal_pmix_condition_wait(&(lck)->cond, &(lck)->mutex); \
} \
OPAL_ACQUIRE_OBJECT(lck); \
opal_mutex_unlock(&(lck)->mutex); \
} while(0)
#endif


#if OPAL_ENABLE_DEBUG
#define OPAL_PMIX_RELEASE_THREAD(lck) \
do { \
if (opal_debug_threads) { \
opal_output(0, "Releasing thread %s:%d", \
__FILE__, __LINE__); \
} \
(lck)->active = false; \
opal_pmix_condition_broadcast(&(lck)->cond); \
opal_mutex_unlock(&(lck)->mutex); \
} while(0)
#else
#define OPAL_PMIX_RELEASE_THREAD(lck) \
do { \
assert(0 != opal_mutex_trylock(&(lck)->mutex)); \
(lck)->active = false; \
opal_pmix_condition_broadcast(&(lck)->cond); \
opal_mutex_unlock(&(lck)->mutex); \
} while(0)
#endif


#define OPAL_PMIX_WAKEUP_THREAD(lck) \
do { \
opal_mutex_lock(&(lck)->mutex); \
(lck)->active = false; \
OPAL_POST_OBJECT(lck); \
opal_pmix_condition_broadcast(&(lck)->cond); \
opal_mutex_unlock(&(lck)->mutex); \
} while(0)

END_C_DECLS

#endif
103 changes: 11 additions & 92 deletions opal/mca/pmix/base/pmix_base_fns.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,39 +92,6 @@ int opal_pmix_base_notify_event(int status,
return OPAL_SUCCESS;
}

struct lookup_caddy_t {
volatile bool active;
int status;
opal_pmix_pdata_t *pdat;
};

/******** DATA EXCHANGE ********/
static void lookup_cbfunc(int status, opal_list_t *data, void *cbdata)
{
struct lookup_caddy_t *cd = (struct lookup_caddy_t*)cbdata;
cd->status = status;
if (OPAL_SUCCESS == status && NULL != data) {
opal_pmix_pdata_t *p = (opal_pmix_pdata_t*)opal_list_get_first(data);
if (NULL != p) {
cd->pdat->proc = p->proc;
if (p->value.type == cd->pdat->value.type) {
if (NULL != cd->pdat->value.key) {
free(cd->pdat->value.key);
}
(void)opal_value_xfer(&cd->pdat->value, &p->value);
}
}
}
cd->active = false;
}

static void opcbfunc(int status, void *cbdata)
{
struct lookup_caddy_t *cd = (struct lookup_caddy_t*)cbdata;
cd->status = status;
cd->active = false;
}

int opal_pmix_base_exchange(opal_value_t *indat,
opal_pmix_pdata_t *outdat,
int timeout)
Expand All @@ -133,8 +100,6 @@ int opal_pmix_base_exchange(opal_value_t *indat,
opal_list_t ilist, mlist;
opal_value_t *info;
opal_pmix_pdata_t *pdat;
struct lookup_caddy_t caddy;
char **keys;

/* protect the incoming value */
opal_dss.copy((void**)&info, indat, OPAL_VALUE);
Expand All @@ -148,31 +113,10 @@ int opal_pmix_base_exchange(opal_value_t *indat,
opal_list_append(&ilist, &info->super);

/* publish it with "session" scope */
if (NULL == opal_pmix.publish_nb) {
rc = opal_pmix.publish(&ilist);
OPAL_LIST_DESTRUCT(&ilist);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
return rc;
}
} else {
caddy.status = -1;
caddy.active = true;
caddy.pdat = NULL;
rc = opal_pmix.publish_nb(&ilist, opcbfunc, &caddy);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
OPAL_LIST_DESTRUCT(&ilist);
return rc;
}
while (caddy.active) {
usleep(10);
}
OPAL_LIST_DESTRUCT(&ilist);
if (OPAL_SUCCESS != caddy.status) {
OPAL_ERROR_LOG(caddy.status);
return caddy.status;
}
rc = opal_pmix.publish(&ilist);
OPAL_LIST_DESTRUCT(&ilist);
if (OPAL_SUCCESS != rc) {
return rc;
}

/* lookup the other side's info - if a non-blocking form
Expand Down Expand Up @@ -206,45 +150,20 @@ int opal_pmix_base_exchange(opal_value_t *indat,

/* if a non-blocking version of lookup isn't
* available, then use the blocking version */
if (NULL == opal_pmix.lookup_nb) {
OBJ_CONSTRUCT(&ilist, opal_list_t);
opal_list_append(&ilist, &pdat->super);
rc = opal_pmix.lookup(&ilist, &mlist);
OPAL_LIST_DESTRUCT(&mlist);
OBJ_CONSTRUCT(&ilist, opal_list_t);
opal_list_append(&ilist, &pdat->super);
rc = opal_pmix.lookup(&ilist, &mlist);
OPAL_LIST_DESTRUCT(&mlist);
if (OPAL_SUCCESS != rc) {
OPAL_LIST_DESTRUCT(&ilist);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
return rc;
}
} else {
caddy.status = -1;
caddy.active = true;
caddy.pdat = pdat;
keys = NULL;
opal_argv_append_nosize(&keys, pdat->value.key);
rc = opal_pmix.lookup_nb(keys, &mlist, lookup_cbfunc, &caddy);
if (OPAL_SUCCESS != rc) {
OPAL_ERROR_LOG(rc);
OPAL_LIST_DESTRUCT(&mlist);
opal_argv_free(keys);
return rc;
}
while (caddy.active) {
usleep(10);
}
opal_argv_free(keys);
OPAL_LIST_DESTRUCT(&mlist);
if (OPAL_SUCCESS != caddy.status) {
OPAL_ERROR_LOG(caddy.status);
return caddy.status;
}
return rc;
}

/* pass back the result */
outdat->proc = pdat->proc;
free(outdat->value.key);
rc = opal_value_xfer(&outdat->value, &pdat->value);
OBJ_RELEASE(pdat);
OPAL_LIST_DESTRUCT(&ilist);
return rc;
}

Expand Down
12 changes: 11 additions & 1 deletion opal/mca/pmix/base/pmix_base_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "opal/constants.h"

#include "opal/mca/mca.h"
#include "opal/threads/thread_usage.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "opal/mca/base/base.h"
Expand All @@ -35,7 +36,16 @@ opal_pmix_base_module_t opal_pmix = { 0 };
bool opal_pmix_collect_all_data = true;
int opal_pmix_verbose_output = -1;
bool opal_pmix_base_async_modex = false;
opal_pmix_base_t opal_pmix_base = {0};
opal_pmix_base_t opal_pmix_base = {
.evbase = NULL,
.timeout = 0,
.initialized = 0,
.lock = {
.mutex = OPAL_MUTEX_STATIC_INIT,
.cond = OPAL_PMIX_CONDITION_STATIC_INIT,
.active = false
}
};

static int opal_pmix_base_frame_register(mca_base_register_flag_t flags)
{
Expand Down
4 changes: 2 additions & 2 deletions opal/mca/pmix/cray/pmix_cray.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

static char cray_pmi_version[128];

static int cray_init(void);
static int cray_init(opal_list_t *ilist);
static int cray_fini(void);
static int cray_initialized(void);
static int cray_abort(int flat, const char *msg,
Expand Down Expand Up @@ -282,7 +282,7 @@ static void cray_get_more_info(void)
return;
}

static int cray_init(void)
static int cray_init(opal_list_t *ilist)
{
int i, spawned, size, rank, appnum, my_node;
int rc, ret = OPAL_ERROR;
Expand Down
Loading