31
31
#include "mpi.h"
32
32
#include "ompi/constants.h"
33
33
#include "ompi/datatype/ompi_datatype.h"
34
+ #include "opal/datatype/opal_convertor_internal.h"
34
35
#include "ompi/mca/coll/coll.h"
35
36
#include "ompi/mca/coll/base/coll_tags.h"
36
37
#include "ompi/mca/pml/pml.h"
@@ -42,12 +43,11 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con
42
43
struct ompi_communicator_t * comm ,
43
44
mca_coll_base_module_t * module )
44
45
{
45
- int i , j , size , rank , err = MPI_SUCCESS , max_size ;
46
+ int i , j , size , rank , err = MPI_SUCCESS ;
46
47
ompi_request_t * req ;
47
48
char * save_buffer = NULL ;
48
- ptrdiff_t ext , gap = 0 ;
49
-
50
- /* Initialize. */
49
+ size_t max_size = 0 , packed_size ;
50
+ opal_convertor_t convertor ;
51
51
52
52
size = ompi_comm_size (comm );
53
53
rank = ompi_comm_rank (comm );
@@ -57,11 +57,14 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con
57
57
return MPI_SUCCESS ;
58
58
}
59
59
60
- /* Find the largest receive amount */
60
+ /* Find the largest amount of packed send/recv data */
61
61
for (i = 0 , max_size = 0 ; i < size ; ++ i ) {
62
- ext = opal_datatype_span ( & rdtypes [ i ] -> super , rcounts [ i ], & gap );
62
+ ompi_proc_t * ompi_proc = ompi_comm_peer_lookup ( comm , i );
63
63
64
- max_size = ext > max_size ? ext : max_size ;
64
+ packed_size = opal_datatype_compute_remote_size (& rdtypes [i ]-> super ,
65
+ ompi_proc -> super .proc_convertor -> master -> remote_sizes );
66
+ packed_size *= rcounts [i ];
67
+ max_size = packed_size > max_size ? packed_size : max_size ;
65
68
}
66
69
67
70
/* Allocate a temporary buffer */
@@ -77,45 +80,45 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con
77
80
msg_size_i *= rcounts [i ];
78
81
for (j = i + 1 ; j < size ; ++ j ) {
79
82
size_t msg_size_j ;
83
+ struct iovec iov = {.iov_base = save_buffer , .iov_len = max_size };
84
+ uint32_t iov_count = 1 ;
80
85
ompi_datatype_type_size (rdtypes [j ], & msg_size_j );
81
86
msg_size_j *= rcounts [j ];
82
87
83
88
/* Initiate all send/recv to/from others. */
84
89
if (i == rank && msg_size_j != 0 ) {
85
- char * tmp_buffer ;
86
- /* Shift the temporary buffer according to the current datatype */
87
- (void )opal_datatype_span (& rdtypes [j ]-> super , rcounts [j ], & gap );
88
- tmp_buffer = save_buffer - gap ;
89
- /* Copy the data into the temporary buffer */
90
- err = ompi_datatype_copy_content_same_ddt (rdtypes [j ], rcounts [j ],
91
- tmp_buffer , (char * ) rbuf + rdisps [j ]);
92
- if (MPI_SUCCESS != err ) { goto error_hndl ; }
90
+ ompi_proc_t * ompi_proc = ompi_comm_peer_lookup (comm , j );
91
+ opal_convertor_clone (& convertor , ompi_proc -> super .proc_convertor , 0 );
92
+ opal_convertor_prepare_for_send (& convertor , & rdtypes [j ]-> super , rcounts [j ],
93
+ (char * ) rbuf + rdisps [j ]);
94
+ packed_size = max_size ;
95
+ err = opal_convertor_pack (& convertor , & iov , & iov_count , & packed_size );
96
+ if (1 != err ) { goto error_hndl ; }
93
97
94
98
/* Exchange data with the peer */
95
99
err = MCA_PML_CALL (irecv ((char * ) rbuf + rdisps [j ], rcounts [j ], rdtypes [j ],
96
100
j , MCA_COLL_BASE_TAG_ALLTOALLW , comm , & req ));
97
101
if (MPI_SUCCESS != err ) { goto error_hndl ; }
98
102
99
- err = MCA_PML_CALL (send ((void * ) tmp_buffer , rcounts [ j ], rdtypes [ j ] ,
103
+ err = MCA_PML_CALL (send ((void * ) save_buffer , packed_size , MPI_PACKED ,
100
104
j , MCA_COLL_BASE_TAG_ALLTOALLW , MCA_PML_BASE_SEND_STANDARD ,
101
105
comm ));
102
106
if (MPI_SUCCESS != err ) { goto error_hndl ; }
103
107
} else if (j == rank && msg_size_i != 0 ) {
104
- char * tmp_buffer ;
105
- /* Shift the temporary buffer according to the current datatype */
106
- (void )opal_datatype_span (& rdtypes [i ]-> super , rcounts [i ], & gap );
107
- tmp_buffer = save_buffer - gap ;
108
- /* Copy the data into the temporary buffer */
109
- err = ompi_datatype_copy_content_same_ddt (rdtypes [i ], rcounts [i ],
110
- tmp_buffer , (char * ) rbuf + rdisps [i ]);
111
- if (MPI_SUCCESS != err ) { goto error_hndl ; }
108
+ ompi_proc_t * ompi_proc = ompi_comm_peer_lookup (comm , i );
109
+ opal_convertor_clone (& convertor , ompi_proc -> super .proc_convertor , 0 );
110
+ opal_convertor_prepare_for_send (& convertor , & rdtypes [i ]-> super , rcounts [i ],
111
+ (char * ) rbuf + rdisps [i ]);
112
+ packed_size = max_size ;
113
+ err = opal_convertor_pack (& convertor , & iov , & iov_count , & packed_size );
114
+ if (1 != err ) { goto error_hndl ; }
112
115
113
116
/* Exchange data with the peer */
114
117
err = MCA_PML_CALL (irecv ((char * ) rbuf + rdisps [i ], rcounts [i ], rdtypes [i ],
115
118
i , MCA_COLL_BASE_TAG_ALLTOALLW , comm , & req ));
116
119
if (MPI_SUCCESS != err ) { goto error_hndl ; }
117
120
118
- err = MCA_PML_CALL (send ((void * ) tmp_buffer , rcounts [ i ], rdtypes [ i ] ,
121
+ err = MCA_PML_CALL (send ((void * ) save_buffer , packed_size , MPI_PACKED ,
119
122
i , MCA_COLL_BASE_TAG_ALLTOALLW , MCA_PML_BASE_SEND_STANDARD ,
120
123
comm ));
121
124
if (MPI_SUCCESS != err ) { goto error_hndl ; }
0 commit comments