11/*
2- * Copyright (c) 2013-2017 Intel, Inc. All rights reserved
2+ * Copyright (c) 2013-2018 Intel, Inc. All rights reserved
33 * Copyright (c) 2017 Los Alamos National Security, LLC. All rights
44 * reserved.
55 *
@@ -244,6 +244,7 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
244244 ompi_proc_t * ompi_proc = NULL ;
245245 mca_mtl_ofi_endpoint_t * endpoint = NULL ;
246246 ompi_mtl_ofi_request_t * ack_req = NULL ; /* For synchronous send */
247+ fi_addr_t src_addr = 0 ;
247248
248249 ompi_proc = ompi_comm_peer_lookup (comm , dest );
249250 endpoint = ompi_mtl_ofi_get_endpoint (mtl , ompi_proc );
@@ -255,6 +256,15 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
255256 ofi_req -> length = length ;
256257 ofi_req -> status .MPI_ERROR = OMPI_SUCCESS ;
257258
259+ if (ompi_mtl_ofi .fi_cq_data ) {
260+ match_bits = mtl_ofi_create_send_tag_CQD (comm -> c_contextid , tag );
261+ src_addr = endpoint -> peer_fiaddr ;
262+ } else {
263+ match_bits = mtl_ofi_create_send_tag (comm -> c_contextid ,
264+ comm -> c_my_rank , tag );
265+ /* src_addr is ignored when FI_DIRECTED_RECV is not supported */
266+ }
267+
258268 if (OPAL_UNLIKELY (MCA_PML_BASE_SEND_SYNCHRONOUS == mode )) {
259269 ack_req = malloc (sizeof (ompi_mtl_ofi_request_t ));
260270 assert (ack_req );
@@ -263,14 +273,15 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
263273 ack_req -> error_callback = ompi_mtl_ofi_send_ack_error_callback ;
264274
265275 ofi_req -> completion_count = 2 ;
266- MTL_OFI_SET_SEND_BITS (match_bits , comm -> c_contextid ,
267- comm -> c_my_rank , tag , MTL_OFI_SYNC_SEND );
276+
277+ MTL_OFI_SET_SYNC_SEND (match_bits );
278+
268279 MTL_OFI_RETRY_UNTIL_DONE (fi_trecv (ompi_mtl_ofi .ep ,
269280 NULL ,
270281 0 ,
271282 NULL ,
272- endpoint -> peer_fiaddr ,
273- match_bits | MTL_OFI_SYNC_SEND_ACK ,
283+ src_addr ,
284+ match_bits | ompi_mtl_ofi . sync_send_ack ,
274285 0 , /* Exact match, no ignore bits */
275286 (void * ) & ack_req -> ctx ));
276287 if (OPAL_UNLIKELY (0 > ret )) {
@@ -282,20 +293,30 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
282293 }
283294 } else {
284295 ofi_req -> completion_count = 1 ;
285- MTL_OFI_SET_SEND_BITS (match_bits , comm -> c_contextid ,
286- comm -> c_my_rank , tag , 0 );
287296 }
288297
289298 if (ompi_mtl_ofi .max_inject_size >= length ) {
290- MTL_OFI_RETRY_UNTIL_DONE (fi_tinject (ompi_mtl_ofi .ep ,
299+ if (ompi_mtl_ofi .fi_cq_data ) {
300+ MTL_OFI_RETRY_UNTIL_DONE (fi_tinjectdata (ompi_mtl_ofi .ep ,
301+ start ,
302+ length ,
303+ comm -> c_my_rank ,
304+ endpoint -> peer_fiaddr ,
305+ match_bits ));
306+ } else {
307+ MTL_OFI_RETRY_UNTIL_DONE (fi_tinject (ompi_mtl_ofi .ep ,
291308 start ,
292309 length ,
293310 endpoint -> peer_fiaddr ,
294311 match_bits ));
312+ }
313+
295314 if (OPAL_UNLIKELY (0 > ret )) {
315+ char * fi_api = ompi_mtl_ofi .fi_cq_data ? "fi_tinjectddata" : "fi_tinject" ;
296316 opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
297- "%s:%d: fi_tinject failed: %s(%zd)" ,
298- __FILE__ , __LINE__ , fi_strerror (- ret ), ret );
317+ "%s:%d: %s failed: %s(%zd)" ,
318+ __FILE__ , __LINE__ ,fi_api , fi_strerror (- ret ), ret );
319+
299320 if (ack_req ) {
300321 fi_cancel ((fid_t )ompi_mtl_ofi .ep , & ack_req -> ctx );
301322 free (ack_req );
@@ -305,17 +326,29 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
305326
306327 ofi_req -> event_callback (NULL ,ofi_req );
307328 } else {
308- MTL_OFI_RETRY_UNTIL_DONE (fi_tsend (ompi_mtl_ofi .ep ,
329+ if (ompi_mtl_ofi .fi_cq_data ) {
330+ MTL_OFI_RETRY_UNTIL_DONE (fi_tsenddata (ompi_mtl_ofi .ep ,
309331 start ,
310332 length ,
311333 NULL ,
334+ comm -> c_my_rank ,
312335 endpoint -> peer_fiaddr ,
313336 match_bits ,
314337 (void * ) & ofi_req -> ctx ));
338+ } else {
339+ MTL_OFI_RETRY_UNTIL_DONE (fi_tsend (ompi_mtl_ofi .ep ,
340+ start ,
341+ length ,
342+ NULL ,
343+ endpoint -> peer_fiaddr ,
344+ match_bits ,
345+ (void * ) & ofi_req -> ctx ));
346+ }
315347 if (OPAL_UNLIKELY (0 > ret )) {
348+ char * fi_api = ompi_mtl_ofi .fi_cq_data ? "fi_tsendddata" : "fi_send" ;
316349 opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
317- "%s:%d: fi_tsend failed: %s(%zd)" ,
318- __FILE__ , __LINE__ , fi_strerror (- ret ), ret );
350+ "%s:%d: %s failed: %s(%zd)" ,
351+ __FILE__ , __LINE__ ,fi_api , fi_strerror (- ret ), ret );
319352 return ompi_mtl_ofi_get_error (ret );
320353 }
321354 }
@@ -415,7 +448,7 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
415448 ssize_t ret ;
416449 ompi_proc_t * ompi_proc = NULL ;
417450 mca_mtl_ofi_endpoint_t * endpoint = NULL ;
418- int src ;
451+ int src = mtl_ofi_get_source ( wc ) ;
419452 ompi_status_public_t * status = NULL ;
420453
421454 assert (ofi_req -> super .ompi_req );
@@ -427,7 +460,7 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
427460 */
428461 ofi_req -> req_started = true;
429462
430- status -> MPI_SOURCE = MTL_OFI_GET_SOURCE ( wc -> tag ) ;
463+ status -> MPI_SOURCE = src ;
431464 status -> MPI_TAG = MTL_OFI_GET_TAG (wc -> tag );
432465 status -> _ucount = wc -> len ;
433466
@@ -474,7 +507,6 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
474507 * we need to extract the source's actual address.
475508 */
476509 if (ompi_mtl_ofi .any_addr == ofi_req -> remote_addr ) {
477- src = MTL_OFI_GET_SOURCE (wc -> tag );
478510 ompi_proc = ompi_comm_peer_lookup (ofi_req -> comm , src );
479511 endpoint = ompi_mtl_ofi_get_endpoint (ofi_req -> mtl , ompi_proc );
480512 ofi_req -> remote_addr = endpoint -> peer_fiaddr ;
@@ -484,7 +516,7 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
484516 0 ,
485517 NULL ,
486518 ofi_req -> remote_addr ,
487- wc -> tag | MTL_OFI_SYNC_SEND_ACK ,
519+ wc -> tag | ompi_mtl_ofi . sync_send_ack ,
488520 (void * ) & ofi_req -> ctx ));
489521 if (OPAL_UNLIKELY (0 > ret )) {
490522 opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
@@ -510,7 +542,7 @@ ompi_mtl_ofi_recv_error_callback(struct fi_cq_err_entry *error,
510542 assert (ofi_req -> super .ompi_req );
511543 status = & ofi_req -> super .ompi_req -> req_status ;
512544 status -> MPI_TAG = MTL_OFI_GET_TAG (ofi_req -> match_bits );
513- status -> MPI_SOURCE = MTL_OFI_GET_SOURCE ( ofi_req -> match_bits );
545+ status -> MPI_SOURCE = mtl_ofi_get_source (( struct fi_cq_tagged_entry * ) error );
514546
515547 switch (error -> err ) {
516548 case FI_ETRUNC :
@@ -538,24 +570,30 @@ ompi_mtl_ofi_irecv(struct mca_mtl_base_module_t *mtl,
538570 int ompi_ret = OMPI_SUCCESS ;
539571 ssize_t ret ;
540572 uint64_t match_bits , mask_bits ;
541- fi_addr_t remote_addr ;
573+ fi_addr_t remote_addr = ompi_mtl_ofi . any_addr ;
542574 ompi_proc_t * ompi_proc = NULL ;
543575 mca_mtl_ofi_endpoint_t * endpoint = NULL ;
544576 ompi_mtl_ofi_request_t * ofi_req = (ompi_mtl_ofi_request_t * ) mtl_request ;
545577 void * start ;
546578 size_t length ;
547579 bool free_after ;
548580
549- if (MPI_ANY_SOURCE != src ) {
550- ompi_proc = ompi_comm_peer_lookup (comm , src );
551- endpoint = ompi_mtl_ofi_get_endpoint (mtl , ompi_proc );
552- remote_addr = endpoint -> peer_fiaddr ;
581+
582+ if (ompi_mtl_ofi .fi_cq_data ) {
583+ if (MPI_ANY_SOURCE != src ) {
584+ ompi_proc = ompi_comm_peer_lookup (comm , src );
585+ endpoint = ompi_mtl_ofi_get_endpoint (mtl , ompi_proc );
586+ remote_addr = endpoint -> peer_fiaddr ;
587+ }
588+
589+ mtl_ofi_create_recv_tag_CQD (& match_bits , & mask_bits , comm -> c_contextid ,
590+ tag );
553591 } else {
554- remote_addr = ompi_mtl_ofi .any_addr ;
592+ mtl_ofi_create_recv_tag (& match_bits , & mask_bits , comm -> c_contextid , src ,
593+ tag );
594+ /* src_addr is ignored when FI_DIRECTED_RECV is not used */
555595 }
556596
557- MTL_OFI_SET_RECV_BITS (match_bits , mask_bits , comm -> c_contextid , src , tag );
558-
559597 ompi_ret = ompi_mtl_datatype_recv_buf (convertor ,
560598 & start ,
561599 & length ,
@@ -606,7 +644,7 @@ ompi_mtl_ofi_mrecv_callback(struct fi_cq_tagged_entry *wc,
606644{
607645 struct mca_mtl_request_t * mrecv_req = ofi_req -> mrecv_req ;
608646 ompi_status_public_t * status = & mrecv_req -> ompi_req -> req_status ;
609- status -> MPI_SOURCE = MTL_OFI_GET_SOURCE (wc -> tag );
647+ status -> MPI_SOURCE = mtl_ofi_get_source (wc );
610648 status -> MPI_TAG = MTL_OFI_GET_TAG (wc -> tag );
611649 status -> MPI_ERROR = MPI_SUCCESS ;
612650 status -> _ucount = wc -> len ;
@@ -628,7 +666,7 @@ ompi_mtl_ofi_mrecv_error_callback(struct fi_cq_err_entry *error,
628666 struct mca_mtl_request_t * mrecv_req = ofi_req -> mrecv_req ;
629667 ompi_status_public_t * status = & mrecv_req -> ompi_req -> req_status ;
630668 status -> MPI_TAG = MTL_OFI_GET_TAG (ofi_req -> match_bits );
631- status -> MPI_SOURCE = MTL_OFI_GET_SOURCE ( ofi_req -> match_bits );
669+ status -> MPI_SOURCE = mtl_ofi_get_source (( struct fi_cq_tagged_entry * ) error );
632670
633671 switch (error -> err ) {
634672 case FI_ETRUNC :
@@ -716,7 +754,7 @@ ompi_mtl_ofi_probe_callback(struct fi_cq_tagged_entry *wc,
716754{
717755 ofi_req -> match_state = 1 ;
718756 ofi_req -> match_bits = wc -> tag ;
719- ofi_req -> status .MPI_SOURCE = MTL_OFI_GET_SOURCE (wc -> tag );
757+ ofi_req -> status .MPI_SOURCE = mtl_ofi_get_source (wc );
720758 ofi_req -> status .MPI_TAG = MTL_OFI_GET_TAG (wc -> tag );
721759 ofi_req -> status .MPI_ERROR = MPI_SUCCESS ;
722760 ofi_req -> status ._ucount = wc -> len ;
@@ -749,22 +787,28 @@ ompi_mtl_ofi_iprobe(struct mca_mtl_base_module_t *mtl,
749787 struct ompi_mtl_ofi_request_t ofi_req ;
750788 ompi_proc_t * ompi_proc = NULL ;
751789 mca_mtl_ofi_endpoint_t * endpoint = NULL ;
752- fi_addr_t remote_proc = 0 ;
790+ fi_addr_t remote_proc = ompi_mtl_ofi . any_addr ;
753791 uint64_t match_bits , mask_bits ;
754792 ssize_t ret ;
755793 struct fi_msg_tagged msg ;
756794 uint64_t msgflags = FI_PEEK ;
757795
758- /**
759- * If the source is known, use its peer_fiaddr.
760- */
761- if (MPI_ANY_SOURCE != src ) {
762- ompi_proc = ompi_comm_peer_lookup ( comm , src );
763- endpoint = ompi_mtl_ofi_get_endpoint (mtl , ompi_proc );
764- remote_proc = endpoint -> peer_fiaddr ;
765- }
796+ if (ompi_mtl_ofi .fi_cq_data ) {
797+ /* If the source is known, use its peer_fiaddr. */
798+ if (MPI_ANY_SOURCE != src ) {
799+ ompi_proc = ompi_comm_peer_lookup ( comm , src );
800+ endpoint = ompi_mtl_ofi_get_endpoint (mtl , ompi_proc );
801+ remote_proc = endpoint -> peer_fiaddr ;
802+ }
766803
767- MTL_OFI_SET_RECV_BITS (match_bits , mask_bits , comm -> c_contextid , src , tag );
804+ mtl_ofi_create_recv_tag_CQD (& match_bits , & mask_bits , comm -> c_contextid ,
805+ tag );
806+ }
807+ else {
808+ mtl_ofi_create_recv_tag (& match_bits , & mask_bits , comm -> c_contextid , src ,
809+ tag );
810+ /* src_addr is ignored when FI_DIRECTED_RECV is not used */
811+ }
768812
769813 /**
770814 * fi_trecvmsg with FI_PEEK:
@@ -829,7 +873,7 @@ ompi_mtl_ofi_improbe(struct mca_mtl_base_module_t *mtl,
829873 struct ompi_mtl_ofi_request_t * ofi_req ;
830874 ompi_proc_t * ompi_proc = NULL ;
831875 mca_mtl_ofi_endpoint_t * endpoint = NULL ;
832- fi_addr_t remote_proc = 0 ;
876+ fi_addr_t remote_proc = ompi_mtl_ofi . any_addr ;
833877 uint64_t match_bits , mask_bits ;
834878 ssize_t ret ;
835879 struct fi_msg_tagged msg ;
@@ -843,13 +887,22 @@ ompi_mtl_ofi_improbe(struct mca_mtl_base_module_t *mtl,
843887 /**
844888 * If the source is known, use its peer_fiaddr.
845889 */
846- if (MPI_ANY_SOURCE != src ) {
847- ompi_proc = ompi_comm_peer_lookup ( comm , src );
848- endpoint = ompi_mtl_ofi_get_endpoint (mtl , ompi_proc );
849- remote_proc = endpoint -> peer_fiaddr ;
850- }
851890
852- MTL_OFI_SET_RECV_BITS (match_bits , mask_bits , comm -> c_contextid , src , tag );
891+ if (ompi_mtl_ofi .fi_cq_data ) {
892+ if (MPI_ANY_SOURCE != src ) {
893+ ompi_proc = ompi_comm_peer_lookup ( comm , src );
894+ endpoint = ompi_mtl_ofi_get_endpoint (mtl , ompi_proc );
895+ remote_proc = endpoint -> peer_fiaddr ;
896+ }
897+
898+ mtl_ofi_create_recv_tag_CQD (& match_bits , & mask_bits , comm -> c_contextid ,
899+ tag );
900+ }
901+ else {
902+ /* src_addr is ignored when FI_DIRECTED_RECV is not used */
903+ mtl_ofi_create_recv_tag (& match_bits , & mask_bits , comm -> c_contextid , src ,
904+ tag );
905+ }
853906
854907 /**
855908 * fi_trecvmsg with FI_PEEK and FI_CLAIM:
0 commit comments