Skip to content

Commit ada93e0

Browse files
committed
orte/oob/tcp: Fix message dropping in case of concurrent connection.
The problem was observed for direct modex used with recursive doubling algorithm (used for collective ID calculation prior to d52a2d0) that has pairwise nature and counter-connections are highly likely. The following scenario was uncovering the issue: * ranks `x` and `y` want to communicate with each other, `x` < `y`; * rank `x` initiates the connection and sends the ack; * rank `y` starts to `connect()` and gets the ack from `x`; * `y` identifies that it already started connecting and `y` > `x` so it rejects incoming connection. * `x` sees that his connection was rejected in `mca_oob_tcp_peer_recv_connect_ack()` when trying to read the message header using `tcp_peer_recv_blocking()` which calls `mca_oob_tcp_peer_close()` that effectively flushes all the messages in the peer->send_queue. * `y` send the ack to `x` and the connection is established, however all the messages for the peer at `x` are vanished (except the front one in peer->send_msg). This commit introduces a "nack" function that will be used at `y` side to tell `x` that `y` has the priority and `x`'s connection should be closed. This allows to avoid "guessing" on the unexpectedly closed connection. Signed-off-by: Artem Polyakov <[email protected]>
1 parent 7ce3ca2 commit ada93e0

File tree

1 file changed

+130
-52
lines changed

1 file changed

+130
-52
lines changed

orte/mca/oob/tcp/oob_tcp_connection.c

Lines changed: 130 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
1717
* Copyright (c) 2014-2015 Research Organization for Information Science
1818
* and Technology (RIST). All rights reserved.
19+
* Copyright (c) 2016 Mellanox Technologies Ltd. All rights reserved.
1920
* $COPYRIGHT$
2021
*
2122
* Additional copyrights may follow
@@ -78,6 +79,7 @@
7879

7980
static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer);
8081
static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer);
82+
static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name);
8183
static int tcp_peer_send_blocking(int sd, void* data, size_t size);
8284
static bool tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, int sd,
8385
void* data, size_t size);
@@ -373,8 +375,9 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
373375
{
374376
char *msg;
375377
mca_oob_tcp_hdr_t hdr;
378+
uint16_t ack_flag = htons(1);
376379
int rc;
377-
size_t sdsize;
380+
size_t sdsize, offset = 0;
378381
char *cred;
379382
size_t credsize;
380383

@@ -401,21 +404,26 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
401404
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
402405
(unsigned long)credsize);
403406

404-
/* set the number of bytes to be read beyond the header */
405-
hdr.nbytes = strlen(orte_version_string) + 1 + credsize;
407+
/* payload size */
408+
sdsize = sizeof(ack_flag) + strlen(orte_version_string) + 1 + credsize;
409+
hdr.nbytes = sdsize;
406410
MCA_OOB_TCP_HDR_HTON(&hdr);
407411

408412
/* create a space for our message */
409-
sdsize = sizeof(hdr) + strlen(orte_version_string) + 1 + credsize;
413+
sdsize += sizeof(hdr);
410414
if (NULL == (msg = (char*)malloc(sdsize))) {
411415
return ORTE_ERR_OUT_OF_RESOURCE;
412416
}
413417
memset(msg, 0, sdsize);
414418

415419
/* load the message */
416-
memcpy(msg, &hdr, sizeof(hdr));
417-
memcpy(msg+sizeof(hdr), orte_version_string, strlen(orte_version_string));
418-
memcpy(msg+sizeof(hdr)+strlen(orte_version_string)+1, cred, credsize);
420+
memcpy(msg + offset, &hdr, sizeof(hdr));
421+
offset += sizeof(hdr);
422+
memcpy(msg + offset, &ack_flag, sizeof(ack_flag));
423+
offset += sizeof(ack_flag);
424+
memcpy(msg + offset, orte_version_string, strlen(orte_version_string));
425+
offset += strlen(orte_version_string)+1;
426+
memcpy(msg + offset, cred, credsize);
419427
/* clear the memory */
420428
if (NULL != cred) {
421429
free(cred);
@@ -433,6 +441,58 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
433441
return ORTE_SUCCESS;
434442
}
435443

444+
/* send a handshake that includes our process identifier, our
445+
* version string, and a security token to ensure we are talking
446+
* to another OMPI process
447+
*/
448+
static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name)
449+
{
450+
char *msg;
451+
mca_oob_tcp_hdr_t hdr;
452+
uint16_t ack_flag = htons(0);
453+
int rc = ORTE_SUCCESS;
454+
size_t sdsize, offset = 0;
455+
456+
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
457+
"%s SEND CONNECT NACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
458+
459+
/* load the header */
460+
hdr.origin = *ORTE_PROC_MY_NAME;
461+
hdr.dst = name;
462+
hdr.type = MCA_OOB_TCP_IDENT;
463+
hdr.tag = 0;
464+
hdr.seq_num = 0;
465+
memset(hdr.routed, 0, ORTE_MAX_RTD_SIZE+1);
466+
467+
/* payload size */
468+
sdsize = sizeof(ack_flag);
469+
hdr.nbytes = sdsize;
470+
MCA_OOB_TCP_HDR_HTON(&hdr);
471+
472+
/* create a space for our message */
473+
sdsize += sizeof(hdr);
474+
if (NULL == (msg = (char*)malloc(sdsize))) {
475+
return ORTE_ERR_OUT_OF_RESOURCE;
476+
}
477+
memset(msg, 0, sdsize);
478+
479+
/* load the message */
480+
memcpy(msg + offset, &hdr, sizeof(hdr));
481+
offset += sizeof(hdr);
482+
memcpy(msg + offset, &ack_flag, sizeof(ack_flag));
483+
offset += sizeof(ack_flag);
484+
485+
/* send it */
486+
if (ORTE_SUCCESS != tcp_peer_send_blocking(sd, msg, sdsize)) {
487+
/* it's ok if it fails - remote side may already
488+
* identifiet the collision and closed the connection
489+
*/
490+
rc = ORTE_SUCCESS;
491+
}
492+
free(msg);
493+
return rc;
494+
}
495+
436496
/*
437497
* Initialize events to be used by the peer instance for TCP select/poll callbacks.
438498
*/
@@ -636,6 +696,7 @@ static bool retry(mca_oob_tcp_peer_t* peer, int sd, bool fatal)
636696
return false;
637697
} else {
638698
/* The connection will be retried */
699+
tcp_peer_send_connect_nack(sd, peer->name);
639700
CLOSE_THE_SOCKET(sd);
640701
return true;
641702
}
@@ -649,10 +710,12 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
649710
char *version;
650711
int rc;
651712
char *cred;
652-
size_t credsize;
713+
size_t credsize, offset = 0;
653714
mca_oob_tcp_hdr_t hdr;
654715
mca_oob_tcp_peer_t *peer;
655716
uint64_t *ui64;
717+
uint16_t ack_flag;
718+
bool is_new = (NULL == pr);
656719

657720
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
658721
"%s RECV CONNECT ACK FROM %s ON SOCKET %d",
@@ -681,19 +744,6 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
681744
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
682745
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
683746
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd);
684-
/* check for a race condition - if I was in the process of
685-
* creating a connection to the peer, or have already established
686-
* such a connection, then we need to reject this connection. We will
687-
* let the higher ranked process retry - if I'm the lower ranked
688-
* process, I'll simply defer until I receive the request
689-
*/
690-
if (NULL != peer &&
691-
(MCA_OOB_TCP_CONNECTED == peer->state ||
692-
MCA_OOB_TCP_CONNECTING == peer->state ||
693-
MCA_OOB_TCP_CONNECT_ACK == peer->state ||
694-
MCA_OOB_TCP_CLOSED == peer->state)) {
695-
retry(peer, sd, false);
696-
}
697747
return ORTE_ERR_UNREACH;
698748
}
699749

@@ -748,23 +798,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
748798
CLOSE_THE_SOCKET(sd);
749799
return ORTE_ERR_OUT_OF_RESOURCE;
750800
}
751-
} else {
752-
/* check for a race condition - if I was in the process of
753-
* creating a connection to the peer, or have already established
754-
* such a connection, then we need to reject this connection. We will
755-
* let the higher ranked process retry - if I'm the lower ranked
756-
* process, I'll simply defer until I receive the request
757-
*/
758-
if (MCA_OOB_TCP_CONNECTED == peer->state ||
759-
MCA_OOB_TCP_CONNECTING == peer->state ||
760-
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
761-
if (retry(peer, sd, false)) {
762-
return ORTE_ERR_UNREACH;
763-
}
764-
}
765801
}
766802
} else {
767-
768803
/* compare the peers name to the expected value */
769804
if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr.origin)) {
770805
opal_output(0, "%s tcp_peer_recv_connect_ack: "
@@ -795,23 +830,66 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
795830
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
796831
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
797832
ORTE_NAME_PRINT(&peer->name), peer->sd);
798-
/* check for a race condition - if I was in the process of
799-
* creating a connection to the peer, or have already established
800-
* such a connection, then we need to reject this connection. We will
801-
* let the higher ranked process retry - if I'm the lower ranked
802-
* process, I'll simply defer until I receive the request
803-
*/
804-
if (MCA_OOB_TCP_CONNECTED == peer->state ||
805-
MCA_OOB_TCP_CONNECTING == peer->state ||
806-
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
807-
retry(peer, sd, true);
808-
}
809833
free(msg);
810834
return ORTE_ERR_UNREACH;
811835
}
812836

837+
/* Check the type of acknowledgement */
838+
memcpy(&ack_flag, msg + offset, sizeof(ack_flag));
839+
offset += sizeof(ack_flag);
840+
841+
ack_flag = ntohs(ack_flag);
842+
if( !ack_flag ){
843+
if (MCA_OOB_TCP_CONNECT_ACK == peer->state) {
844+
/* We got nack from the remote side which means that
845+
* it will be the initiator of the connection.
846+
*/
847+
848+
/* release the socket */
849+
CLOSE_THE_SOCKET(peer->sd);
850+
peer->sd = -1;
851+
852+
/* unregister active events */
853+
if (peer->recv_ev_active) {
854+
opal_event_del(&peer->recv_event);
855+
peer->recv_ev_active = false;
856+
}
857+
if (peer->send_ev_active) {
858+
opal_event_del(&peer->send_event);
859+
peer->send_ev_active = false;
860+
}
861+
862+
/* change the state so we'll accept the remote
863+
* connection when it'll apeear
864+
*/
865+
peer->state = MCA_OOB_TCP_UNCONNECTED;
866+
} else {
867+
/* FIXME: this shouldn't happen. We need to force next address
868+
* to be tried.
869+
*/
870+
mca_oob_tcp_peer_close(peer);
871+
}
872+
return ORTE_ERR_UNREACH;
873+
}
874+
875+
/* check for a race condition - if I was in the process of
876+
* creating a connection to the peer, or have already established
877+
* such a connection, then we need to reject this connection. We will
878+
* let the higher ranked process retry - if I'm the lower ranked
879+
* process, I'll simply defer until I receive the request
880+
*/
881+
if (is_new &&
882+
( MCA_OOB_TCP_CONNECTED == peer->state ||
883+
MCA_OOB_TCP_CONNECTING == peer->state ||
884+
MCA_OOB_TCP_CONNECT_ACK == peer->state ) ) {
885+
if (retry(peer, sd, false)) {
886+
return ORTE_ERR_UNREACH;
887+
}
888+
}
889+
813890
/* check that this is from a matching version */
814-
version = (char*)(msg);
891+
version = (char*)((void*)msg + offset);
892+
offset += strlen(version) + 1;
815893
if (0 != strcmp(version, orte_version_string)) {
816894
opal_output(0, "%s tcp_peer_recv_connect_ack: "
817895
"received different version from %s: %s instead of %s\n",
@@ -830,8 +908,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
830908
ORTE_NAME_PRINT(&peer->name));
831909

832910
/* check security token */
833-
cred = (char*)(msg + strlen(version) + 1);
834-
credsize = hdr.nbytes - strlen(version) - 1;
911+
cred = (char*)((void*)msg + offset);
912+
credsize = hdr.nbytes - offset;
835913
if (OPAL_SUCCESS != (rc = opal_sec.authenticate(cred, credsize, &peer->auth_method))) {
836914
char *hostname;
837915
hostname = orte_get_proc_hostname(&peer->name);
@@ -911,8 +989,6 @@ static void tcp_peer_connected(mca_oob_tcp_peer_t* peer)
911989
*/
912990
void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer)
913991
{
914-
mca_oob_tcp_send_t *snd;
915-
916992
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
917993
"%s tcp_peer_close for %s sd %d state %s",
918994
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@@ -964,10 +1040,12 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer)
9641040
* handle these recycled messages. This prevents us from unintentionally
9651041
* attempting to send the message again across the now-failed interface
9661042
*/
1043+
/*
9671044
if (NULL != peer->send_msg) {
9681045
}
9691046
while (NULL != (snd = (mca_oob_tcp_send_t*)opal_list_remove_first(&peer->send_queue))) {
9701047
}
1048+
*/
9711049
}
9721050

9731051
/*

0 commit comments

Comments
 (0)