Skip to content

Commit f627608

Browse files
committed
OSHMEM/UCX: implements atomic support
ucx atomic component has a real code now. fixes bug in spml ucx arr_procs removes redundant parameter checks from atomic components.
1 parent bd04192 commit f627608

File tree

8 files changed

+137
-87
lines changed

8 files changed

+137
-87
lines changed

oshmem/mca/atomic/basic/atomic_basic_fadd.c

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,32 +27,25 @@ int mca_atomic_basic_fadd(void *target,
2727
struct oshmem_op_t *op)
2828
{
2929
int rc = OSHMEM_SUCCESS;
30+
long long temp_value = 0;
3031

31-
if (!target || !value) {
32-
rc = OSHMEM_ERROR;
33-
}
34-
35-
if (rc == OSHMEM_SUCCESS) {
36-
long long temp_value = 0;
37-
38-
atomic_basic_lock(pe);
32+
atomic_basic_lock(pe);
3933

40-
rc = MCA_SPML_CALL(get(target, nlong, (void*)&temp_value, pe));
34+
rc = MCA_SPML_CALL(get(target, nlong, (void*)&temp_value, pe));
4135

42-
if (prev)
43-
memcpy(prev, (void*) &temp_value, nlong);
36+
if (prev)
37+
memcpy(prev, (void*) &temp_value, nlong);
4438

45-
op->o_func.c_fn((void*) value,
46-
(void*) &temp_value,
47-
nlong / op->dt_size);
39+
op->o_func.c_fn((void*) value,
40+
(void*) &temp_value,
41+
nlong / op->dt_size);
4842

49-
if (rc == OSHMEM_SUCCESS) {
50-
rc = MCA_SPML_CALL(put(target, nlong, (void*)&temp_value, pe));
51-
shmem_quiet();
52-
}
53-
54-
atomic_basic_unlock(pe);
43+
if (rc == OSHMEM_SUCCESS) {
44+
rc = MCA_SPML_CALL(put(target, nlong, (void*)&temp_value, pe));
45+
shmem_quiet();
5546
}
5647

48+
atomic_basic_unlock(pe);
49+
5750
return rc;
5851
}

oshmem/mca/atomic/mxm/atomic_mxm_cswap.c

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,6 @@ int mca_atomic_mxm_cswap(void *target,
4141
ptl_id = -1;
4242
mxm_err = MXM_OK;
4343

44-
if (!prev || !target || !value) {
45-
ATOMIC_ERROR("[#%d] Whether target, value or prev are not defined",
46-
my_pe);
47-
oshmem_shmem_abort(-1);
48-
return OSHMEM_ERR_BAD_PARAM;
49-
}
50-
if ((pe < 0) || (pe >= oshmem_num_procs())) {
51-
ATOMIC_ERROR("[#%d] PE=%d not valid", my_pe, pe);
52-
oshmem_shmem_abort(-1);
53-
return OSHMEM_ERR_BAD_PARAM;
54-
}
55-
5644
switch (nlong) {
5745
case 1:
5846
nlong_order = 0;

oshmem/mca/atomic/mxm/atomic_mxm_fadd.c

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,6 @@ int mca_atomic_mxm_fadd(void *target,
4343
ptl_id = -1;
4444
mxm_err = MXM_OK;
4545

46-
if (!target || !value) {
47-
ATOMIC_ERROR("[#%d] target or value are not defined", my_pe);
48-
oshmem_shmem_abort(-1);
49-
return OSHMEM_ERR_BAD_PARAM;
50-
}
51-
52-
if ((pe < 0) || (pe >= oshmem_num_procs())) {
53-
ATOMIC_ERROR("[#%d] PE=%d not valid", my_pe, pe);
54-
oshmem_shmem_abort(-1);
55-
return OSHMEM_ERR_BAD_PARAM;
56-
}
57-
5846
switch (nlong) {
5947
case 1:
6048
nlong_order = 0;

oshmem/mca/atomic/ucx/atomic_ucx_component.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ static int ucx_open(void)
9797
*/
9898
if (strcmp(mca_spml_base_selected_component.spmlm_version.mca_component_name, "ucx")) {
9999
ATOMIC_VERBOSE(5,
100-
"Can not use atomic/ucx because spml ikrit component disabled");
100+
"Can not use atomic/ucx because spml ucx component disabled");
101101
return OSHMEM_ERR_NOT_AVAILABLE;
102102
}
103103
mca_spml_self = (mca_spml_ucx_t *)mca_spml.self;

oshmem/mca/atomic/ucx/atomic_ucx_cswap.c

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,8 @@
1313
#include <stdlib.h>
1414

1515
#include "oshmem/constants.h"
16-
#include "oshmem/mca/spml/spml.h"
1716
#include "oshmem/mca/atomic/atomic.h"
1817
#include "oshmem/mca/atomic/base/base.h"
19-
#include "oshmem/mca/memheap/memheap.h"
20-
#include "oshmem/mca/memheap/base/base.h"
2118
#include "oshmem/runtime/runtime.h"
2219

2320
#include "atomic_ucx.h"
@@ -29,6 +26,45 @@ int mca_atomic_ucx_cswap(void *target,
2926
size_t nlong,
3027
int pe)
3128
{
32-
return OSHMEM_SUCCESS;
29+
ucs_status_t status;
30+
spml_ucx_mkey_t *ucx_mkey;
31+
uint64_t rva;
32+
33+
ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva);
34+
if (NULL == cond) {
35+
switch (nlong) {
36+
case 4:
37+
status = ucp_atomic_swap32(mca_spml_self->ucp_peers[pe].ucp_conn,
38+
*(uint32_t *)value, rva, ucx_mkey->rkey, prev);
39+
break;
40+
case 8:
41+
status = ucp_atomic_swap64(mca_spml_self->ucp_peers[pe].ucp_conn,
42+
*(uint64_t *)value, rva, ucx_mkey->rkey, prev);
43+
break;
44+
default:
45+
goto err_size;
46+
}
47+
}
48+
else {
49+
switch (nlong) {
50+
case 4:
51+
status = ucp_atomic_cswap32(mca_spml_self->ucp_peers[pe].ucp_conn,
52+
*(uint32_t *)cond, *(uint32_t *)value, rva, ucx_mkey->rkey, prev);
53+
break;
54+
case 8:
55+
status = ucp_atomic_cswap64(mca_spml_self->ucp_peers[pe].ucp_conn,
56+
*(uint64_t *)cond, *(uint64_t *)value, rva, ucx_mkey->rkey, prev);
57+
break;
58+
default:
59+
goto err_size;
60+
}
61+
}
62+
63+
return ucx_status_to_oshmem(status);
64+
65+
err_size:
66+
ATOMIC_ERROR("[#%d] Type size must be 1/2/4 or 8 bytes.", my_pe);
67+
return OSHMEM_ERROR;
3368
}
3469

70+

oshmem/mca/atomic/ucx/atomic_ucx_fadd.c

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,8 @@
1313
#include <stdlib.h>
1414

1515
#include "oshmem/constants.h"
16-
#include "oshmem/op/op.h"
17-
#include "oshmem/mca/spml/spml.h"
1816
#include "oshmem/mca/atomic/atomic.h"
1917
#include "oshmem/mca/atomic/base/base.h"
20-
#include "oshmem/mca/memheap/memheap.h"
21-
#include "oshmem/mca/memheap/base/base.h"
22-
#include "oshmem/runtime/runtime.h"
2318

2419
#include "atomic_ucx.h"
2520

@@ -30,6 +25,44 @@ int mca_atomic_ucx_fadd(void *target,
3025
int pe,
3126
struct oshmem_op_t *op)
3227
{
33-
/* TODO: actual code */
34-
return OSHMEM_SUCCESS;
28+
ucs_status_t status;
29+
spml_ucx_mkey_t *ucx_mkey;
30+
uint64_t rva;
31+
32+
ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva);
33+
34+
if (NULL == prev) {
35+
switch (nlong) {
36+
case 4:
37+
status = ucp_atomic_add32(mca_spml_self->ucp_peers[pe].ucp_conn,
38+
*(uint32_t *)value, rva, ucx_mkey->rkey);
39+
break;
40+
case 8:
41+
status = ucp_atomic_add64(mca_spml_self->ucp_peers[pe].ucp_conn,
42+
*(uint64_t *)value, rva, ucx_mkey->rkey);
43+
break;
44+
default:
45+
goto err_size;
46+
}
47+
}
48+
else {
49+
switch (nlong) {
50+
case 4:
51+
status = ucp_atomic_fadd32(mca_spml_self->ucp_peers[pe].ucp_conn,
52+
*(uint32_t *)value, rva, ucx_mkey->rkey, prev);
53+
break;
54+
case 8:
55+
status = ucp_atomic_fadd64(mca_spml_self->ucp_peers[pe].ucp_conn,
56+
*(uint64_t *)value, rva, ucx_mkey->rkey, prev);
57+
break;
58+
default:
59+
goto err_size;
60+
}
61+
}
62+
63+
return ucx_status_to_oshmem(status);
64+
65+
err_size:
66+
ATOMIC_ERROR("[#%d] Type size must be 1/2/4 or 8 bytes.", my_pe);
67+
return OSHMEM_ERROR;
3568
}

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 15 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ mca_spml_ucx_t mca_spml_ucx = {
5353
mca_spml_ucx_deregister,
5454
mca_spml_base_oob_get_mkeys,
5555
mca_spml_ucx_put,
56-
NULL, //mca_spml_ucx_put_nb,
56+
NULL, /* todo: mca_spml_ucx_put_nb, */
5757
mca_spml_ucx_get,
5858
mca_spml_ucx_recv,
5959
mca_spml_ucx_send,
@@ -174,6 +174,8 @@ static void dump_address(int pe, char *addr, size_t len)
174174
#endif
175175
}
176176

177+
static char spml_ucx_transport_ids[1] = { 0 };
178+
177179
int mca_spml_ucx_add_procs(oshmem_proc_t** procs, size_t nprocs)
178180
{
179181
size_t i, n;
@@ -208,7 +210,6 @@ int mca_spml_ucx_add_procs(oshmem_proc_t** procs, size_t nprocs)
208210
/* Get the EP connection requests for all the processes from modex */
209211
for (n = 0; n < nprocs; ++n) {
210212
i = (my_rank + n) % nprocs;
211-
//if (i == my_rank) continue;
212213
dump_address(i, (char *)(wk_raddrs + wk_roffs[i]), wk_rsizes[i]);
213214
err = ucp_ep_create(mca_spml_ucx.ucp_worker,
214215
(ucp_address_t *)(wk_raddrs + wk_roffs[i]),
@@ -217,6 +218,8 @@ int mca_spml_ucx_add_procs(oshmem_proc_t** procs, size_t nprocs)
217218
SPML_ERROR("ucp_ep_create failed!!!\n");
218219
goto error2;
219220
}
221+
procs[i]->num_transports = 1;
222+
procs[i]->transport_ids = spml_ucx_transport_ids;
220223
}
221224

222225
ucp_worker_release_address(mca_spml_ucx.ucp_worker, wk_local_addr);
@@ -377,46 +380,27 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
377380
int mca_spml_ucx_get(void *src_addr, size_t size, void *dst_addr, int src)
378381
{
379382
void *rva;
380-
sshmem_mkey_t *r_mkey;
381-
ucs_status_t err;
383+
ucs_status_t status;
382384
spml_ucx_mkey_t *ucx_mkey;
383385

384-
r_mkey = mca_memheap_base_get_cached_mkey(src, src_addr, 0, &rva);
385-
if (OPAL_UNLIKELY(!r_mkey)) {
386-
SPML_ERROR("pe=%d: %p is not address of shared variable",
387-
src, src_addr);
388-
oshmem_shmem_abort(-1);
389-
return OSHMEM_ERROR;
390-
}
391-
392-
ucx_mkey = (spml_ucx_mkey_t *)(r_mkey->spml_context);
393-
err = ucp_get(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size,
394-
(uint64_t)rva, ucx_mkey->rkey);
386+
ucx_mkey = mca_spml_ucx_get_mkey(src, src_addr, &rva);
387+
status = ucp_get(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size,
388+
(uint64_t)rva, ucx_mkey->rkey);
395389

396-
return OPAL_LIKELY(UCS_OK == err) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
390+
return ucx_status_to_oshmem(status);
397391
}
398392

399393
int mca_spml_ucx_put(void* dst_addr, size_t size, void* src_addr, int dst)
400394
{
401395
void *rva;
402-
sshmem_mkey_t *r_mkey;
403-
ucs_status_t err;
396+
ucs_status_t status;
404397
spml_ucx_mkey_t *ucx_mkey;
405398

406-
r_mkey = mca_memheap_base_get_cached_mkey(dst, dst_addr, 0, &rva);
407-
if (OPAL_UNLIKELY(!r_mkey)) {
408-
SPML_ERROR("pe=%d: %p is not address of shared variable",
409-
dst, dst_addr);
410-
oshmem_shmem_abort(-1);
411-
return OSHMEM_ERROR;
412-
}
413-
414-
ucx_mkey = (spml_ucx_mkey_t *)(r_mkey->spml_context);
415-
416-
err = ucp_put(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size,
417-
(uint64_t)rva, ucx_mkey->rkey);
399+
ucx_mkey = mca_spml_ucx_get_mkey(dst, dst_addr, &rva);
400+
status = ucp_put(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size,
401+
(uint64_t)rva, ucx_mkey->rkey);
418402

419-
return OPAL_LIKELY(UCS_OK == err) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
403+
return ucx_status_to_oshmem(status);
420404
}
421405

422406
int mca_spml_ucx_fence(void)

oshmem/mca/spml/ucx/spml_ucx.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@
1616

1717
#include "oshmem_config.h"
1818
#include "oshmem/request/request.h"
19+
#include "oshmem/mca/spml/base/base.h"
1920
#include "oshmem/mca/spml/spml.h"
2021
#include "oshmem/util/oshmem_util.h"
2122
#include "oshmem/mca/spml/base/spml_base_putreq.h"
2223
#include "oshmem/proc/proc.h"
2324
#include "oshmem/mca/spml/base/spml_base_request.h"
2425
#include "oshmem/mca/spml/base/spml_base_getreq.h"
26+
#include "oshmem/runtime/runtime.h"
27+
28+
#include "oshmem/mca/memheap/memheap.h"
29+
#include "oshmem/mca/memheap/base/base.h"
2530

2631
#include "orte/runtime/orte_globals.h"
2732

@@ -98,6 +103,29 @@ extern int mca_spml_ucx_fence(void);
98103
extern int mca_spml_ucx_quiet(void);
99104
extern int spml_ucx_progress(void);
100105

106+
107+
108+
static inline spml_ucx_mkey_t *
109+
mca_spml_ucx_get_mkey(int pe, void *va, void **rva)
110+
{
111+
sshmem_mkey_t *r_mkey;
112+
113+
r_mkey = mca_memheap_base_get_cached_mkey(pe, va, 0, rva);
114+
if (OPAL_UNLIKELY(!r_mkey)) {
115+
SPML_ERROR("pe=%d: %p is not address of symmetric variable",
116+
pe, va);
117+
oshmem_shmem_abort(-1);
118+
return NULL;
119+
}
120+
return (spml_ucx_mkey_t *)(r_mkey->spml_context);
121+
}
122+
123+
static inline ucx_status_to_oshmem(ucs_status_t status)
124+
{
125+
return OPAL_LIKELY(UCS_OK == status) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
126+
}
127+
128+
101129
END_C_DECLS
102130

103131
#endif

0 commit comments

Comments
 (0)