Skip to content

Commit 49913c6

Browse files
committed
PML UCX: unite the code for all the sending modes.
Signed-off-by: Alina Sklarevich <[email protected]>
1 parent d93b672 commit 49913c6

File tree

1 file changed

+35
-56
lines changed

1 file changed

+35
-56
lines changed

ompi/mca/pml/ucx/pml_ucx.c

Lines changed: 35 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,7 @@ int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datat
601601
return OMPI_SUCCESS;
602602
}
603603

604-
static int
604+
static ucs_status_ptr_t
605605
mca_pml_ucx_bsend(ucp_ep_h ep, const void *buf, size_t count,
606606
ompi_datatype_t *datatype, uint64_t pml_tag)
607607
{
@@ -623,21 +623,21 @@ mca_pml_ucx_bsend(ucp_ep_h ep, const void *buf, size_t count,
623623
if (OPAL_UNLIKELY(NULL == packed_data)) {
624624
OBJ_DESTRUCT(&opal_conv);
625625
PML_UCX_ERROR("bsend: failed to allocate buffer");
626-
return OMPI_ERR_OUT_OF_RESOURCE;
626+
return UCS_STATUS_PTR(OMPI_ERROR);
627627
}
628628

629629
iov_count = 1;
630630
iov.iov_base = packed_data;
631631
iov.iov_len = packed_length;
632632

633-
PML_UCX_VERBOSE(8, "bsend of packed buffer %p len %d", packed_data, packed_length);
633+
PML_UCX_VERBOSE(8, "bsend of packed buffer %p len %zu", packed_data, packed_length);
634634
offset = 0;
635635
opal_convertor_set_position(&opal_conv, &offset);
636636
if (0 > opal_convertor_pack(&opal_conv, &iov, &iov_count, &packed_length)) {
637637
mca_pml_base_bsend_request_free(packed_data);
638638
OBJ_DESTRUCT(&opal_conv);
639639
PML_UCX_ERROR("bsend: failed to pack user datatype");
640-
return OMPI_ERROR;
640+
return UCS_STATUS_PTR(OMPI_ERROR);
641641
}
642642

643643
OBJ_DESTRUCT(&opal_conv);
@@ -648,29 +648,33 @@ mca_pml_ucx_bsend(ucp_ep_h ep, const void *buf, size_t count,
648648
if (NULL == req) {
649649
/* request was completed in place */
650650
mca_pml_base_bsend_request_free(packed_data);
651-
return OMPI_SUCCESS;
651+
return NULL;
652652
}
653653

654654
if (OPAL_UNLIKELY(UCS_PTR_IS_ERR(req))) {
655655
mca_pml_base_bsend_request_free(packed_data);
656656
PML_UCX_ERROR("ucx bsend failed: %s", ucs_status_string(UCS_PTR_STATUS(req)));
657-
return OMPI_ERROR;
657+
return UCS_STATUS_PTR(OMPI_ERROR);
658658
}
659659

660660
req->req_complete_cb_data = packed_data;
661-
return OMPI_SUCCESS;
661+
return NULL;
662662
}
663663

664-
static ompi_request_t* mca_pml_ucx_tag_send_nb(ucp_ep_h ep, const void *buf,
665-
size_t count, ucp_datatype_t datatype,
666-
ucp_tag_t tag, mca_pml_base_send_mode_t mode)
664+
static inline ucs_status_ptr_t mca_pml_ucx_common_send(ucp_ep_h ep, const void *buf,
665+
size_t count,
666+
ompi_datatype_t *datatype,
667+
ucp_datatype_t ucx_datatype,
668+
ucp_tag_t tag,
669+
mca_pml_base_send_mode_t mode,
670+
ucp_send_callback_t cb)
667671
{
668-
if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) {
669-
return (ompi_request_t*)ucp_tag_send_sync_nb(ep, buf, count, datatype,
670-
tag, mca_pml_ucx_send_completion);
672+
if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_BUFFERED == mode)) {
673+
return mca_pml_ucx_bsend(ep, buf, count, datatype, tag);
674+
} else if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) {
675+
return ucp_tag_send_sync_nb(ep, buf, count, ucx_datatype, tag, cb);
671676
} else {
672-
return (ompi_request_t*)ucp_tag_send_nb(ep, buf, count, datatype,
673-
tag, mca_pml_ucx_send_completion);
677+
return ucp_tag_send_nb(ep, buf, count, ucx_datatype, tag, cb);
674678
}
675679
}
676680

@@ -693,15 +697,10 @@ int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype,
693697
return OMPI_ERROR;
694698
}
695699

696-
/* Special care to sync/buffered send */
697-
if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_BUFFERED == mode)) {
698-
*request = &ompi_pml_ucx.completed_send_req;
699-
return mca_pml_ucx_bsend(ep, buf, count, datatype,
700-
PML_UCX_MAKE_SEND_TAG(tag, comm));
701-
}
702-
703-
req = mca_pml_ucx_tag_send_nb(ep, buf, count, mca_pml_ucx_get_datatype(datatype),
704-
PML_UCX_MAKE_SEND_TAG(tag, comm), mode);
700+
req = (ompi_request_t*)mca_pml_ucx_common_send(ep, buf, count, datatype,
701+
mca_pml_ucx_get_datatype(datatype),
702+
PML_UCX_MAKE_SEND_TAG(tag, comm), mode,
703+
mca_pml_ucx_send_completion);
705704

706705
if (req == NULL) {
707706
PML_UCX_VERBOSE(8, "returning completed request");
@@ -733,14 +732,10 @@ int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, i
733732
return OMPI_ERROR;
734733
}
735734

736-
/* Special care to buffered send */
737-
if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_BUFFERED == mode)) {
738-
return mca_pml_ucx_bsend(ep, buf, count, datatype,
739-
PML_UCX_MAKE_SEND_TAG(tag, comm));
740-
}
741-
742-
req = mca_pml_ucx_tag_send_nb(ep, buf, count, mca_pml_ucx_get_datatype(datatype),
743-
PML_UCX_MAKE_SEND_TAG(tag, comm), mode);
735+
req = (ompi_request_t*)mca_pml_ucx_common_send(ep, buf, count, datatype,
736+
mca_pml_ucx_get_datatype(datatype),
737+
PML_UCX_MAKE_SEND_TAG(tag, comm),
738+
mode, mca_pml_ucx_send_completion);
744739

745740
if (OPAL_LIKELY(req == NULL)) {
746741
return OMPI_SUCCESS;
@@ -900,7 +895,6 @@ int mca_pml_ucx_start(size_t count, ompi_request_t** requests)
900895
mca_pml_ucx_persistent_request_t *preq;
901896
ompi_request_t *tmp_req;
902897
size_t i;
903-
int rc;
904898

905899
for (i = 0; i < count; ++i) {
906900
preq = (mca_pml_ucx_persistent_request_t *)requests[i];
@@ -915,29 +909,14 @@ int mca_pml_ucx_start(size_t count, ompi_request_t** requests)
915909
mca_pml_ucx_request_reset(&preq->ompi);
916910

917911
if (preq->flags & MCA_PML_UCX_REQUEST_FLAG_SEND) {
918-
if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_BUFFERED == preq->send.mode)) {
919-
PML_UCX_VERBOSE(8, "start bsend request %p", (void*)preq);
920-
rc = mca_pml_ucx_bsend(preq->send.ep, preq->buffer, preq->count,
921-
preq->ompi_datatype, preq->tag);
922-
if (OMPI_SUCCESS != rc) {
923-
return rc;
924-
}
925-
/* pretend that we got immediate completion */
926-
tmp_req = NULL;
927-
} else if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == preq->send.mode)) {
928-
PML_UCX_VERBOSE(8, "start send sync request %p", (void*)preq);
929-
tmp_req = (ompi_request_t*)ucp_tag_send_sync_nb(preq->send.ep,
930-
preq->buffer,
931-
preq->count, preq->datatype,
932-
preq->tag,
933-
mca_pml_ucx_psend_completion);
934-
} else {
935-
PML_UCX_VERBOSE(8, "start send request %p", (void*)preq);
936-
tmp_req = (ompi_request_t*)ucp_tag_send_nb(preq->send.ep, preq->buffer,
937-
preq->count, preq->datatype,
938-
preq->tag,
939-
mca_pml_ucx_psend_completion);
940-
}
912+
tmp_req = (ompi_request_t*)mca_pml_ucx_common_send(preq->send.ep,
913+
preq->buffer,
914+
preq->count,
915+
preq->ompi_datatype,
916+
preq->datatype,
917+
preq->tag,
918+
preq->send.mode,
919+
mca_pml_ucx_psend_completion);
941920
} else {
942921
PML_UCX_VERBOSE(8, "start recv request %p", (void*)preq);
943922
tmp_req = (ompi_request_t*)ucp_tag_recv_nb(ompi_pml_ucx.ucp_worker,

0 commit comments

Comments
 (0)