Skip to content

mtl-portals4: in rendezvous, reissue PtlGet() if it fails #3528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ompi/mca/mtl/portals4/mtl_portals4.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ struct mca_mtl_portals4_module_t {
/* free list of message for matched probe */
opal_free_list_t fl_message;

/* free list of rendezvous get fragments */
opal_free_list_t fl_rndv_get_frag;

/** Network interface handle for matched interface */
ptl_handle_ni_t ni_h;
/** Limit given by portals after NIInit */
Expand Down
25 changes: 24 additions & 1 deletion ompi/mca/mtl/portals4/mtl_portals4_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ static mca_base_var_enum_value_t long_protocol_values[] = {
{0, NULL}
};

OBJ_CLASS_INSTANCE(ompi_mtl_portals4_rndv_get_frag_t,
opal_free_list_item_t,
NULL, NULL);

static int
ompi_mtl_portals4_component_register(void)
{
Expand Down Expand Up @@ -251,6 +255,13 @@ ompi_mtl_portals4_component_open(void)
OBJ_CLASS(ompi_mtl_portals4_message_t),
0, 0, 1, -1, 1, NULL, 0, NULL, NULL, NULL);

OBJ_CONSTRUCT(&ompi_mtl_portals4.fl_rndv_get_frag, opal_free_list_t);
opal_free_list_init(&ompi_mtl_portals4.fl_rndv_get_frag,
sizeof(ompi_mtl_portals4_rndv_get_frag_t),
opal_cache_line_size,
OBJ_CLASS(ompi_mtl_portals4_rndv_get_frag_t),
0, 0, 1, -1, 1, NULL, 0, NULL, NULL, NULL);

ompi_mtl_portals4.ni_h = PTL_INVALID_HANDLE;
ompi_mtl_portals4.send_eq_h = PTL_INVALID_HANDLE;
ompi_mtl_portals4.recv_eq_h = PTL_INVALID_HANDLE;
Expand Down Expand Up @@ -478,6 +489,7 @@ ompi_mtl_portals4_progress(void)
unsigned int which;
ptl_event_t ev;
ompi_mtl_portals4_base_request_t *ptl_request;
ompi_mtl_portals4_rndv_get_frag_t *rndv_get_frag;

while (true) {
ret = PtlEQPoll(ompi_mtl_portals4.eqs_h, 2, 0, &ev, &which);
Expand All @@ -489,7 +501,6 @@ ompi_mtl_portals4_progress(void)
case PTL_EVENT_GET:
case PTL_EVENT_PUT:
case PTL_EVENT_PUT_OVERFLOW:
case PTL_EVENT_REPLY:
case PTL_EVENT_SEND:
case PTL_EVENT_ACK:
case PTL_EVENT_AUTO_FREE:
Expand All @@ -507,6 +518,18 @@ ompi_mtl_portals4_progress(void)
}
break;

case PTL_EVENT_REPLY:
if (NULL != ev.user_ptr) {
rndv_get_frag = ev.user_ptr;
ret = rndv_get_frag->event_callback(&ev, rndv_get_frag);
if (OMPI_SUCCESS != ret) {
opal_output(ompi_mtl_base_framework.framework_output,
"Error returned from target event callback: %d", ret);
abort();
}
}
break;

case PTL_EVENT_PT_DISABLED:
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_framework.framework_output,
Expand Down
203 changes: 133 additions & 70 deletions ompi/mca/mtl/portals4/mtl_portals4_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,22 @@
#include "mtl_portals4_recv_short.h"
#include "mtl_portals4_message.h"


static int
ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t* ptl_base_request);
static int
ompi_mtl_portals4_rndv_get_frag_progress(ptl_event_t *ev,
ompi_mtl_portals4_rndv_get_frag_t* rndv_get_frag);

static int
read_msg(void *start, ptl_size_t length, ptl_process_t target,
ptl_match_bits_t match_bits, ptl_size_t remote_offset,
ompi_mtl_portals4_recv_request_t *request)
{
int ret, i;
ptl_size_t rest = length, asked = 0, frag_size;
int32_t pending_reply;
ptl_size_t rest = length, asked = 0;
int32_t frag_count;

#if OMPI_MTL_PORTALS4_FLOW_CONTROL
while (OPAL_UNLIKELY(OPAL_THREAD_ADD32(&ompi_mtl_portals4.flowctl.send_slots, -1) < 0)) {
Expand All @@ -50,29 +58,49 @@ read_msg(void *start, ptl_size_t length, ptl_process_t target,
}
#endif

request->pending_reply = (length + ompi_mtl_portals4.max_msg_size_mtl - 1) / ompi_mtl_portals4.max_msg_size_mtl;
pending_reply = request->pending_reply;
frag_count = (length + ompi_mtl_portals4.max_msg_size_mtl - 1) / ompi_mtl_portals4.max_msg_size_mtl;
ret = OPAL_THREAD_ADD32(&(request->pending_reply), frag_count);

for (i = 0 ; i < frag_count ; i++) {
opal_free_list_item_t *tmp;
ompi_mtl_portals4_rndv_get_frag_t* frag;

tmp = opal_free_list_get (&ompi_mtl_portals4.fl_rndv_get_frag);
if (NULL == tmp) return OMPI_ERR_OUT_OF_RESOURCE;

frag = (ompi_mtl_portals4_rndv_get_frag_t*) tmp;

frag->request = request;
#if OPAL_ENABLE_DEBUG
frag->frag_num = i;
#endif
frag->frag_start = (char*)start + i * ompi_mtl_portals4.max_msg_size_mtl;
frag->frag_length = (OPAL_UNLIKELY(rest > ompi_mtl_portals4.max_msg_size_mtl)) ? ompi_mtl_portals4.max_msg_size_mtl : rest;
frag->frag_target = target;
frag->frag_match_bits = match_bits;
frag->frag_remote_offset = remote_offset + i * ompi_mtl_portals4.max_msg_size_mtl;

frag->event_callback = ompi_mtl_portals4_rndv_get_frag_progress;

OPAL_OUTPUT_VERBOSE((90, ompi_mtl_base_framework.framework_output, "GET (fragment %d/%d, size %ld) send",
i + 1, frag_count, frag->frag_length));

for (i = 0 ; i < pending_reply ; i++) {
OPAL_OUTPUT_VERBOSE((90, ompi_mtl_base_framework.framework_output, "GET (fragment %d/%d) send",
i + 1, pending_reply));
frag_size = (OPAL_UNLIKELY(rest > ompi_mtl_portals4.max_msg_size_mtl)) ? ompi_mtl_portals4.max_msg_size_mtl : rest;
ret = PtlGet(ompi_mtl_portals4.send_md_h,
(ptl_size_t) start + i * ompi_mtl_portals4.max_msg_size_mtl,
frag_size,
target,
(ptl_size_t) frag->frag_start,
frag->frag_length,
frag->frag_target,
ompi_mtl_portals4.read_idx,
match_bits,
remote_offset + i * ompi_mtl_portals4.max_msg_size_mtl,
request);
frag->frag_match_bits,
frag->frag_remote_offset,
frag);
if (OPAL_UNLIKELY(PTL_OK != ret)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: PtlGet failed: %d",
__FILE__, __LINE__, ret);
return OMPI_ERR_OUT_OF_RESOURCE;
}
rest -= frag_size;
asked += frag_size;
rest -= frag->frag_length;
asked += frag->frag_length;
}

return OMPI_SUCCESS;
Expand Down Expand Up @@ -134,9 +162,8 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
/* If it's not a short message and we're doing rndv and the message is not complete, we
only have the first part of the message. Issue the get
to pull the second part of the message. */
ret = read_msg((char*) ptl_request->delivery_ptr + ev->mlength,
((msg_length > ptl_request->delivery_len) ?
ptl_request->delivery_len : msg_length) - ev->mlength,
ret = read_msg((char*)ptl_request->delivery_ptr + ev->mlength,
((msg_length > ptl_request->delivery_len) ? ptl_request->delivery_len : msg_length) - ev->mlength,
ev->initiator,
ev->hdr_data,
ev->mlength,
Expand Down Expand Up @@ -165,54 +192,6 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
}
break;

case PTL_EVENT_REPLY:
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
"Recv %lu (0x%lx) got reply event",
ptl_request->opcount, ptl_request->hdr_data));

if (OPAL_UNLIKELY(ev->ni_fail_type != PTL_NI_OK)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: PTL_EVENT_REPLY with ni_fail_type: %d",
__FILE__, __LINE__, ev->ni_fail_type);
ret = PTL_FAIL;
goto callback_error;
}

/* set the received length in the status, now that we know
exactly how much data was sent. */
ptl_request->super.super.ompi_req->req_status._ucount += ev->mlength;

ret = OPAL_THREAD_ADD32(&(ptl_request->pending_reply), -1);
if (ret > 0) {
return OMPI_SUCCESS;
}
assert(ptl_request->pending_reply == 0);

#if OMPI_MTL_PORTALS4_FLOW_CONTROL
OPAL_THREAD_ADD32(&ompi_mtl_portals4.flowctl.send_slots, 1);
#endif

/* make sure the data is in the right place. Use _ucount for
the total length because it will be set correctly for all
three protocols. mlength is only correct for eager, and
delivery_len is the length of the buffer, not the length of
the send. */
ret = ompi_mtl_datatype_unpack(ptl_request->convertor,
ptl_request->delivery_ptr,
ptl_request->super.super.ompi_req->req_status._ucount);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: ompi_mtl_datatype_unpack failed: %d",
__FILE__, __LINE__, ret);
ptl_request->super.super.ompi_req->req_status.MPI_ERROR = ret;
}

OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
"Recv %lu (0x%lx) completed , reply (pending_reply: %d)",
ptl_request->opcount, ptl_request->hdr_data, ptl_request->pending_reply));
ptl_request->super.super.completion_callback(&ptl_request->super.super);
break;

case PTL_EVENT_PUT_OVERFLOW:
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
"Recv %lu (0x%lx) got put_overflow event",
Expand Down Expand Up @@ -301,9 +280,8 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
/* For long messages in the overflow list, ev->mlength = 0 */
ptl_request->super.super.ompi_req->req_status._ucount = 0;

ret = read_msg((char*) ptl_request->delivery_ptr,
(msg_length > ptl_request->delivery_len) ?
ptl_request->delivery_len : msg_length,
ret = read_msg((char*)ptl_request->delivery_ptr,
(msg_length > ptl_request->delivery_len) ? ptl_request->delivery_len : msg_length,
ev->initiator,
ev->hdr_data,
0,
Expand Down Expand Up @@ -336,6 +314,91 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
}


static int
ompi_mtl_portals4_rndv_get_frag_progress(ptl_event_t *ev,
ompi_mtl_portals4_rndv_get_frag_t* rndv_get_frag)
{
int ret;
ompi_mtl_portals4_recv_request_t* ptl_request =
(ompi_mtl_portals4_recv_request_t*) rndv_get_frag->request;

assert(ev->type==PTL_EVENT_REPLY);

OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
"Recv %lu (0x%lx) got reply event",
ptl_request->opcount, ptl_request->hdr_data));

if (OPAL_UNLIKELY(ev->ni_fail_type != PTL_NI_OK)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: PTL_EVENT_REPLY with ni_fail_type: %d",
__FILE__, __LINE__, ev->ni_fail_type);

OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
"Rendezvous Get Failed: Reissuing frag #%u", rndv_get_frag->frag_num));

ret = PtlGet(ompi_mtl_portals4.send_md_h,
(ptl_size_t) rndv_get_frag->frag_start,
rndv_get_frag->frag_length,
rndv_get_frag->frag_target,
ompi_mtl_portals4.read_idx,
rndv_get_frag->frag_match_bits,
rndv_get_frag->frag_remote_offset,
rndv_get_frag);
if (OPAL_UNLIKELY(PTL_OK != ret)) {
if (NULL != ptl_request->buffer_ptr) free(ptl_request->buffer_ptr);
goto callback_error;
}
return OMPI_SUCCESS;
}

/* set the received length in the status, now that we know
exactly how much data was sent. */
ptl_request->super.super.ompi_req->req_status._ucount += ev->mlength;

/* this frag is complete. return to freelist. */
opal_free_list_return (&ompi_mtl_portals4.fl_rndv_get_frag,
&rndv_get_frag->super);

ret = OPAL_THREAD_ADD32(&(ptl_request->pending_reply), -1);
if (ret > 0) {
return OMPI_SUCCESS;
}
assert(ptl_request->pending_reply == 0);

#if OMPI_MTL_PORTALS4_FLOW_CONTROL
OPAL_THREAD_ADD32(&ompi_mtl_portals4.flowctl.send_slots, 1);
#endif

/* make sure the data is in the right place. Use _ucount for
the total length because it will be set correctly for all
three protocols. mlength is only correct for eager, and
delivery_len is the length of the buffer, not the length of
the send. */
ret = ompi_mtl_datatype_unpack(ptl_request->convertor,
ptl_request->delivery_ptr,
ptl_request->super.super.ompi_req->req_status._ucount);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: ompi_mtl_datatype_unpack failed: %d",
__FILE__, __LINE__, ret);
ptl_request->super.super.ompi_req->req_status.MPI_ERROR = ret;
}

OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_framework.framework_output,
"Recv %lu (0x%lx) completed , reply (pending_reply: %d)",
ptl_request->opcount, ptl_request->hdr_data, ptl_request->pending_reply));
ptl_request->super.super.completion_callback(&ptl_request->super.super);

return OMPI_SUCCESS;

callback_error:
ptl_request->super.super.ompi_req->req_status.MPI_ERROR =
ompi_mtl_portals4_get_error(ret);
ptl_request->super.super.completion_callback(&ptl_request->super.super);
return OMPI_SUCCESS;
}


int
ompi_mtl_portals4_irecv(struct mca_mtl_base_module_t* mtl,
struct ompi_communicator_t *comm,
Expand Down
20 changes: 20 additions & 0 deletions ompi/mca/mtl/portals4/mtl_portals4_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,26 @@ struct ompi_mtl_portals4_recv_request_t {
};
typedef struct ompi_mtl_portals4_recv_request_t ompi_mtl_portals4_recv_request_t;

struct ompi_mtl_portals4_rndv_get_frag_t {
opal_free_list_item_t super;
/* the recv request that's composed of these frags */
ompi_mtl_portals4_recv_request_t *request;
/* info extracted from the put_overflow event that is required to retry the rndv-get */
void *frag_start;
ptl_size_t frag_length;
ptl_process_t frag_target;
ptl_hdr_data_t frag_match_bits;
ptl_size_t frag_remote_offset;

int (*event_callback)(ptl_event_t *ev, struct ompi_mtl_portals4_rndv_get_frag_t*);

#if OPAL_ENABLE_DEBUG
uint32_t frag_num;
#endif
};
typedef struct ompi_mtl_portals4_rndv_get_frag_t ompi_mtl_portals4_rndv_get_frag_t;
OBJ_CLASS_DECLARATION(ompi_mtl_portals4_rndv_get_frag_t);


struct ompi_mtl_portals4_recv_short_request_t {
ompi_mtl_portals4_base_request_t super;
Expand Down