From 1f21f54748e0d06d1884fadae32381fa3460b417 Mon Sep 17 00:00:00 2001 From: George Bosilca Date: Tue, 5 Jul 2016 18:30:11 +0200 Subject: [PATCH 01/13] Remove an apparently useless function. (cherry picked from commit 73972768f834505757bb43b1974ac237d4cf56a5) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_init.c | 8 -------- ompi/communicator/communicator.h | 1 - 2 files changed, 9 deletions(-) diff --git a/ompi/communicator/comm_init.c b/ompi/communicator/comm_init.c index b2200bdb71e..f2acab5bbec 100644 --- a/ompi/communicator/comm_init.c +++ b/ompi/communicator/comm_init.c @@ -341,14 +341,6 @@ int ompi_comm_finalize(void) return OMPI_SUCCESS; } -/* - * For linking only. To be checked. - */ -int ompi_comm_link_function(void) -{ - return OMPI_SUCCESS; -} - /********************************************************************************/ /********************************************************************************/ /********************************************************************************/ diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index 57ef9977fce..455defb9cc5 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -351,7 +351,6 @@ static inline bool ompi_comm_peer_invalid(ompi_communicator_t* comm, int peer_id * Initialise MPI_COMM_WORLD and MPI_COMM_SELF */ int ompi_comm_init(void); -OMPI_DECLSPEC int ompi_comm_link_function(void); /** * extract the local group from a communicator From d52a2d081e9598a9ac9a50fb4b013a6d2a72375b Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Fri, 8 Jul 2016 10:46:05 -0600 Subject: [PATCH 02/13] ompi/comm: refactor communicator cid code This commit simplifies the communicator context ID generation by removing the blocking code. The high level calls: ompi_comm_nextcid and ompi_comm_activate remain but now call the non-blocking variants and wait on the resulting request. This was done to remove the parallel paths for context ID generation in preperation for further improvements of the CID generation code. Signed-off-by: Nathan Hjelm (cherry picked from commit 035c2e2e2aefbd36a200a0a1c4262fde959de462) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm.c | 158 +-- ompi/communicator/comm_cid.c | 1594 ++++++++++++------------------ ompi/communicator/comm_init.c | 8 - ompi/communicator/comm_request.c | 5 +- ompi/communicator/comm_request.h | 2 +- ompi/communicator/communicator.h | 71 +- ompi/dpm/dpm.c | 30 +- ompi/mpi/c/intercomm_create.c | 22 +- ompi/mpi/c/intercomm_merge.c | 23 +- 9 files changed, 718 insertions(+), 1195 deletions(-) diff --git a/ompi/communicator/comm.c b/ompi/communicator/comm.c index c1997d3841f..342e5028afe 100644 --- a/ompi/communicator/comm.c +++ b/ompi/communicator/comm.c @@ -358,13 +358,7 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group, } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send first */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -374,13 +368,7 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group, newcomp->c_contextid, comm->c_contextid ); /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -609,13 +597,7 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send first, doesn't matter */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -634,36 +616,15 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); exit: - if ( NULL != results ) { - free ( results ); - } - if ( NULL != sorted ) { - free ( sorted ); - } - if ( NULL != rresults) { - free ( rresults ); - } - if ( NULL != rsorted ) { - free ( rsorted ); - } - if ( NULL != lranks ) { - free ( lranks ); - } - if ( NULL != rranks ) { - free ( rranks ); - } + free ( results ); + free ( sorted ); + free ( rresults ); + free ( rsorted ); + free ( lranks ); + free ( rranks ); /* Step 4: if we are not part of the comm, free the struct */ /* --------------------------------------------------------- */ @@ -675,7 +636,7 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, } *newcomm = newcomp; - return ( rc ); + return rc; } @@ -925,13 +886,7 @@ ompi_comm_split_type(ompi_communicator_t *comm, } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send first, doesn't matter */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -950,13 +905,7 @@ ompi_comm_split_type(ompi_communicator_t *comm, /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -1031,13 +980,7 @@ int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, omp } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1047,13 +990,7 @@ int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, omp newcomp->c_contextid, comm->c_contextid ); /* activate communicator and init coll-module */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1062,11 +999,15 @@ int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, omp return MPI_SUCCESS; } -struct ompi_comm_idup_with_info_context { +struct ompi_comm_idup_with_info_context_t { + opal_object_t super; ompi_communicator_t *comm; ompi_communicator_t *newcomp; }; +typedef struct ompi_comm_idup_with_info_context_t ompi_comm_idup_with_info_context_t; +OBJ_CLASS_INSTANCE(ompi_comm_idup_with_info_context_t, opal_object_t, NULL, NULL); + static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request); static int ompi_comm_idup_with_info_finish (ompi_comm_request_t *request); static int ompi_comm_idup_getcid (ompi_comm_request_t *request); @@ -1085,7 +1026,7 @@ int ompi_comm_idup_with_info (ompi_communicator_t *comm, ompi_info_t *info, ompi static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *group, ompi_group_t *remote_group, ompi_info_t *info, ompi_communicator_t **newcomm, ompi_request_t **req) { - struct ompi_comm_idup_with_info_context *context; + ompi_comm_idup_with_info_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq[1]; int rc; @@ -1101,7 +1042,7 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro return OMPI_ERR_OUT_OF_RESOURCE; } - context = calloc (1, sizeof (*context)); + context = OBJ_NEW(ompi_comm_idup_with_info_context_t); if (NULL == context) { ompi_comm_request_return (request); return OMPI_ERR_OUT_OF_RESOURCE; @@ -1109,7 +1050,7 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro context->comm = comm; - request->context = context; + request->context = &context->super; rc = ompi_comm_set_nb (&context->newcomp, /* new comm */ comm, /* old comm */ @@ -1142,8 +1083,8 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro static int ompi_comm_idup_getcid (ompi_comm_request_t *request) { - struct ompi_comm_idup_with_info_context *context = - (struct ompi_comm_idup_with_info_context *) request->context; + ompi_comm_idup_with_info_context_t *context = + (ompi_comm_idup_with_info_context_t *) request->context; ompi_request_t *subreq[1]; int rc, mode; @@ -1154,11 +1095,8 @@ static int ompi_comm_idup_getcid (ompi_comm_request_t *request) } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid_nb (context->newcomp, /* new communicator */ - context->comm, /* old comm */ - NULL, /* bridge comm */ - mode, /* mode */ - subreq); /* new subrequest */ + rc = ompi_comm_nextcid_nb (context->newcomp, context->comm, NULL, NULL, + NULL, false, mode, subreq); if (OMPI_SUCCESS != rc) { ompi_comm_request_return (request); return rc; @@ -1171,8 +1109,8 @@ static int ompi_comm_idup_getcid (ompi_comm_request_t *request) static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request) { - struct ompi_comm_idup_with_info_context *context = - (struct ompi_comm_idup_with_info_context *) request->context; + ompi_comm_idup_with_info_context_t *context = + (ompi_comm_idup_with_info_context_t *) request->context; ompi_request_t *subreq[1]; int rc, mode; @@ -1187,7 +1125,7 @@ static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request) context->newcomp->c_contextid, context->comm->c_contextid ); /* activate communicator and init coll-module */ - rc = ompi_comm_activate_nb (&context->newcomp, context->comm, NULL, mode, subreq); + rc = ompi_comm_activate_nb (&context->newcomp, context->comm, NULL, NULL, NULL, false, mode, subreq); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1233,13 +1171,7 @@ int ompi_comm_create_group (ompi_communicator_t *comm, ompi_group_t *group, int } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - newcomp, /* bridge comm (used to pass the group into the group allreduce) */ - &tag, /* user defined tag */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, &tag, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1249,13 +1181,7 @@ int ompi_comm_create_group (ompi_communicator_t *comm, ompi_group_t *group, int newcomp->c_contextid, comm->c_contextid ); /* activate communicator and init coll-module */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - newcomp, - &tag, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, &tag, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1924,13 +1850,8 @@ int ompi_comm_enable(ompi_communicator_t *old_comm, int ret = OMPI_SUCCESS; /* Determine context id. It is identical to f_2_c_handle */ - ret = ompi_comm_nextcid ( new_comm, /* new communicator */ - old_comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTRA, /* mode */ - -1 ); /* send first, doesn't matter */ + ret = ompi_comm_nextcid (new_comm, old_comm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTRA); if (OMPI_SUCCESS != ret) { /* something wrong happened while setting the communicator */ goto complete_and_return; @@ -1953,15 +1874,8 @@ int ompi_comm_enable(ompi_communicator_t *old_comm, goto complete_and_return; } - ret = ompi_comm_activate( &new_comm, /* new communicator */ - old_comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTRA, /* mode */ - -1 ); /* send first, doesn't matter */ - - + ret = ompi_comm_activate (&new_comm, old_comm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTRA); if (OMPI_SUCCESS != ret) { /* something wrong happened while setting the communicator */ goto complete_and_return; diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index f0c332b5dc1..ab9c9961198 100644 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -14,7 +14,7 @@ * Copyright (c) 2007 Voltaire All rights reserved. * Copyright (c) 2006-2010 University of Houston. All rights reserved. * Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved. - * Copyright (c) 2012-2014 Los Alamos National Security, LLC. All rights + * Copyright (c) 2012-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. @@ -44,7 +44,90 @@ #include "ompi/request/request.h" #include "ompi/runtime/mpiruntime.h" -BEGIN_C_DECLS +struct ompi_comm_cid_context_t; + +typedef int (*ompi_comm_allreduce_impl_fn_t) (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, + struct ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); + + +struct ompi_comm_cid_context_t { + opal_object_t super; + + ompi_communicator_t *newcomm; + ompi_communicator_t **newcommp; + ompi_communicator_t *comm; + ompi_communicator_t *bridgecomm; + + ompi_comm_allreduce_impl_fn_t allreduce_fn; + + int nextcid; + int nextlocal_cid; + int start; + int flag, rflag; + int local_leader; + int remote_leader; + int iter; + /** storage for activate barrier */ + int ok; + char *port_string; + bool send_first; + int pml_tag; + char *pmix_tag; +}; + +typedef struct ompi_comm_cid_context_t ompi_comm_cid_context_t; + +static void mca_comm_cid_context_construct (ompi_comm_cid_context_t *context) +{ + memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super)); +} + +static void mca_comm_cid_context_destruct (ompi_comm_cid_context_t *context) +{ + free (context->port_string); + free (context->pmix_tag); +} + +OBJ_CLASS_INSTANCE (ompi_comm_cid_context_t, opal_object_t, + mca_comm_cid_context_construct, + mca_comm_cid_context_destruct); + +struct ompi_comm_allreduce_context_t { + opal_object_t super; + + int *inbuf; + int *outbuf; + int count; + struct ompi_op_t *op; + ompi_comm_cid_context_t *cid_context; + int *tmpbuf; + + /* for intercomm allreduce */ + int *rcounts; + int *rdisps; + + /* for group allreduce */ + int peers_comm[3]; +}; + +typedef struct ompi_comm_allreduce_context_t ompi_comm_allreduce_context_t; + +static void ompi_comm_allreduce_context_construct (ompi_comm_allreduce_context_t *context) +{ + memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super)); +} + +static void ompi_comm_allreduce_context_destruct (ompi_comm_allreduce_context_t *context) +{ + free (context->tmpbuf); + free (context->rcounts); + free (context->rdisps); +} + +OBJ_CLASS_INSTANCE (ompi_comm_allreduce_context_t, opal_object_t, + ompi_comm_allreduce_context_construct, + ompi_comm_allreduce_context_destruct); /** * These functions make sure, that we determine the global result over @@ -53,90 +136,29 @@ BEGIN_C_DECLS * and a bridge-comm (intercomm-create scenario). */ - -typedef int ompi_comm_cid_allredfct (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* lleader, void* rleader, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_intra (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_ledaer, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_intra_bridge(int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter); - -static int ompi_comm_allreduce_intra_pmix (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter); - /* non-blocking intracommunicator allreduce */ -static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, +static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, ompi_request_t **req); /* non-blocking intercommunicator allreduce */ -static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, +static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, ompi_request_t **req); +static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); -static int ompi_comm_register_cid (uint32_t contextid); -static int ompi_comm_unregister_cid (uint32_t contextid); -static uint32_t ompi_comm_lowest_cid ( void ); - -struct ompi_comm_reg_t{ - opal_list_item_t super; - uint32_t cid; -}; -typedef struct ompi_comm_reg_t ompi_comm_reg_t; -OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_comm_reg_t); - -static void ompi_comm_reg_constructor(ompi_comm_reg_t *regcom); -static void ompi_comm_reg_destructor(ompi_comm_reg_t *regcom); +static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); -OBJ_CLASS_INSTANCE (ompi_comm_reg_t, - opal_list_item_t, - ompi_comm_reg_constructor, - ompi_comm_reg_destructor ); +static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); -static opal_mutex_t ompi_cid_lock; -static opal_list_t ompi_registered_comms; +static opal_mutex_t ompi_cid_lock = OPAL_MUTEX_STATIC_INIT; int ompi_comm_cid_init (void) @@ -144,239 +166,164 @@ int ompi_comm_cid_init (void) return OMPI_SUCCESS; } -int ompi_comm_nextcid ( ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, int send_first ) +static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, const char *pmix_tag, bool send_first, + int mode) { + ompi_comm_cid_context_t *context; + ompi_comm_request_t *request; int ret; - int nextcid; - bool flag; - int nextlocal_cid; - int done=0; - int response, glresponse=0; - int start; - unsigned int i; - int iter=0; - ompi_comm_cid_allredfct* allredfnct; - - /** - * Determine which implementation of allreduce we have to use - * for the current scenario - */ - - switch (mode) - { - case OMPI_COMM_CID_INTRA: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra; - break; - case OMPI_COMM_CID_INTER: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter; - break; - case OMPI_COMM_CID_INTRA_BRIDGE: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge; - break; - case OMPI_COMM_CID_INTRA_PMIX: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_pmix; - break; - case OMPI_COMM_CID_GROUP: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_group; - break; - default: - return MPI_UNDEFINED; - break; - } - ret = ompi_comm_register_cid (comm->c_contextid); - if (OMPI_SUCCESS != ret) { - return ret; + context = OBJ_NEW(ompi_comm_cid_context_t); + if (OPAL_UNLIKELY(NULL == context)) { + return NULL; } - start = ompi_mpi_communicators.lowest_free; - - while (!done) { - /** - * This is the real algorithm described in the doc - */ - OPAL_THREAD_LOCK(&ompi_cid_lock); - if (comm->c_contextid != ompi_comm_lowest_cid() ) { - /* if not lowest cid, we do not continue, but sleep and try again */ - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - continue; - } - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - nextlocal_cid = mca_pml.pml_max_contextid; - flag = false; - for (i=start; i < mca_pml.pml_max_contextid ; i++) { - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - i, comm); - if (true == flag) { - nextlocal_cid = i; - break; - } - } - - ret = (allredfnct)(&nextlocal_cid, &nextcid, 1, MPI_MAX, comm, bridgecomm, - local_leader, remote_leader, send_first, "nextcid", iter ); - ++iter; - if( OMPI_SUCCESS != ret ) { - opal_pointer_array_set_item(&ompi_mpi_communicators, nextlocal_cid, NULL); - goto release_and_return; - } + context->newcomm = newcomm; + context->comm = comm; + context->bridgecomm = bridgecomm; - if (mca_pml.pml_max_contextid == (unsigned int) nextcid) { - /* at least one peer ran out of CIDs */ - if (flag) { - opal_pointer_array_set_item(&ompi_mpi_communicators, nextlocal_cid, NULL); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto release_and_return; - } + /* Determine which implementation of allreduce we have to use + * for the current mode. */ + switch (mode) { + case OMPI_COMM_CID_INTRA: + context->allreduce_fn = ompi_comm_allreduce_intra_nb; + break; + case OMPI_COMM_CID_INTER: + context->allreduce_fn = ompi_comm_allreduce_inter_nb; + break; + case OMPI_COMM_CID_GROUP: + context->allreduce_fn = ompi_comm_allreduce_group_nb; + context->pml_tag = ((int *) arg0)[0]; + break; + case OMPI_COMM_CID_INTRA_PMIX: + context->allreduce_fn = ompi_comm_allreduce_intra_pmix_nb; + context->local_leader = ((int *) arg0)[0]; + if (arg1) { + context->port_string = strdup ((char *) arg1); } + context->pmix_tag = strdup ((char *) pmix_tag); + break; + case OMPI_COMM_CID_INTRA_BRIDGE: + context->allreduce_fn = ompi_comm_allreduce_intra_bridge_nb; + context->local_leader = ((int *) arg0)[0]; + context->remote_leader = ((int *) arg1)[0]; + break; + default: + OBJ_RELEASE(context); + return NULL; + } + + context->send_first = send_first; + context->iter = 0; + + return context; +} - if (nextcid == nextlocal_cid) { - response = 1; /* fine with me */ - } - else { - opal_pointer_array_set_item(&ompi_mpi_communicators, - nextlocal_cid, NULL); - - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - nextcid, comm ); - if (true == flag) { - response = 1; /* works as well */ - } - else { - response = 0; /* nope, not acceptable */ - } - } +static ompi_comm_allreduce_context_t *ompi_comm_allreduce_context_alloc (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *cid_context) +{ + ompi_comm_allreduce_context_t *context; - ret = (allredfnct)(&response, &glresponse, 1, MPI_MIN, comm, bridgecomm, - local_leader, remote_leader, send_first, "nextcid", iter ); - ++iter; - if( OMPI_SUCCESS != ret ) { - opal_pointer_array_set_item(&ompi_mpi_communicators, nextcid, NULL); - goto release_and_return; - } - if (1 == glresponse) { - done = 1; /* we are done */ - break; - } - else if ( 0 == glresponse ) { - if ( 1 == response ) { - /* we could use that, but other don't agree */ - opal_pointer_array_set_item(&ompi_mpi_communicators, - nextcid, NULL); - } - start = nextcid+1; /* that's where we can start the next round */ - } + context = OBJ_NEW(ompi_comm_allreduce_context_t); + if (OPAL_UNLIKELY(NULL == context)) { + return NULL; } - /* set the according values to the newcomm */ - newcomm->c_contextid = nextcid; - opal_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm); - - release_and_return: - ompi_comm_unregister_cid (comm->c_contextid); + context->inbuf = inbuf; + context->outbuf = outbuf; + context->count = count; + context->op = op; + context->cid_context = cid_context; - return ret; + return context; } -/* Non-blocking version of ompi_comm_nextcid */ -struct mca_comm_nextcid_context { - ompi_communicator_t* newcomm; - ompi_communicator_t* comm; - ompi_communicator_t* bridgecomm; - int mode; - int nextcid; - int nextlocal_cid; - int start; - int flag, rflag; -}; - /* find the next available local cid and start an allreduce */ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request); /* verify that the maximum cid is locally available and start an allreduce */ static int ompi_comm_checkcid (ompi_comm_request_t *request); /* verify that the cid was available globally */ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request); +/* lock the cid generator */ +static int ompi_comm_cid_lock (ompi_comm_request_t *request); -int ompi_comm_nextcid_nb (ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - int mode, ompi_request_t **req) +int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode, ompi_request_t **req) { - struct mca_comm_nextcid_context *context; + ompi_comm_cid_context_t *context; ompi_comm_request_t *request; int ret; - /** - * Determine which implementation of allreduce we have to use - * for the current scenario - */ - if (OMPI_COMM_CID_INTRA != mode && OMPI_COMM_CID_INTER != mode) { - return MPI_UNDEFINED; - } - - ret = ompi_comm_register_cid (comm->c_contextid); - if (OMPI_SUCCESS != ret) { - return ret; - } - - context = calloc (1, sizeof (*context)); + context = mca_comm_cid_context_alloc (newcomm, comm, bridgecomm, arg0, arg1, + "nextcid", send_first, mode); if (NULL == context) { - ompi_comm_unregister_cid (comm->c_contextid); return OMPI_ERR_OUT_OF_RESOURCE; } + context->start = ompi_mpi_communicators.lowest_free; + request = ompi_comm_request_get (); if (NULL == request) { - ompi_comm_unregister_cid (comm->c_contextid); - free (context); + OBJ_RELEASE(context); return OMPI_ERR_OUT_OF_RESOURCE; } - context->newcomm = newcomm; - context->comm = comm; - context->bridgecomm = bridgecomm; - context->mode = mode; - context->start = ompi_mpi_communicators.lowest_free; - - request->context = context; + request->context = &context->super; - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0); ompi_comm_request_start (request); *req = &request->super; + return OMPI_SUCCESS; } +int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode) +{ + ompi_request_t *req; + int rc; + + rc = ompi_comm_nextcid_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req); + if (OMPI_SUCCESS != rc) { + return rc; + } + + ompi_request_wait_completion (req); + rc = req->req_status.MPI_ERROR; + ompi_comm_request_return ((ompi_comm_request_t *) req); + + return rc; +} + +static int ompi_comm_cid_lock (ompi_comm_request_t *request) +{ + if (!OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + } + + return ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0); +} + static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) { - struct mca_comm_nextcid_context *context = request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; ompi_request_t *subreq; - unsigned int i; bool flag; int ret; /** * This is the real algorithm described in the doc */ - OPAL_THREAD_LOCK(&ompi_cid_lock); - if (context->comm->c_contextid != ompi_comm_lowest_cid() ) { - /* if not lowest cid, we do not continue, but sleep and try again */ - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); - - return OMPI_SUCCESS; - } - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - flag = false; context->nextlocal_cid = mca_pml.pml_max_contextid; - for (i = context->start ; i < mca_pml.pml_max_contextid ; ++i) { + for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) { flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, i, context->comm); if (true == flag) { @@ -385,15 +332,10 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) } } - if (context->mode == OMPI_COMM_CID_INTRA) { - ret = ompi_comm_allreduce_intra_nb (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, - context->comm, context->bridgecomm, &subreq); - } else { - ret = ompi_comm_allreduce_inter_nb (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, - context->comm, context->bridgecomm, &subreq); - } - + ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, + context, &subreq); if (OMPI_SUCCESS != ret) { + OPAL_THREAD_UNLOCK(&ompi_cid_lock); return ret; } @@ -403,18 +345,17 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL); } + OPAL_THREAD_UNLOCK(&ompi_cid_lock); return OMPI_ERR_OUT_OF_RESOURCE; } /* next we want to verify that the resulting commid is ok */ - ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1); - - return OMPI_SUCCESS; + return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1); } static int ompi_comm_checkcid (ompi_comm_request_t *request) { - struct mca_comm_nextcid_context *context = request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; ompi_request_t *subreq; int ret; @@ -423,37 +364,31 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request) if (!context->flag) { opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL); - context->flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - context->nextcid, context->comm); + context->flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, + context->nextcid, context->comm); } - if (context->mode == OMPI_COMM_CID_INTRA) { - ret = ompi_comm_allreduce_intra_nb (&context->flag, &context->rflag, 1, MPI_MIN, context->comm, - context->bridgecomm, &subreq); - } else { - ret = ompi_comm_allreduce_inter_nb (&context->flag, &context->rflag, 1, MPI_MIN, context->comm, - context->bridgecomm, &subreq); - } + ++context->iter; - if (OMPI_SUCCESS != ret) { - return ret; + ret = context->allreduce_fn (&context->flag, &context->rflag, 1, MPI_MAX, context, &subreq); + if (OMPI_SUCCESS == ret) { + ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); } - ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); - - return OMPI_SUCCESS; + return ret; } static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) { - struct mca_comm_nextcid_context *context = request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; if (1 == context->rflag) { /* set the according values to the newcomm */ context->newcomm->c_contextid = context->nextcid; opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm); - ompi_comm_unregister_cid (context->comm->c_contextid); + /* unlock the cid generator */ + OPAL_THREAD_UNLOCK(&ompi_cid_lock); /* done! */ return OMPI_SUCCESS; @@ -461,118 +396,16 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) if (1 == context->flag) { /* we could use this cid, but other don't agree */ - opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextcid, NULL); + opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, NULL); context->start = context->nextcid + 1; /* that's where we can start the next round */ } + ++context->iter; + /* try again */ return ompi_comm_allreduce_getnextcid (request); } -/**************************************************************************/ -/**************************************************************************/ -/**************************************************************************/ -static void ompi_comm_reg_constructor (ompi_comm_reg_t *regcom) -{ - regcom->cid=MPI_UNDEFINED; -} - -static void ompi_comm_reg_destructor (ompi_comm_reg_t *regcom) -{ -} - -void ompi_comm_reg_init (void) -{ - OBJ_CONSTRUCT(&ompi_registered_comms, opal_list_t); - OBJ_CONSTRUCT(&ompi_cid_lock, opal_mutex_t); -} - -void ompi_comm_reg_finalize (void) -{ - OBJ_DESTRUCT(&ompi_registered_comms); - OBJ_DESTRUCT(&ompi_cid_lock); -} - - -static int ompi_comm_register_cid (uint32_t cid) -{ - ompi_comm_reg_t *regcom; - ompi_comm_reg_t *newentry = OBJ_NEW(ompi_comm_reg_t); - bool registered = false; - - do { - /* Only one communicator function allowed in same time on the - * same communicator. - */ - OPAL_THREAD_LOCK(&ompi_cid_lock); - - newentry->cid = cid; - if ( !(opal_list_is_empty (&ompi_registered_comms)) ) { - bool ok = true; - - OPAL_LIST_FOREACH(regcom, &ompi_registered_comms, ompi_comm_reg_t) { - if ( regcom->cid > cid ) { - break; - } -#if OMPI_ENABLE_THREAD_MULTIPLE - if( regcom->cid == cid ) { - /** - * The MPI standard state that is the user responsability to - * schedule the global communications in order to avoid any - * kind of troubles. As, managing communicators involve several - * collective communications, we should enforce a sequential - * execution order. This test only allow one communicator - * creation function based on the same communicator. - */ - ok = false; - break; - } -#endif /* OMPI_ENABLE_THREAD_MULTIPLE */ - } - if (ok) { - opal_list_insert_pos (&ompi_registered_comms, (opal_list_item_t *) regcom, - (opal_list_item_t *)newentry); - registered = true; - } - } else { - opal_list_append (&ompi_registered_comms, (opal_list_item_t *)newentry); - registered = true; - } - - /* drop the lock before trying again */ - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - } while (!registered); - - return OMPI_SUCCESS; -} - -static int ompi_comm_unregister_cid (uint32_t cid) -{ - ompi_comm_reg_t *regcom; - - OPAL_THREAD_LOCK(&ompi_cid_lock); - - OPAL_LIST_FOREACH(regcom, &ompi_registered_comms, ompi_comm_reg_t) { - if(regcom->cid == cid) { - opal_list_remove_item(&ompi_registered_comms, (opal_list_item_t *) regcom); - OBJ_RELEASE(regcom); - break; - } - } - - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - - return OMPI_SUCCESS; -} - -static uint32_t ompi_comm_lowest_cid (void) -{ - ompi_comm_reg_t *regcom=NULL; - opal_list_item_t *item=opal_list_get_first (&ompi_registered_comms); - - regcom = (ompi_comm_reg_t *)item; - return regcom->cid; -} /**************************************************************************/ /**************************************************************************/ /**************************************************************************/ @@ -590,174 +423,43 @@ static uint32_t ompi_comm_lowest_cid (void) * comm.c is, that this file contains the allreduce implementations * which are required, and thus we avoid having duplicate code... */ -int ompi_comm_activate ( ompi_communicator_t** newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, - int send_first ) -{ - int ret = 0; - - int ok=0, gok=0; - ompi_comm_cid_allredfct* allredfnct; - - /* Step 1: the barrier, after which it is allowed to - * send messages over the new communicator - */ - switch (mode) - { - case OMPI_COMM_CID_INTRA: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra; - break; - case OMPI_COMM_CID_INTER: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter; - break; - case OMPI_COMM_CID_INTRA_BRIDGE: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge; - break; - case OMPI_COMM_CID_INTRA_PMIX: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_pmix; - break; - case OMPI_COMM_CID_GROUP: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_group; - break; - default: - return MPI_UNDEFINED; - break; - } - - if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) { - /* Initialize the PML stuff in the newcomm */ - if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) { - goto bail_on_error; - } - - OMPI_COMM_SET_PML_ADDED(*newcomm); - } - - - ret = (allredfnct)(&ok, &gok, 1, MPI_MIN, comm, bridgecomm, - local_leader, remote_leader, send_first, "activate", 0 ); - if( OMPI_SUCCESS != ret ) { - goto bail_on_error; - } - - - - /** - * Check to see if this process is in the new communicator. - * - * Specifically, this function is invoked by all proceses in the - * old communicator, regardless of whether they are in the new - * communicator or not. This is because it is far simpler to use - * MPI collective functions on the old communicator to determine - * some data for the new communicator (e.g., remote_leader) than - * to kludge up our own pseudo-collective routines over just the - * processes in the new communicator. Hence, *all* processes in - * the old communicator need to invoke this function. - * - * That being said, only processes in the new communicator need to - * select a coll module for the new communicator. More - * specifically, proceses who are not in the new communicator - * should *not* select a coll module -- for example, - * ompi_comm_rank(newcomm) returns MPI_UNDEFINED for processes who - * are not in the new communicator. This can cause errors in the - * selection / initialization of a coll module. Plus, it's - * wasteful -- processes in the new communicator will end up - * freeing the new communicator anyway, so we might as well leave - * the coll selection as NULL (the coll base comm unselect code - * handles that case properly). - */ - if (MPI_UNDEFINED == (*newcomm)->c_local_group->grp_my_rank) { - return OMPI_SUCCESS; - } - - /* Let the collectives components fight over who will do - collective on this new comm. */ - if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(*newcomm))) { - goto bail_on_error; - } - - /* For an inter communicator, we have to deal with the potential - * problem of what is happening if the local_comm that we created - * has a lower CID than the parent comm. This is not a problem - * as long as the user calls MPI_Comm_free on the inter communicator. - * However, if the communicators are not freed by the user but released - * by Open MPI in MPI_Finalize, we walk through the list of still available - * communicators and free them one by one. Thus, local_comm is freed before - * the actual inter-communicator. However, the local_comm pointer in the - * inter communicator will still contain the 'previous' address of the local_comm - * and thus this will lead to a segmentation violation. In order to prevent - * that from happening, we increase the reference counter local_comm - * by one if its CID is lower than the parent. We cannot increase however - * its reference counter if the CID of local_comm is larger than - * the CID of the inter communicators, since a regular MPI_Comm_free would - * leave in that the case the local_comm hanging around and thus we would not - * recycle CID's properly, which was the reason and the cause for this trouble. - */ - if ( OMPI_COMM_IS_INTER(*newcomm)) { - if ( OMPI_COMM_CID_IS_LOWER(*newcomm, comm)) { - OMPI_COMM_SET_EXTRA_RETAIN (*newcomm); - OBJ_RETAIN (*newcomm); - } - } - - - return OMPI_SUCCESS; - - bail_on_error: - OBJ_RELEASE(*newcomm); - *newcomm = MPI_COMM_NULL; - return ret; -} /* Non-blocking version of ompi_comm_activate */ -struct ompi_comm_activate_nb_context { - ompi_communicator_t **newcomm; - ompi_communicator_t *comm; - - /* storage for activate barrier */ - int ok; -}; - static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request); -int ompi_comm_activate_nb (ompi_communicator_t **newcomm, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - int mode, ompi_request_t **req) +int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode, ompi_request_t **req) { - struct ompi_comm_activate_nb_context *context; + ompi_comm_cid_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq; + int local_leader; + int remote_leader; int ret = 0; - request = ompi_comm_request_get (); - if (NULL == request) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - - context = calloc (1, sizeof (*context)); + context = mca_comm_cid_context_alloc (*newcomm, comm, bridgecomm, arg0, arg1, "activate", + send_first, mode); if (NULL == context) { - ompi_comm_request_return (request); return OMPI_ERR_OUT_OF_RESOURCE; } - context->newcomm = newcomm; - context->comm = comm; + /* keep track of the pointer so it can be set to MPI_COMM_NULL on failure */ + context->newcommp = newcomm; - request->context = context; - - if (OMPI_COMM_CID_INTRA != mode && OMPI_COMM_CID_INTER != mode) { - return MPI_UNDEFINED; + request = ompi_comm_request_get (); + if (NULL == request) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; } + request->context = &context->super; + if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) { /* Initialize the PML stuff in the newcomm */ if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) { OBJ_RELEASE(newcomm); + OBJ_RELEASE(context); *newcomm = MPI_COMM_NULL; return ret; } @@ -765,16 +467,10 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, } /* Step 1: the barrier, after which it is allowed to - * send messages over the new communicator - */ - if (mode == OMPI_COMM_CID_INTRA) { - ret = ompi_comm_allreduce_intra_nb (&context->ok, &context->ok, 1, MPI_MIN, - context->comm, bridgecomm, &subreq); - } else { - ret = ompi_comm_allreduce_inter_nb (&context->ok, &context->ok, 1, MPI_MIN, - context->comm, bridgecomm, &subreq); - } - + * send messages over the new communicator + */ + ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MAX, context, + &subreq); if (OMPI_SUCCESS != ret) { ompi_comm_request_return (request); return ret; @@ -788,10 +484,28 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, return OMPI_SUCCESS; } +int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode) +{ + ompi_request_t *req; + int rc; + + rc = ompi_comm_activate_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req); + if (OMPI_SUCCESS != rc) { + return rc; + } + + ompi_request_wait_completion (req); + rc = req->req_status.MPI_ERROR; + ompi_comm_request_return ((ompi_comm_request_t *) req); + + return rc; +} + static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) { - struct ompi_comm_activate_nb_context *context = - (struct ompi_comm_activate_nb_context *) request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; int ret; /** @@ -818,15 +532,15 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) * the coll selection as NULL (the coll base comm unselect code * handles that case properly). */ - if (MPI_UNDEFINED == (*context->newcomm)->c_local_group->grp_my_rank) { + if (MPI_UNDEFINED == (context->newcomm)->c_local_group->grp_my_rank) { return OMPI_SUCCESS; } /* Let the collectives components fight over who will do collective on this new comm. */ - if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(*context->newcomm))) { - OBJ_RELEASE(*context->newcomm); - *context->newcomm = MPI_COMM_NULL; + if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(context->newcomm))) { + OBJ_RELEASE(context->newcomm); + *context->newcommp = MPI_COMM_NULL; return ret; } @@ -847,10 +561,10 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) * leave in that the case the local_comm hanging around and thus we would not * recycle CID's properly, which was the reason and the cause for this trouble. */ - if (OMPI_COMM_IS_INTER(*context->newcomm)) { - if (OMPI_COMM_CID_IS_LOWER(*context->newcomm, context->comm)) { - OMPI_COMM_SET_EXTRA_RETAIN (*context->newcomm); - OBJ_RETAIN (*context->newcomm); + if (OMPI_COMM_IS_INTER(context->newcomm)) { + if (OMPI_COMM_CID_IS_LOWER(context->newcomm, context->comm)) { + OMPI_COMM_SET_EXTRA_RETAIN (context->newcomm); + OBJ_RETAIN (context->newcomm); } } @@ -861,207 +575,47 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) /**************************************************************************/ /**************************************************************************/ /**************************************************************************/ -/* Arguments not used in this implementation: - * - bridgecomm - * - local_leader - * - remote_leader - * - send_first - */ -static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ) +static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *context, ompi_request_t **req) { - return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT, op, comm, - comm->c_coll.coll_allreduce_module ); -} + ompi_communicator_t *comm = context->comm; -static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - ompi_request_t **req) -{ return comm->c_coll.coll_iallreduce (inbuf, outbuf, count, MPI_INT, op, comm, req, comm->c_coll.coll_iallreduce_module); } - -/* Arguments not used in this implementation: - * - bridgecomm - * - local_leader - * - remote_leader - * - send_first - */ -static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ) -{ - int local_rank, rsize; - int rc; - int *sbuf; - int *tmpbuf=NULL; - int *rcounts=NULL, scount=0; - int *rdisps=NULL; - - if ( !OMPI_COMM_IS_INTER (intercomm)) { - return MPI_ERR_COMM; - } - - /* Allocate temporary arrays */ - rsize = ompi_comm_remote_size (intercomm); - local_rank = ompi_comm_rank ( intercomm ); - - tmpbuf = (int *) malloc ( count * sizeof(int)); - rdisps = (int *) calloc ( rsize, sizeof(int)); - rcounts = (int *) calloc ( rsize, sizeof(int) ); - if ( OPAL_UNLIKELY (NULL == tmpbuf || NULL == rdisps || NULL == rcounts)) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } - - /* Execute the inter-allreduce: the result of our group will - be in the buffer of the remote group */ - rc = intercomm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT, - op, intercomm, - intercomm->c_coll.coll_allreduce_module); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - if ( 0 == local_rank ) { - MPI_Request req; - - /* for the allgatherv later */ - scount = count; - - /* local leader exchange their data and determine the overall result - for both groups */ - rc = MCA_PML_CALL(irecv (outbuf, count, MPI_INT, 0, - OMPI_COMM_ALLREDUCE_TAG, - intercomm, &req)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, 0, - OMPI_COMM_ALLREDUCE_TAG, - MCA_PML_BASE_SEND_STANDARD, - intercomm)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = ompi_request_wait ( &req, MPI_STATUS_IGNORE ); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT); - } - - /* distribute the overall result to all processes in the other group. - Instead of using bcast, we are using here allgatherv, to avoid the - possible deadlock. Else, we need an algorithm to determine, - which group sends first in the inter-bcast and which receives - the result first. - */ - rcounts[0] = count; - sbuf = outbuf; - rc = intercomm->c_coll.coll_allgatherv (sbuf, scount, MPI_INT, outbuf, - rcounts, rdisps, MPI_INT, - intercomm, - intercomm->c_coll.coll_allgatherv_module); - - exit: - if ( NULL != tmpbuf ) { - free ( tmpbuf ); - } - if ( NULL != rcounts ) { - free ( rcounts ); - } - if ( NULL != rdisps ) { - free ( rdisps ); - } - - return (rc); -} - /* Non-blocking version of ompi_comm_allreduce_inter */ -struct ompi_comm_allreduce_inter_context { - int *inbuf; - int *outbuf; - int count; - struct ompi_op_t *op; - ompi_communicator_t *intercomm; - ompi_communicator_t *bridgecomm; - int *tmpbuf; - int *rcounts; - int *rdisps; -}; - -static void ompi_comm_allreduce_inter_context_free (struct ompi_comm_allreduce_inter_context *context) -{ - if (context->tmpbuf) { - free (context->tmpbuf); - } - - if (context->rdisps) { - free (context->rdisps); - } - - if (context->rcounts) { - free (context->rcounts); - } - - free (context); -} - static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request); static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request); static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request); -static int ompi_comm_allreduce_inter_allgather_complete (ompi_comm_request_t *request); -/* Arguments not used in this implementation: - * - bridgecomm - */ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, + ompi_comm_cid_context_t *cid_context, ompi_request_t **req) { - struct ompi_comm_allreduce_inter_context *context = NULL; - ompi_comm_request_t *request = NULL; + ompi_communicator_t *intercomm = cid_context->comm; + ompi_comm_allreduce_context_t *context; + ompi_comm_request_t *request; ompi_request_t *subreq; int local_rank, rsize, rc; - if (!OMPI_COMM_IS_INTER (intercomm)) { + if (!OMPI_COMM_IS_INTER (cid_context->comm)) { return MPI_ERR_COMM; } request = ompi_comm_request_get (); - if (NULL == request) { + if (OPAL_UNLIKELY(NULL == request)) { return OMPI_ERR_OUT_OF_RESOURCE; } - context = calloc (1, sizeof (*context)); - if (NULL == context) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (OPAL_UNLIKELY(NULL == context)) { + ompi_comm_request_return (request); + return OMPI_ERR_OUT_OF_RESOURCE; } - context->inbuf = inbuf; - context->outbuf = outbuf; - context->count = count; - context->op = op; - context->intercomm = intercomm; - context->bridgecomm = bridgecomm; + request->context = &context->super; /* Allocate temporary arrays */ rsize = ompi_comm_remote_size (intercomm); @@ -1071,18 +625,17 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, context->rdisps = (int *) calloc (rsize, sizeof(int)); context->rcounts = (int *) calloc (rsize, sizeof(int)); if (OPAL_UNLIKELY (NULL == context->tmpbuf || NULL == context->rdisps || NULL == context->rcounts)) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + ompi_comm_request_return (request); + return OMPI_ERR_OUT_OF_RESOURCE; } - request->context = context; - /* Execute the inter-allreduce: the result from the local will be in the buffer of the remote group * and vise-versa. */ rc = intercomm->c_coll.coll_iallreduce (inbuf, context->tmpbuf, count, MPI_INT, op, intercomm, &subreq, intercomm->c_coll.coll_iallreduce_module); - if (OMPI_SUCCESS != rc) { - goto exit; + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + ompi_comm_request_return (request); + return rc; } if (0 == local_rank) { @@ -1094,58 +647,37 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, ompi_comm_request_start (request); *req = &request->super; -exit: - if (OMPI_SUCCESS != rc) { - if (context) { - ompi_comm_allreduce_inter_context_free (context); - } - - if (request) { - request->context = NULL; - ompi_comm_request_return (request); - } - } - - return rc; + return OMPI_SUCCESS; } static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request) { - struct ompi_comm_allreduce_inter_context *context = - (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *intercomm = context->cid_context->comm; ompi_request_t *subreqs[2]; int rc; /* local leader exchange their data and determine the overall result for both groups */ rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG, - context->intercomm, subreqs)); - if ( OMPI_SUCCESS != rc ) { - goto exit; + intercomm, subreqs)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; } rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG, - MCA_PML_BASE_SEND_STANDARD, context->intercomm, subreqs + 1)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2); - -exit: - if (OMPI_SUCCESS != rc) { - ompi_comm_allreduce_inter_context_free (context); - request->context = NULL; + MCA_PML_BASE_SEND_STANDARD, intercomm, subreqs + 1)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; } - return rc; + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2); } static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request) { - struct ompi_comm_allreduce_inter_context *context = - (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT); @@ -1155,8 +687,8 @@ static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request) { - struct ompi_comm_allreduce_inter_context *context = - (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *intercomm = context->cid_context->comm; ompi_request_t *subreq; int scount = 0, rc; @@ -1167,245 +699,369 @@ static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request) the result first. */ - if (0 != ompi_comm_rank (context->intercomm)) { + if (0 != ompi_comm_rank (intercomm)) { context->rcounts[0] = context->count; } else { scount = context->count; } - rc = context->intercomm->c_coll.coll_iallgatherv (context->outbuf, scount, MPI_INT, context->outbuf, - context->rcounts, context->rdisps, MPI_INT, - context->intercomm, &subreq, - context->intercomm->c_coll.coll_iallgatherv_module); + rc = intercomm->c_coll.coll_iallgatherv (context->outbuf, scount, MPI_INT, context->outbuf, + context->rcounts, context->rdisps, MPI_INT, intercomm, + &subreq, intercomm->c_coll.coll_iallgatherv_module); if (OMPI_SUCCESS != rc) { - ompi_comm_allreduce_inter_context_free (context); - request->context = NULL; return rc; } - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_allgather_complete, &subreq, 1); + return ompi_comm_request_schedule_append (request, NULL, &subreq, 1); +} - return OMPI_SUCCESS; +static int ompi_comm_allreduce_bridged_schedule_bcast (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *comm = context->cid_context->comm; + ompi_request_t *subreq; + int rc; + + rc = comm->c_coll.coll_ibcast (context->outbuf, context->count, MPI_INT, + context->cid_context->local_leader, comm, + &subreq, comm->c_coll.coll_ibcast_module); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + return ompi_comm_request_schedule_append (request, NULL, &subreq, 1); } -static int ompi_comm_allreduce_inter_allgather_complete (ompi_comm_request_t *request) +static int ompi_comm_allreduce_bridged_xchng_complete (ompi_comm_request_t *request) { - /* free this request's context */ - ompi_comm_allreduce_inter_context_free (request->context); - /* prevent a double-free from the progress engine */ - request->context = NULL; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; - /* done */ - return OMPI_SUCCESS; + /* step 3: reduce leader data */ + ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT); + + /* schedule the broadcast to local peers */ + return ompi_comm_allreduce_bridged_schedule_bcast (request); } -/* Arguments not used in this implementation: - * - send_first - */ -static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bcomm, - void* lleader, void* rleader, - int send_first, char *tag, int iter ) +static int ompi_comm_allreduce_bridged_reduce_complete (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *bridgecomm = context->cid_context->bridgecomm; + ompi_request_t *subreq[2]; + int rc; + + /* step 2: leader exchange */ + rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, context->cid_context->remote_leader, + OMPI_COMM_ALLREDUCE_TAG, bridgecomm, subreq + 1)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, context->cid_context->remote_leader, + OMPI_COMM_ALLREDUCE_TAG, MCA_PML_BASE_SEND_STANDARD, bridgecomm, + subreq)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_xchng_complete, subreq, 2); +} + +static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *cid_context, + ompi_request_t **req) { - int *tmpbuf=NULL; - int local_rank; + ompi_communicator_t *comm = cid_context->comm; + ompi_comm_allreduce_context_t *context; + int local_rank = ompi_comm_rank (comm); + ompi_comm_request_t *request; + ompi_request_t *subreq; int i; int rc; - int local_leader, remote_leader; - local_leader = (*((int*)lleader)); - remote_leader = (*((int*)rleader)); + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (OPAL_UNLIKELY(NULL == context)) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + if (local_rank == cid_context->local_leader) { + context->tmpbuf = (int *) calloc (count, sizeof (int)); + if (OPAL_UNLIKELY(NULL == context->tmpbuf)) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + } - if ( &ompi_mpi_op_sum.op != op && &ompi_mpi_op_prod.op != op && - &ompi_mpi_op_max.op != op && &ompi_mpi_op_min.op != op ) { - return MPI_ERR_OP; + request = ompi_comm_request_get (); + if (OPAL_UNLIKELY(NULL == request)) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; } - local_rank = ompi_comm_rank ( comm ); - tmpbuf = (int *) malloc ( count * sizeof(int)); - if ( NULL == tmpbuf ) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + request->context = &context->super; + + if (cid_context->local_leader == local_rank) { + memcpy (context->tmpbuf, inbuf, count * sizeof (int)); } - /* Intercomm_create */ - rc = comm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT, - op, comm, comm->c_coll.coll_allreduce_module ); + /* step 1: reduce to the local leader */ + rc = comm->c_coll.coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, + cid_context->local_leader, comm, &subreq, + comm->c_coll.coll_ireduce_module); if ( OMPI_SUCCESS != rc ) { - goto exit; + ompi_comm_request_return (request); + return rc; } - if (local_rank == local_leader ) { - MPI_Request req; + if (cid_context->local_leader == local_rank) { + rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_reduce_complete, + &subreq, 1); + } else { + /* go ahead and schedule the broadcast */ + ompi_comm_request_schedule_append (request, NULL, &subreq, 1); - rc = MCA_PML_CALL(irecv ( outbuf, count, MPI_INT, remote_leader, - OMPI_COMM_ALLREDUCE_TAG, - bcomm, &req)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, remote_leader, - OMPI_COMM_ALLREDUCE_TAG, - MCA_PML_BASE_SEND_STANDARD, bcomm)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = ompi_request_wait( &req, MPI_STATUS_IGNORE); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } + rc = ompi_comm_allreduce_bridged_schedule_bcast (request); + } - if ( &ompi_mpi_op_max.op == op ) { - for ( i = 0 ; i < count; i++ ) { - if (tmpbuf[i] > outbuf[i]) { - outbuf[i] = tmpbuf[i]; - } - } - } - else if ( &ompi_mpi_op_min.op == op ) { - for ( i = 0 ; i < count; i++ ) { - if (tmpbuf[i] < outbuf[i]) { - outbuf[i] = tmpbuf[i]; - } - } - } - else if ( &ompi_mpi_op_sum.op == op ) { - for ( i = 0 ; i < count; i++ ) { - outbuf[i] += tmpbuf[i]; - } - } - else if ( &ompi_mpi_op_prod.op == op ) { - for ( i = 0 ; i < count; i++ ) { - outbuf[i] *= tmpbuf[i]; - } - } + if (OMPI_SUCCESS != rc) { + ompi_comm_request_return (request); + return rc; } - rc = comm->c_coll.coll_bcast ( outbuf, count, MPI_INT, local_leader, - comm, comm->c_coll.coll_bcast_module ); + ompi_comm_request_start (request); - exit: - if (NULL != tmpbuf ) { - free (tmpbuf); - } + *req = &request->super; - return (rc); + return OMPI_SUCCESS; } -/* Arguments not used in this implementation: - * - bridgecomm - * - * lleader is the local rank of root in comm - * rleader is the port_string - */ -static int ompi_comm_allreduce_intra_pmix (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* lleader, void* rleader, - int send_first, char *tag, int iter ) +static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *request) { - int *tmpbuf=NULL; - int rc; - int local_leader, local_rank; - char *port_string; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_comm_cid_context_t *cid_context = context->cid_context; + int32_t size_count = context->count; opal_value_t info; opal_pmix_pdata_t pdat; opal_buffer_t sbuf; - int32_t size_count; + int rc; + + fprintf (stderr, "reduce complete\n"); + + OBJ_CONSTRUCT(&sbuf, opal_buffer_t); + + if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, context->tmpbuf, (int32_t)context->count, OPAL_INT))) { + OBJ_DESTRUCT(&sbuf); + fprintf (stderr, "pack failed. rc %d\n", rc); + return rc; + } + + OBJ_CONSTRUCT(&info, opal_value_t); + OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t); + + info.type = OPAL_BYTE_OBJECT; + pdat.value.type = OPAL_BYTE_OBJECT; + + opal_dss.unload(&sbuf, (void**)&info.data.bo.bytes, &info.data.bo.size); + OBJ_DESTRUCT(&sbuf); + + if (cid_context->send_first) { + (void)asprintf(&info.key, "%s:%s:send:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + (void)asprintf(&pdat.value.key, "%s:%s:recv:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + } else { + (void)asprintf(&info.key, "%s:%s:recv:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + (void)asprintf(&pdat.value.key, "%s:%s:send:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + } + + fprintf (stderr, "%s, %s\n", info.key, pdat.value.key); + + /* this macro is not actually non-blocking. if a non-blocking version becomes available this function + * needs to be reworked to take advantage of it. */ + OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); + OBJ_DESTRUCT(&info); + fprintf (stderr, "OPAL_PMIX_EXCHANGE returned %d\n", rc); + if (OPAL_SUCCESS != rc) { + OBJ_DESTRUCT(&pdat); + return rc; + } + + OBJ_CONSTRUCT(&sbuf, opal_buffer_t); + opal_dss.load(&sbuf, pdat.value.data.bo.bytes, pdat.value.data.bo.size); + pdat.value.data.bo.bytes = NULL; + pdat.value.data.bo.size = 0; + OBJ_DESTRUCT(&pdat); + + rc = opal_dss.unpack (&sbuf, context->outbuf, &size_count, OPAL_INT); + OBJ_DESTRUCT(&sbuf); + if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) { + return rc; + } - local_leader = (*((int*)lleader)); - port_string = (char*)rleader; - size_count = count; + ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, size_count, MPI_INT); - local_rank = ompi_comm_rank ( comm ); - tmpbuf = (int *) malloc ( count * sizeof(int)); - if ( NULL == tmpbuf ) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + return ompi_comm_allreduce_bridged_schedule_bcast (request); +} + +static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *cid_context, + ompi_request_t **req) +{ + ompi_communicator_t *comm = cid_context->comm; + ompi_comm_allreduce_context_t *context; + int local_rank = ompi_comm_rank (comm); + ompi_comm_request_t *request; + ompi_request_t *subreq; + int rc; + + fprintf (stderr, "Calling ompi_comm_allreduce_intra_pmix_nb. rank %d\n", local_rank); + + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (OPAL_UNLIKELY(NULL == context)) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + if (cid_context->local_leader == local_rank) { + context->tmpbuf = (int *) calloc (count, sizeof(int)); + if (OPAL_UNLIKELY(NULL == context->tmpbuf)) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + } + + request = ompi_comm_request_get (); + if (NULL == request) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; } + request->context = &context->super; + /* comm is an intra-communicator */ - rc = comm->c_coll.coll_allreduce(inbuf,tmpbuf,count,MPI_INT,op, comm, - comm->c_coll.coll_allreduce_module); + rc = comm->c_coll.coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, + cid_context->local_leader, comm, + &subreq, comm->c_coll.coll_ireduce_module); if ( OMPI_SUCCESS != rc ) { - goto exit; + ompi_comm_request_return (request); + return rc; } - if (local_rank == local_leader ) { - OBJ_CONSTRUCT(&sbuf, opal_buffer_t); + if (cid_context->local_leader == local_rank) { + rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_pmix_reduce_complete, + &subreq, 1); + } else { + /* go ahead and schedule the broadcast */ + rc = ompi_comm_request_schedule_append (request, NULL, &subreq, 1); - if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, tmpbuf, (int32_t)count, OPAL_INT))) { - goto exit; - } - OBJ_CONSTRUCT(&info, opal_value_t); - OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t); + rc = ompi_comm_allreduce_bridged_schedule_bcast (request); + } - info.type = OPAL_BYTE_OBJECT; - pdat.value.type = OPAL_BYTE_OBJECT; + if (OMPI_SUCCESS != rc) { + ompi_comm_request_return (request); + return rc; + } - opal_dss.unload(&sbuf, (void**)&info.data.bo.bytes, &info.data.bo.size); - OBJ_DESTRUCT(&sbuf); + ompi_comm_request_start (request); + *req = (ompi_request_t *) request; - if (send_first) { - (void)asprintf(&info.key, "%s:%s:send:%d", port_string, tag, iter); - (void)asprintf(&pdat.value.key, "%s:%s:recv:%d", port_string, tag, iter); - } else { - (void)asprintf(&info.key, "%s:%s:recv:%d", port_string, tag, iter); - (void)asprintf(&pdat.value.key, "%s:%s:send:%d", port_string, tag, iter); - } + /* use the same function as bridged to schedule the broadcast */ + return OMPI_SUCCESS; +} - OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); - OBJ_DESTRUCT(&info); +static int ompi_comm_allreduce_group_broadcast (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_comm_cid_context_t *cid_context = context->cid_context; + ompi_request_t *subreq[2]; + int subreq_count = 0; + int rc; - if (OPAL_SUCCESS != rc) { - OBJ_DESTRUCT(&pdat); - goto exit; + for (int i = 0 ; i < 2 ; ++i) { + if (MPI_PROC_NULL != context->peers_comm[i + 1]) { + rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[i+1], + cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD, + cid_context->comm, subreq + subreq_count++)); + if (OMPI_SUCCESS != rc) { + return rc; + } } - OBJ_CONSTRUCT(&sbuf, opal_buffer_t); - opal_dss.load(&sbuf, pdat.value.data.bo.bytes, pdat.value.data.bo.size); - pdat.value.data.bo.bytes = NULL; - pdat.value.data.bo.size = 0; - OBJ_DESTRUCT(&pdat); + } + + return ompi_comm_request_schedule_append (request, NULL, subreq, subreq_count); +} - if (OPAL_SUCCESS != (rc = opal_dss.unpack(&sbuf, outbuf, &size_count, OPAL_INT))) { - OBJ_DESTRUCT(&sbuf); - goto exit; +static int ompi_comm_allreduce_group_recv_complete (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_comm_cid_context_t *cid_context = context->cid_context; + int *tmp = context->tmpbuf; + ompi_request_t *subreq[2]; + int rc; + + for (int i = 0 ; i < 2 ; ++i) { + if (MPI_PROC_NULL != context->peers_comm[i + 1]) { + ompi_op_reduce (context->op, tmp, context->outbuf, context->count, MPI_INT); + tmp += context->count; } - OBJ_DESTRUCT(&sbuf); - count = (int)size_count; - ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT); } - rc = comm->c_coll.coll_bcast (outbuf, count, MPI_INT, - local_leader, comm, - comm->c_coll.coll_bcast_module); + if (MPI_PROC_NULL != context->peers_comm[0]) { + /* interior node */ + rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[0], + cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD, + cid_context->comm, subreq)); + if (OMPI_SUCCESS != rc) { + return rc; + } + + rc = MCA_PML_CALL(irecv(context->outbuf, context->count, MPI_INT, context->peers_comm[0], + cid_context->pml_tag, cid_context->comm, subreq + 1)); + if (OMPI_SUCCESS != rc) { + return rc; + } - exit: - if (NULL != tmpbuf ) { - free (tmpbuf); + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_broadcast, subreq, 2); } - return (rc); + /* root */ + return ompi_comm_allreduce_group_broadcast (request); } -static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *newcomm, - void* local_leader, - void* remote_leader, - int send_first, char *intag, int iter) +static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req) { - ompi_group_t *group = newcomm->c_local_group; - int peers_group[3], peers_comm[3]; + ompi_group_t *group = cid_context->newcomm->c_local_group; const int group_size = ompi_group_size (group); const int group_rank = ompi_group_rank (group); - int tag = *((int *) local_leader); - int *tmp1; - int i, rc=OMPI_SUCCESS; + ompi_communicator_t *comm = cid_context->comm; + int peers_group[3], *tmp, subreq_count = 0; + ompi_comm_allreduce_context_t *context; + ompi_comm_request_t *request; + ompi_request_t *subreq[3]; + + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (NULL == context) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + tmp = context->tmpbuf = calloc (sizeof (int), count * 3); + if (NULL == context->tmpbuf) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + request = ompi_comm_request_get (); + if (NULL == request) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + request->context = &context->super; /* basic recursive doubling allreduce on the group */ peers_group[0] = group_rank ? ((group_rank - 1) >> 1) : MPI_PROC_NULL; @@ -1413,54 +1069,28 @@ static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, peers_group[2] = (group_rank * 2 + 2) < group_size ? group_rank * 2 + 2 : MPI_PROC_NULL; /* translate the ranks into the ranks of the parent communicator */ - ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, peers_comm); - - tmp1 = malloc (sizeof (int) * count); + ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, context->peers_comm); /* reduce */ memmove (outbuf, inbuf, sizeof (int) * count); - for (i = 1 ; i < 3 ; ++i) { - if (MPI_PROC_NULL != peers_comm[i]) { - rc = MCA_PML_CALL(recv(tmp1, count, MPI_INT, peers_comm[i], tag, comm, - MPI_STATUS_IGNORE)); + for (int i = 0 ; i < 2 ; ++i) { + if (MPI_PROC_NULL != context->peers_comm[i + 1]) { + int rc = MCA_PML_CALL(irecv(tmp, count, MPI_INT, context->peers_comm[i + 1], + cid_context->pml_tag, comm, subreq + subreq_count++)); if (OMPI_SUCCESS != rc) { - goto out; + ompi_comm_request_return (request); + return rc; } - /* this is integer reduction so we do not care about ordering */ - ompi_op_reduce (op, tmp1, outbuf, count, MPI_INT); - } - } - - if (MPI_PROC_NULL != peers_comm[0]) { - rc = MCA_PML_CALL(send(outbuf, count, MPI_INT, peers_comm[0], - tag, MCA_PML_BASE_SEND_STANDARD, comm)); - if (OMPI_SUCCESS != rc) { - goto out; - } - rc = MCA_PML_CALL(recv(outbuf, count, MPI_INT, peers_comm[0], - tag, comm, MPI_STATUS_IGNORE)); - if (OMPI_SUCCESS != rc) { - goto out; + tmp += count; } } - /* broadcast */ - for (i = 1 ; i < 3 ; ++i) { - if (MPI_PROC_NULL != peers_comm[i]) { - rc = MCA_PML_CALL(send(outbuf, count, MPI_INT, peers_comm[i], tag, - MCA_PML_BASE_SEND_STANDARD, comm)); - if (OMPI_SUCCESS != rc) { - goto out; - } - } - } + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_recv_complete, subreq, subreq_count); - out: - free (tmp1); + ompi_comm_request_start (request); + *req = &request->super; - return rc; + return OMPI_SUCCESS; } - -END_C_DECLS diff --git a/ompi/communicator/comm_init.c b/ompi/communicator/comm_init.c index f2acab5bbec..f453ca1e8e1 100644 --- a/ompi/communicator/comm_init.c +++ b/ompi/communicator/comm_init.c @@ -206,10 +206,6 @@ int ompi_comm_init(void) OBJ_RETAIN(&ompi_mpi_group_null.group); OBJ_RETAIN(&ompi_mpi_errors_are_fatal.eh); - /* initialize the comm_reg stuff for multi-threaded comm_cid - allocation */ - ompi_comm_reg_init(); - /* initialize communicator requests (for ompi_comm_idup) */ ompi_comm_request_init (); @@ -328,13 +324,9 @@ int ompi_comm_finalize(void) } } - OBJ_DESTRUCT (&ompi_mpi_communicators); OBJ_DESTRUCT (&ompi_comm_f_to_c_table); - /* finalize the comm_reg stuff */ - ompi_comm_reg_finalize(); - /* finalize communicator requests */ ompi_comm_request_fini (); diff --git a/ompi/communicator/comm_request.c b/ompi/communicator/comm_request.c index 496de0319aa..f7372e27452 100644 --- a/ompi/communicator/comm_request.c +++ b/ompi/communicator/comm_request.c @@ -234,6 +234,7 @@ static void ompi_comm_request_destruct (ompi_comm_request_t *request) { OBJ_DESTRUCT(&request->schedule); } + OBJ_CLASS_INSTANCE(ompi_comm_request_t, ompi_request_t, ompi_comm_request_construct, ompi_comm_request_destruct); @@ -257,10 +258,10 @@ ompi_comm_request_t *ompi_comm_request_get (void) void ompi_comm_request_return (ompi_comm_request_t *request) { if (request->context) { - free (request->context); - request->context = NULL; + OBJ_RELEASE (request->context); } + OMPI_REQUEST_FINI(&request->super); opal_free_list_return (&ompi_comm_requests, (opal_free_list_item_t *) request); } diff --git a/ompi/communicator/comm_request.h b/ompi/communicator/comm_request.h index 246a3010b0e..65af613f95f 100644 --- a/ompi/communicator/comm_request.h +++ b/ompi/communicator/comm_request.h @@ -22,7 +22,7 @@ typedef struct ompi_comm_request_t { ompi_request_t super; - void *context; + opal_object_t *context; opal_list_t schedule; } ompi_comm_request_t; OBJ_CLASS_DECLARATION(ompi_comm_request_t); diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index 455defb9cc5..a8d4756cffa 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -492,24 +492,27 @@ ompi_communicator_t* ompi_comm_allocate (int local_group_size, * @param mode: combination of input * OMPI_COMM_CID_INTRA: intra-comm * OMPI_COMM_CID_INTER: inter-comm + * OMPI_COMM_CID_GROUP: only decide CID within the ompi_group_t + * associated with the communicator. arg0 + * must point to an int which will be used + * as the pml tag for communication. * OMPI_COMM_CID_INTRA_BRIDGE: 2 intracomms connected by - * a bridge comm. local_leader - * and remote leader are in this - * case an int (rank in bridge-comm). + * a bridge comm. arg0 and arg1 must point + * to integers representing the local and + * remote leader ranks. the remote leader rank + * is a rank in the bridgecomm. * OMPI_COMM_CID_INTRA_PMIX: 2 intracomms, leaders talk - * through PMIx. lleader and rleader - * are the required contact information. + * through PMIx. arg0 must point to an integer + * representing the local leader rank. arg1 + * must point to a string representing the + * port of the remote leader. * @param send_first: to avoid a potential deadlock for * the OOB version. * This routine has to be thread safe in the final version. */ -OMPI_DECLSPEC int ompi_comm_nextcid ( ompi_communicator_t* newcomm, - ompi_communicator_t* oldcomm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, - int send_first); +OMPI_DECLSPEC int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode); /** * allocate new communicator ID (non-blocking) @@ -521,10 +524,9 @@ OMPI_DECLSPEC int ompi_comm_nextcid ( ompi_communicator_t* newcomm, * OMPI_COMM_CID_INTER: inter-comm * This routine has to be thread safe in the final version. */ -OMPI_DECLSPEC int ompi_comm_nextcid_nb (ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - int mode, ompi_request_t **req); +OMPI_DECLSPEC int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode, ompi_request_t **req); /** * shut down the communicator infrastructure. @@ -617,18 +619,25 @@ int ompi_comm_determine_first ( ompi_communicator_t *intercomm, int high ); -OMPI_DECLSPEC int ompi_comm_activate ( ompi_communicator_t** newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, - int send_first ); +OMPI_DECLSPEC int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode); -OMPI_DECLSPEC int ompi_comm_activate_nb (ompi_communicator_t **newcomm, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - int mode, ompi_request_t **req); +/** + * Non-blocking variant of comm_activate. + * + * @param[inout] newcomm New communicator + * @param[in] comm Parent communicator + * @param[in] bridgecomm Bridge communicator (used for PMIX and bridge modes) + * @param[in] arg0 Mode argument 0 + * @param[in] arg1 Mode argument 1 + * @param[in] send_first Send first from this process (PMIX mode only) + * @param[in] mode Collective mode + * @param[out] req New request object to track this operation + */ +OMPI_DECLSPEC int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode, ompi_request_t **req); /** * a simple function to dump the structure @@ -638,14 +647,6 @@ int ompi_comm_dump ( ompi_communicator_t *comm ); /* setting name */ int ompi_comm_set_name (ompi_communicator_t *comm, const char *name ); -/* - * these are the init and finalize functions for the comm_reg - * stuff. These routines are necessary for handling multi-threading - * scenarious in the communicator_cid allocation - */ -void ompi_comm_reg_init(void); -void ompi_comm_reg_finalize(void); - /* global variable to save the number od dynamic communicators */ extern int ompi_comm_num_dyncomm; diff --git a/ompi/dpm/dpm.c b/ompi/dpm/dpm.c index 2da39d920a4..7619e8a219f 100644 --- a/ompi/dpm/dpm.c +++ b/ompi/dpm/dpm.c @@ -469,25 +469,25 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, new_group_pointer = MPI_GROUP_NULL; /* allocate comm_cid */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old communicator */ - NULL, /* bridge comm */ - &root, /* local leader */ - (void*)port_string, /* rendezvous point */ - OMPI_COMM_CID_INTRA_PMIX, /* mode */ - send_first ); /* send or recv first */ + rc = ompi_comm_nextcid ( newcomp, /* new communicator */ + comm, /* old communicator */ + NULL, /* bridge comm */ + &root, /* local leader */ + (void*)port_string, /* rendezvous point */ + send_first, /* send or recv first */ + OMPI_COMM_CID_INTRA_PMIX); /* mode */ if (OMPI_SUCCESS != rc) { goto exit; } /* activate comm and init coll-component */ - rc = ompi_comm_activate ( &newcomp, /* new communicator */ - comm, /* old communicator */ - NULL, /* bridge comm */ - &root, /* local leader */ - (void*)port_string, /* rendezvous point */ - OMPI_COMM_CID_INTRA_PMIX, /* mode */ - send_first ); /* send or recv first */ + rc = ompi_comm_activate ( &newcomp, /* new communicator */ + comm, /* old communicator */ + NULL, /* bridge comm */ + &root, /* local leader */ + (void*)port_string, /* rendezvous point */ + send_first, /* send or recv first */ + OMPI_COMM_CID_INTRA_PMIX); /* mode */ if (OMPI_SUCCESS != rc) { goto exit; } @@ -500,7 +500,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, exit: if (OMPI_SUCCESS != rc) { if (MPI_COMM_NULL != newcomp && NULL != newcomp) { - OBJ_RETAIN(newcomp); + OBJ_RELEASE(newcomp); newcomp = MPI_COMM_NULL; } } diff --git a/ompi/mpi/c/intercomm_create.c b/ompi/mpi/c/intercomm_create.c index b7e948cd624..10f0e8ad2ca 100644 --- a/ompi/mpi/c/intercomm_create.c +++ b/ompi/mpi/c/intercomm_create.c @@ -1,3 +1,4 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana * University Research and Technology @@ -14,6 +15,8 @@ * Copyright (c) 2012-2013 Inria. All rights reserved. * Copyright (c) 2014-2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2016 Los Alamos National Security, LLC. All rights + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -196,26 +199,15 @@ int MPI_Intercomm_create(MPI_Comm local_comm, int local_leader, new_group_pointer = MPI_GROUP_NULL; /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new comm */ - local_comm, /* old comm */ - bridge_comm, /* bridge comm */ - &lleader, /* local leader */ - &rleader, /* remote_leader */ - OMPI_COMM_CID_INTRA_BRIDGE, /* mode */ - -1 ); /* send_first */ - + rc = ompi_comm_nextcid (newcomp, local_comm, bridge_comm, &lleader, + &rleader, false, OMPI_COMM_CID_INTRA_BRIDGE); if ( MPI_SUCCESS != rc ) { goto err_exit; } /* activate comm and init coll-module */ - rc = ompi_comm_activate ( &newcomp, - local_comm, /* old comm */ - bridge_comm, /* bridge comm */ - &lleader, /* local leader */ - &rleader, /* remote_leader */ - OMPI_COMM_CID_INTRA_BRIDGE, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_activate (&newcomp, local_comm, bridge_comm, &lleader, &rleader, + false, OMPI_COMM_CID_INTRA_BRIDGE); if ( MPI_SUCCESS != rc ) { goto err_exit; } diff --git a/ompi/mpi/c/intercomm_merge.c b/ompi/mpi/c/intercomm_merge.c index bcf8f9fec73..55258b637ef 100644 --- a/ompi/mpi/c/intercomm_merge.c +++ b/ompi/mpi/c/intercomm_merge.c @@ -1,3 +1,4 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana * University Research and Technology @@ -14,6 +15,8 @@ * Copyright (c) 2012-2013 Inria. All rights reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2016 Los Alamos National Security, LLC. All rights + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -115,26 +118,16 @@ int MPI_Intercomm_merge(MPI_Comm intercomm, int high, OBJ_RELEASE(new_group_pointer); new_group_pointer = MPI_GROUP_NULL; - /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new comm */ - intercomm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTER, /* mode */ - -1 ); /* send_first */ + /* Determine context id */ + rc = ompi_comm_nextcid (newcomp, intercomm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTER); if ( OMPI_SUCCESS != rc ) { goto exit; } /* activate communicator and init coll-module */ - rc = ompi_comm_activate( &newcomp, /* new comm */ - intercomm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTER, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_activate (&newcomp, intercomm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTER); if ( OMPI_SUCCESS != rc ) { goto exit; } From 91337bf5c818b752f8ab7e63705b4fea8f4c8dd7 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Wed, 13 Jul 2016 15:38:54 -0600 Subject: [PATCH 03/13] ompi/comm: improve comm_split_type scalability This commit introduces a new algorithm for MPI_Comm_split_type. The old algorithm performed an allgather on the communicator to decide which processes were part of the new communicators. This does not scale well in either time or memory. The new algorithm performs a couple of all reductions to determine the global parameters of the MPI_Comm_split_type call. If any rank gives an inconsistent split_type (as defined by the standard) an error is returned without proceeding further. The algorithm then creates a communicator with all the ranks that match the split_type (no communication required) in the same order as the original communicator. It then does an allgather on the new communicator (which should be much smaller) to determine 1) if the new communicator is in the correct order, and 2) if any ranks in the new communicator supplied MPI_UNDEFINED as the split_type. If either of these conditions are detected the new communicator is split using ompi_comm_split and the intermediate communicator is freed. Signed-off-by: Nathan Hjelm (cherry picked from commit 4c49c42dd0747d0699aa4e5d6fad72860ddecc26) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm.c | 341 +++++++++++++++++++++------------------ 1 file changed, 183 insertions(+), 158 deletions(-) diff --git a/ompi/communicator/comm.c b/ompi/communicator/comm.c index 342e5028afe..e60295989d8 100644 --- a/ompi/communicator/comm.c +++ b/ompi/communicator/comm.c @@ -16,7 +16,7 @@ * Copyright (c) 2011-2013 Inria. All rights reserved. * Copyright (c) 2011-2013 Universite Bordeaux 1 * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. - * Copyright (c) 2012-2015 Los Alamos National Security, LLC. + * Copyright (c) 2012-2016 Los Alamos National Security, LLC. * All rights reserved. * Copyright (c) 2014-2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. @@ -647,21 +647,32 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, * Produces an array of ranks that will be part of the local/remote group in the * new communicator. The results array will be modified by this call. */ -static int ompi_comm_split_type_get_part (ompi_group_t *group, int *results, int **ranks_out, int *rank_size) { +static int ompi_comm_split_type_get_part (ompi_group_t *group, const int split_type, int **ranks_out, int *rank_size) { int size = ompi_group_size (group); int my_size = 0; int *ranks; int ret; + ranks = malloc (size * sizeof (int)); + if (OPAL_UNLIKELY(NULL == ranks)) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + for (int i = 0 ; i < size ; ++i) { ompi_proc_t *proc = ompi_group_get_proc_ptr_raw (group, i); uint16_t locality, *u16ptr; - int split_type = results[i * 2]; int include = false; if (ompi_proc_is_sentinel (proc)) { opal_process_name_t proc_name = ompi_proc_sentinel_to_name ((uintptr_t) proc); + if (split_type <= OMPI_COMM_TYPE_HOST) { + /* local ranks should never be represented by sentinel procs. ideally we + * should be able to use OPAL_MODEX_RECV_VALUE_OPTIONAL but it does have + * some overhead. update this to use the optional recv if that is ever fixed. */ + continue; + } + u16ptr = &locality; OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_LOCALITY, &proc_name, &u16ptr, OPAL_UINT16); @@ -712,68 +723,145 @@ static int ompi_comm_split_type_get_part (ompi_group_t *group, int *results, int } if (include) { - /* copy data in place */ - results[2*my_size] = i; /* copy rank to break ties */ - results[2*my_size+1] = results[2*i+1]; /* copy key */ - ++my_size; + ranks[my_size++] = i; } } *rank_size = my_size; - /* silence a clang warning about a 0-byte malloc. my_size can not be 0 here */ + /* silence a clang warning about a 0-byte malloc. my_size will never be 0 here */ if (OPAL_UNLIKELY(0 == my_size)) { + free (ranks); return OMPI_SUCCESS; } - /* the new array needs to be sorted so that it is in 'key' order - * if two keys are equal then it is sorted in original rank order! */ - qsort (results, my_size, sizeof(int) * 2, rankkeycompare); + /* shrink the rank array */ + int *tmp = realloc (ranks, my_size * sizeof (int)); + if (OPAL_LIKELY(NULL != tmp)) { + ranks = tmp; + } - /* put group elements in a list */ - ranks = (int *) malloc ( my_size * sizeof(int)); - if (NULL == ranks) { + *ranks_out = ranks; + + return OMPI_SUCCESS; +} + +static int ompi_comm_split_verify (ompi_communicator_t *comm, int split_type, int key, bool *need_split) +{ + int rank = ompi_comm_rank (comm); + int size = ompi_comm_size (comm); + int *results; + int rc; + + if (*need_split) { + return OMPI_SUCCESS; + } + + results = malloc (2 * sizeof (int) * size); + if (OPAL_UNLIKELY(NULL == results)) { return OMPI_ERR_OUT_OF_RESOURCE; } - for (int i = 0 ; i < my_size ; ++i) { - ranks[i] = results[i*2]; + *need_split = false; + + results[rank * 2] = split_type; + results[rank * 2 + 1] = key; + + rc = comm->c_coll.coll_allgather (MPI_IN_PLACE, 2, MPI_INT, results, 2, MPI_INT, comm, + comm->c_coll.coll_allgather_module); + if (OMPI_SUCCESS != rc) { + free (results); + return rc; } - *ranks_out = ranks; + for (int i = 0 ; i < size ; ++i) { + if (MPI_UNDEFINED == results[i * 2] || (i > 1 && results[i * 2 + 1] < results[i * 2 - 1])) { + *need_split = true; + break; + } + } + + free (results); return OMPI_SUCCESS; } -int -ompi_comm_split_type(ompi_communicator_t *comm, - int split_type, int key, - ompi_info_t *info, - ompi_communicator_t** newcomm) +int ompi_comm_split_type (ompi_communicator_t *comm, int split_type, int key, + ompi_info_t *info, ompi_communicator_t **newcomm) { - int myinfo[2]; - int size, my_size; - int my_rsize; - int mode; - int rsize; - int inter; - int *results=NULL; - int *rresults=NULL; - int rc=OMPI_SUCCESS; - ompi_communicator_t *newcomp = NULL; - int *lranks=NULL, *rranks=NULL; - - ompi_comm_allgatherfct *allgatherfct=NULL; + bool need_split = false, no_reorder = false, no_undefined = false; + ompi_communicator_t *newcomp = MPI_COMM_NULL; + int my_size, my_rsize = 0, mode, inter; + int *lranks = NULL, *rranks = NULL; + int global_split_type, ok, tmp[4]; + int rc; /* silence clang warning. newcomm should never be NULL */ if (OPAL_UNLIKELY(NULL == newcomm)) { return OMPI_ERR_BAD_PARAM; } - /* Step 1: determine all the information for the local group */ + inter = OMPI_COMM_IS_INTER(comm); + + /* Step 1: verify all ranks have supplied the same value for split type. All split types + * must be the same or MPI_UNDEFINED (which is negative). */ + tmp[0] = split_type; + tmp[1] = -split_type; + tmp[2] = key; + tmp[3] = -key; + + rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &tmp, 4, MPI_INT, MPI_MAX, comm, + comm->c_coll.coll_allgather_module); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + global_split_type = tmp[0]; + + if (tmp[0] != -tmp[1] || inter) { + /* at least one rank supplied a different split type check if our split_type is ok */ + ok = (MPI_UNDEFINED == split_type) || global_split_type == split_type; + + rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_MIN, comm, + comm->c_coll.coll_allgather_module); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + if (inter) { + /* need an extra allreduce to ensure that all ranks have the same result */ + rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_MIN, comm, + comm->c_coll.coll_allgather_module); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + } + + if (OPAL_UNLIKELY(!ok)) { + return OMPI_ERR_BAD_PARAM; + } + + need_split = tmp[0] == -tmp[1]; + } else { + /* intracommunicator and all ranks specified the same split type */ + no_undefined = true; + /* check if all ranks specified the same key */ + no_reorder = tmp[2] == -tmp[3]; + } + + if (MPI_UNDEFINED == global_split_type) { + /* short-circut. every rank provided MPI_UNDEFINED */ + *newcomm = MPI_COMM_NULL; + return OMPI_SUCCESS; + } + + /* Step 2: Build potential communicator groups. If any ranks will not be part of + * the ultimate communicator we will drop them later. This saves doing an extra + * allgather on the whole communicator. By using ompi_comm_split() later only + * if needed we 1) optimized the common case (no MPI_UNDEFINED and no reorder), + * and 2) limit the allgather to a smaller set of peers in the uncommon case. */ /* --------------------------------------------------------- */ - /* sort according to participation and rank. Gather information from everyone */ /* allowed splitting types: CLUSTER CU @@ -791,151 +879,88 @@ ompi_comm_split_type(ompi_communicator_t *comm, They will most likely return a communicator which is equal to MPI_COMM_SELF Unless oversubscribing. */ - myinfo[0] = split_type; - myinfo[1] = key; - size = ompi_comm_size ( comm ); - inter = OMPI_COMM_IS_INTER(comm); - if ( inter ) { - allgatherfct = (ompi_comm_allgatherfct *)ompi_comm_allgather_emulate_intra; - } else { - allgatherfct = (ompi_comm_allgatherfct *)comm->c_coll.coll_allgather; - } - - results = (int*) malloc ( 2 * size * sizeof(int)); - if ( NULL == results ) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - - rc = allgatherfct( myinfo, 2, MPI_INT, results, 2, MPI_INT, comm, comm->c_coll.coll_allgather_module ); - if ( OMPI_SUCCESS != rc ) { - goto exit; + /* how many ranks are potentially participating and on my node? */ + rc = ompi_comm_split_type_get_part (comm->c_local_group, global_split_type, &lranks, &my_size); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; } - /* check that all processors have been called with the same value */ - for (int i = 0 ; i < size ; ++i) { - if ( results[2*i] != split_type && MPI_UNDEFINED != results[2*i] && MPI_UNDEFINED != split_type) { - rc = OMPI_ERR_BAD_PARAM; - goto exit; + /* Step 3: determine all the information for the remote group */ + /* --------------------------------------------------------- */ + if (inter) { + rc = ompi_comm_split_type_get_part (comm->c_remote_group, global_split_type, &rranks, &my_rsize); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + free (lranks); + return rc; } } - /* how many are participating and on my node? */ - rc = ompi_comm_split_type_get_part (comm->c_local_group, results, &lranks, &my_size); - if (0 == my_size && MPI_UNDEFINED != split_type) { - /* should never happen */ - rc = OMPI_ERR_BAD_PARAM; - } - if (OMPI_SUCCESS != rc) { - goto exit; - } - + /* set the CID allgather mode to the appropriate one for the communicator */ + mode = inter ? OMPI_COMM_CID_INTER : OMPI_COMM_CID_INTRA; - /* Step 2: determine all the information for the remote group */ + /* Step 4: set up the communicator */ /* --------------------------------------------------------- */ - if ( inter ) { - rsize = ompi_group_size (comm->c_remote_group); - rresults = (int *) malloc ( rsize * 2 * sizeof(int)); - if ( NULL == rresults ) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } + /* Create the communicator finally */ - /* this is an allgather on an inter-communicator */ - rc = comm->c_coll.coll_allgather( myinfo, 2, MPI_INT, rresults, 2, - MPI_INT, comm, - comm->c_coll.coll_allgather_module); - if ( OMPI_SUCCESS != rc ) { - goto exit; + do { + rc = ompi_comm_set (&newcomp, comm, my_size, lranks, my_rsize, + rranks, NULL, comm->error_handler, false, + NULL, NULL); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + break; } - rc = ompi_comm_split_type_get_part (comm->c_remote_group, rresults, &rranks, &my_rsize); - if (OMPI_SUCCESS != rc) { - goto exit; + /* Determine context id. It is identical to f_2_c_handle */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + break; } - mode = OMPI_COMM_CID_INTER; - } else { - my_rsize = 0; - mode = OMPI_COMM_CID_INTRA; - } - - - /* Step 3: set up the communicator */ - /* --------------------------------------------------------- */ - /* Create the communicator finally */ - - rc = ompi_comm_set ( &newcomp, /* new comm */ - comm, /* old comm */ - my_size, /* local_size */ - lranks, /* local_ranks */ - my_rsize, /* remote_size */ - rranks, /* remote_ranks */ - NULL, /* attrs */ - comm->error_handler,/* error handler */ - false, /* don't copy the topo */ - NULL, /* local group */ - NULL ); /* remote group */ - - if ( NULL == newcomp ) { - rc = MPI_ERR_INTERN; - goto exit; - } - if ( OMPI_SUCCESS != rc ) { - goto exit; - } + /* Activate the communicator and init coll-component */ + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + break; + } - /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } + /* Step 5: Check if we need to remove or reorder ranks in the communicator */ + if (!(no_reorder && no_undefined)) { + rc = ompi_comm_split_verify (newcomp, split_type, key, &need_split); - /* Set name for debugging purposes */ - snprintf(newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d SPLIT_TYPE FROM %d", - newcomp->c_contextid, comm->c_contextid ); + if (inter) { + /* verify that no local ranks need to be removed or reordered */ + rc = ompi_comm_split_verify (newcomp->c_local_comm, split_type, key, &need_split); + } + } - /* set the rank to MPI_UNDEFINED. This prevents in comm_activate - * the collective module selection for a communicator that will - * be freed anyway. - */ - if ( MPI_UNDEFINED == split_type ) { - newcomp->c_local_group->grp_my_rank = MPI_UNDEFINED; - } + if (!need_split) { + /* common case. no reordering and no MPI_UNDEFINED */ + *newcomm = newcomp; + /* Set name for debugging purposes */ + snprintf(newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d SPLIT_TYPE FROM %d", + newcomp->c_contextid, comm->c_contextid ); + break; + } - /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } + /* TODO: there probably is better way to handle this case without throwing away the + * intermediate communicator. */ + rc = ompi_comm_split (newcomp, split_type, key, newcomm, false); + /* get rid of the intermediate communicator */ + ompi_comm_free (&newcomp); + } while (0); - exit: - if ( NULL != results ) { - free ( results ); - } - if ( NULL != rresults) { - free ( rresults ); - } - if ( NULL != lranks ) { - free ( lranks ); - } - if ( NULL != rranks ) { - free ( rranks ); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc && MPI_COMM_NULL != newcomp)) { + ompi_comm_free (&newcomp); + *newcomm = MPI_COMM_NULL; } - /* Step 4: if we are not part of the comm, free the struct */ - /* --------------------------------------------------------- */ - if ( NULL != newcomp && MPI_UNDEFINED == split_type ) { - ompi_comm_free ( &newcomp ); - } + free (lranks); + free (rranks); - *newcomm = newcomp; - return ( rc ); + return rc; } - - /**********************************************************************/ /**********************************************************************/ /**********************************************************************/ From 3b06c89bb28cb47db0f4cf0c13608d6e0f24a7f7 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Tue, 19 Jul 2016 17:35:34 -0700 Subject: [PATCH 04/13] Silence warnings (cherry picked from commit 36a9063466c2ac71ad768cb2f2424e07288844ed) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_cid.c | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) mode change 100644 => 100755 ompi/communicator/comm_cid.c diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c old mode 100644 new mode 100755 index ab9c9961198..b4478b120ed --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -17,7 +17,7 @@ * Copyright (c) 2012-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. - * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2016 Intel, Inc. All rights reserved. * Copyright (c) 2014 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -172,8 +172,6 @@ static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t int mode) { ompi_comm_cid_context_t *context; - ompi_comm_request_t *request; - int ret; context = OBJ_NEW(ompi_comm_cid_context_t); if (OPAL_UNLIKELY(NULL == context)) { @@ -256,7 +254,6 @@ int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *com { ompi_comm_cid_context_t *context; ompi_comm_request_t *request; - int ret; context = mca_comm_cid_context_alloc (newcomm, comm, bridgecomm, arg0, arg1, "nextcid", send_first, mode); @@ -434,8 +431,6 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *c ompi_comm_cid_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq; - int local_leader; - int remote_leader; int ret = 0; context = mca_comm_cid_context_alloc (*newcomm, comm, bridgecomm, arg0, arg1, "activate", @@ -777,7 +772,6 @@ static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, int local_rank = ompi_comm_rank (comm); ompi_comm_request_t *request; ompi_request_t *subreq; - int i; int rc; context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); From b8c9f13c2762c459e13e569aaed23b59b0f0fff2 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Wed, 12 Oct 2016 11:53:10 -0600 Subject: [PATCH 05/13] Remove a debug print in comm_cid.c Back-ported from 01a653d50a996f044d8daff54261ac8de3a51de7 Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_cid.c | 1 - 1 file changed, 1 deletion(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index b4478b120ed..0f2415ab94d 100755 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -877,7 +877,6 @@ static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *reques * needs to be reworked to take advantage of it. */ OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); OBJ_DESTRUCT(&info); - fprintf (stderr, "OPAL_PMIX_EXCHANGE returned %d\n", rc); if (OPAL_SUCCESS != rc) { OBJ_DESTRUCT(&pdat); return rc; From c7d2e4784f1faec034bee0bfb278b2959fe42f3c Mon Sep 17 00:00:00 2001 From: Gilles Gouaillardet Date: Fri, 22 Jul 2016 15:42:56 +0900 Subject: [PATCH 06/13] ompi/communicator: remove an other debug print statement in ompi_comm_allreduce_intra_pmix_nb() (cherry picked from commit bbc6d4b3d44476af49adfa582900f57c4e2323ec) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_cid.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index 0f2415ab94d..f71c9da27da 100755 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -18,7 +18,7 @@ * reserved. * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. * Copyright (c) 2013-2016 Intel, Inc. All rights reserved. - * Copyright (c) 2014 Research Organization for Information Science + * Copyright (c) 2014-2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ * @@ -911,8 +911,6 @@ static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, ompi_request_t *subreq; int rc; - fprintf (stderr, "Calling ompi_comm_allreduce_intra_pmix_nb. rank %d\n", local_rank); - context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); if (OPAL_UNLIKELY(NULL == context)) { return OMPI_ERR_OUT_OF_RESOURCE; From 63a8c22ecb68d59779ae2e20403da231d2393f99 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Mon, 8 Aug 2016 13:20:24 -0700 Subject: [PATCH 07/13] Remove forced debugs (cherry picked from commit ba77d9beff8a5c425926baa7621dc43332f1e973) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_cid.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index f71c9da27da..8769b27c6a4 100755 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -840,8 +840,6 @@ static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *reques opal_buffer_t sbuf; int rc; - fprintf (stderr, "reduce complete\n"); - OBJ_CONSTRUCT(&sbuf, opal_buffer_t); if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, context->tmpbuf, (int32_t)context->count, OPAL_INT))) { @@ -871,8 +869,6 @@ static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *reques cid_context->iter); } - fprintf (stderr, "%s, %s\n", info.key, pdat.value.key); - /* this macro is not actually non-blocking. if a non-blocking version becomes available this function * needs to be reworked to take advantage of it. */ OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); From b56417beeed7d3d007f9a9eee8e273c519380cf9 Mon Sep 17 00:00:00 2001 From: Sylvain Jeaugey Date: Wed, 17 Aug 2016 17:05:13 -0700 Subject: [PATCH 08/13] Fix typo calling allreduce with the allgather module. That was causing CUDA collective to crash. (cherry picked from commit 61e900eea599c31efabaf6b8782f89df6ba5e82e) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ompi/communicator/comm.c b/ompi/communicator/comm.c index e60295989d8..6a4ec204eed 100644 --- a/ompi/communicator/comm.c +++ b/ompi/communicator/comm.c @@ -811,7 +811,7 @@ int ompi_comm_split_type (ompi_communicator_t *comm, int split_type, int key, tmp[3] = -key; rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &tmp, 4, MPI_INT, MPI_MAX, comm, - comm->c_coll.coll_allgather_module); + comm->c_coll.coll_allreduce_module); if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { return rc; } From a0d8715fab070163bf83b26bf550d19a79495185 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Thu, 18 Aug 2016 08:50:06 -0600 Subject: [PATCH 09/13] comm/cid: fix threaded CID allocation This commit should restore the pre-non-blocking behavior of the CID allocator when threads are used. There are two primary changes: 1) do not hold the cid allocator lock past the end of a request callback, and 2) if a lower id communicator is detected during CID allocation back off and let the lower id communicator finish before continuing. Signed-off-by: Nathan Hjelm (cherry picked from commit fbbf743c368cad6d2422c5712a04f92446880617) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_cid.c | 50 +++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index 8769b27c6a4..60981e2ba1d 100755 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -181,6 +181,7 @@ static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t context->newcomm = newcomm; context->comm = comm; context->bridgecomm = bridgecomm; + context->pml_tag = 0; /* Determine which implementation of allreduce we have to use * for the current mode. */ @@ -245,8 +246,8 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request); static int ompi_comm_checkcid (ompi_comm_request_t *request); /* verify that the cid was available globally */ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request); -/* lock the cid generator */ -static int ompi_comm_cid_lock (ompi_comm_request_t *request); + +static volatile int64_t ompi_comm_cid_lowest_id = INT64_MAX; int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm, ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, @@ -271,7 +272,7 @@ int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *com request->context = &context->super; - ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0); + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); ompi_comm_request_start (request); *req = &request->super; @@ -299,30 +300,33 @@ int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm, return rc; } -static int ompi_comm_cid_lock (ompi_comm_request_t *request) -{ - if (!OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { - return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); - } - - return ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0); -} - static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) { ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; + int64_t my_id = ((int64_t) ompi_comm_get_cid (context->comm) << 32 | context->pml_tag); ompi_request_t *subreq; bool flag; int ret; + if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + } + + if (ompi_comm_cid_lowest_id < my_id) { + OPAL_THREAD_UNLOCK(&ompi_cid_lock); + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + } + + ompi_comm_cid_lowest_id = my_id; + /** * This is the real algorithm described in the doc */ flag = false; context->nextlocal_cid = mca_pml.pml_max_contextid; for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) { - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - i, context->comm); + flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i, + context->comm); if (true == flag) { context->nextlocal_cid = i; break; @@ -332,6 +336,7 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, context, &subreq); if (OMPI_SUCCESS != ret) { + ompi_comm_cid_lowest_id = INT64_MAX; OPAL_THREAD_UNLOCK(&ompi_cid_lock); return ret; } @@ -341,10 +346,12 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) if (flag) { opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL); } - + + ompi_comm_cid_lowest_id = INT64_MAX; OPAL_THREAD_UNLOCK(&ompi_cid_lock); return OMPI_ERR_OUT_OF_RESOURCE; } + OPAL_THREAD_UNLOCK(&ompi_cid_lock); /* next we want to verify that the resulting commid is ok */ return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1); @@ -356,6 +363,10 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request) ompi_request_t *subreq; int ret; + if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, NULL, 0); + } + context->flag = (context->nextcid == context->nextlocal_cid); if (!context->flag) { @@ -372,6 +383,8 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request) ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); } + OPAL_THREAD_UNLOCK(&ompi_cid_lock); + return ret; } @@ -379,12 +392,17 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) { ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; + if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, NULL, 0); + } + if (1 == context->rflag) { /* set the according values to the newcomm */ context->newcomm->c_contextid = context->nextcid; opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm); /* unlock the cid generator */ + ompi_comm_cid_lowest_id = INT64_MAX; OPAL_THREAD_UNLOCK(&ompi_cid_lock); /* done! */ @@ -399,6 +417,8 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) ++context->iter; + OPAL_THREAD_UNLOCK(&ompi_cid_lock); + /* try again */ return ompi_comm_allreduce_getnextcid (request); } From 59a327e4c9563965656b32c7bd2a67a773406388 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Wed, 7 Sep 2016 08:25:37 -0600 Subject: [PATCH 10/13] comm/cid: use ibcast to distribute result in intercomm case This commit updates the intercomm allgather to do a local comm bcast as the final step. This should resolve a hang seen in intercomm tests. Signed-off-by: Nathan Hjelm (cherry picked from commit 54cc829aabe7489843e90d47dfed98f52db89993) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_cid.c | 52 ++++++++++++------------------------ 1 file changed, 17 insertions(+), 35 deletions(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index 60981e2ba1d..8ed1a465d49 100755 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -103,10 +103,6 @@ struct ompi_comm_allreduce_context_t { ompi_comm_cid_context_t *cid_context; int *tmpbuf; - /* for intercomm allreduce */ - int *rcounts; - int *rdisps; - /* for group allreduce */ int peers_comm[3]; }; @@ -121,8 +117,6 @@ static void ompi_comm_allreduce_context_construct (ompi_comm_allreduce_context_t static void ompi_comm_allreduce_context_destruct (ompi_comm_allreduce_context_t *context) { free (context->tmpbuf); - free (context->rcounts); - free (context->rdisps); } OBJ_CLASS_INSTANCE (ompi_comm_allreduce_context_t, opal_object_t, @@ -602,7 +596,7 @@ static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, str /* Non-blocking version of ompi_comm_allreduce_inter */ static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request); static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request); -static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request); +static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request); static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, @@ -636,18 +630,19 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, rsize = ompi_comm_remote_size (intercomm); local_rank = ompi_comm_rank (intercomm); - context->tmpbuf = (int *) calloc (count, sizeof(int)); - context->rdisps = (int *) calloc (rsize, sizeof(int)); - context->rcounts = (int *) calloc (rsize, sizeof(int)); - if (OPAL_UNLIKELY (NULL == context->tmpbuf || NULL == context->rdisps || NULL == context->rcounts)) { - ompi_comm_request_return (request); - return OMPI_ERR_OUT_OF_RESOURCE; + if (0 == local_rank) { + context->tmpbuf = (int *) calloc (count, sizeof(int)); + if (OPAL_UNLIKELY (NULL == context->tmpbuf)) { + ompi_comm_request_return (request); + return OMPI_ERR_OUT_OF_RESOURCE; + } } /* Execute the inter-allreduce: the result from the local will be in the buffer of the remote group * and vise-versa. */ - rc = intercomm->c_coll.coll_iallreduce (inbuf, context->tmpbuf, count, MPI_INT, op, intercomm, - &subreq, intercomm->c_coll.coll_iallreduce_module); + rc = intercomm->c_local_comm->c_coll.coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, 0, + intercomm->c_local_comm, &subreq, + intercomm->c_local_comm->c_coll.coll_ireduce_module); if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { ompi_comm_request_return (request); return rc; @@ -656,7 +651,7 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, if (0 == local_rank) { ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_exchange, &subreq, 1); } else { - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_allgather, &subreq, 1); + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_bcast, &subreq, 1); } ompi_comm_request_start (request); @@ -696,33 +691,20 @@ static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT); - return ompi_comm_allreduce_inter_allgather (request); + return ompi_comm_allreduce_inter_bcast (request); } -static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request) +static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request) { ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; - ompi_communicator_t *intercomm = context->cid_context->comm; + ompi_communicator_t *comm = context->cid_context->comm->c_local_comm; ompi_request_t *subreq; int scount = 0, rc; - /* distribute the overall result to all processes in the other group. - Instead of using bcast, we are using here allgatherv, to avoid the - possible deadlock. Else, we need an algorithm to determine, - which group sends first in the inter-bcast and which receives - the result first. - */ - - if (0 != ompi_comm_rank (intercomm)) { - context->rcounts[0] = context->count; - } else { - scount = context->count; - } - - rc = intercomm->c_coll.coll_iallgatherv (context->outbuf, scount, MPI_INT, context->outbuf, - context->rcounts, context->rdisps, MPI_INT, intercomm, - &subreq, intercomm->c_coll.coll_iallgatherv_module); + /* both roots have the same result. broadcast to the local group */ + rc = comm->c_coll.coll_ibcast (context->outbuf, context->count, MPI_INT, 0, comm, + &subreq, comm->c_coll.coll_ibcast_module); if (OMPI_SUCCESS != rc) { return rc; } From b8b5c31f9a06bccdf8e96c5a7f565f05e2f12c96 Mon Sep 17 00:00:00 2001 From: Gilles Gouaillardet Date: Fri, 9 Sep 2016 10:10:35 +0900 Subject: [PATCH 11/13] ompi/communicator: fix typos in CID generation use MPI_MIN instead of MPI_MAX when appropriate, otherwise a currently used CID can be reused, and bad things will likely happen. Refs open-mpi/ompi#2061 (cherry picked from commit 3b968ec6bb693c64966d9e596786cc4d5cc7e231) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_cid.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index 8ed1a465d49..a6f44db891b 100755 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -372,7 +372,7 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request) ++context->iter; - ret = context->allreduce_fn (&context->flag, &context->rflag, 1, MPI_MAX, context, &subreq); + ret = context->allreduce_fn (&context->flag, &context->rflag, 1, MPI_MIN, context, &subreq); if (OMPI_SUCCESS == ret) { ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); } @@ -478,7 +478,7 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *c /* Step 1: the barrier, after which it is allowed to * send messages over the new communicator */ - ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MAX, context, + ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MIN, context, &subreq); if (OMPI_SUCCESS != ret) { ompi_comm_request_return (request); From 085c8ae6b4039544bac679a187a23c0a3fd24308 Mon Sep 17 00:00:00 2001 From: George Bosilca Date: Wed, 21 Sep 2016 07:45:51 -0400 Subject: [PATCH 12/13] Correctly indent the code. (cherry picked from commit 803897a91503a25b9e9e4a7f72a4d786f38096b7) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm.c | 94 ++++++++++++++++++++-------------------- ompi/request/req_wait.c | 2 +- 2 files changed, 48 insertions(+), 48 deletions(-) diff --git a/ompi/communicator/comm.c b/ompi/communicator/comm.c index 6a4ec204eed..7126ca486b9 100644 --- a/ompi/communicator/comm.c +++ b/ompi/communicator/comm.c @@ -819,34 +819,34 @@ int ompi_comm_split_type (ompi_communicator_t *comm, int split_type, int key, global_split_type = tmp[0]; if (tmp[0] != -tmp[1] || inter) { - /* at least one rank supplied a different split type check if our split_type is ok */ - ok = (MPI_UNDEFINED == split_type) || global_split_type == split_type; - - rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_MIN, comm, - comm->c_coll.coll_allgather_module); - if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { - return rc; - } - - if (inter) { - /* need an extra allreduce to ensure that all ranks have the same result */ - rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_MIN, comm, - comm->c_coll.coll_allgather_module); - if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { - return rc; - } - } - - if (OPAL_UNLIKELY(!ok)) { - return OMPI_ERR_BAD_PARAM; - } - - need_split = tmp[0] == -tmp[1]; + /* at least one rank supplied a different split type check if our split_type is ok */ + ok = (MPI_UNDEFINED == split_type) || global_split_type == split_type; + + rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_MIN, comm, + comm->c_coll.coll_allgather_module); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + if (inter) { + /* need an extra allreduce to ensure that all ranks have the same result */ + rc = comm->c_coll.coll_allreduce (MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_MIN, comm, + comm->c_coll.coll_allgather_module); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + } + + if (OPAL_UNLIKELY(!ok)) { + return OMPI_ERR_BAD_PARAM; + } + + need_split = tmp[0] == -tmp[1]; } else { - /* intracommunicator and all ranks specified the same split type */ - no_undefined = true; - /* check if all ranks specified the same key */ - no_reorder = tmp[2] == -tmp[3]; + /* intracommunicator and all ranks specified the same split type */ + no_undefined = true; + /* check if all ranks specified the same key */ + no_reorder = tmp[2] == -tmp[3]; } if (MPI_UNDEFINED == global_split_type) { @@ -863,18 +863,18 @@ int ompi_comm_split_type (ompi_communicator_t *comm, int split_type, int key, /* --------------------------------------------------------- */ /* allowed splitting types: - CLUSTER - CU - HOST - BOARD - NODE - NUMA - SOCKET - L3CACHE - L2CACHE - L1CACHE - CORE - HWTHREAD + CLUSTER + CU + HOST + BOARD + NODE + NUMA + SOCKET + L3CACHE + L2CACHE + L1CACHE + CORE + HWTHREAD Even though HWTHREAD/CORE etc. is overkill they are here for consistency. They will most likely return a communicator which is equal to MPI_COMM_SELF Unless oversubscribing. @@ -924,14 +924,14 @@ int ompi_comm_split_type (ompi_communicator_t *comm, int split_type, int key, } /* Step 5: Check if we need to remove or reorder ranks in the communicator */ - if (!(no_reorder && no_undefined)) { - rc = ompi_comm_split_verify (newcomp, split_type, key, &need_split); - - if (inter) { - /* verify that no local ranks need to be removed or reordered */ - rc = ompi_comm_split_verify (newcomp->c_local_comm, split_type, key, &need_split); - } - } + if (!(no_reorder && no_undefined)) { + rc = ompi_comm_split_verify (newcomp, split_type, key, &need_split); + + if (inter) { + /* verify that no local ranks need to be removed or reordered */ + rc = ompi_comm_split_verify (newcomp->c_local_comm, split_type, key, &need_split); + } + } if (!need_split) { /* common case. no reordering and no MPI_UNDEFINED */ diff --git a/ompi/request/req_wait.c b/ompi/request/req_wait.c index 4a76bd8ab6d..21afa95f348 100644 --- a/ompi/request/req_wait.c +++ b/ompi/request/req_wait.c @@ -196,7 +196,7 @@ int ompi_request_default_wait_all( size_t count, rptr = requests; for (i = 0; i < count; i++) { request = *rptr++; - + if( request->req_state == OMPI_REQUEST_INACTIVE ) { completed++; continue; From a7b8d16fcbffb80dc6f47c70ca79dcb76ce0470b Mon Sep 17 00:00:00 2001 From: Gilles Gouaillardet Date: Thu, 6 Oct 2016 15:03:06 +0900 Subject: [PATCH 13/13] ompi/communicator: silence warnings (cherry picked from commit 6c6e35bb40ca600695077f3a3618e9195d2411c9) Signed-off-by: Nathan Hjelm --- ompi/communicator/comm_cid.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index a6f44db891b..d91a427c93b 100755 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -607,7 +607,7 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, ompi_comm_allreduce_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq; - int local_rank, rsize, rc; + int local_rank, rc; if (!OMPI_COMM_IS_INTER (cid_context->comm)) { return MPI_ERR_COMM; @@ -627,7 +627,6 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, request->context = &context->super; /* Allocate temporary arrays */ - rsize = ompi_comm_remote_size (intercomm); local_rank = ompi_comm_rank (intercomm); if (0 == local_rank) { @@ -700,12 +699,12 @@ static int ompi_comm_allreduce_inter_bcast (ompi_comm_request_t *request) ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; ompi_communicator_t *comm = context->cid_context->comm->c_local_comm; ompi_request_t *subreq; - int scount = 0, rc; + int rc; /* both roots have the same result. broadcast to the local group */ rc = comm->c_coll.coll_ibcast (context->outbuf, context->count, MPI_INT, 0, comm, &subreq, comm->c_coll.coll_ibcast_module); - if (OMPI_SUCCESS != rc) { + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { return rc; }