Skip to content

Commit 6db6d31

Browse files
Mamzi Bayatpour  mbayatpour@nvidia.com ()Mamzi Bayatpour  mbayatpour@nvidia.com ()
authored andcommitted
Enhancing the osc finalize for resource utilization design
1 parent 4afa6c7 commit 6db6d31

File tree

7 files changed

+68
-37
lines changed

7 files changed

+68
-37
lines changed

ompi/mca/osc/ucx/osc_ucx.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ typedef struct ompi_osc_ucx_component {
3535
opal_free_list_t requests; /* request free list for the r* communication variants */
3636
bool env_initialized; /* UCX environment is initialized or not */
3737
int num_incomplete_req_ops;
38+
int comm_world_size;
39+
ucp_ep_h *endpoints;
3840
int num_modules;
3941
bool no_locks; /* Default value of the no_locks info key for new windows */
4042
bool acc_single_intrinsic;
@@ -113,6 +115,7 @@ typedef struct ompi_osc_ucx_module {
113115
size_t size;
114116
uint64_t *addrs;
115117
uint64_t *state_addrs;
118+
uint64_t *comm_world_ranks;
116119
int disp_unit; /* if disp_unit >= 0, then everyone has the same
117120
* disp unit size; if disp_unit == -1, then we
118121
* need to look at disp_units */
@@ -159,16 +162,16 @@ typedef struct ompi_osc_ucx_lock {
159162
bool is_nocheck;
160163
} ompi_osc_ucx_lock_t;
161164

162-
#define OSC_UCX_GET_EP(comm_, rank_) (ompi_comm_peer_lookup(comm_, rank_)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_UCX])
165+
#define OSC_UCX_GET_EP(_module, rank_) (mca_osc_ucx_component.endpoints[_module->comm_world_ranks[rank_]])
163166
#define OSC_UCX_GET_DISP(module_, rank_) ((module_->disp_unit < 0) ? module_->disp_units[rank_] : module_->disp_unit)
164167

165168
extern bool mpi_thread_multiple_enabled;
166169

167-
#define OSC_UCX_GET_DEFAULT_EP(_ep_ptr, _comm, _target) \
170+
#define OSC_UCX_GET_DEFAULT_EP(_ep_ptr, _module, _target) \
168171
if (mpi_thread_multiple_enabled) { \
169172
_ep_ptr = NULL; \
170173
} else { \
171-
_ep_ptr = (ucp_ep_h *)&(OSC_UCX_GET_EP(_comm, _target)); \
174+
_ep_ptr = (ucp_ep_h *)&(OSC_UCX_GET_EP(_module, _target)); \
172175
}
173176

174177
extern int outstanding_ops_flush_threshold;

ompi/mca/osc/ucx/osc_ucx_active_target.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ int ompi_osc_ucx_complete(struct ompi_win_t *win) {
184184
for (i = 0; i < size; i++) {
185185
uint64_t remote_addr = module->state_addrs[module->start_grp_ranks[i]] + OSC_UCX_STATE_COMPLETE_COUNT_OFFSET; // write to state.complete_count on remote side
186186

187-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, module->start_grp_ranks[i]);
187+
OSC_UCX_GET_DEFAULT_EP(ep, module, module->start_grp_ranks[i]);
188188

189189
ret = opal_common_ucx_wpmem_post(module->state_mem, UCP_ATOMIC_POST_OP_ADD,
190190
1, module->start_grp_ranks[i], sizeof(uint64_t),
@@ -250,7 +250,7 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int mpi_assert, struct ompi_wi
250250
uint64_t remote_addr = module->state_addrs[ranks_in_win_grp[i]] + OSC_UCX_STATE_POST_INDEX_OFFSET; // write to state.post_index on remote side
251251
uint64_t curr_idx = 0, result = 0;
252252

253-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, ranks_in_win_grp[i]);
253+
OSC_UCX_GET_DEFAULT_EP(ep, module, ranks_in_win_grp[i]);
254254

255255
/* do fop first to get an post index */
256256
ret = opal_common_ucx_wpmem_fetch(module->state_mem, UCP_ATOMIC_FETCH_OP_FADD,

ompi/mca/osc/ucx/osc_ucx_comm.c

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
140140
int target_count, struct ompi_datatype_t *target_dt,
141141
bool is_target_contig, ptrdiff_t target_lb, bool is_get) {
142142
ucp_ep_h *ep;
143-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
143+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
144144
ucx_iovec_t *origin_ucx_iov = NULL, *target_ucx_iov = NULL;
145145
uint32_t origin_ucx_iov_count = 0, target_ucx_iov_count = 0;
146146
uint32_t origin_ucx_iov_idx = 0, target_ucx_iov_idx = 0;
@@ -260,7 +260,7 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
260260
static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module_t *module,
261261
int target, bool lock_required) {
262262
ucp_ep_h *ep;
263-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
263+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
264264
uint64_t remote_state_addr = (module->state_addrs)[target] + OSC_UCX_STATE_DYNAMIC_WIN_CNT_OFFSET;
265265
size_t remote_state_len = sizeof(uint64_t) + sizeof(ompi_osc_dynamic_win_info_t) * OMPI_OSC_UCX_ATTACH_MAX;
266266
char *temp_buf = calloc(remote_state_len, 1);
@@ -322,7 +322,7 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
322322
}
323323

324324
if (mem_rec == NULL) {
325-
OSC_UCX_GET_DEFAULT_EP(ep, module->mem->comm, target);
325+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
326326
ret = opal_common_ucx_tlocal_fetch_spath(module->mem, target, ep);
327327
if (OPAL_SUCCESS != ret) {
328328
goto cleanup;
@@ -431,7 +431,7 @@ static int do_atomic_op_intrinsic(
431431
opal_common_ucx_wpmem_t *mem = module->mem;
432432
ompi_datatype_type_size(dt, &origin_dt_bytes);
433433
ucp_ep_h *ep;
434-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
434+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
435435

436436
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
437437

@@ -495,7 +495,7 @@ int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_data
495495
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
496496
opal_common_ucx_wpmem_t *mem = module->mem;
497497
ucp_ep_h *ep;
498-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
498+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
499499
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
500500
bool is_origin_contig = false, is_target_contig = false;
501501
ptrdiff_t origin_lb, origin_extent, target_lb, target_extent;
@@ -547,7 +547,7 @@ int ompi_osc_ucx_get(void *origin_addr, int origin_count,
547547
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
548548
opal_common_ucx_wpmem_t *mem = module->mem;
549549
ucp_ep_h *ep;
550-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
550+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
551551
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
552552
ptrdiff_t origin_lb, origin_extent, target_lb, target_extent;
553553
bool is_origin_contig = false, is_target_contig = false;
@@ -776,7 +776,7 @@ do_atomic_compare_and_swap(const void *origin_addr, const void *compare_addr,
776776
size_t dt_bytes;
777777
opal_common_ucx_wpmem_t *mem = module->mem;
778778
ucp_ep_h *ep;
779-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
779+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
780780
if (!module->acc_single_intrinsic) {
781781
/* Start atomicity by acquiring acc lock */
782782
ret = ompi_osc_ucx_state_lock(module, target, &lock_acquired, false);
@@ -808,7 +808,7 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
808808
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t *)win->w_osc_module;
809809
opal_common_ucx_wpmem_t *mem = module->mem;
810810
ucp_ep_h *ep;
811-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
811+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
812812
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
813813
size_t dt_bytes;
814814
int ret = OMPI_SUCCESS;
@@ -871,7 +871,7 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
871871
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
872872
opal_common_ucx_wpmem_t *mem = module->mem;
873873
ucp_ep_h *ep;
874-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
874+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
875875
int ret = OMPI_SUCCESS;
876876

877877
ret = check_sync_state(module, target, false);
@@ -1114,7 +1114,7 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
11141114
struct ompi_win_t *win, struct ompi_request_t **request) {
11151115
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
11161116
ucp_ep_h *ep;
1117-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
1117+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
11181118
opal_common_ucx_wpmem_t *mem = module->mem;
11191119
uint64_t remote_addr = (module->state_addrs[target]) + OSC_UCX_STATE_REQ_FLAG_OFFSET;
11201120
ompi_osc_ucx_request_t *ucx_req = NULL;
@@ -1169,7 +1169,7 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
11691169
struct ompi_request_t **request) {
11701170
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
11711171
ucp_ep_h *ep;
1172-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
1172+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
11731173
opal_common_ucx_wpmem_t *mem = module->mem;
11741174
uint64_t remote_addr = (module->state_addrs[target]) + OSC_UCX_STATE_REQ_FLAG_OFFSET;
11751175
ompi_osc_ucx_request_t *ucx_req = NULL;
@@ -1288,7 +1288,7 @@ static inline int ompi_osc_ucx_acc_rputget(void *stage_addr, int stage_count,
12881288
int phase, int acc_type) {
12891289
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
12901290
ucp_ep_h *ep;
1291-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
1291+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
12921292
opal_common_ucx_wpmem_t *mem = module->mem;
12931293
uint64_t remote_addr = (module->state_addrs[target]) + OSC_UCX_STATE_REQ_FLAG_OFFSET;
12941294
ompi_osc_ucx_request_t *ucx_req = NULL;

ompi/mca/osc/ucx/osc_ucx_component.c

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ ompi_osc_ucx_component_t mca_osc_ucx_component = {
7878
.env_initialized = false,
7979
.num_incomplete_req_ops = 0,
8080
.num_modules = 0,
81-
.acc_single_intrinsic = false
81+
.acc_single_intrinsic = false,
82+
.comm_world_size = 0,
83+
.endpoints = NULL
8284
};
8385

8486
ompi_osc_ucx_module_t ompi_osc_ucx_module_template = {
@@ -296,6 +298,17 @@ static int component_init(bool enable_progress_threads, bool enable_mpi_threads)
296298
}
297299

298300
static int component_finalize(void) {
301+
assert(mca_osc_ucx_component.num_incomplete_req_ops == 0);
302+
if (!mpi_thread_multiple_enabled) {
303+
int i;
304+
for (i = 0; i < mca_osc_ucx_component.comm_world_size; i++) {
305+
ucp_ep_h ep = mca_osc_ucx_component.endpoints[i];
306+
if (ep != NULL) {
307+
ucp_ep_destroy(ep);
308+
}
309+
}
310+
free(mca_osc_ucx_component.endpoints);
311+
}
299312
opal_common_ucx_mca_deregister();
300313
if (mca_osc_ucx_component.env_initialized) {
301314
opal_common_ucx_wpool_finalize(mca_osc_ucx_component.wpool);
@@ -451,7 +464,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
451464
opal_common_ucx_mem_type_t mem_type;
452465
char *my_mem_addr;
453466
int my_mem_addr_size;
454-
uint64_t my_info[2] = {0};
467+
uint64_t my_info[3] = {0};
455468
char *recv_buf = NULL;
456469
void *dynamic_base = NULL;
457470
unsigned long total, *rbuf;
@@ -485,7 +498,10 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
485498
OSC_UCX_VERBOSE(1, "opal_common_ucx_wpool_init failed: %d", ret);
486499
goto select_unlock;
487500
}
488-
501+
if (!mpi_thread_multiple_enabled) {
502+
mca_osc_ucx_component.comm_world_size = ompi_proc_world_size();
503+
mca_osc_ucx_component.endpoints = calloc(mca_osc_ucx_component.comm_world_size, sizeof(ucp_ep_h));
504+
}
489505
/* Make sure that all memory updates performed above are globally
490506
* observable before (mca_osc_ucx_component.env_initialized = true)
491507
*/
@@ -762,20 +778,23 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
762778
my_info[0] = (uint64_t)dynamic_base;
763779
}
764780
my_info[1] = (uint64_t)state_base;
781+
my_info[2] = ompi_comm_rank(&ompi_mpi_comm_world.comm);
765782

766-
recv_buf = (char *)calloc(comm_size, 2 * sizeof(uint64_t));
767-
ret = comm->c_coll->coll_allgather((void *)my_info, 2 * sizeof(uint64_t),
768-
MPI_BYTE, recv_buf, 2 * sizeof(uint64_t),
783+
recv_buf = (char *)calloc(comm_size, sizeof(my_info));
784+
ret = comm->c_coll->coll_allgather((void *)my_info, sizeof(my_info),
785+
MPI_BYTE, recv_buf, sizeof(my_info),
769786
MPI_BYTE, comm, comm->c_coll->coll_allgather_module);
770787
if (ret != OMPI_SUCCESS) {
771788
goto error;
772789
}
773790

774791
module->addrs = calloc(comm_size, sizeof(uint64_t));
775792
module->state_addrs = calloc(comm_size, sizeof(uint64_t));
793+
module->comm_world_ranks = calloc(comm_size, sizeof(uint64_t));
776794
for (i = 0; i < comm_size; i++) {
777-
memcpy(&(module->addrs[i]), recv_buf + i * 2 * sizeof(uint64_t), sizeof(uint64_t));
778-
memcpy(&(module->state_addrs[i]), recv_buf + i * 2 * sizeof(uint64_t) + sizeof(uint64_t), sizeof(uint64_t));
795+
memcpy(&(module->addrs[i]), recv_buf + i * 3 * sizeof(uint64_t), sizeof(uint64_t));
796+
memcpy(&(module->state_addrs[i]), recv_buf + i * 3 * sizeof(uint64_t) + sizeof(uint64_t), sizeof(uint64_t));
797+
memcpy(&(module->comm_world_ranks[i]), recv_buf + i * 3 * sizeof(uint64_t) + 2 * sizeof(uint64_t), sizeof(uint64_t));
779798
}
780799
free(recv_buf);
781800

@@ -885,7 +904,7 @@ inline int ompi_osc_ucx_state_lock(
885904
uint64_t result_value = -1;
886905
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET;
887906
ucp_ep_h *ep;
888-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
907+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
889908
int ret = OMPI_SUCCESS;
890909

891910
if (force_lock || ompi_osc_need_acc_lock(module, target)) {
@@ -920,7 +939,7 @@ inline int ompi_osc_ucx_state_unlock(
920939
void *free_ptr) {
921940
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET;
922941
ucp_ep_h *ep;
923-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
942+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
924943
int ret = OMPI_SUCCESS;
925944

926945
if (lock_acquired) {
@@ -958,7 +977,7 @@ inline int ompi_osc_ucx_nonblocking_ops_finalize(ompi_osc_ucx_module_t *module,
958977
target, bool lock_acquired, struct ompi_win_t *win, void *free_ptr) {
959978
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET;
960979
ucp_ep_h *ep;
961-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
980+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
962981
int ret = OMPI_SUCCESS;
963982
ompi_osc_ucx_request_t *ucx_req = NULL;
964983

@@ -1158,6 +1177,7 @@ int ompi_osc_ucx_free(struct ompi_win_t *win) {
11581177

11591178
free(module->addrs);
11601179
free(module->state_addrs);
1180+
free(module->comm_world_ranks);
11611181

11621182
opal_common_ucx_wpmem_free(module->state_mem);
11631183
if (NULL != module->mem) {

ompi/mca/osc/ucx/osc_ucx_passive_target.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ static inline int start_shared(ompi_osc_ucx_module_t *module, int target) {
2222
uint64_t result_value = -1;
2323
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_LOCK_OFFSET;
2424
ucp_ep_h *ep;
25-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
25+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
2626
int ret = OMPI_SUCCESS;
2727

2828
while (true) {
@@ -53,7 +53,7 @@ static inline int start_shared(ompi_osc_ucx_module_t *module, int target) {
5353
static inline int end_shared(ompi_osc_ucx_module_t *module, int target) {
5454
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_LOCK_OFFSET;
5555
ucp_ep_h *ep;
56-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
56+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
5757
return opal_common_ucx_wpmem_post(module->state_mem, UCP_ATOMIC_POST_OP_ADD,
5858
(-1), target, sizeof(uint64_t), remote_addr, ep);
5959
}
@@ -62,7 +62,7 @@ static inline int start_exclusive(ompi_osc_ucx_module_t *module, int target) {
6262
uint64_t result_value = -1;
6363
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_LOCK_OFFSET;
6464
ucp_ep_h *ep;
65-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
65+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
6666
int ret = OMPI_SUCCESS;
6767

6868
for (;;) {
@@ -83,7 +83,7 @@ static inline int start_exclusive(ompi_osc_ucx_module_t *module, int target) {
8383
static inline int end_exclusive(ompi_osc_ucx_module_t *module, int target) {
8484
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_LOCK_OFFSET;
8585
ucp_ep_h *ep;
86-
OSC_UCX_GET_DEFAULT_EP(ep, module->comm, target);
86+
OSC_UCX_GET_DEFAULT_EP(ep, module, target);
8787
return opal_common_ucx_wpmem_post(module->state_mem, UCP_ATOMIC_POST_OP_ADD,
8888
-((int64_t)TARGET_LOCK_EXCLUSIVE), target,
8989
sizeof(uint64_t), remote_addr, ep);

opal/mca/common/ucx/common_ucx_wpool.c

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ static opal_common_ucx_winfo_t *_winfo_create(opal_common_ucx_wpool_t *wpool)
7777
winfo->inflight_ops = NULL;
7878
winfo->global_inflight_ops = 0;
7979
winfo->inflight_req = UCS_OK;
80+
winfo->is_dflt_winfo = false;
8081

8182
return winfo;
8283

@@ -97,11 +98,13 @@ static void _winfo_destructor(opal_common_ucx_winfo_t *winfo)
9798

9899
if (winfo->comm_size != 0) {
99100
size_t i;
100-
for (i = 0; i < winfo->comm_size; i++) {
101-
if (NULL != winfo->endpoints[i]) {
102-
ucp_ep_destroy(winfo->endpoints[i]);
101+
if (mpi_thread_multiple_enabled) {
102+
for (i = 0; i < winfo->comm_size; i++) {
103+
if (NULL != winfo->endpoints[i]) {
104+
ucp_ep_destroy(winfo->endpoints[i]);
105+
}
106+
assert(winfo->inflight_ops[i] == 0);
103107
}
104-
assert(winfo->inflight_ops[i] == 0);
105108
}
106109
free(winfo->endpoints);
107110
free(winfo->inflight_ops);
@@ -110,7 +113,10 @@ static void _winfo_destructor(opal_common_ucx_winfo_t *winfo)
110113
winfo->comm_size = 0;
111114

112115
OBJ_DESTRUCT(&winfo->mutex);
113-
ucp_worker_destroy(winfo->worker);
116+
if (mpi_thread_multiple_enabled || winfo->is_dflt_winfo) {
117+
ucp_worker_destroy(winfo->worker);
118+
}
119+
114120
}
115121

116122
/* -----------------------------------------------------------------------------
@@ -160,6 +166,7 @@ OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool)
160166
rc = OPAL_ERROR;
161167
goto err_worker_create;
162168
}
169+
winfo->is_dflt_winfo = true;
163170
wpool->dflt_winfo = winfo;
164171
OBJ_RETAIN(wpool->dflt_winfo);
165172

opal/mca/common/ucx/common_ucx_wpool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ struct opal_common_ucx_winfo {
129129
short *inflight_ops;
130130
short global_inflight_ops;
131131
ucs_status_ptr_t inflight_req;
132+
bool is_dflt_winfo;
132133
};
133134
OBJ_CLASS_DECLARATION(opal_common_ucx_winfo_t);
134135

0 commit comments

Comments
 (0)