From 94593ca20b2ef468fb4cc1e904014d912ffa2bb9 Mon Sep 17 00:00:00 2001 From: Anandhi S Jayakumar Date: Fri, 21 Oct 2016 13:14:25 -0700 Subject: [PATCH] Adding ofi plugin to allow for opening a conduit to use ethernet/fabric. modified: ../orte/mca/rml/base/rml_base_frame.c modified: ../orte/mca/rml/base/rml_base_stubs.c deleted: ../orte/mca/rml/ofi/.opal_ignore modified: ../orte/mca/rml/ofi/Makefile.am modified: ../orte/mca/rml/ofi/rml_ofi.h modified: ../orte/mca/rml/ofi/rml_ofi_component.c modified: ../orte/mca/rml/ofi/rml_ofi_send.c modified: ../orte/test/system/ofi_conduit_stress.c Removed stale include directive modified: ../orte/mca/rml/ofi/Makefile.am The ofi plugin supports multiple providers, and identifies them by ofi_prov_id, changed the previous name conduit_id to ofi_prov_id modified: ../orte/mca/rml/base/base.h modified: ../orte/mca/rml/ofi/rml_ofi.h modified: ../orte/mca/rml/ofi/rml_ofi_component.c modified: ../orte/mca/rml/ofi/rml_ofi_request.h modified: ../orte/mca/rml/ofi/rml_ofi_send.c Adding ofi plugin to allow for opening a conduit to use ethernet/fabric. modified: ../orte/mca/rml/base/rml_base_frame.c modified: ../orte/mca/rml/base/rml_base_stubs.c deleted: ../orte/mca/rml/ofi/.opal_ignore modified: ../orte/mca/rml/ofi/Makefile.am modified: ../orte/mca/rml/ofi/rml_ofi.h modified: ../orte/mca/rml/ofi/rml_ofi_component.c modified: ../orte/mca/rml/ofi/rml_ofi_send.c modified: ../orte/test/system/ofi_conduit_stress.c Removed stale include directive modified: ../orte/mca/rml/ofi/Makefile.am The ofi plugin supports multiple providers, and identifies them by ofi_prov_id, changed the previous name conduit_id to ofi_prov_id modified: ../orte/mca/rml/base/base.h modified: ../orte/mca/rml/ofi/rml_ofi.h modified: ../orte/mca/rml/ofi/rml_ofi_component.c modified: ../orte/mca/rml/ofi/rml_ofi_request.h modified: ../orte/mca/rml/ofi/rml_ofi_send.c Fixed merge issues, and minor pull-request comments modified: ../orte/mca/rml/base/base.h modified: ../orte/mca/rml/base/rml_base_frame.c modified: ../orte/mca/rml/ofi/rml_ofi.h modified: ../orte/mca/rml/ofi/rml_ofi_component.c Adding ofi plugin to allow for opening a conduit to use ethernet/fabric. modified: ../orte/mca/rml/base/rml_base_frame.c modified: ../orte/mca/rml/base/rml_base_stubs.c deleted: ../orte/mca/rml/ofi/.opal_ignore modified: ../orte/mca/rml/ofi/Makefile.am modified: ../orte/mca/rml/ofi/rml_ofi.h modified: ../orte/mca/rml/ofi/rml_ofi_component.c modified: ../orte/mca/rml/ofi/rml_ofi_send.c modified: ../orte/test/system/ofi_conduit_stress.c Removed stale include directive modified: ../orte/mca/rml/ofi/Makefile.am The ofi plugin supports multiple providers, and identifies them by ofi_prov_id, changed the previous name conduit_id to ofi_prov_id modified: ../orte/mca/rml/base/base.h modified: ../orte/mca/rml/ofi/rml_ofi.h modified: ../orte/mca/rml/ofi/rml_ofi_component.c modified: ../orte/mca/rml/ofi/rml_ofi_request.h modified: ../orte/mca/rml/ofi/rml_ofi_send.c Adding ofi plugin to allow for opening a conduit to use ethernet/fabric. modified: ../orte/mca/rml/base/rml_base_frame.c modified: ../orte/mca/rml/base/rml_base_stubs.c deleted: ../orte/mca/rml/ofi/.opal_ignore modified: ../orte/mca/rml/ofi/Makefile.am modified: ../orte/mca/rml/ofi/rml_ofi.h modified: ../orte/mca/rml/ofi/rml_ofi_component.c modified: ../orte/mca/rml/ofi/rml_ofi_send.c modified: ../orte/test/system/ofi_conduit_stress.c Removed stale include directive modified: ../orte/mca/rml/ofi/Makefile.am Fixed merge issues, and minor pull-request comments modified: ../orte/mca/rml/base/base.h modified: ../orte/mca/rml/base/rml_base_frame.c modified: ../orte/mca/rml/ofi/rml_ofi.h modified: ../orte/mca/rml/ofi/rml_ofi_component.c Removed trailing space modified: ../orte/mca/rml/ofi/rml_ofi_component.c Cleaned up test- ofi_conduit_stress.c modified: ../orte/test/system/ofi_conduit_stress.c cleaned up printing the provider info during initialisation modified: ../orte/mca/rml/ofi/rml_ofi.h modified: ../orte/mca/rml/ofi/rml_ofi_component.c Signed-off-by: Anandhi S Jayakumar Fixing warnings modified: ../orte/mca/rml/ofi/rml_ofi.h modified: ../orte/mca/rml/ofi/rml_ofi_component.c modified: ../orte/mca/rml/ofi/rml_ofi_send.c Signed-off-by: Anandhi S Jayakumar minor cleanup modified: ../orte/mca/rml/ofi/rml_ofi_component.c modified: ../orte/mca/rml/ofi/rml_ofi_send.c Signed-off-by: Anandhi S Jayakumar more cleanup modified: ../orte/mca/rml/ofi/rml_ofi_component.c Signed-off-by: Anandhi S Jayakumar Sending the ethernet address only in the get_contact_info, rest will be sent through modex modified: ../orte/mca/rml/ofi/rml_ofi.h modified: ../orte/mca/rml/ofi/rml_ofi_component.c Signed-off-by: Anandhi S Jayakumar Adding error logging on failures modified: ../orte/mca/rml/ofi/rml_ofi_component.c Signed-off-by: Anandhi S Jayakumar Handling the OPAL_MODEX_SEND/RECV generically for all ofi providers. modified: ../orte/mca/rml/ofi/rml_ofi.h modified: ../orte/mca/rml/ofi/rml_ofi_component.c modified: ../orte/mca/rml/ofi/rml_ofi_send.c Signed-off-by: Anandhi S Jayakumar Adding to build ofi for limited people new file: ../orte/mca/rml/ofi/.opal_ignore new file: ../orte/mca/rml/ofi/.opal_unignore Signed-off-by: Anandhi S Jayakumar Removign the error logging for now modified: ../orte/mca/rml/ofi/rml_ofi_component.c --- orte/mca/rml/base/base.h | 2 - orte/mca/rml/base/rml_base_stubs.c | 8 +- orte/mca/rml/ofi/.opal_unignore | 2 + orte/mca/rml/ofi/rml_ofi.h | 46 +- orte/mca/rml/ofi/rml_ofi_component.c | 1080 ++++++++++++++----------- orte/mca/rml/ofi/rml_ofi_request.h | 16 +- orte/mca/rml/ofi/rml_ofi_send.c | 180 +++-- orte/test/system/ofi_conduit_stress.c | 60 +- 8 files changed, 757 insertions(+), 637 deletions(-) create mode 100644 orte/mca/rml/ofi/.opal_unignore diff --git a/orte/mca/rml/base/base.h b/orte/mca/rml/base/base.h index 524a2a447ae..941cb43c27e 100644 --- a/orte/mca/rml/base/base.h +++ b/orte/mca/rml/base/base.h @@ -145,8 +145,6 @@ typedef struct { opal_object_t super; opal_event_t ev; orte_rml_send_t send; - /* conduit_id */ - orte_rml_conduit_t conduit_id; } orte_rml_send_request_t; OBJ_CLASS_DECLARATION(orte_rml_send_request_t); diff --git a/orte/mca/rml/base/rml_base_stubs.c b/orte/mca/rml/base/rml_base_stubs.c index 951e469ab63..ff25766099c 100644 --- a/orte/mca/rml/base/rml_base_stubs.c +++ b/orte/mca/rml/base/rml_base_stubs.c @@ -74,11 +74,8 @@ orte_rml_conduit_t orte_rml_API_open_conduit(opal_list_t *attributes) "%s rml:base:open_conduit Component %s provided a conduit", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), active->component->base.mca_component_name); - /* retain this answer */ - if (NULL != ourmod) { - free(ourmod); - } ourmod = mod; + break; } } } @@ -140,6 +137,9 @@ char* orte_rml_API_get_contact_info(void) } else { tmp = NULL; } + opal_output_verbose(10,orte_rml_base_framework.framework_output, + "%s rml:base:get_contact_info() returning -> %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),tmp); return tmp; } diff --git a/orte/mca/rml/ofi/.opal_unignore b/orte/mca/rml/ofi/.opal_unignore new file mode 100644 index 00000000000..335cd142ab7 --- /dev/null +++ b/orte/mca/rml/ofi/.opal_unignore @@ -0,0 +1,2 @@ +anandhis +rhc diff --git a/orte/mca/rml/ofi/rml_ofi.h b/orte/mca/rml/ofi/rml_ofi.h index beb97cc52da..32332e4f2bd 100644 --- a/orte/mca/rml/ofi/rml_ofi.h +++ b/orte/mca/rml/ofi/rml_ofi.h @@ -26,12 +26,15 @@ #include "rml_ofi_request.h" +/** the maximum open OFI ofi_prov - assuming system will have no more than 20 transports*/ +#define MAX_OFI_PROVIDERS 40 +#define RML_OFI_PROV_ID_INVALID 0xFF /** RML/OFI key values **/ /* (char*) ofi socket address (type IN) of the node process is running on */ -#define OPAL_RML_OFI_FI_SOCKADDR_IN "rml.ofi.fisockaddrin" +#define OPAL_RML_OFI_FI_SOCKADDR_IN "rml.ofi.fisockaddrin" /* (char*) ofi socket address (type PSM) of the node process is running on */ -#define OPAL_RML_OFI_FI_ADDR_PSMX "rml.ofi.fiaddrpsmx" +#define OPAL_RML_OFI_FI_ADDR_PSMX "rml.ofi.fiaddrpsmx" // MULTI_BUF_SIZE_FACTOR defines how large the multi recv buffer will be. // In order to use FI_MULTI_RECV feature efficiently, we need to have a @@ -40,6 +43,8 @@ #define MULTI_BUF_SIZE_FACTOR 128 #define MIN_MULTI_BUF_SIZE (1024 * 1024) +#define OFIADDR "ofiaddr" + #define CLOSE_FID(fd) \ do { \ int _ret = 0; \ @@ -72,8 +77,8 @@ and also the corresponding fi_info **/ typedef struct { - /** OFI conduit ID **/ - uint8_t conduit_id; + /** ofi provider ID **/ + uint8_t ofi_prov_id; /** fi_info for this transport */ struct fi_info *fabric_info; @@ -116,37 +121,36 @@ typedef struct { struct fi_context rx_ctx1; - /* module associated with this conduit_id returned to rml - from open_conduit call */ - struct orte_rml_ofi_module_t *ofi_module; - -} ofi_transport_conduit_t; +} ofi_transport_ofi_prov_t; struct orte_rml_ofi_module_t { orte_rml_base_module_t api; /** current ofi transport id the component is using, this will be initialised - ** in the open_conduit() call **/ + ** in the open_ofi_prov() call **/ int cur_transport_id; /** Fabric info structure of all supported transports in system **/ struct fi_info *fi_info_list; - /** OFI ep and corr fi_info for all the transports (conduit) **/ - ofi_transport_conduit_t ofi_conduits[MAX_CONDUIT]; + /** OFI ep and corr fi_info for all the transports (ofi_providers) **/ + ofi_transport_ofi_prov_t ofi_prov[MAX_OFI_PROVIDERS]; size_t min_ofi_recv_buf_sz; /** "Any source" address */ fi_addr_t any_addr; - /** number of conduits currently opened **/ - uint8_t conduit_open_num; + /** number of ofi providers currently opened **/ + uint8_t ofi_prov_open_num; /** Unique message id for every message that is fragmented to be sent over OFI **/ uint32_t cur_msgid; + /* hashtable stores the peer addresses */ + opal_hash_table_t peers; + opal_list_t recv_msg_queue_list; opal_list_t queued_routing_messages; opal_event_t *timer_event; @@ -154,8 +158,15 @@ typedef struct { } ; typedef struct orte_rml_ofi_module_t orte_rml_ofi_module_t; +typedef struct { + opal_object_t super; + void* ofi_ep; + size_t ofi_ep_len; +} orte_rml_ofi_peer_t; +OBJ_CLASS_DECLARATION(orte_rml_ofi_peer_t); ORTE_MODULE_DECLSPEC extern orte_rml_component_t mca_rml_ofi_component; +extern orte_rml_ofi_module_t orte_rml_ofi; int orte_rml_ofi_send_buffer_nb(struct orte_rml_base_module_t *mod, orte_process_name_t* peer, @@ -172,8 +183,11 @@ int orte_rml_ofi_send_nb(struct orte_rml_base_module_t *mod, void* cbdata); /****************** INTERNAL OFI Functions*************/ -void free_conduit_resources( int conduit_id); +void free_ofi_prov_resources( int ofi_prov_id); void print_provider_list_info (struct fi_info *fi ); +void print_provider_info (struct fi_info *cur_fi ); +int cq_progress_handler(int sd, short flags, void *cbdata); +int get_ofi_prov_id( opal_list_t *attributes); /** Send callback */ int orte_rml_ofi_send_callback(struct fi_cq_data_entry *wc, @@ -184,7 +198,7 @@ int orte_rml_ofi_error_callback(struct fi_cq_err_entry *error, orte_rml_ofi_request_t*); /* OFI Recv handler */ -int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id); +int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t ofi_prov_id); END_C_DECLS diff --git a/orte/mca/rml/ofi/rml_ofi_component.c b/orte/mca/rml/ofi/rml_ofi_component.c index 6c3a7d5d0dc..1de1b56eca8 100644 --- a/orte/mca/rml/ofi/rml_ofi_component.c +++ b/orte/mca/rml/ofi/rml_ofi_component.c @@ -28,13 +28,17 @@ #include "rml_ofi.h" + static int rml_ofi_component_open(void); static int rml_ofi_component_close(void); +static int rml_ofi_component_init(void); static orte_rml_base_module_t* open_conduit(opal_list_t *attributes); static orte_rml_pathway_t* query_transports(void); -static char* get_contact_info(void); -static void set_contact_info(const char *uri); -static void close_conduit(orte_rml_base_module_t *mod); +static char* ofi_get_contact_info(void); +static void process_uri(char *uri); +static void ofi_set_contact_info (const char *uri); +void convert_to_sockaddr( char *ofiuri, struct sockaddr_in* ep_sockaddr); + /** * component definition */ @@ -58,20 +62,25 @@ orte_rml_component_t mca_rml_ofi_component = { .priority = 10, .open_conduit = open_conduit, .query_transports = query_transports, - .get_contact_info = get_contact_info, - .set_contact_info = set_contact_info, - .close_conduit = close_conduit + .get_contact_info = ofi_get_contact_info, + .set_contact_info = ofi_set_contact_info, + .close_conduit = NULL }; /* Local variables */ -static orte_rml_base_module_t base_module = { +orte_rml_ofi_module_t orte_rml_ofi = { + .api = { .component = (struct orte_rml_component_t*)&mca_rml_ofi_component, .ping = NULL, .send_nb = orte_rml_ofi_send_nb, .send_buffer_nb = orte_rml_ofi_send_buffer_nb, .purge = NULL + } }; +/* Local variables */ +static bool init_done = false; + static int rml_ofi_component_open(void) { @@ -80,41 +89,43 @@ rml_ofi_component_open(void) orte_rml_ofi.fi_info_list = NULL; orte_rml_ofi.min_ofi_recv_buf_sz = MIN_MULTI_BUF_SIZE; orte_rml_ofi.cur_msgid = 1; - orte_rml_ofi.cur_transport_id = RML_OFI_CONDUIT_ID_INVALID; - - for( uint8_t conduit_id=0; conduit_id < MAX_CONDUIT ; conduit_id++) { - orte_rml_ofi.ofi_conduits[conduit_id].fabric = NULL; - orte_rml_ofi.ofi_conduits[conduit_id].domain = NULL; - orte_rml_ofi.ofi_conduits[conduit_id].av = NULL; - orte_rml_ofi.ofi_conduits[conduit_id].cq = NULL; - orte_rml_ofi.ofi_conduits[conduit_id].ep = NULL; - orte_rml_ofi.ofi_conduits[conduit_id].ep_name[0] = 0; - orte_rml_ofi.ofi_conduits[conduit_id].epnamelen = 0; - orte_rml_ofi.ofi_conduits[conduit_id].mr_multi_recv = NULL; - orte_rml_ofi.ofi_conduits[conduit_id].rxbuf = NULL; - orte_rml_ofi.ofi_conduits[conduit_id].rxbuf_size = 0; - orte_rml_ofi.ofi_conduits[conduit_id].progress_ev_active = false; - orte_rml_ofi.ofi_conduits[conduit_id].conduit_id = RML_OFI_CONDUIT_ID_INVALID; - orte_rml_ofi.ofi_conduits[conduit_id].ofi_module = NULL; + orte_rml_ofi.cur_transport_id = RML_OFI_PROV_ID_INVALID; + orte_rml_ofi.ofi_prov_open_num = 0; + OBJ_CONSTRUCT(&orte_rml_ofi.peers, opal_hash_table_t); + opal_hash_table_init(&orte_rml_ofi.peers, 128); + + for( uint8_t ofi_prov_id=0; ofi_prov_id < MAX_OFI_PROVIDERS ; ofi_prov_id++) { + orte_rml_ofi.ofi_prov[ofi_prov_id].fabric = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].domain = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].av = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].cq = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].ep = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].ep_name[0] = 0; + orte_rml_ofi.ofi_prov[ofi_prov_id].epnamelen = 0; + orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf_size = 0; + orte_rml_ofi.ofi_prov[ofi_prov_id].progress_ev_active = false; + orte_rml_ofi.ofi_prov[ofi_prov_id].ofi_prov_id = RML_OFI_PROV_ID_INVALID; } - opal_output_verbose(1,orte_rml_base_framework.framework_output," from %s:%d rml_ofi_component_open()",__FILE__,__LINE__); + opal_output_verbose(10,orte_rml_base_framework.framework_output," from %s:%d rml_ofi_component_open()",__FILE__,__LINE__); return ORTE_SUCCESS; } -void free_conduit_resources( int conduit_id) +void free_ofi_prov_resources( int ofi_prov_id) { int ret=0; opal_output_verbose(10,orte_rml_base_framework.framework_output, - " %s - free_conduit_resources() begin. OFI conduit_id- %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),conduit_id); - if (orte_rml_ofi.ofi_conduits[conduit_id].ep) { + " %s - free_ofi_prov_resources() begin. OFI ofi_prov_id- %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ofi_prov_id); + if (orte_rml_ofi.ofi_prov[ofi_prov_id].ep) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - close ep",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - CLOSE_FID(orte_rml_ofi.ofi_conduits[conduit_id].ep); + CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].ep); if (ret) { opal_output_verbose(10,orte_rml_base_framework.framework_output, @@ -122,58 +133,53 @@ void free_conduit_resources( int conduit_id) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ret); } } - if (orte_rml_ofi.ofi_conduits[conduit_id].mr_multi_recv) { + if (orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - close mr_multi_recv",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - CLOSE_FID(orte_rml_ofi.ofi_conduits[conduit_id].mr_multi_recv); + CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv); } - if (orte_rml_ofi.ofi_conduits[conduit_id].cq) { + if (orte_rml_ofi.ofi_prov[ofi_prov_id].cq) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - close cq",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - CLOSE_FID(orte_rml_ofi.ofi_conduits[conduit_id].cq); + CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].cq); } - if (orte_rml_ofi.ofi_conduits[conduit_id].av) { - CLOSE_FID(orte_rml_ofi.ofi_conduits[conduit_id].av); + if (orte_rml_ofi.ofi_prov[ofi_prov_id].av) { + CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].av); } - if (orte_rml_ofi.ofi_conduits[conduit_id].domain) { + if (orte_rml_ofi.ofi_prov[ofi_prov_id].domain) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - close domain",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - CLOSE_FID(orte_rml_ofi.ofi_conduits[conduit_id].domain); + CLOSE_FID(orte_rml_ofi.ofi_prov[ofi_prov_id].domain); } - if (orte_rml_ofi.ofi_conduits[conduit_id].fabric) { + if (orte_rml_ofi.ofi_prov[ofi_prov_id].fabric) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - close fabric",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - fi_close((fid_t)orte_rml_ofi.ofi_conduits[conduit_id].fabric); + fi_close((fid_t)orte_rml_ofi.ofi_prov[ofi_prov_id].fabric); } - if (orte_rml_ofi.ofi_conduits[conduit_id].rxbuf) { - free(orte_rml_ofi.ofi_conduits[conduit_id].rxbuf); + if (orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf) { + free(orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf); } - if (orte_rml_ofi.ofi_conduits[conduit_id].ofi_module) { - free(orte_rml_ofi.ofi_conduits[conduit_id].ofi_module); - orte_rml_ofi.ofi_conduits[conduit_id].ofi_module = NULL; - } - orte_rml_ofi.ofi_conduits[conduit_id].fabric = NULL; - orte_rml_ofi.ofi_conduits[conduit_id].domain = NULL; - orte_rml_ofi.ofi_conduits[conduit_id].av = NULL; - orte_rml_ofi.ofi_conduits[conduit_id].cq = NULL; - orte_rml_ofi.ofi_conduits[conduit_id].ep = NULL; - orte_rml_ofi.ofi_conduits[conduit_id].ep_name[0] = 0; - orte_rml_ofi.ofi_conduits[conduit_id].epnamelen = 0; - orte_rml_ofi.ofi_conduits[conduit_id].rxbuf = NULL; - orte_rml_ofi.ofi_conduits[conduit_id].rxbuf_size = 0; - orte_rml_ofi.ofi_conduits[conduit_id].fabric_info = NULL; - orte_rml_ofi.ofi_conduits[conduit_id].mr_multi_recv = NULL; - orte_rml_ofi.ofi_conduits[conduit_id].conduit_id = RML_OFI_CONDUIT_ID_INVALID; - orte_rml_ofi.ofi_conduits[conduit_id].ofi_module = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].fabric = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].domain = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].av = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].cq = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].ep = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].ep_name[0] = 0; + orte_rml_ofi.ofi_prov[ofi_prov_id].epnamelen = 0; + orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].rxbuf_size = 0; + orte_rml_ofi.ofi_prov[ofi_prov_id].fabric_info = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv = NULL; + orte_rml_ofi.ofi_prov[ofi_prov_id].ofi_prov_id = RML_OFI_PROV_ID_INVALID; OPAL_LIST_DESTRUCT(&orte_rml_ofi.recv_msg_queue_list); - if( orte_rml_ofi.ofi_conduits[conduit_id].progress_ev_active) { + if( orte_rml_ofi.ofi_prov[ofi_prov_id].progress_ev_active) { opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - deleting progress event", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - opal_event_del( &orte_rml_ofi.ofi_conduits[conduit_id].progress_event); + opal_event_del( &orte_rml_ofi.ofi_prov[ofi_prov_id].progress_event); } return; @@ -183,223 +189,107 @@ void free_conduit_resources( int conduit_id) static int rml_ofi_component_close(void) { + + int rc; + opal_object_t *value; + uint64_t key; + void *node; + opal_output_verbose(10,orte_rml_base_framework.framework_output, - " %s - rml_ofi_component_close() -begin, total open OFI conduits = %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),orte_rml_ofi.conduit_open_num); + " %s - rml_ofi_component_close() -begin, total open OFI providers = %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),orte_rml_ofi.ofi_prov_open_num); if(orte_rml_ofi.fi_info_list) { (void) fi_freeinfo(orte_rml_ofi.fi_info_list); } /* Close endpoint and all queues */ - for( uint8_t conduit_id=0;conduit_idfabric_attr->prov_name); + opal_output_verbose(10,orte_rml_base_framework.framework_output, + " Protocol : %s",fi_tostr(&cur_fi->ep_attr->protocol,FI_TYPE_PROTOCOL)); + opal_output_verbose(10,orte_rml_base_framework.framework_output, + " EP Type : %s",fi_tostr(&cur_fi->ep_attr->type,FI_TYPE_EP_TYPE)); + opal_output_verbose(10,orte_rml_base_framework.framework_output, + " address_format : %s",fi_tostr(&cur_fi->addr_format,FI_TYPE_ADDR_FORMAT)); +} + void print_provider_list_info (struct fi_info *fi ) { struct fi_info *cur_fi = fi; int fi_count = 0; //Display all the details in the fi_info structure - opal_output_verbose(1,orte_rml_base_framework.framework_output, + opal_output_verbose(10,orte_rml_base_framework.framework_output, " %s - Print_provider_list_info() ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - while(cur_fi != NULL) { + while( NULL != cur_fi ) { fi_count++; opal_output_verbose(10,orte_rml_base_framework.framework_output, " %d.\n",fi_count); - opal_output_verbose(10,orte_rml_base_framework.framework_output, - " fi_info[]->caps : 0x%x \n",cur_fi->caps); - opal_output_verbose(10,orte_rml_base_framework.framework_output, - " fi_info[]->mode : 0x%x \n",cur_fi->mode); - opal_output_verbose(10,orte_rml_base_framework.framework_output, - " fi_info[]->address_format : 0x%x \n",cur_fi->addr_format); - opal_output_verbose(10,orte_rml_base_framework.framework_output, - " fi_info[]->fabric_attr->provname : %s \n",cur_fi->fabric_attr->prov_name); - opal_output_verbose(10,orte_rml_base_framework.framework_output, - " fi_info[]->src_address : 0x%x \n",cur_fi->src_addr); - opal_output_verbose(10,orte_rml_base_framework.framework_output, - " fi_info[]->dest_address : 0x%x \n",cur_fi->dest_addr); - opal_output_verbose(10,orte_rml_base_framework.framework_output, - " EndPoint Attributes (ep_attr) :"); - switch( cur_fi->ep_attr->type) - { - case FI_EP_UNSPEC: - opal_output_verbose(10,orte_rml_base_framework.framework_output," FI_EP_UNSPEC \n"); - break; - case FI_EP_MSG: - opal_output_verbose(10,orte_rml_base_framework.framework_output," FI_EP_MSG \n"); - break; - case FI_EP_DGRAM: - opal_output_verbose(10,orte_rml_base_framework.framework_output," FI_EP_DGRAM \n"); - break; - case FI_EP_RDM: - opal_output_verbose(10,orte_rml_base_framework.framework_output," FI_EP_RDM \n"); - break; - default: - opal_output_verbose(10,orte_rml_base_framework.framework_output," %d",cur_fi->ep_attr->type); - } - opal_output_verbose(10,orte_rml_base_framework.framework_output, - " Protocol : 0x%x \n", cur_fi->ep_attr->protocol); - cur_fi = cur_fi->next; + print_provider_info( cur_fi); + cur_fi = cur_fi->next; } opal_output_verbose(10,orte_rml_base_framework.framework_output, "Total # of providers supported is %d\n",fi_count); - } /* * This returns all the supported transports in the system that support endpoint type RDM (reliable datagram) * The providers returned is a list of type opal_valut_t holding opal_list_t */ -static int orte_rml_ofi_query_transports(opal_value_t **providers) -{ - opal_list_t *ofi_prov = NULL; - opal_value_t *providers_list, *prev_provider=NULL, *next_provider=NULL; - struct fi_info *cur_fi = NULL; - int ret = 0, prov_num = 0; - - opal_output_verbose(10,orte_rml_base_framework.framework_output, - " %s -Begin of query_transports()",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) ); - if ( NULL == *providers) - { - *providers = OBJ_NEW(opal_value_t); - } - - providers_list = *providers; - - //Create the opal_value_t list in which each item is an opal_list_t that holds the provider details - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "Starting to add the providers in a loop from orte_rml_ofi.ofi_conduits[] %s:%d",__FILE__,__LINE__); - - for ( prov_num = 0; prov_num < orte_rml_ofi.conduit_open_num ; prov_num++ ) { - cur_fi = orte_rml_ofi.ofi_conduits[prov_num].fabric_info; - if( NULL != prev_provider) - { - //if there is another provider in the array, then add another item to the providers_list - next_provider = OBJ_NEW(opal_value_t); - providers_list->super.opal_list_next = &next_provider->super; - providers_list->super.opal_list_prev = &prev_provider->super; - providers_list = (opal_value_t *)providers_list->super.opal_list_next; - } - - /* populate the opal_list_t *ofi_prov with provider details from the - * orte_rml_ofi.fi_info_list array populated in the rml_ofi_component_init() fn.*/ - ofi_prov = OBJ_NEW(opal_list_t); - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "\n loading the attribute ORTE_CONDUIT_ID"); - if( ORTE_SUCCESS != - (ret = orte_set_attribute( ofi_prov, ORTE_CONDUIT_ID, ORTE_ATTR_GLOBAL, - (void *)&orte_rml_ofi.ofi_conduits[prov_num].conduit_id ,OPAL_UINT8))) { - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d Not able to add provider conduit_id ",__FILE__,__LINE__); - return ORTE_ERROR; - } - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "\n provider conduit_id : %d",orte_rml_ofi.ofi_conduits[prov_num].conduit_id); - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "\n loading the attribute ORTE_PROV_NAME"); - if( ORTE_SUCCESS == - (ret = orte_set_attribute( ofi_prov, ORTE_PROV_NAME, ORTE_ATTR_GLOBAL,cur_fi->fabric_attr->prov_name ,OPAL_STRING))) { - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "\n loading the attribute ORTE_PROTOCOL %s",fi_tostr(&cur_fi->ep_attr->protocol,FI_TYPE_PROTOCOL)); - if( ORTE_SUCCESS == - (ret = orte_set_attribute( ofi_prov, ORTE_PROTOCOL, ORTE_ATTR_GLOBAL,(void *)&cur_fi->ep_attr->protocol ,OPAL_UINT32))) { - // insert the opal_list_t into opal_value_t list - opal_value_load(providers_list,ofi_prov,OPAL_PTR); - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "\n loading the provider opal_list_t* prov=%x into opal_value_t list successful", - ofi_prov); - } else { - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d Not able to add provider name ",__FILE__,__LINE__); - return ORTE_ERROR; - } - } else { - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d Not able to add provider name ",__FILE__,__LINE__); - return ORTE_ERROR; - } - - prev_provider = providers_list; - cur_fi = cur_fi->next; - } - - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "\n%s:%d Completed Query Interface",__FILE__,__LINE__); - return ORTE_SUCCESS; -} - - -/*debug routine to print the opal_value_t returned by query interface */ -void print_transports_query() +static orte_rml_pathway_t* query_transports(void) { - opal_value_t *providers=NULL; - char* prov_name = NULL; - int ret; - int32_t *protocol_ptr, protocol; - int8_t* prov_num; - protocol_ptr = &protocol; - - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "\n print_transports_query() Begin- %s:%d",__FILE__,__LINE__); - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "\n calling the orte_rml_ofi_query_transports() "); - if( ORTE_SUCCESS == orte_rml_ofi_query_transports(&providers)) - { - opal_output_verbose(20,orte_rml_base_framework.framework_output, - "\n query_transports() completed, printing details\n"); - while (providers) - { - //get the first opal_list_t; - opal_list_t *prov; - ret = opal_value_unload(providers,(void **)&prov,OPAL_PTR); - if (ret == OPAL_SUCCESS) { - if( orte_get_attribute( prov, ORTE_CONDUIT_ID, (void **)&prov_num,OPAL_UINT8)) { - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "\n OFI Provider conduit_id : %d",*prov_num); - } - if( orte_get_attribute( prov, ORTE_PROTOCOL, (void **)&protocol_ptr,OPAL_UINT32)) { - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "\n Protocol : %d", *protocol_ptr); - } - if( orte_get_attribute( prov, ORTE_PROV_NAME, (void **)&prov_name ,OPAL_STRING)) { - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "\n Provider name : %s",prov_name); - } else { - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "\n Error in getting Provider name"); - } - } else { - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "\n %s:%d opal_value_unload() failed, opal_list* prov = %x", - __FILE__,__LINE__,prov); - } - providers = (opal_value_t *)providers->super.opal_list_next; - } - } else { - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "\n query_transports() returned Error "); - } opal_output_verbose(10,orte_rml_base_framework.framework_output, - "\n End of print_transports_query() \n"); + "%s:%d OFI Query Interface not implemented",__FILE__,__LINE__); + return NULL; } - - /** - conduit [in]: the ofi conduit_id that triggered the progress fn + ofi_prov [in]: the ofi ofi_prov_id that triggered the progress fn **/ __opal_attribute_always_inline__ static inline int -orte_rml_ofi_progress(ofi_transport_conduit_t* conduit) +orte_rml_ofi_progress(ofi_transport_ofi_prov_t* prov) { ssize_t ret; int count=0; /* number of messages read and processed */ @@ -407,10 +297,10 @@ orte_rml_ofi_progress(ofi_transport_conduit_t* conduit) struct fi_cq_err_entry error = { 0 }; orte_rml_ofi_request_t *ofi_req; - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s orte_rml_ofi_progress called for OFI conduitid %d", + opal_output_verbose(10, orte_rml_base_framework.framework_output, + "%s orte_rml_ofi_progress called for OFI ofi_provid %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - conduit->conduit_id); + prov->ofi_prov_id); /** * Read the work completions from the CQ. * From the completion's op_context, we get the associated OFI request. @@ -418,20 +308,20 @@ orte_rml_ofi_progress(ofi_transport_conduit_t* conduit) */ while (true) { /* Read the cq - that triggered the libevent to call this progress fn. */ - ret = fi_cq_read(conduit->cq, (void *)&wc, 1); + ret = fi_cq_read(prov->cq, (void *)&wc, 1); if (0 < ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s cq read for OFI conduitid %d - wc.flags = %x", + opal_output_verbose(15, orte_rml_base_framework.framework_output, + "%s cq read for OFI ofi_provid %d - wc.flags = %llx", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - conduit->conduit_id, wc.flags); + prov->ofi_prov_id, wc.flags); count++; // check the flags to see if this is a send-completion or receive if ( wc.flags & FI_SEND ) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s Send completion received on OFI conduitid %d", + opal_output_verbose(15, orte_rml_base_framework.framework_output, + "%s Send completion received on OFI provider id %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - conduit->conduit_id); + prov->ofi_prov_id); if (NULL != wc.op_context) { /* get the context from the wc and call the message handler */ ofi_req = TO_OFI_REQ(wc.op_context); @@ -439,59 +329,59 @@ orte_rml_ofi_progress(ofi_transport_conduit_t* conduit) ret = orte_rml_ofi_send_callback(&wc, ofi_req); if (ORTE_SUCCESS != ret) { opal_output(orte_rml_base_framework.framework_output, - "Error returned by OFI send callback handler when a send completion was received on OFI conduit: %zd", + "Error returned by OFI send callback handler when a send completion was received on OFI prov: %zd", ret); } } } else if ( (wc.flags & FI_RECV) && (wc.flags & FI_MULTI_RECV) ) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s Received message on OFI conduitid %d - but buffer is consumed, need to repost", + opal_output_verbose(15, orte_rml_base_framework.framework_output, + "%s Received message on OFI ofi_prov_id %d - but buffer is consumed, need to repost", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - conduit->conduit_id); + prov->ofi_prov_id); // reposting buffer - ret = fi_recv(orte_rml_ofi.ofi_conduits[conduit->conduit_id].ep, - orte_rml_ofi.ofi_conduits[conduit->conduit_id].rxbuf, - orte_rml_ofi.ofi_conduits[conduit->conduit_id].rxbuf_size, - fi_mr_desc(orte_rml_ofi.ofi_conduits[conduit->conduit_id].mr_multi_recv), - 0,&(conduit->rx_ctx1)); + ret = fi_recv(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].ep, + orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf, + orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf_size, + fi_mr_desc(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].mr_multi_recv), + 0,&(prov->rx_ctx1)); // call the receive message handler that will call the rml_base - ret = orte_rml_ofi_recv_handler(&wc, conduit->conduit_id); + ret = orte_rml_ofi_recv_handler(&wc, prov->ofi_prov_id); if (ORTE_SUCCESS != ret) { opal_output(orte_rml_base_framework.framework_output, - "Error returned by OFI Recv handler when handling the received message on the conduit: %zd", + "Error returned by OFI Recv handler when handling the received message on the prov: %zd", ret); } } else if ( wc.flags & FI_RECV ) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s Received message on OFI conduitid %d", + opal_output_verbose(15, orte_rml_base_framework.framework_output, + "%s Received message on OFI provider id %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - conduit->conduit_id); + prov->ofi_prov_id); // call the receive message handler that will call the rml_base - ret = orte_rml_ofi_recv_handler(&wc, conduit->conduit_id); + ret = orte_rml_ofi_recv_handler(&wc, prov->ofi_prov_id); if (ORTE_SUCCESS != ret) { opal_output(orte_rml_base_framework.framework_output, - "Error returned by OFI Recv handler when handling the received message on the OFI conduit: %zd", + "Error returned by OFI Recv handler when handling the received message on the OFI prov: %zd", ret); } } else if ( wc.flags & FI_MULTI_RECV ) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s Received buffer overrun message on OFI conduitid %d - need to repost", + opal_output_verbose(15, orte_rml_base_framework.framework_output, + "%s Received buffer overrun message on OFI provider id %d - need to repost", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - conduit->conduit_id); + prov->ofi_prov_id); // reposting buffer - ret = fi_recv(orte_rml_ofi.ofi_conduits[conduit->conduit_id].ep, - orte_rml_ofi.ofi_conduits[conduit->conduit_id].rxbuf, - orte_rml_ofi.ofi_conduits[conduit->conduit_id].rxbuf_size, - fi_mr_desc(orte_rml_ofi.ofi_conduits[conduit->conduit_id].mr_multi_recv), - 0,&(conduit->rx_ctx1)); + ret = fi_recv(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].ep, + orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf, + orte_rml_ofi.ofi_prov[prov->ofi_prov_id].rxbuf_size, + fi_mr_desc(orte_rml_ofi.ofi_prov[prov->ofi_prov_id].mr_multi_recv), + 0,&(prov->rx_ctx1)); if (ORTE_SUCCESS != ret) { opal_output(orte_rml_base_framework.framework_output, - "Error returned by OFI when reposting buffer on the OFI conduit: %zd", + "Error returned by OFI when reposting buffer on the OFI prov: %zd", ret); } }else { opal_output_verbose(1,orte_rml_base_framework.framework_output, - "CQ has unhandled completion event with FLAG wc.flags = 0x%x", + "CQ has unhandled completion event with FLAG wc.flags = 0x%llx", wc.flags); } } else if (ret == -FI_EAVAIL) { @@ -500,13 +390,11 @@ orte_rml_ofi_progress(ofi_transport_conduit_t* conduit) * Read the error and forward it to the upper layer. */ opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s cq_read for OFI conduitid %d returned error 0x%x <%s>", + "%s cq_read for OFI provider id %d returned error 0x%zx <%s>", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - conduit->conduit_id, ret, + prov->ofi_prov_id, ret, fi_strerror((int) -ret) ); - ret = fi_cq_readerr(conduit->cq, - &error, - 0); + ret = fi_cq_readerr(prov->cq,&error,0); if (0 > ret) { opal_output_verbose(1,orte_rml_base_framework.framework_output, "Error returned from fi_cq_readerr: %zd", ret); @@ -527,15 +415,15 @@ orte_rml_ofi_progress(ofi_transport_conduit_t* conduit) * The CQ is empty. Return. */ opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s Empty cq for OFI conduitid %d,exiting from ofi_progress()", + "%s Empty cq for OFI provider id %d,exiting from ofi_progress()", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - conduit->conduit_id ); + prov->ofi_prov_id ); break; } else { opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s cq_read for OFI conduitid %d returned error 0x%x <%s>", + "%s cq_read for OFI provider id %d returned error 0x%zx <%s>", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - conduit->conduit_id, ret, + prov->ofi_prov_id, ret, fi_strerror((int) -ret) ); break; } @@ -550,52 +438,42 @@ orte_rml_ofi_progress(ofi_transport_conduit_t* conduit) */ int cq_progress_handler(int sd, short flags, void *cbdata) { - ofi_transport_conduit_t* conduit = (ofi_transport_conduit_t*)cbdata; + ofi_transport_ofi_prov_t* prov = (ofi_transport_ofi_prov_t*)cbdata; int count; - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s cq_progress_handler called for OFI conduitid %d", + opal_output_verbose(10, orte_rml_base_framework.framework_output, + "%s cq_progress_handler called for OFI Provider id %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - conduit->conduit_id); + prov->ofi_prov_id); /* call the progress fn to read the cq and process the message - * for the conduit */ - count = orte_rml_ofi_progress(conduit); + * for the ofi provider */ + count = orte_rml_ofi_progress(prov); return count; } -static orte_rml_base_module_t* -rml_ofi_component_init(int* priority) +/* + * Returns the number of ofi-providers available + */ +static int rml_ofi_component_init(void) { int ret, fi_version; struct fi_info *hints, *fabric_info; struct fi_cq_attr cq_attr = {0}; struct fi_av_attr av_attr = {0}; - size_t namelen; - char ep_name[FI_NAME_MAX]= {0}; - uint8_t fabric_id = 0, cur_conduit; + char *pmix_key; + uint8_t cur_ofi_prov; - opal_output_verbose(20,orte_rml_base_framework.framework_output, + opal_output_verbose(10,orte_rml_base_framework.framework_output, "%s - Entering rml_ofi_component_init()",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - /*[TODO] Limiting current implementation to APP PROCs. The PMIX component does not get initialised for - * ORTE_DAEMON_PROC so the OPAL_MODEX_SEND_STRING (opal_pmix.put) fails during initialisation, - * this needs to be fixed to support DAEMON PROC to use ofi - */ - if (!ORTE_PROC_IS_APP) { - *priority = 0; - return NULL; - } if (init_done) { - *priority = 2; - return &orte_rml_ofi.api; + return orte_rml_ofi.ofi_prov_open_num; } - *priority = 2; - - + /** * Hints to filter providers * See man fi_getinfo for a list of all filters @@ -612,7 +490,7 @@ rml_ofi_component_init(int* priority) opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: Could not allocate fi_info\n", __FILE__, __LINE__); - return NULL; + return orte_rml_ofi.ofi_prov_open_num; } /** @@ -634,7 +512,7 @@ rml_ofi_component_init(int* priority) * Specify the version of OFI is coded to, the provider will select struct * layouts that are compatible with this version. */ - fi_version = FI_VERSION(1, 1); + fi_version = FI_VERSION(1, 3); /** * fi_getinfo: returns information about fabric services for reaching a @@ -659,16 +537,17 @@ rml_ofi_component_init(int* priority) */ /** create the OFI objects for each transport in the system - * (fi_info_list) and store it in the ofi_conduits array **/ - orte_rml_ofi.conduit_open_num = 0; // start the conduit_id from 0 + * (fi_info_list) and store it in the ofi_prov array **/ + orte_rml_ofi.ofi_prov_open_num = 0; // start the ofi_prov_id from 0 for( fabric_info = orte_rml_ofi.fi_info_list ; - NULL != fabric_info && orte_rml_ofi.conduit_open_num < MAX_CONDUIT ; fabric_info = fabric_info->next) + NULL != fabric_info && orte_rml_ofi.ofi_prov_open_num < MAX_OFI_PROVIDERS ; fabric_info = fabric_info->next) { - opal_output_verbose(100,orte_rml_base_framework.framework_output, - "%s:%d beginning to add endpoint for OFIconduit_id=%d ",__FILE__,__LINE__,orte_rml_ofi.conduit_open_num); - cur_conduit = orte_rml_ofi.conduit_open_num; - orte_rml_ofi.ofi_conduits[cur_conduit].conduit_id = orte_rml_ofi.conduit_open_num ; - orte_rml_ofi.ofi_conduits[cur_conduit].fabric_info = fabric_info; + opal_output_verbose(10,orte_rml_base_framework.framework_output, + "%s:%d beginning to add endpoint for OFI_provider_id=%d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); + print_provider_info(fabric_info); + cur_ofi_prov = orte_rml_ofi.ofi_prov_open_num; + orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id = orte_rml_ofi.ofi_prov_open_num ; + orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info = fabric_info; // set FI_MULTI_RECV flag for all recv operations fabric_info->rx_attr->op_flags = FI_MULTI_RECV; @@ -680,13 +559,13 @@ rml_ofi_component_init(int* priority) */ ret = fi_fabric(fabric_info->fabric_attr, /* In: Fabric attributes */ - &orte_rml_ofi.ofi_conduits[cur_conduit].fabric, /* Out: Fabric handle */ + &orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* Out: Fabric handle */ NULL); /* Optional context for fabric events */ if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_fabric failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - orte_rml_ofi.ofi_conduits[cur_conduit].fabric = NULL; + orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric = NULL; /* abort this current transport, but check if next transport can be opened */ continue; } @@ -697,16 +576,15 @@ rml_ofi_component_init(int* priority) * hardware port/collection of ports. Returns a domain object that can be * used to create endpoints. See man fi_domain for details. */ - ret = fi_domain(orte_rml_ofi.ofi_conduits[cur_conduit].fabric, /* In: Fabric object */ + ret = fi_domain(orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric, /* In: Fabric object */ fabric_info, /* In: Provider */ - &orte_rml_ofi.ofi_conduits[cur_conduit].domain, /* Out: Domain oject */ + &orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* Out: Domain oject */ NULL); /* Optional context for domain events */ if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_domain failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - orte_rml_ofi.ofi_conduits[cur_conduit].domain = NULL; - free_conduit_resources(cur_conduit); + orte_rml_ofi.ofi_prov[cur_ofi_prov].domain = NULL; /* abort this current transport, but check if next transport can be opened */ continue; } @@ -718,15 +596,15 @@ rml_ofi_component_init(int* priority) * completion queues, etc. * see man fi_endpoint for more details. */ - ret = fi_endpoint(orte_rml_ofi.ofi_conduits[cur_conduit].domain, /* In: Domain object */ + ret = fi_endpoint(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, /* In: Domain object */ fabric_info, /* In: Provider */ - &orte_rml_ofi.ofi_conduits[cur_conduit].ep, /* Out: Endpoint object */ + &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, /* Out: Endpoint object */ NULL); /* Optional context */ if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_endpoint failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - free_conduit_resources(cur_conduit); + free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } @@ -746,13 +624,13 @@ rml_ofi_component_init(int* priority) cq_attr.format = FI_CQ_FORMAT_DATA; cq_attr.wait_obj = FI_WAIT_FD; cq_attr.wait_cond = FI_CQ_COND_NONE; - ret = fi_cq_open(orte_rml_ofi.ofi_conduits[cur_conduit].domain, - &cq_attr, &orte_rml_ofi.ofi_conduits[cur_conduit].cq, NULL); + ret = fi_cq_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, + &cq_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, NULL); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_cq_open failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - free_conduit_resources(cur_conduit); + free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } @@ -762,13 +640,13 @@ rml_ofi_component_init(int* priority) * So, we use the AV in "map" mode. */ av_attr.type = FI_AV_MAP; - ret = fi_av_open(orte_rml_ofi.ofi_conduits[cur_conduit].domain, - &av_attr, &orte_rml_ofi.ofi_conduits[cur_conduit].av, NULL); + ret = fi_av_open(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, + &av_attr, &orte_rml_ofi.ofi_prov[cur_ofi_prov].av, NULL); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_av_open failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - free_conduit_resources(cur_conduit); + free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } @@ -776,26 +654,26 @@ rml_ofi_component_init(int* priority) /** * Bind the CQ and AV to the endpoint object. */ - ret = fi_ep_bind(orte_rml_ofi.ofi_conduits[cur_conduit].ep, - (fid_t)orte_rml_ofi.ofi_conduits[cur_conduit].cq, + ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, + (fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].cq, FI_SEND | FI_RECV); if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_bind CQ-EP failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - free_conduit_resources(cur_conduit); + free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } - ret = fi_ep_bind(orte_rml_ofi.ofi_conduits[cur_conduit].ep, - (fid_t)orte_rml_ofi.ofi_conduits[cur_conduit].av, + ret = fi_ep_bind(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, + (fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].av, 0); if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_bind AV-EP failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - free_conduit_resources(cur_conduit); + free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } @@ -804,89 +682,55 @@ rml_ofi_component_init(int* priority) * Enable the endpoint for communication * This commits the bind operations. */ - ret = fi_enable(orte_rml_ofi.ofi_conduits[cur_conduit].ep); + ret = fi_enable(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep); if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_enable failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - free_conduit_resources(cur_conduit); + free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } opal_output_verbose(10,orte_rml_base_framework.framework_output, - "%s:%d ep enabled for conduit_id - %d ",__FILE__,__LINE__,orte_rml_ofi.ofi_conduits[cur_conduit].conduit_id); + "%s:%d ep enabled for ofi_prov_id - %d ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov[cur_ofi_prov].ofi_prov_id); /** * Get our address and publish it with modex. **/ - orte_rml_ofi.ofi_conduits[cur_conduit].epnamelen = sizeof (orte_rml_ofi.ofi_conduits[cur_conduit].ep_name); - ret = fi_getname((fid_t)orte_rml_ofi.ofi_conduits[cur_conduit].ep, - &orte_rml_ofi.ofi_conduits[cur_conduit].ep_name[0], - &orte_rml_ofi.ofi_conduits[cur_conduit].epnamelen); + orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen = sizeof (orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name); + ret = fi_getname((fid_t)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, + &orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name[0], + &orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_getname failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - free_conduit_resources(cur_conduit); + free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } - switch ( orte_rml_ofi.ofi_conduits[cur_conduit].fabric_info->addr_format) - { - case FI_SOCKADDR_IN : - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__); - /* Address is of type sockaddr_in (IPv4) */ - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s sending Opal modex string for conduit_it %d, epnamelen = %d ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),cur_conduit,orte_rml_ofi.ofi_conduits[cur_conduit].epnamelen); - /*[debug] - print the sockaddr - port and s_addr */ - struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_conduits[cur_conduit].ep_name; - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s port = 0x%x, InternetAddr = %s ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),inet_ntoa(ep_sockaddr->sin_addr)); - /*[end debug]*/ - OPAL_MODEX_SEND_STRING( ret, OPAL_PMIX_GLOBAL, - OPAL_RML_OFI_FI_SOCKADDR_IN, - orte_rml_ofi.ofi_conduits[cur_conduit].ep_name, - orte_rml_ofi.ofi_conduits[cur_conduit].epnamelen); - if (ORTE_SUCCESS != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, + /* Register the ofi address of this peer with PMIX server only if it is a user process / + * for daemons the set/get_contact_info is used to exchange this information */ + if (ORTE_PROC_IS_APP) { + asprintf(&pmix_key,"%s%d",orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name,cur_ofi_prov); + opal_output_verbose(25, orte_rml_base_framework.framework_output, + "%s calling OPAL_MODEX_SEND_STRING key - %s ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pmix_key ); + OPAL_MODEX_SEND_STRING( ret, OPAL_PMIX_GLOBAL, + pmix_key, + orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name, + orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); + free(pmix_key); + if (ORTE_SUCCESS != ret) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: OPAL_MODEX_SEND failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - free_conduit_resources(cur_conduit); - /*abort this current transport, but check if next transport can be opened*/ - continue; - } - break; - case FI_ADDR_PSMX : - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d In FI_ADDR_PSMX. ",__FILE__,__LINE__); - /* Address is of type Intel proprietery PSMX */ - OPAL_MODEX_SEND_STRING( ret, OPAL_PMIX_GLOBAL, - OPAL_RML_OFI_FI_ADDR_PSMX,orte_rml_ofi.ofi_conduits[cur_conduit].ep_name, - orte_rml_ofi.ofi_conduits[cur_conduit].epnamelen); - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d Opal modex send completed for FI_ADDR_PSMX. ",__FILE__,__LINE__); - if (ORTE_SUCCESS != ret) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s:%d: OPAL_MODEX_SEND failed: %s\n", - __FILE__, __LINE__, fi_strerror(-ret)); - free_conduit_resources(cur_conduit); - /*abort this current transport, but check if next transport can be opened*/ - continue; - } - break; - default: - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d ERROR: Cannot register address, Unhandled addr_format - %d, ep_name - %s ", - __FILE__,__LINE__,orte_rml_ofi.ofi_conduits[cur_conduit].fabric_info->addr_format, - orte_rml_ofi.ofi_conduits[cur_conduit].ep_name); - free_conduit_resources(cur_conduit); - /*abort this current transport, but check if next transport can be opened*/ - continue; + free_ofi_prov_resources(cur_ofi_prov); + /*abort this current transport, but check if next transport can be opened*/ + continue; + } } /** @@ -898,57 +742,57 @@ rml_ofi_component_init(int* priority) * Allocate tx,rx buffers and Post a multi-RECV buffer for each endpoint **/ //[TODO later] For now not considering ep_attr prefix_size (add this later) - orte_rml_ofi.ofi_conduits[cur_conduit].rxbuf_size = MIN_MULTI_BUF_SIZE * MULTI_BUF_SIZE_FACTOR; - orte_rml_ofi.ofi_conduits[cur_conduit].rxbuf = malloc(orte_rml_ofi.ofi_conduits[cur_conduit].rxbuf_size); - - ret = fi_mr_reg(orte_rml_ofi.ofi_conduits[cur_conduit].domain, - orte_rml_ofi.ofi_conduits[cur_conduit].rxbuf, - orte_rml_ofi.ofi_conduits[cur_conduit].rxbuf_size, - FI_RECV, 0, 0, 0, &orte_rml_ofi.ofi_conduits[cur_conduit].mr_multi_recv, - &orte_rml_ofi.ofi_conduits[cur_conduit].rx_ctx1); + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size = MIN_MULTI_BUF_SIZE * MULTI_BUF_SIZE_FACTOR; + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf = malloc(orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size); + + ret = fi_mr_reg(orte_rml_ofi.ofi_prov[cur_ofi_prov].domain, + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf, + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size, + FI_RECV, 0, 0, 0, &orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv, + &orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_mr_reg failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - free_conduit_resources(cur_conduit); + free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } - ret = fi_setopt(&orte_rml_ofi.ofi_conduits[cur_conduit].ep->fid, FI_OPT_ENDPOINT, FI_OPT_MIN_MULTI_RECV, + ret = fi_setopt(&orte_rml_ofi.ofi_prov[cur_ofi_prov].ep->fid, FI_OPT_ENDPOINT, FI_OPT_MIN_MULTI_RECV, &orte_rml_ofi.min_ofi_recv_buf_sz, sizeof(orte_rml_ofi.min_ofi_recv_buf_sz) ); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_setopt failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - free_conduit_resources(cur_conduit); + free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } - ret = fi_recv(orte_rml_ofi.ofi_conduits[cur_conduit].ep, - orte_rml_ofi.ofi_conduits[cur_conduit].rxbuf, - orte_rml_ofi.ofi_conduits[cur_conduit].rxbuf_size, - fi_mr_desc(orte_rml_ofi.ofi_conduits[cur_conduit].mr_multi_recv), - 0,&orte_rml_ofi.ofi_conduits[cur_conduit].rx_ctx1); + ret = fi_recv(orte_rml_ofi.ofi_prov[cur_ofi_prov].ep, + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf, + orte_rml_ofi.ofi_prov[cur_ofi_prov].rxbuf_size, + fi_mr_desc(orte_rml_ofi.ofi_prov[cur_ofi_prov].mr_multi_recv), + 0,&orte_rml_ofi.ofi_prov[cur_ofi_prov].rx_ctx1); if (ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_recv failed: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - free_conduit_resources(cur_conduit); + free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } /** * get the fd and register the progress fn **/ - ret = fi_control(&orte_rml_ofi.ofi_conduits[cur_conduit].cq->fid, FI_GETWAIT, - (void *) &orte_rml_ofi.ofi_conduits[cur_conduit].fd); + ret = fi_control(&orte_rml_ofi.ofi_prov[cur_ofi_prov].cq->fid, FI_GETWAIT, + (void *) &orte_rml_ofi.ofi_prov[cur_ofi_prov].fd); if (0 != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s:%d: fi_control failed to get fd: %s\n", __FILE__, __LINE__, fi_strerror(-ret)); - free_conduit_resources(cur_conduit); + free_ofi_prov_resources(cur_ofi_prov); /* abort this current transport, but check if next transport can be opened */ continue; } @@ -958,26 +802,22 @@ rml_ofi_component_init(int* priority) * so when something is available to read, the cq_porgress_handler * will be called */ opal_event_set(orte_event_base, - &orte_rml_ofi.ofi_conduits[cur_conduit].progress_event, - orte_rml_ofi.ofi_conduits[cur_conduit].fd, + &orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, + orte_rml_ofi.ofi_prov[cur_ofi_prov].fd, OPAL_EV_READ|OPAL_EV_PERSIST, cq_progress_handler, - &orte_rml_ofi.ofi_conduits[cur_conduit]); - opal_event_add(&orte_rml_ofi.ofi_conduits[cur_conduit].progress_event, 0); - orte_rml_ofi.ofi_conduits[cur_conduit].progress_ev_active = true; - - /** allocate space for module to be returned if this ofi_conduit transport is requested by rml */ - orte_rml_ofi.ofi_conduits[cur_conduit].ofi_module = (orte_rml_ofi_module_t*)calloc(1, sizeof(orte_rml_ofi_module_t)); - /* copy the function pointers across */ - memcpy(orte_rml_ofi.ofi_conduits[cur_conduit].ofi_module, &orte_rml_ofi, sizeof(orte_rml_ofi_module_t)); - /** update the number of conduits in the ofi_conduits[] array **/ - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d Conduit id - %d created ",__FILE__,__LINE__,orte_rml_ofi.conduit_open_num); - orte_rml_ofi.conduit_open_num++; + &orte_rml_ofi.ofi_prov[cur_ofi_prov]); + opal_event_add(&orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_event, 0); + orte_rml_ofi.ofi_prov[cur_ofi_prov].progress_ev_active = true; + + /** update the number of ofi_provs in the ofi_prov[] array **/ + opal_output_verbose(10,orte_rml_base_framework.framework_output, + "%s:%d ofi_prov id - %d created ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); + orte_rml_ofi.ofi_prov_open_num++; } - if (fabric_info != NULL && orte_rml_ofi.conduit_open_num >= MAX_CONDUIT ) { + if (fabric_info != NULL && orte_rml_ofi.ofi_prov_open_num >= MAX_OFI_PROVIDERS ) { opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d fi_getinfo list not fully parsed as MAX_CONDUIT - %d reached ",__FILE__,__LINE__,orte_rml_ofi.conduit_open_num); + "%s:%d fi_getinfo list not fully parsed as MAX_OFI_PROVIDERS - %d reached ",__FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); } @@ -987,34 +827,31 @@ rml_ofi_component_init(int* priority) */ fi_freeinfo(hints); hints = NULL; - /* only if atleast one conduit was successfully opened then return the module */ - if (0 < orte_rml_ofi.conduit_open_num ) { - opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d OFIconduits openened=%d returning orte_rml_ofi.api", - __FILE__,__LINE__,orte_rml_ofi.conduit_open_num); + /* check if atleast one ofi_prov was successfully opened */ + if (0 < orte_rml_ofi.ofi_prov_open_num ) { + opal_output_verbose(10,orte_rml_base_framework.framework_output, + "%s:%d ofi providers openened=%d returning orte_rml_ofi.api", + __FILE__,__LINE__,orte_rml_ofi.ofi_prov_open_num); OBJ_CONSTRUCT(&orte_rml_ofi.recv_msg_queue_list,opal_list_t); - init_done = true; - orte_rml_ofi.api.tot_num_transports = orte_rml_ofi.conduit_open_num; - return &orte_rml_ofi.api; } else { opal_output_verbose(1,orte_rml_base_framework.framework_output, - "%s:%d Failed to open any OFIconduits",__FILE__,__LINE__); - return NULL; + "%s:%d Failed to open any OFI Providers",__FILE__,__LINE__); } + return orte_rml_ofi.ofi_prov_open_num; } -/* return : the ofi_conduit that corresponds to the transport requested by the attributes - if transport is not found RML_OFI_CONDUIT_ID_INVALID is returned. +/* return : the ofi_prov_id that corresponds to the transport requested by the attributes + if transport is not found RML_OFI_PROV_ID_INVALID is returned. @[in]attributes : the attributes passed in to open_conduit reg the transport requested */ -int get_ofi_conduit_id( opal_list_t *attributes) +int get_ofi_prov_id( opal_list_t *attributes) { - int ofi_conduit_id = RML_OFI_CONDUIT_ID_INVALID, prov_num=0; + int ofi_prov_id = RML_OFI_PROV_ID_INVALID, prov_num=0; char *provider = NULL, *transport = NULL; - char *ethernet="sockets", *fabric="fabric"; /*[TODO] - replace the string for fabric with right value for OPA*/ + char *ethernet="sockets", *fabric="psm2"; struct fi_info *cur_fi; /* check the list of attributes to see if we should respond @@ -1032,57 +869,350 @@ int get_ofi_conduit_id( opal_list_t *attributes) } /* if from the transport we don't know which provider we want, then check for the ORTE_RML_OFI_PROV_NAME_ATTRIB */ if ( NULL == provider) { - orte_get_attribute(attributes, ORTE_RML_OFI_PROV_NAME_ATTRIB, (void**)&provider, OPAL_STRING); + orte_get_attribute(attributes, ORTE_RML_PROVIDER_ATTRIB, (void**)&provider, OPAL_STRING); } - if (provider != NULL) + if (NULL != provider) { - // loop the orte_rml_ofi.conduits[] and find the provider name that matches - for ( prov_num = 0; prov_num < orte_rml_ofi.conduit_open_num && ofi_conduit_id == RML_OFI_CONDUIT_ID_INVALID ; prov_num++ ) { - cur_fi = orte_rml_ofi.ofi_conduits[prov_num].fabric_info; + // loop the orte_rml_ofi.ofi_provs[] and find the provider name that matches + for ( prov_num = 0; prov_num < orte_rml_ofi.ofi_prov_open_num && ofi_prov_id == RML_OFI_PROV_ID_INVALID ; prov_num++ ) { + cur_fi = orte_rml_ofi.ofi_prov[prov_num].fabric_info; + opal_output_verbose(20,orte_rml_base_framework.framework_output, + "%s - get_ofi_prov_id() -> comparing %s = %s ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),provider,cur_fi->fabric_attr->prov_name); if ( strcmp(provider,cur_fi->fabric_attr->prov_name) == 0) { - ofi_conduit_id = prov_num; + ofi_prov_id = prov_num; } } } - return ofi_conduit_id; + opal_output_verbose(20,orte_rml_base_framework.framework_output, + "%s - get_ofi_prov_id(), returning ofi_prov_id=%d ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ofi_prov_id); + return ofi_prov_id; } -static orte_rml_base_module_t* open_conduit(opal_list_t *attributes) +/* + * Allocate a new module and initialise ofi_prov information + * for the requested provider and return the module * + */ +static orte_rml_base_module_t* make_module( int ofi_prov_id) { orte_rml_ofi_module_t *mod = NULL; - int ofi_conduit_id = RML_OFI_CONDUIT_ID_INVALID; - ofi_conduit_id = get_ofi_conduit_id(attributes); - if ( ofi_conduit_id == RML_OFI_CONDUIT_ID_INVALID) { - return mod; + opal_output_verbose(20,orte_rml_base_framework.framework_output, + "%s - rml_ofi make_module() begin ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + + if ( RML_OFI_PROV_ID_INVALID == ofi_prov_id) { + opal_output_verbose(20,orte_rml_base_framework.framework_output, + "%s - open_conduit did not select any ofi provider, returning NULL ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + return NULL; } - /* we will provide this module - to make sure we don't open multiple modules we return * - * the one allocated at init associated with this ofi_conduit(transport) */ - mod = orte_rml_ofi.ofi_conduits[ofi_conduit_id].ofi_module; - /* setup the remaining data locations in mod to reflect the current conduit*/ - mod->cur_transport_id = ofi_conduit_id; - return mod; + /* create a new module */ + mod = (orte_rml_ofi_module_t*)calloc(1,sizeof(orte_rml_ofi_module_t)); + if (NULL == mod) { + opal_output_verbose(20,orte_rml_base_framework.framework_output, + "%s - Module allocation failed, returning NULL ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + return NULL; + } + /* copy the APIs over to it and the OFI provider information */ + memcpy(mod, &orte_rml_ofi, sizeof(orte_rml_ofi_module_t)); + /* setup the remaining data locations in mod, associate conduit with ofi provider selected*/ + mod->cur_transport_id = ofi_prov_id; + + return (orte_rml_base_module_t*)mod; } -static void orte_rml_ofi_fini(void *mod) +/* Order of attributes honoring * +* ORTE_RML_INCLUDE_COMP_ATTRIB * +* ORTE_RML_EXCLUDE_COMP_ATTRIB * +* ORTE_RML_TRANSPORT_ATTRIB * +* ORTE_RML_PROVIDER_ATTRIB */ +static orte_rml_base_module_t* open_conduit(opal_list_t *attributes) { - opal_list_item_t *item; - uint8_t conduit_id; - orte_rml_ofi_module_t* ofi_mod = (orte_rml_ofi_module_t *)mod; + char *comp_attrib = NULL; + char **comps; + int i; + orte_attribute_t *attr; + opal_output_verbose(20,orte_rml_base_framework.framework_output, + "%s - Entering rml_ofi_open_conduit()", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - opal_output_verbose(1,orte_rml_base_framework.framework_output, - " %s - orte_rml_ofi_fini() begin ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - //freeing module* created by open_conduit() is done in rml_ofi_component_close() + /* Open all ofi endpoints */ + if (!init_done) { + rml_ofi_component_init(); + init_done = true; + } + + /* check if atleast 1 ofi provider is initialised */ + if ( 0 >= orte_rml_ofi.ofi_prov_open_num) { + opal_output_verbose(20,orte_rml_base_framework.framework_output, + "%s - Init did not open any Ofi endpoints, returning NULL", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + return NULL; + } + + /* someone may require this specific component, so look for "ofi" */ + if (orte_get_attribute(attributes, ORTE_RML_INCLUDE_COMP_ATTRIB, (void**)&comp_attrib, OPAL_STRING) && + NULL != comp_attrib) { + /* they specified specific components - could be multiple */ + comps = opal_argv_split(comp_attrib, ','); + for (i=0; NULL != comps[i]; i++) { + if (0 == strcmp(comps[i], "ofi")) { + /* we are a candidate, */ + opal_argv_free(comps); + return make_module(get_ofi_prov_id(attributes)); + } + } + /* we are not a candidate */ + opal_argv_free(comps); + return NULL; + } else if (orte_get_attribute(attributes, ORTE_RML_EXCLUDE_COMP_ATTRIB, (void**)&comp_attrib, OPAL_STRING) && + NULL != comp_attrib) { + /* see if we are on the list */ + comps = opal_argv_split(comp_attrib, ','); + for (i=0; NULL != comps[i]; i++) { + if (0 == strcmp(comps[i], "ofi")) { + /* we cannot be a candidate */ + opal_argv_free(comps); + return NULL; + } + } + } + + /* Alternatively, check the attributes to see if we qualify - we only handle + * "pt2pt" */ + OPAL_LIST_FOREACH(attr, attributes, orte_attribute_t) { + /* [TODO] add any additional attributes check here */ - opal_output_verbose(1,orte_rml_base_framework.framework_output, - " %s - orte_rml_ofi_fini() completed ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + } + opal_output_verbose(20,orte_rml_base_framework.framework_output, + "%s - ofi is not a candidate as per attributes, returning NULL", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + /* if we get here, we cannot handle it */ + return NULL; +} + +static void pr_cons(orte_rml_ofi_peer_t *ptr) +{ + ptr->ofi_ep = NULL; + ptr->ofi_ep_len = 0; +} + +static void pr_des(orte_rml_ofi_peer_t *ptr) +{ + if ( 0 < ptr->ofi_ep_len) + free( ptr->ofi_ep); +} + +OBJ_CLASS_INSTANCE(orte_rml_ofi_peer_t, + opal_object_t, + pr_cons, pr_des); + + +/* The returned string will be of format - */ +/* ";ofi-socket:;ofi-:" */ +/* caller will take care of string length check to not exceed limit */ +static char* ofi_get_contact_info(void) +{ + char *turi, *final=NULL, *tmp, *addrtype; + int rc=ORTE_SUCCESS, cur_ofi_prov=0; + struct sockaddr_in* ep_sockaddr; + + /* start with our process name */ + if (ORTE_SUCCESS != (rc = orte_util_convert_process_name_to_string(&final, ORTE_PROC_MY_NAME))) { + /* [TODO] ORTE_ERROR_LOG(rc); */ + return final; + } + + /* The returned string will be of format - ";ofi-addr:;" */ + /* we are sending only the ethernet address */ + for( cur_ofi_prov=0; cur_ofi_prov < orte_rml_ofi.ofi_prov_open_num ; cur_ofi_prov++ ) { + if ( FI_SOCKADDR_IN == orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) { + ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name; + asprintf(&addrtype, OFIADDR); + asprintf(&turi,"%d,%s,%d",ep_sockaddr->sin_family,inet_ntoa(ep_sockaddr->sin_addr),ntohs(ep_sockaddr->sin_port)); + opal_output_verbose(20,orte_rml_base_framework.framework_output, + "%s - cur_ofi_prov = %d, addrtype = %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),cur_ofi_prov,addrtype); + /* Add to the final string - the ofi addrtype and the epname */ + asprintf(&tmp, "%s;%s:%s", final,addrtype, turi); + + free(addrtype); + free(turi); + free(final); + final = tmp; + } + } + opal_output_verbose(10,orte_rml_base_framework.framework_output, + "[%s] get_contact_info returns string - %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),final); + return final; +} + + +static void ofi_set_contact_info (const char *uri) +{ + char *uris; + + opal_output_verbose(5, orte_rml_base_framework.framework_output, + "%s: OFI set_contact_info to uri %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == uri) ? "NULL" : uri); + + /* if the request doesn't contain a URI, then we + * have an error + */ + if (NULL == uri) { + opal_output(0, "%s: NULL URI", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + /* [TODO] ORTE_FORCED_TERMINATE(1);*/ + return; + } + + uris = strdup(uri); + process_uri(uris); + free(uris); + return; +} + +static void process_uri( char *uri) +{ + orte_process_name_t peer; + char *cptr, *ofiuri; + char **uris=NULL; + int rc, i=0, tot_reqd = 1, tot_found = 0; + uint64_t ui64; + orte_rml_ofi_peer_t *pr; + void *ep_name; + struct sockaddr_in* ep_sockaddr; + /* find the first semi-colon in the string */ + cptr = strchr(uri, ';'); + if (NULL == cptr) { + /* got a problem - there must be at least two fields, + * the first containing the process name of our peer + * and all others containing the OOB contact info + */ + ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); + return; + } + *cptr = '\0'; + cptr++; + + /* the first field is the process name, so convert it */ + orte_util_convert_string_to_process_name(&peer, uri); + + /* if the peer is us, no need to go further as we already + * know our own contact info + */ + if (peer.jobid == ORTE_PROC_MY_NAME->jobid && + peer.vpid == ORTE_PROC_MY_NAME->vpid) { + opal_output_verbose(15, orte_rml_base_framework.framework_output, + "%s:OFI set_contact_info peer %s is me", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer)); + //skip adding to hashtable for HNP + if (!ORTE_PROC_IS_HNP) { + return; + } else { + opal_output_verbose(15, orte_rml_base_framework.framework_output, + "%s:OFI set_contact_info - HNP process so proceeding to add to hashtable", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) ); + } + } + + /* split the rest of the uri into component parts */ + uris = opal_argv_split(cptr, ';'); + + /* get the peer object for this process */ + memcpy(&ui64, (char*)&peer, sizeof(uint64_t)); + if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_rml_ofi.peers, + ui64, (void**)&pr) || + NULL == pr) { + pr = OBJ_NEW(orte_rml_ofi_peer_t); + /* populate the peer object with the ofi addresses */ + for(i=0; NULL != uris[i] && tot_found < tot_reqd; i++) { + ofiuri = strdup(uris[i]); + if (NULL == ofiuri) { + opal_output_verbose(2, orte_rml_base_framework.framework_output, + "%s rml:ofi: out of memory", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + continue; + } + /* Handle the OFI address types in the uri - OFIADDR(ofiaddr) */ + if (0 == strncmp(ofiuri, OFIADDR, strlen(OFIADDR)) ) { + /* allocate and initialise the peer object to be inserted in hashtable */ + pr->ofi_ep_len = sizeof(struct sockaddr_in); + ep_sockaddr = malloc( sizeof ( struct sockaddr_in) ); + /* ofiuri for socket provider is of format - ofi-socket: */ + convert_to_sockaddr(ofiuri, ep_sockaddr); + pr->ofi_ep = (void *)ep_sockaddr; + tot_found++; + } + free( ofiuri); + } + /* if atleast one OFI address is known for peer insert it */ + if( 1 <= tot_found ) { + if (OPAL_SUCCESS != + (rc = opal_hash_table_set_value_uint64(&orte_rml_ofi.peers, ui64, (void*)pr))) { + opal_output_verbose(15, orte_rml_base_framework.framework_output, + "%s: ofi peer address insertion failed for peer %s ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer)); + ORTE_ERROR_LOG(rc); + } + opal_output_verbose(15, orte_rml_base_framework.framework_output, + "%s: ofi peer address inserted for peer %s ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer)); + opal_output_verbose(15, orte_rml_base_framework.framework_output, + "%s: ofi sock address length = %zd ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + pr->ofi_ep_len); + struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)pr->ofi_ep; + opal_output_verbose(15,orte_rml_base_framework.framework_output, + "%s OFI set_name() port = 0x%x, InternetAddr = %s ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),inet_ntoa(ep_sockaddr->sin_addr)); + } + } + opal_output_verbose(10,orte_rml_base_framework.framework_output, + "%s OFI end of set_contact_info()", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + opal_argv_free(uris); + return; } + + +/* converts the socket uri returned by get_contact_info into sockaddr_in */ +void convert_to_sockaddr( char *ofiuri, struct sockaddr_in* ep_sockaddr) +{ + char *tmp, *sin_fly, *sin_port, *sin_addr; + short port; + int res; + + tmp = strchr(ofiuri,':'); + sin_fly = tmp+1; + tmp = strchr(sin_fly,','); + sin_addr = tmp+1; + *tmp = '\0'; + tmp = strchr(sin_addr,','); + sin_port = tmp + 1; + *tmp = '\0'; + + opal_output_verbose(10,orte_rml_base_framework.framework_output, + "%s OFI convert_to_sockaddr uri strings got -> family = %s, InternetAddr = %s, port = %s ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),sin_fly,sin_addr, sin_port); + ep_sockaddr->sin_family = atoi( sin_fly ); + port = atoi( sin_port); + ep_sockaddr->sin_port = htons(port); + res = inet_aton(sin_addr,(struct in_addr *)&ep_sockaddr->sin_addr); + opal_output_verbose(10,orte_rml_base_framework.framework_output, + "%s OFI convert_to_sockaddr() port = 0x%x, InternetAddr = %s ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port), + inet_ntoa(ep_sockaddr->sin_addr)); +} diff --git a/orte/mca/rml/ofi/rml_ofi_request.h b/orte/mca/rml/ofi/rml_ofi_request.h index 191a3f8e9f5..54b8203ae84 100644 --- a/orte/mca/rml/ofi/rml_ofi_request.h +++ b/orte/mca/rml/ofi/rml_ofi_request.h @@ -79,9 +79,9 @@ typedef struct { orte_rml_send_t *send; - /** OFI conduit_id the request will use - this is - * the reference to element into the orte_rml_ofi.ofi_conduits[] **/ - uint8_t conduit_id; + /** OFI provider_id the request will use - this is + * the reference to element into the orte_rml_ofi.ofi_prov[] **/ + uint8_t ofi_prov_id; /** OFI Request type */ orte_rml_ofi_request_type_t type; @@ -124,4 +124,14 @@ typedef struct { } ofi_recv_msg_queue_t; OBJ_CLASS_DECLARATION( ofi_recv_msg_queue_t); +/* define an object for transferring send requests to the event lib */ +typedef struct { + opal_object_t super; + opal_event_t ev; + orte_rml_send_t send; + /* ofi provider id */ + int ofi_prov_id; +} ofi_send_request_t; +OBJ_CLASS_DECLARATION(ofi_send_request_t); + #endif diff --git a/orte/mca/rml/ofi/rml_ofi_send.c b/orte/mca/rml/ofi/rml_ofi_send.c index c6674ecb055..c833dc5ecfd 100644 --- a/orte/mca/rml/ofi/rml_ofi_send.c +++ b/orte/mca/rml/ofi/rml_ofi_send.c @@ -40,6 +40,14 @@ OBJ_CLASS_INSTANCE(orte_rml_ofi_request_t, ofi_req_cons, ofi_req_des); +static void ofi_send_req_cons(ofi_send_request_t *ptr) +{ + OBJ_CONSTRUCT(&ptr->send, orte_rml_send_t); +} +OBJ_CLASS_INSTANCE(ofi_send_request_t, + opal_object_t, + ofi_send_req_cons, NULL); + OBJ_CLASS_INSTANCE(orte_rml_ofi_send_pkt_t, opal_list_item_t, NULL, NULL); @@ -48,6 +56,7 @@ OBJ_CLASS_INSTANCE(orte_rml_ofi_recv_pkt_t, opal_list_item_t, NULL, NULL); + static void ofi_recv_msg_queue_cons(ofi_recv_msg_queue_t *ptr) { ptr->msgid = 0; @@ -73,7 +82,7 @@ int orte_rml_ofi_send_callback(struct fi_cq_data_entry *wc, orte_rml_ofi_request_t* ofi_req) { orte_rml_ofi_send_pkt_t *ofi_send_pkt, *next; - opal_output_verbose(1, orte_rml_base_framework.framework_output, + opal_output_verbose(10, orte_rml_base_framework.framework_output, "%s orte_rml_ofi_send_callback called, completion count = %d, msgid = %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ofi_req->completion_count, ofi_req->hdr.msgid); assert(ofi_req->completion_count > 0); @@ -81,7 +90,7 @@ int orte_rml_ofi_send_callback(struct fi_cq_data_entry *wc, if ( 0 == ofi_req->completion_count ) { // call the callback fn of the sender ofi_req->send->status = ORTE_SUCCESS; - opal_output_verbose(1, orte_rml_base_framework.framework_output, + opal_output_verbose(10, orte_rml_base_framework.framework_output, "%s calling ORTE_RML_SEND_COMPLETE macro for msgid = %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ofi_req->hdr.msgid); ORTE_RML_SEND_COMPLETE(ofi_req->send); @@ -112,7 +121,7 @@ int orte_rml_ofi_send_callback(struct fi_cq_data_entry *wc, int orte_rml_ofi_error_callback(struct fi_cq_err_entry *error, orte_rml_ofi_request_t* ofi_req) { - opal_output_verbose(1, orte_rml_base_framework.framework_output, + opal_output_verbose(10, orte_rml_base_framework.framework_output, "%s orte_rml_ofi_error_callback called ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) ); switch(error->err) { @@ -128,7 +137,7 @@ int orte_rml_ofi_error_callback(struct fi_cq_err_entry *error, /* [Desc] This is called from the progress fn when a recv completion ** is received in the cq ** wc [in] : the completion queue data entry */ -int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id) +int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t ofi_prov_id) { orte_rml_ofi_msg_header_t msg_hdr; uint32_t msglen, datalen = 0; @@ -137,22 +146,22 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id) orte_rml_ofi_recv_pkt_t *ofi_recv_pkt, *new_pkt, *next; bool msg_in_queue = false; - opal_output_verbose(1, orte_rml_base_framework.framework_output, + opal_output_verbose(10, orte_rml_base_framework.framework_output, "%s orte_rml_ofi_recv_handler called ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) ); /*copy the header and data from buffer and pass it on - ** since this is the conduit recv buffer don't want it to be released as + ** since this is the ofi_prov recv buffer don't want it to be released as ** considering re-using it, so for now copying to newly allocated *data ** the *data will be released by orte_rml_base functions */ memcpy(&msg_hdr,wc->buf,sizeof(orte_rml_ofi_msg_header_t)); msglen = wc->len - sizeof(orte_rml_ofi_msg_header_t); - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s Received packet -> msg id = %d wc->len = %d, msglen = %d", + opal_output_verbose(10, orte_rml_base_framework.framework_output, + "%s Received packet -> msg id = %d wc->len = %lu, msglen = %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg_hdr.msgid, wc->len, msglen ); data = (char *)malloc(msglen); - memcpy(data,(wc->buf+sizeof(orte_rml_ofi_msg_header_t)),msglen); - opal_output_verbose(10, orte_rml_base_framework.framework_output, + memcpy(data,((char *)wc->buf+sizeof(orte_rml_ofi_msg_header_t)),msglen); + opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s header info of received packet -> cur_pkt_num = %d, tot_pkts = %d ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg_hdr.cur_pkt_num, msg_hdr.tot_pkts ); /* To accomodate message bigger than recv buffer size, @@ -186,25 +195,25 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id) msg_in_queue = true; opal_output_verbose(10, orte_rml_base_framework.framework_output, - "%s msgid %d, tot_pkts=%d, opal_list_get_size()=%d,total pkt_recd=%d", + "%s msgid %d, tot_pkts=%d, opal_list_get_size()=%lu,total pkt_recd=%d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv_msg_queue->msgid, recv_msg_queue->tot_pkts, opal_list_get_size(&recv_msg_queue->pkt_list), recv_msg_queue->pkt_recd ); if( recv_msg_queue->tot_pkts == (recv_msg_queue->pkt_recd +1) ) { /* all packets received for this message - post message to rml and remove this from queue */ opal_output_verbose(10, orte_rml_base_framework.framework_output, - "%s All packets recd for msgid %d, tot_pkts=%d, opal_list_get_size()=%d,total pkt_recd=%d", + "%s All packets recd for msgid %d, tot_pkts=%d, opal_list_get_size()=%lu,total pkt_recd=%d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv_msg_queue->msgid, recv_msg_queue->tot_pkts, opal_list_get_size(&recv_msg_queue->pkt_list), recv_msg_queue->pkt_recd ); totdata = NULL; datalen = 0; OPAL_LIST_FOREACH(ofi_recv_pkt, &recv_msg_queue->pkt_list, orte_rml_ofi_recv_pkt_t) { opal_output_verbose(10, orte_rml_base_framework.framework_output, - "%s Adding data for packet %d, pktlength = %d, cumulative datalen so far = %d", + "%s Adding data for packet %d, pktlength = %lu, cumulative datalen so far = %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ofi_recv_pkt->cur_pkt_num, ofi_recv_pkt->pkt_size, datalen ); if (0 == datalen) { totdata = (char *)malloc(ofi_recv_pkt->pkt_size); if( totdata == NULL) { - opal_output_verbose(10, orte_rml_base_framework.framework_output, + opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s Error: malloc failed for msgid %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),recv_msg_queue->msgid ); return 1; //[TODO: error-handling needs to be implemented @@ -216,7 +225,7 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id) if (NULL != totdata ) { memcpy((totdata+datalen),ofi_recv_pkt->data,ofi_recv_pkt->pkt_size); } else { - opal_output_verbose(10, orte_rml_base_framework.framework_output, + opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s Error: realloc failed for msgid %d, from sender jobid=%d, sender vpid=%d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv_msg_queue->msgid, recv_msg_queue->sender.jobid, recv_msg_queue->sender.vpid); @@ -229,13 +238,13 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ofi_recv_pkt->cur_pkt_num,datalen); } opal_output_verbose(10, orte_rml_base_framework.framework_output, - "%s Adding leftover data recd, datalen = %d, new_pkt->pkt_size = %d", + "%s Adding leftover data recd, datalen = %d, new_pkt->pkt_size = %lu", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), datalen, new_pkt->pkt_size); //add the last packet totdata =realloc(totdata,datalen+new_pkt->pkt_size); if( NULL != totdata ) { opal_output_verbose(10, orte_rml_base_framework.framework_output, - "%s Realloc completed for leftover data recd, datalen = %d, new->pkt->pkt_size = %d", + "%s Realloc completed for leftover data recd, datalen = %d, new->pkt->pkt_size = %lu", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), datalen, new_pkt->pkt_size); nextpkt = totdata+datalen; opal_output_verbose(10, orte_rml_base_framework.framework_output, @@ -243,7 +252,7 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), totdata, nextpkt); memcpy(nextpkt,new_pkt->data,new_pkt->pkt_size); opal_output_verbose(10, orte_rml_base_framework.framework_output, - "%s memcpy completed for leftover data recd, datalen = %d, new->pkt->pkt_size = %d", + "%s memcpy completed for leftover data recd, datalen = %d, new->pkt->pkt_size = %lu", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), datalen, new_pkt->pkt_size); datalen += new_pkt->pkt_size; opal_output_verbose(10, orte_rml_base_framework.framework_output, @@ -276,7 +285,7 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) ); OBJ_RELEASE(recv_msg_queue); } else { - opal_output_verbose(10, orte_rml_base_framework.framework_output, + opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s Error: realloc failed for msgid %d, from sender jobid=%d, sender vpid=%d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv_msg_queue->msgid, recv_msg_queue->sender.jobid, recv_msg_queue->sender.vpid); @@ -326,20 +335,21 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id) static void send_msg(int fd, short args, void *cbdata) { - orte_rml_send_request_t *req = (orte_rml_send_request_t*)cbdata; + ofi_send_request_t *req = (ofi_send_request_t*)cbdata; orte_process_name_t *peer = &(req->send.dst); orte_rml_tag_t tag = req->send.tag; - char *dest_ep_name; + char *dest_ep_name, *pmix_key; size_t dest_ep_namelen = 0; int ret = OPAL_ERROR; uint32_t total_packets; fi_addr_t dest_fi_addr; orte_rml_send_t *snd; orte_rml_ofi_request_t* ofi_send_req = OBJ_NEW( orte_rml_ofi_request_t ); - uint8_t conduit_id = req->conduit_id; + uint8_t ofi_prov_id = req->ofi_prov_id; orte_rml_ofi_send_pkt_t* ofi_msg_pkt; size_t datalen_per_pkt, hdrsize, data_in_pkt; // the length of data in per packet excluding the header size - + orte_rml_ofi_peer_t* pr; + uint64_t ui64; snd = OBJ_NEW(orte_rml_send_t); snd->dst = *peer; @@ -364,40 +374,49 @@ static void send_msg(int fd, short args, void *cbdata) /* get the peer address by doing modex_receive */ opal_output_verbose(10, orte_rml_base_framework.framework_output, "%s calling OPAL_MODEX_RECV_STRING ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) ); - switch ( orte_rml_ofi.ofi_conduits[conduit_id].fabric_info->addr_format) - { - case FI_SOCKADDR_IN : - OPAL_MODEX_RECV_STRING(ret, OPAL_RML_OFI_FI_SOCKADDR_IN, peer , (char **) &dest_ep_name, &dest_ep_namelen); - /*print the sockaddr - port and s_addr */ - struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*) dest_ep_name; - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "%s obtained for peer %s port = 0x%printinx, InternetAddr = %s ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ORTE_NAME_PRINT(peer),ntohs(ep_sockaddr->sin_port), - inet_ntoa(ep_sockaddr->sin_addr)); - break; - case FI_ADDR_PSMX : - OPAL_MODEX_RECV_STRING(ret, OPAL_RML_OFI_FI_ADDR_PSMX, peer , (char **) &dest_ep_name, &dest_ep_namelen); - break; - default: - /* we shouldn't be getting here as only above are supported and address sent - * to PMIX (OPAL_MODEX_SEND) in orte_component_init() */ - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s Error: Unhandled address format type in ofi_send_msg", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN; - ORTE_RML_SEND_COMPLETE(snd); - return; + // if dest is same as me then instead of doing lookup just populate the dest_ep_name + if (peer->jobid == ORTE_PROC_MY_NAME->jobid && peer->vpid == ORTE_PROC_MY_NAME->vpid) { + dest_ep_namelen = orte_rml_ofi.ofi_prov[ofi_prov_id].epnamelen; + dest_ep_name = (char *)calloc(dest_ep_namelen,sizeof(char)); + memcpy( dest_ep_name, orte_rml_ofi.ofi_prov[ofi_prov_id].ep_name,dest_ep_namelen); + opal_output_verbose(10, orte_rml_base_framework.framework_output, + "%s rml:ofi: send and dest are same so proceeding with cur provider ep_name ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + ret = OPAL_SUCCESS; + } else { + if (ORTE_PROC_IS_APP ) { + asprintf(&pmix_key,"%s%d",orte_rml_ofi.ofi_prov[ofi_prov_id].fabric_info->fabric_attr->prov_name,ofi_prov_id); + opal_output_verbose(10, orte_rml_base_framework.framework_output, + "%s calling OPAL_MODEX_RECV_STRING peer - %s, key - %s ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer),pmix_key ); + OPAL_MODEX_RECV_STRING(ret, pmix_key, peer , (char **) &dest_ep_name, &dest_ep_namelen); + opal_output_verbose(10, orte_rml_base_framework.framework_output, "Returned from MODEX_RECV"); + free(pmix_key); + } else { + memcpy(&ui64, (char*)peer, sizeof(uint64_t)); + if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_rml_ofi.peers, + ui64, (void**)&pr) || NULL == pr) { + opal_output_verbose(2, orte_rml_base_framework.framework_output, + "%s rml:ofi: Send failed to get peer OFI contact info ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + return; + } + dest_ep_name = pr->ofi_ep; + dest_ep_namelen = pr->ofi_ep_len; + ret = OPAL_SUCCESS; + } } opal_output_verbose(50, orte_rml_base_framework.framework_output, - "%s Return value from OPAL_MODEX_RECV_STRING - %d, length returned - %d", + "%s Return value from OPAL_MODEX_RECV_STRING - %d, length returned - %lu", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ret, dest_ep_namelen); if ( OPAL_SUCCESS == ret) { opal_output_verbose(10, orte_rml_base_framework.framework_output, - "%s OPAL_MODEX_RECV succeded, %s peer ep name obtained. length=%d", + "%s OPAL_MODEX_RECV succeded, %s peer ep name obtained. length=%lu", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer), dest_ep_namelen); - ret = fi_av_insert(orte_rml_ofi.ofi_conduits[conduit_id].av, dest_ep_name,1,&dest_fi_addr,0,NULL); + ret = fi_av_insert(orte_rml_ofi.ofi_prov[ofi_prov_id].av, dest_ep_name,1,&dest_fi_addr,0,NULL); if( ret != 1) { opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s fi_av_insert failed in send_msg() returned %d", @@ -456,7 +475,7 @@ static void send_msg(int fd, short args, void *cbdata) ofi_send_req->data_blob = (char *)malloc(ofi_send_req->length); int iovlen=0; for (int i=0; i < ofi_send_req->send->count; i++) { - memcpy((ofi_send_req->data_blob + iovlen ), + memcpy(((char *)ofi_send_req->data_blob + iovlen ), ofi_send_req->send->iov[i].iov_base, ofi_send_req->send->iov[i].iov_len); iovlen += ofi_send_req->send->iov[i].iov_len; @@ -471,8 +490,8 @@ static void send_msg(int fd, short args, void *cbdata) } - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s Completed copying all data into ofi_send_req->data_blob, total data - %d bytes", + opal_output_verbose(15, orte_rml_base_framework.framework_output, + "%s Completed copying all data into ofi_send_req->data_blob, total data - %lu bytes", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ofi_send_req->length ); /* Each packet will have header information, so the data length in each packet is datalen_per_packet. @@ -486,8 +505,8 @@ static void send_msg(int fd, short args, void *cbdata) } ofi_send_req->hdr.tot_pkts = total_packets; - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s datalen_per_pkt = %d, ofi_send_req->length= %d, total packets = %d", + opal_output_verbose(15, orte_rml_base_framework.framework_output, + "%s datalen_per_pkt = %lu, ofi_send_req->length= %d, total packets = %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), datalen_per_pkt, ofi_send_req->length, total_packets ); /* in a loop send create and send the packets */ @@ -498,25 +517,22 @@ static void send_msg(int fd, short args, void *cbdata) data_in_pkt = ((ofi_send_req->length - sent_data) >= datalen_per_pkt) ? datalen_per_pkt : (ofi_send_req->length - sent_data); ofi_msg_pkt->pkt_size = hdrsize + data_in_pkt; - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s Packet %d -> data_in_pkt= %d, header_size= %d, pkt_size=%d", + opal_output_verbose(15, orte_rml_base_framework.framework_output, + "%s Packet %lu -> data_in_pkt= %lu, header_size= %lu, pkt_size=%lu", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pkt_num,data_in_pkt,hdrsize,ofi_msg_pkt->pkt_size ); /* copy the header and data for this pkt */ ofi_msg_pkt->data = malloc( ofi_msg_pkt->pkt_size); memcpy(ofi_msg_pkt->data, &ofi_send_req->hdr, hdrsize ); - memcpy( (ofi_msg_pkt->data + hdrsize ), + memcpy( ( (char *)ofi_msg_pkt->data + hdrsize ), (ofi_send_req->data_blob + sent_data), data_in_pkt); - opal_output_verbose(1, orte_rml_base_framework.framework_output, + opal_output_verbose(15, orte_rml_base_framework.framework_output, "%s Copying header, data into packets completed", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) ); /* add it to list */ - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s Before adding packet %d to list. List addr -> 0x%x, ofi_msg_pkt->super is 0x%x", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),pkt_num,&(ofi_send_req->pkt_list), &ofi_msg_pkt->super ); opal_list_append(&(ofi_send_req->pkt_list), &ofi_msg_pkt->super); - opal_output_verbose(1, orte_rml_base_framework.framework_output, - "%s adding packet %d to list done successful", + opal_output_verbose(15, orte_rml_base_framework.framework_output, + "%s adding packet %lu to list done successful", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),pkt_num ); sent_data += data_in_pkt; } @@ -533,21 +549,21 @@ static void send_msg(int fd, short args, void *cbdata) /* debug purpose - copying the header from packet to verify if it is correct */ struct orte_rml_ofi_msg_header_t *cur_hdr; cur_hdr = (struct orte_rml_ofi_msg_header_t* ) ofi_msg_pkt->data; - opal_output_verbose(1, orte_rml_base_framework.framework_output, + opal_output_verbose(10, orte_rml_base_framework.framework_output, "%s Sending Pkt[%d] of total %d pkts for msgid:%d to peer %s with tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), cur_hdr->cur_pkt_num, ofi_send_req->completion_count, cur_hdr->msgid, ORTE_NAME_PRINT(peer), tag); /* end debug*/ - RML_OFI_RETRY_UNTIL_DONE(fi_send(orte_rml_ofi.ofi_conduits[conduit_id].ep, + RML_OFI_RETRY_UNTIL_DONE(fi_send(orte_rml_ofi.ofi_prov[ofi_prov_id].ep, ofi_msg_pkt->data, ofi_msg_pkt->pkt_size, - fi_mr_desc(orte_rml_ofi.ofi_conduits[conduit_id].mr_multi_recv), + fi_mr_desc(orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv), dest_fi_addr, (void *)&ofi_send_req->ctx)); } - opal_output_verbose(1, orte_rml_base_framework.framework_output, + opal_output_verbose(10, orte_rml_base_framework.framework_output, "%s End of send_msg_transport. fi_send completed to peer %s with tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer), tag); @@ -556,7 +572,7 @@ static void send_msg(int fd, short args, void *cbdata) OBJ_RELEASE(req); } -int orte_rml_ofi_send_nb(void* mod, +int orte_rml_ofi_send_nb(struct orte_rml_base_module_t* mod, orte_process_name_t* peer, struct iovec* iov, int count, @@ -564,19 +580,19 @@ int orte_rml_ofi_send_nb(void* mod, orte_rml_callback_fn_t cbfunc, void* cbdata) { - orte_rml_send_request_t *req; + ofi_send_request_t *req; orte_rml_ofi_module_t *ofi_mod = (orte_rml_ofi_module_t*)mod; - int conduit_id = ofi_mod->cur_transport_id; + int ofi_prov_id = ofi_mod->cur_transport_id; - opal_output_verbose(1, orte_rml_base_framework.framework_output, + opal_output_verbose(10, orte_rml_base_framework.framework_output, "%s rml_ofi_send_transport to peer %s at tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer), tag); - if( (0 > conduit_id) || ( conduit_id >= orte_rml_ofi.conduit_open_num ) ) { - /* Invalid conduit ID provided */ + if( (0 > ofi_prov_id) || ( ofi_prov_id >= orte_rml_ofi.ofi_prov_open_num ) ) { + /* Invalid ofi_prov ID provided */ ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); return ORTE_ERR_BAD_PARAM; } @@ -594,8 +610,8 @@ int orte_rml_ofi_send_nb(void* mod, /* get ourselves into an event to protect against * race conditions and threads */ - req = OBJ_NEW(orte_rml_send_request_t); - req->conduit_id = conduit_id; + req = OBJ_NEW(ofi_send_request_t); + req->ofi_prov_id = ofi_prov_id; req->send.dst = *peer; req->send.iov = iov; req->send.count = count; @@ -612,25 +628,25 @@ int orte_rml_ofi_send_nb(void* mod, } -int orte_rml_ofi_send_buffer_nb(void* mod, +int orte_rml_ofi_send_buffer_nb(struct orte_rml_base_module_t *mod, orte_process_name_t* peer, struct opal_buffer_t* buffer, orte_rml_tag_t tag, orte_rml_buffer_callback_fn_t cbfunc, void* cbdata) { - orte_rml_send_request_t *req; + ofi_send_request_t *req; orte_rml_ofi_module_t *ofi_mod = (orte_rml_ofi_module_t*)mod; - int conduit_id = ofi_mod->cur_transport_id; + int ofi_prov_id = ofi_mod->cur_transport_id; - opal_output_verbose(1, orte_rml_base_framework.framework_output, + opal_output_verbose(10, orte_rml_base_framework.framework_output, "%s rml_ofi_send_buffer_transport to peer %s at tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer), tag); - if( (0 > conduit_id) || ( conduit_id >= orte_rml_ofi.conduit_open_num ) ) { - /* Invalid conduit ID provided */ + if( (0 > ofi_prov_id) || ( ofi_prov_id >= orte_rml_ofi.ofi_prov_open_num ) ) { + /* Invalid ofi_prov ID provided */ ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); return ORTE_ERR_BAD_PARAM; } @@ -648,8 +664,8 @@ int orte_rml_ofi_send_buffer_nb(void* mod, /* get ourselves into an event to protect against * race conditions and threads */ - req = OBJ_NEW(orte_rml_send_request_t); - req->conduit_id = conduit_id; + req = OBJ_NEW(ofi_send_request_t); + req->ofi_prov_id = ofi_prov_id; req->send.dst = *peer; req->send.buffer = buffer; req->send.tag = tag; diff --git a/orte/test/system/ofi_conduit_stress.c b/orte/test/system/ofi_conduit_stress.c index 2696856b9d6..52f1ff6f2bc 100644 --- a/orte/test/system/ofi_conduit_stress.c +++ b/orte/test/system/ofi_conduit_stress.c @@ -35,56 +35,6 @@ static void send_callback(int status, orte_process_name_t *peer, msg_active = false; } -//debug routine to print the opal_value_t returned by query interface -void print_transports_query() -{ - opal_value_t *providers=NULL; - char* prov_name = NULL; - int ret; - int32_t *protocol_ptr, protocol; - int8_t conduit_id; - int8_t *prov_num=&conduit_id; - - protocol_ptr = &protocol; - opal_output(0, "\n Current conduits loaded in rml-ofi ==>"); - /*opal_output(0,"\n print_transports_query() Begin- %s:%d",__FILE__,__LINE__); - opal_output(0,"\n calling the orte_rml_ofi_query_transports() ");*/ - if( ORTE_SUCCESS == orte_rml.query_transports(&providers)) { - //opal_output(0,"\n query_transports() completed, printing details\n"); - while (providers) { - //get the first opal_list_t; - opal_list_t temp; - opal_list_t *prov = &temp; - - ret = opal_value_unload(providers,(void **)&prov,OPAL_PTR); - if (ret == OPAL_SUCCESS) { - //opal_output(0,"\n %s:%d opal_value_unload() succeeded, opal_list* prov = %x",__FILE__,__LINE__,prov); - if( orte_get_attribute( prov, ORTE_CONDUIT_ID, (void **)&prov_num,OPAL_UINT8)) { - opal_output(0," Provider conduit_id : %d",*prov_num); - } - if( orte_get_attribute( prov, ORTE_PROTOCOL, (void **)&protocol_ptr,OPAL_UINT32)) { - opal_output(0," Protocol : %d",*protocol_ptr); - } - if( orte_get_attribute( prov, ORTE_PROV_NAME, (void **)&prov_name ,OPAL_STRING)) { - opal_output(0," Provider name : %s",prov_name); - } else { - opal_output(0," Error in getting Provider name"); - } - } else { - opal_output(0," %s:%d opal_value_unload() failed, opal_list* prov = %x",__FILE__,__LINE__,prov); - } - providers = (opal_value_t *)providers->super.opal_list_next; - // opal_output_verbose(1,orte_rml_base_framework.framework_output,"\n %s:%d - - // Moving on to next provider provders=%x",__FILE__,__LINE__,providers); - } - } else { - opal_output(0,"\n query_transports() returned Error "); - } - //opal_output(0,"\n End of print_transports_query() from ofi_query_test.c \n"); - - //need to free all the providers here -} - int main(int argc, char *argv[]){ @@ -99,18 +49,18 @@ main(int argc, char *argv[]){ int conduit_id = 0; //use the first available conduit struct timeval start, end; opal_list_t *conduit_attr; - + /* * Init */ orte_init(&argc, &argv, ORTE_PROC_NON_MPI); - print_transports_query(); + conduit_attr = OBJ_NEW(opal_list_t); - if( ORTE_SUCCESS == - ( orte_set_attribute( conduit_attr, ORTE_RML_OFI_PROV_NAME_ATTRIB, ORTE_ATTR_GLOBAL,"sockets",OPAL_STRING))) { - if( ORTE_SUCCESS == + if( ORTE_SUCCESS == + ( orte_set_attribute( conduit_attr, ORTE_RML_PROVIDER_ATTRIB, ORTE_ATTR_GLOBAL,"sockets",OPAL_STRING))) { + if( ORTE_SUCCESS == ( orte_set_attribute( conduit_attr, ORTE_RML_INCLUDE_COMP_ATTRIB, ORTE_ATTR_GLOBAL,"ofi",OPAL_STRING))) { opal_output(0, "%s calling open_conduit with ORTE_RML_INCLUDE_COMP_ATTRIB and ORTE_RML_OFI_PROV_NAME_ATTRIB", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));