34
34
#include "mtl_portals4_recv_short.h"
35
35
#include "mtl_portals4_message.h"
36
36
37
+
38
+ static int
39
+ ompi_mtl_portals4_recv_progress (ptl_event_t * ev ,
40
+ ompi_mtl_portals4_base_request_t * ptl_base_request );
41
+ static int
42
+ ompi_mtl_portals4_rndv_get_frag_progress (ptl_event_t * ev ,
43
+ ompi_mtl_portals4_rndv_get_frag_t * rndv_get_frag );
44
+
37
45
static int
38
46
read_msg (void * start , ptl_size_t length , ptl_process_t target ,
39
47
ptl_match_bits_t match_bits , ptl_size_t remote_offset ,
40
48
ompi_mtl_portals4_recv_request_t * request )
41
49
{
42
50
int ret , i ;
43
- ptl_size_t rest = length , asked = 0 , frag_size ;
44
- int32_t pending_reply ;
51
+ ptl_size_t rest = length , asked = 0 ;
52
+ int32_t frag_count ;
45
53
46
54
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
47
55
while (OPAL_UNLIKELY (OPAL_THREAD_ADD32 (& ompi_mtl_portals4 .flowctl .send_slots , -1 ) < 0 )) {
@@ -50,29 +58,49 @@ read_msg(void *start, ptl_size_t length, ptl_process_t target,
50
58
}
51
59
#endif
52
60
53
- request -> pending_reply = (length + ompi_mtl_portals4 .max_msg_size_mtl - 1 ) / ompi_mtl_portals4 .max_msg_size_mtl ;
54
- pending_reply = request -> pending_reply ;
61
+ frag_count = (length + ompi_mtl_portals4 .max_msg_size_mtl - 1 ) / ompi_mtl_portals4 .max_msg_size_mtl ;
62
+ ret = OPAL_THREAD_ADD32 (& (request -> pending_reply ), frag_count );
63
+
64
+ for (i = 0 ; i < frag_count ; i ++ ) {
65
+ opal_free_list_item_t * tmp ;
66
+ ompi_mtl_portals4_rndv_get_frag_t * frag ;
67
+
68
+ tmp = opal_free_list_get (& ompi_mtl_portals4 .fl_rndv_get_frag );
69
+ if (NULL == tmp ) return OMPI_ERR_OUT_OF_RESOURCE ;
70
+
71
+ frag = (ompi_mtl_portals4_rndv_get_frag_t * ) tmp ;
72
+
73
+ frag -> request = request ;
74
+ #if OPAL_ENABLE_DEBUG
75
+ frag -> frag_num = i ;
76
+ #endif
77
+ frag -> frag_start = (char * )start + i * ompi_mtl_portals4 .max_msg_size_mtl ;
78
+ frag -> frag_length = (OPAL_UNLIKELY (rest > ompi_mtl_portals4 .max_msg_size_mtl )) ? ompi_mtl_portals4 .max_msg_size_mtl : rest ;
79
+ frag -> frag_target = target ;
80
+ frag -> frag_match_bits = match_bits ;
81
+ frag -> frag_remote_offset = remote_offset + i * ompi_mtl_portals4 .max_msg_size_mtl ;
82
+
83
+ frag -> event_callback = ompi_mtl_portals4_rndv_get_frag_progress ;
84
+
85
+ OPAL_OUTPUT_VERBOSE ((90 , ompi_mtl_base_framework .framework_output , "GET (fragment %d/%d, size %ld) send" ,
86
+ i + 1 , frag_count , frag -> frag_length ));
55
87
56
- for (i = 0 ; i < pending_reply ; i ++ ) {
57
- OPAL_OUTPUT_VERBOSE ((90 , ompi_mtl_base_framework .framework_output , "GET (fragment %d/%d) send" ,
58
- i + 1 , pending_reply ));
59
- frag_size = (OPAL_UNLIKELY (rest > ompi_mtl_portals4 .max_msg_size_mtl )) ? ompi_mtl_portals4 .max_msg_size_mtl : rest ;
60
88
ret = PtlGet (ompi_mtl_portals4 .send_md_h ,
61
- (ptl_size_t ) start + i * ompi_mtl_portals4 . max_msg_size_mtl ,
62
- frag_size ,
63
- target ,
89
+ (ptl_size_t ) frag -> frag_start ,
90
+ frag -> frag_length ,
91
+ frag -> frag_target ,
64
92
ompi_mtl_portals4 .read_idx ,
65
- match_bits ,
66
- remote_offset + i * ompi_mtl_portals4 . max_msg_size_mtl ,
67
- request );
93
+ frag -> frag_match_bits ,
94
+ frag -> frag_remote_offset ,
95
+ frag );
68
96
if (OPAL_UNLIKELY (PTL_OK != ret )) {
69
97
opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
70
98
"%s:%d: PtlGet failed: %d" ,
71
99
__FILE__ , __LINE__ , ret );
72
100
return OMPI_ERR_OUT_OF_RESOURCE ;
73
101
}
74
- rest -= frag_size ;
75
- asked += frag_size ;
102
+ rest -= frag -> frag_length ;
103
+ asked += frag -> frag_length ;
76
104
}
77
105
78
106
return OMPI_SUCCESS ;
@@ -134,9 +162,8 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
134
162
/* If it's not a short message and we're doing rndv and the message is not complete, we
135
163
only have the first part of the message. Issue the get
136
164
to pull the second part of the message. */
137
- ret = read_msg ((char * ) ptl_request -> delivery_ptr + ev -> mlength ,
138
- ((msg_length > ptl_request -> delivery_len ) ?
139
- ptl_request -> delivery_len : msg_length ) - ev -> mlength ,
165
+ ret = read_msg ((char * )ptl_request -> delivery_ptr + ev -> mlength ,
166
+ ((msg_length > ptl_request -> delivery_len ) ? ptl_request -> delivery_len : msg_length ) - ev -> mlength ,
140
167
ev -> initiator ,
141
168
ev -> hdr_data ,
142
169
ev -> mlength ,
@@ -165,54 +192,6 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
165
192
}
166
193
break ;
167
194
168
- case PTL_EVENT_REPLY :
169
- OPAL_OUTPUT_VERBOSE ((50 , ompi_mtl_base_framework .framework_output ,
170
- "Recv %lu (0x%lx) got reply event" ,
171
- ptl_request -> opcount , ptl_request -> hdr_data ));
172
-
173
- if (OPAL_UNLIKELY (ev -> ni_fail_type != PTL_NI_OK )) {
174
- opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
175
- "%s:%d: PTL_EVENT_REPLY with ni_fail_type: %d" ,
176
- __FILE__ , __LINE__ , ev -> ni_fail_type );
177
- ret = PTL_FAIL ;
178
- goto callback_error ;
179
- }
180
-
181
- /* set the received length in the status, now that we know
182
- exactly how much data was sent. */
183
- ptl_request -> super .super .ompi_req -> req_status ._ucount += ev -> mlength ;
184
-
185
- ret = OPAL_THREAD_ADD32 (& (ptl_request -> pending_reply ), -1 );
186
- if (ret > 0 ) {
187
- return OMPI_SUCCESS ;
188
- }
189
- assert (ptl_request -> pending_reply == 0 );
190
-
191
- #if OMPI_MTL_PORTALS4_FLOW_CONTROL
192
- OPAL_THREAD_ADD32 (& ompi_mtl_portals4 .flowctl .send_slots , 1 );
193
- #endif
194
-
195
- /* make sure the data is in the right place. Use _ucount for
196
- the total length because it will be set correctly for all
197
- three protocols. mlength is only correct for eager, and
198
- delivery_len is the length of the buffer, not the length of
199
- the send. */
200
- ret = ompi_mtl_datatype_unpack (ptl_request -> convertor ,
201
- ptl_request -> delivery_ptr ,
202
- ptl_request -> super .super .ompi_req -> req_status ._ucount );
203
- if (OPAL_UNLIKELY (OMPI_SUCCESS != ret )) {
204
- opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
205
- "%s:%d: ompi_mtl_datatype_unpack failed: %d" ,
206
- __FILE__ , __LINE__ , ret );
207
- ptl_request -> super .super .ompi_req -> req_status .MPI_ERROR = ret ;
208
- }
209
-
210
- OPAL_OUTPUT_VERBOSE ((50 , ompi_mtl_base_framework .framework_output ,
211
- "Recv %lu (0x%lx) completed , reply (pending_reply: %d)" ,
212
- ptl_request -> opcount , ptl_request -> hdr_data , ptl_request -> pending_reply ));
213
- ptl_request -> super .super .completion_callback (& ptl_request -> super .super );
214
- break ;
215
-
216
195
case PTL_EVENT_PUT_OVERFLOW :
217
196
OPAL_OUTPUT_VERBOSE ((50 , ompi_mtl_base_framework .framework_output ,
218
197
"Recv %lu (0x%lx) got put_overflow event" ,
@@ -301,9 +280,8 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
301
280
/* For long messages in the overflow list, ev->mlength = 0 */
302
281
ptl_request -> super .super .ompi_req -> req_status ._ucount = 0 ;
303
282
304
- ret = read_msg ((char * ) ptl_request -> delivery_ptr ,
305
- (msg_length > ptl_request -> delivery_len ) ?
306
- ptl_request -> delivery_len : msg_length ,
283
+ ret = read_msg ((char * )ptl_request -> delivery_ptr ,
284
+ (msg_length > ptl_request -> delivery_len ) ? ptl_request -> delivery_len : msg_length ,
307
285
ev -> initiator ,
308
286
ev -> hdr_data ,
309
287
0 ,
@@ -336,6 +314,91 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
336
314
}
337
315
338
316
317
+ static int
318
+ ompi_mtl_portals4_rndv_get_frag_progress (ptl_event_t * ev ,
319
+ ompi_mtl_portals4_rndv_get_frag_t * rndv_get_frag )
320
+ {
321
+ int ret ;
322
+ ompi_mtl_portals4_recv_request_t * ptl_request =
323
+ (ompi_mtl_portals4_recv_request_t * ) rndv_get_frag -> request ;
324
+
325
+ assert (ev -> type == PTL_EVENT_REPLY );
326
+
327
+ OPAL_OUTPUT_VERBOSE ((50 , ompi_mtl_base_framework .framework_output ,
328
+ "Recv %lu (0x%lx) got reply event" ,
329
+ ptl_request -> opcount , ptl_request -> hdr_data ));
330
+
331
+ if (OPAL_UNLIKELY (ev -> ni_fail_type != PTL_NI_OK )) {
332
+ opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
333
+ "%s:%d: PTL_EVENT_REPLY with ni_fail_type: %d" ,
334
+ __FILE__ , __LINE__ , ev -> ni_fail_type );
335
+
336
+ OPAL_OUTPUT_VERBOSE ((50 , ompi_mtl_base_framework .framework_output ,
337
+ "Rendezvous Get Failed: Reissuing frag #%u" , rndv_get_frag -> frag_num ));
338
+
339
+ ret = PtlGet (ompi_mtl_portals4 .send_md_h ,
340
+ (ptl_size_t ) rndv_get_frag -> frag_start ,
341
+ rndv_get_frag -> frag_length ,
342
+ rndv_get_frag -> frag_target ,
343
+ ompi_mtl_portals4 .read_idx ,
344
+ rndv_get_frag -> frag_match_bits ,
345
+ rndv_get_frag -> frag_remote_offset ,
346
+ rndv_get_frag );
347
+ if (OPAL_UNLIKELY (PTL_OK != ret )) {
348
+ if (NULL != ptl_request -> buffer_ptr ) free (ptl_request -> buffer_ptr );
349
+ goto callback_error ;
350
+ }
351
+ return OMPI_SUCCESS ;
352
+ }
353
+
354
+ /* set the received length in the status, now that we know
355
+ exactly how much data was sent. */
356
+ ptl_request -> super .super .ompi_req -> req_status ._ucount += ev -> mlength ;
357
+
358
+ /* this frag is complete. return to freelist. */
359
+ opal_free_list_return (& ompi_mtl_portals4 .fl_rndv_get_frag ,
360
+ & rndv_get_frag -> super );
361
+
362
+ ret = OPAL_THREAD_ADD32 (& (ptl_request -> pending_reply ), -1 );
363
+ if (ret > 0 ) {
364
+ return OMPI_SUCCESS ;
365
+ }
366
+ assert (ptl_request -> pending_reply == 0 );
367
+
368
+ #if OMPI_MTL_PORTALS4_FLOW_CONTROL
369
+ OPAL_THREAD_ADD32 (& ompi_mtl_portals4 .flowctl .send_slots , 1 );
370
+ #endif
371
+
372
+ /* make sure the data is in the right place. Use _ucount for
373
+ the total length because it will be set correctly for all
374
+ three protocols. mlength is only correct for eager, and
375
+ delivery_len is the length of the buffer, not the length of
376
+ the send. */
377
+ ret = ompi_mtl_datatype_unpack (ptl_request -> convertor ,
378
+ ptl_request -> delivery_ptr ,
379
+ ptl_request -> super .super .ompi_req -> req_status ._ucount );
380
+ if (OPAL_UNLIKELY (OMPI_SUCCESS != ret )) {
381
+ opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
382
+ "%s:%d: ompi_mtl_datatype_unpack failed: %d" ,
383
+ __FILE__ , __LINE__ , ret );
384
+ ptl_request -> super .super .ompi_req -> req_status .MPI_ERROR = ret ;
385
+ }
386
+
387
+ OPAL_OUTPUT_VERBOSE ((50 , ompi_mtl_base_framework .framework_output ,
388
+ "Recv %lu (0x%lx) completed , reply (pending_reply: %d)" ,
389
+ ptl_request -> opcount , ptl_request -> hdr_data , ptl_request -> pending_reply ));
390
+ ptl_request -> super .super .completion_callback (& ptl_request -> super .super );
391
+
392
+ return OMPI_SUCCESS ;
393
+
394
+ callback_error :
395
+ ptl_request -> super .super .ompi_req -> req_status .MPI_ERROR =
396
+ ompi_mtl_portals4_get_error (ret );
397
+ ptl_request -> super .super .completion_callback (& ptl_request -> super .super );
398
+ return OMPI_SUCCESS ;
399
+ }
400
+
401
+
339
402
int
340
403
ompi_mtl_portals4_irecv (struct mca_mtl_base_module_t * mtl ,
341
404
struct ompi_communicator_t * comm ,
0 commit comments