Skip to content
This repository was archived by the owner on Sep 30, 2022. It is now read-only.

Commit 9db54b2

Browse files
authored
Merge pull request #1365 from hjelmn/osc_pt2pt_v2.x
osc/pt2pt threading fixes
2 parents 572957c + 43c7f8f commit 9db54b2

7 files changed

+43
-36
lines changed

ompi/mca/osc/pt2pt/osc_pt2pt.h

+13-8
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ struct ompi_osc_pt2pt_peer_t {
121121
int32_t passive_incoming_frag_count;
122122

123123
/** peer flags */
124-
int32_t flags;
124+
volatile int32_t flags;
125125
};
126126
typedef struct ompi_osc_pt2pt_peer_t ompi_osc_pt2pt_peer_t;
127127

@@ -144,11 +144,15 @@ static inline bool ompi_osc_pt2pt_peer_eager_active (ompi_osc_pt2pt_peer_t *peer
144144

145145
static inline void ompi_osc_pt2pt_peer_set_flag (ompi_osc_pt2pt_peer_t *peer, int32_t flag, bool value)
146146
{
147-
if (value) {
148-
peer->flags |= flag;
149-
} else {
150-
peer->flags &= ~flag;
151-
}
147+
int32_t peer_flags, new_flags;
148+
do {
149+
peer_flags = peer->flags;
150+
if (value) {
151+
new_flags = peer_flags | flag;
152+
} else {
153+
new_flags = peer_flags & ~flag;
154+
}
155+
} while (!OPAL_ATOMIC_CMPSET_32 (&peer->flags, peer_flags, new_flags));
152156
}
153157

154158
static inline void ompi_osc_pt2pt_peer_set_locked (ompi_osc_pt2pt_peer_t *peer, bool value)
@@ -937,13 +941,14 @@ static inline bool ompi_osc_pt2pt_access_epoch_active (ompi_osc_pt2pt_module_t *
937941
static inline bool ompi_osc_pt2pt_peer_sends_active (ompi_osc_pt2pt_module_t *module, int rank)
938942
{
939943
ompi_osc_pt2pt_sync_t *sync;
944+
ompi_osc_pt2pt_peer_t *peer;
940945

941-
sync = ompi_osc_pt2pt_module_sync_lookup (module, rank, NULL);
946+
sync = ompi_osc_pt2pt_module_sync_lookup (module, rank, &peer);
942947
if (!sync) {
943948
return false;
944949
}
945950

946-
return sync->eager_send_active;
951+
return sync->eager_send_active || ompi_osc_pt2pt_peer_eager_active (peer);
947952
}
948953

949954
END_C_DECLS

ompi/mca/osc/pt2pt/osc_pt2pt_comm.c

+8-8
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ static inline int ompi_osc_pt2pt_put_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co
116116
int ret;
117117

118118
/* if we are in active target mode wait until all post messages arrive */
119-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
119+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
120120

121121
ret = ompi_datatype_sndrcv ((void *)source, source_count, source_datatype,
122122
target, target_count, target_datatype);
@@ -140,7 +140,7 @@ static inline int ompi_osc_pt2pt_get_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, vo
140140
int ret;
141141

142142
/* if we are in active target mode wait until all post messages arrive */
143-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
143+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
144144

145145
ret = ompi_datatype_sndrcv (source, source_count, source_datatype,
146146
target, target_count, target_datatype);
@@ -162,7 +162,7 @@ static inline int ompi_osc_pt2pt_cas_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co
162162
((unsigned long) target_disp * module->disp_unit);
163163

164164
/* if we are in active target mode wait until all post messages arrive */
165-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
165+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
166166

167167
ompi_osc_pt2pt_accumulate_lock (module);
168168

@@ -186,7 +186,7 @@ static inline int ompi_osc_pt2pt_acc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co
186186
int ret;
187187

188188
/* if we are in active target mode wait until all post messages arrive */
189-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
189+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
190190

191191
ompi_osc_pt2pt_accumulate_lock (module);
192192

@@ -336,7 +336,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
336336

337337
if (is_long_msg) {
338338
/* wait for eager sends to be active before starting a long put */
339-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
339+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
340340
}
341341

342342
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
@@ -495,7 +495,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
495495

496496
if (is_long_msg) {
497497
/* wait for synchronization before posting a long message */
498-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
498+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
499499
}
500500

501501
header = (ompi_osc_pt2pt_header_acc_t*) ptr;
@@ -802,7 +802,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
802802

803803
if (!release_req) {
804804
/* wait for epoch to begin before starting rget operation */
805-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
805+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
806806
}
807807

808808
header = (ompi_osc_pt2pt_header_get_t*) ptr;
@@ -968,7 +968,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
968968

969969
if (!release_req) {
970970
/* wait for epoch to begin before starting operation */
971-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
971+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
972972
}
973973

974974
/* optimize the self case. TODO: optimize the local case */

ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c

+3-6
Original file line numberDiff line numberDiff line change
@@ -1546,12 +1546,6 @@ static inline int process_frag (ompi_osc_pt2pt_module_t *module,
15461546
ret = process_acc_long (module, frag->source, &header->acc);
15471547
break;
15481548

1549-
case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_REQ:
1550-
ret = ompi_osc_pt2pt_process_lock(module, frag->source, &header->lock);
1551-
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
1552-
ret = sizeof (header->lock);
1553-
}
1554-
break;
15551549
case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ:
15561550
ret = process_unlock(module, frag->source, &header->unlock);
15571551
break;
@@ -1655,6 +1649,9 @@ int ompi_osc_pt2pt_process_receive (ompi_osc_pt2pt_receive_t *recv)
16551649
case OMPI_OSC_PT2PT_HDR_TYPE_POST:
16561650
osc_pt2pt_incoming_post (module, source);
16571651
break;
1652+
case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_REQ:
1653+
ompi_osc_pt2pt_process_lock(module, source, (ompi_osc_pt2pt_header_lock_t *) base_header);
1654+
break;
16581655
case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_ACK:
16591656
ompi_osc_pt2pt_process_lock_ack(module, (ompi_osc_pt2pt_header_lock_ack_t *) base_header);
16601657
break;

ompi/mca/osc/pt2pt/osc_pt2pt_frag.c

-1
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,6 @@ int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int targe
151151
return ret;
152152
}
153153

154-
155154
int ompi_osc_pt2pt_frag_flush_all (ompi_osc_pt2pt_module_t *module)
156155
{
157156
int ret = OMPI_SUCCESS;

ompi/mca/osc/pt2pt/osc_pt2pt_frag.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ struct ompi_osc_pt2pt_frag_t {
4141
typedef struct ompi_osc_pt2pt_frag_t ompi_osc_pt2pt_frag_t;
4242
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_frag_t);
4343

44-
extern int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer);
45-
extern int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target);
46-
extern int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module);
44+
int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer);
45+
int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target);
46+
int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module);
4747

4848
static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module,
4949
ompi_osc_pt2pt_frag_t* buffer)

ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c

+13-7
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,12 @@ int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, omp
122122

123123
int ret;
124124

125+
OPAL_THREAD_LOCK(&peer->lock);
126+
if (ompi_osc_pt2pt_peer_locked (peer)) {
127+
OPAL_THREAD_UNLOCK(&peer->lock);
128+
return OMPI_SUCCESS;
129+
}
130+
125131
(void) OPAL_THREAD_ADD32(&lock->sync_expected, 1);
126132

127133
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
@@ -137,17 +143,15 @@ int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, omp
137143
lock_req.lock_ptr = (uint64_t) (uintptr_t) lock;
138144
OSC_PT2PT_HTON(&lock_req, module, target);
139145

140-
ret = ompi_osc_pt2pt_control_send (module, target, &lock_req, sizeof (lock_req));
146+
ret = ompi_osc_pt2pt_control_send_unbuffered (module, target, &lock_req, sizeof (lock_req));
141147
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
142-
return ret;
143-
}
144-
145-
/* make sure the request gets sent, so we can start eager sending... */
146-
ret = ompi_osc_pt2pt_frag_flush_target (module, target);
147-
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
148+
OPAL_THREAD_ADD32(&lock->sync_expected, -1);
149+
} else {
148150
ompi_osc_pt2pt_peer_set_locked (peer, true);
149151
}
150152

153+
OPAL_THREAD_UNLOCK(&peer->lock);
154+
151155
return ret;
152156
}
153157

@@ -316,6 +320,8 @@ static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert,
316320
if (OPAL_UNLIKELY(NULL == lock)) {
317321
return OMPI_ERR_OUT_OF_RESOURCE;
318322
}
323+
324+
lock->peer_list.peer = ompi_osc_pt2pt_peer_lookup (module, target);
319325
} else {
320326
lock = &module->all_sync;
321327
}

ompi/mca/osc/pt2pt/osc_pt2pt_sync.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,10 @@ struct ompi_osc_pt2pt_sync_t {
7474
int num_peers;
7575

7676
/** number of synchronization messages expected */
77-
int32_t sync_expected;
77+
volatile int32_t sync_expected;
7878

7979
/** eager sends are active to all peers in this access epoch */
80-
bool eager_send_active;
80+
volatile bool eager_send_active;
8181

8282
/** communication has started on this epoch */
8383
bool epoch_active;
@@ -175,7 +175,7 @@ static inline void ompi_osc_pt2pt_sync_expected (ompi_osc_pt2pt_sync_t *sync)
175175
static inline void ompi_osc_pt2pt_sync_reset (ompi_osc_pt2pt_sync_t *sync)
176176
{
177177
sync->type = OMPI_OSC_PT2PT_SYNC_TYPE_NONE;
178-
sync->eager_send_active = 0;
178+
sync->eager_send_active = false;
179179
sync->epoch_active = 0;
180180
sync->peer_list.peers = NULL;
181181
sync->sync.pscw.group = NULL;

0 commit comments

Comments
 (0)