Skip to content

Commit a6d390f

Browse files
authored
Merge pull request #2461 from artpol84/oob/msg_drop
orte/oob/tcp: Fix message dropping in case of concurrent connection.
2 parents a378271 + ada93e0 commit a6d390f

File tree

1 file changed

+130
-52
lines changed

1 file changed

+130
-52
lines changed

orte/mca/oob/tcp/oob_tcp_connection.c

Lines changed: 130 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
1717
* Copyright (c) 2014-2015 Research Organization for Information Science
1818
* and Technology (RIST). All rights reserved.
19+
* Copyright (c) 2016 Mellanox Technologies Ltd. All rights reserved.
1920
* $COPYRIGHT$
2021
*
2122
* Additional copyrights may follow
@@ -78,6 +79,7 @@
7879

7980
static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer);
8081
static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer);
82+
static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name);
8183
static int tcp_peer_send_blocking(int sd, void* data, size_t size);
8284
static bool tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, int sd,
8385
void* data, size_t size);
@@ -373,8 +375,9 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
373375
{
374376
char *msg;
375377
mca_oob_tcp_hdr_t hdr;
378+
uint16_t ack_flag = htons(1);
376379
int rc;
377-
size_t sdsize;
380+
size_t sdsize, offset = 0;
378381
char *cred;
379382
size_t credsize;
380383

@@ -401,21 +404,26 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
401404
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
402405
(unsigned long)credsize);
403406

404-
/* set the number of bytes to be read beyond the header */
405-
hdr.nbytes = strlen(orte_version_string) + 1 + credsize;
407+
/* payload size */
408+
sdsize = sizeof(ack_flag) + strlen(orte_version_string) + 1 + credsize;
409+
hdr.nbytes = sdsize;
406410
MCA_OOB_TCP_HDR_HTON(&hdr);
407411

408412
/* create a space for our message */
409-
sdsize = sizeof(hdr) + strlen(orte_version_string) + 1 + credsize;
413+
sdsize += sizeof(hdr);
410414
if (NULL == (msg = (char*)malloc(sdsize))) {
411415
return ORTE_ERR_OUT_OF_RESOURCE;
412416
}
413417
memset(msg, 0, sdsize);
414418

415419
/* load the message */
416-
memcpy(msg, &hdr, sizeof(hdr));
417-
memcpy(msg+sizeof(hdr), orte_version_string, strlen(orte_version_string));
418-
memcpy(msg+sizeof(hdr)+strlen(orte_version_string)+1, cred, credsize);
420+
memcpy(msg + offset, &hdr, sizeof(hdr));
421+
offset += sizeof(hdr);
422+
memcpy(msg + offset, &ack_flag, sizeof(ack_flag));
423+
offset += sizeof(ack_flag);
424+
memcpy(msg + offset, orte_version_string, strlen(orte_version_string));
425+
offset += strlen(orte_version_string)+1;
426+
memcpy(msg + offset, cred, credsize);
419427
/* clear the memory */
420428
if (NULL != cred) {
421429
free(cred);
@@ -433,6 +441,58 @@ static int tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
433441
return ORTE_SUCCESS;
434442
}
435443

444+
/* send a handshake that includes our process identifier, our
445+
* version string, and a security token to ensure we are talking
446+
* to another OMPI process
447+
*/
448+
static int tcp_peer_send_connect_nack(int sd, orte_process_name_t name)
449+
{
450+
char *msg;
451+
mca_oob_tcp_hdr_t hdr;
452+
uint16_t ack_flag = htons(0);
453+
int rc = ORTE_SUCCESS;
454+
size_t sdsize, offset = 0;
455+
456+
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
457+
"%s SEND CONNECT NACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
458+
459+
/* load the header */
460+
hdr.origin = *ORTE_PROC_MY_NAME;
461+
hdr.dst = name;
462+
hdr.type = MCA_OOB_TCP_IDENT;
463+
hdr.tag = 0;
464+
hdr.seq_num = 0;
465+
memset(hdr.routed, 0, ORTE_MAX_RTD_SIZE+1);
466+
467+
/* payload size */
468+
sdsize = sizeof(ack_flag);
469+
hdr.nbytes = sdsize;
470+
MCA_OOB_TCP_HDR_HTON(&hdr);
471+
472+
/* create a space for our message */
473+
sdsize += sizeof(hdr);
474+
if (NULL == (msg = (char*)malloc(sdsize))) {
475+
return ORTE_ERR_OUT_OF_RESOURCE;
476+
}
477+
memset(msg, 0, sdsize);
478+
479+
/* load the message */
480+
memcpy(msg + offset, &hdr, sizeof(hdr));
481+
offset += sizeof(hdr);
482+
memcpy(msg + offset, &ack_flag, sizeof(ack_flag));
483+
offset += sizeof(ack_flag);
484+
485+
/* send it */
486+
if (ORTE_SUCCESS != tcp_peer_send_blocking(sd, msg, sdsize)) {
487+
/* it's ok if it fails - remote side may already
488+
* identifiet the collision and closed the connection
489+
*/
490+
rc = ORTE_SUCCESS;
491+
}
492+
free(msg);
493+
return rc;
494+
}
495+
436496
/*
437497
* Initialize events to be used by the peer instance for TCP select/poll callbacks.
438498
*/
@@ -636,6 +696,7 @@ static bool retry(mca_oob_tcp_peer_t* peer, int sd, bool fatal)
636696
return false;
637697
} else {
638698
/* The connection will be retried */
699+
tcp_peer_send_connect_nack(sd, peer->name);
639700
CLOSE_THE_SOCKET(sd);
640701
return true;
641702
}
@@ -649,10 +710,12 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
649710
char *version;
650711
int rc;
651712
char *cred;
652-
size_t credsize;
713+
size_t credsize, offset = 0;
653714
mca_oob_tcp_hdr_t hdr;
654715
mca_oob_tcp_peer_t *peer;
655716
uint64_t *ui64;
717+
uint16_t ack_flag;
718+
bool is_new = (NULL == pr);
656719

657720
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
658721
"%s RECV CONNECT ACK FROM %s ON SOCKET %d",
@@ -681,19 +744,6 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
681744
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
682745
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
683746
(NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd);
684-
/* check for a race condition - if I was in the process of
685-
* creating a connection to the peer, or have already established
686-
* such a connection, then we need to reject this connection. We will
687-
* let the higher ranked process retry - if I'm the lower ranked
688-
* process, I'll simply defer until I receive the request
689-
*/
690-
if (NULL != peer &&
691-
(MCA_OOB_TCP_CONNECTED == peer->state ||
692-
MCA_OOB_TCP_CONNECTING == peer->state ||
693-
MCA_OOB_TCP_CONNECT_ACK == peer->state ||
694-
MCA_OOB_TCP_CLOSED == peer->state)) {
695-
retry(peer, sd, false);
696-
}
697747
return ORTE_ERR_UNREACH;
698748
}
699749

@@ -748,23 +798,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
748798
CLOSE_THE_SOCKET(sd);
749799
return ORTE_ERR_OUT_OF_RESOURCE;
750800
}
751-
} else {
752-
/* check for a race condition - if I was in the process of
753-
* creating a connection to the peer, or have already established
754-
* such a connection, then we need to reject this connection. We will
755-
* let the higher ranked process retry - if I'm the lower ranked
756-
* process, I'll simply defer until I receive the request
757-
*/
758-
if (MCA_OOB_TCP_CONNECTED == peer->state ||
759-
MCA_OOB_TCP_CONNECTING == peer->state ||
760-
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
761-
if (retry(peer, sd, false)) {
762-
return ORTE_ERR_UNREACH;
763-
}
764-
}
765801
}
766802
} else {
767-
768803
/* compare the peers name to the expected value */
769804
if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr.origin)) {
770805
opal_output(0, "%s tcp_peer_recv_connect_ack: "
@@ -795,23 +830,66 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
795830
"%s unable to complete recv of connect-ack from %s ON SOCKET %d",
796831
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
797832
ORTE_NAME_PRINT(&peer->name), peer->sd);
798-
/* check for a race condition - if I was in the process of
799-
* creating a connection to the peer, or have already established
800-
* such a connection, then we need to reject this connection. We will
801-
* let the higher ranked process retry - if I'm the lower ranked
802-
* process, I'll simply defer until I receive the request
803-
*/
804-
if (MCA_OOB_TCP_CONNECTED == peer->state ||
805-
MCA_OOB_TCP_CONNECTING == peer->state ||
806-
MCA_OOB_TCP_CONNECT_ACK == peer->state) {
807-
retry(peer, sd, true);
808-
}
809833
free(msg);
810834
return ORTE_ERR_UNREACH;
811835
}
812836

837+
/* Check the type of acknowledgement */
838+
memcpy(&ack_flag, msg + offset, sizeof(ack_flag));
839+
offset += sizeof(ack_flag);
840+
841+
ack_flag = ntohs(ack_flag);
842+
if( !ack_flag ){
843+
if (MCA_OOB_TCP_CONNECT_ACK == peer->state) {
844+
/* We got nack from the remote side which means that
845+
* it will be the initiator of the connection.
846+
*/
847+
848+
/* release the socket */
849+
CLOSE_THE_SOCKET(peer->sd);
850+
peer->sd = -1;
851+
852+
/* unregister active events */
853+
if (peer->recv_ev_active) {
854+
opal_event_del(&peer->recv_event);
855+
peer->recv_ev_active = false;
856+
}
857+
if (peer->send_ev_active) {
858+
opal_event_del(&peer->send_event);
859+
peer->send_ev_active = false;
860+
}
861+
862+
/* change the state so we'll accept the remote
863+
* connection when it'll apeear
864+
*/
865+
peer->state = MCA_OOB_TCP_UNCONNECTED;
866+
} else {
867+
/* FIXME: this shouldn't happen. We need to force next address
868+
* to be tried.
869+
*/
870+
mca_oob_tcp_peer_close(peer);
871+
}
872+
return ORTE_ERR_UNREACH;
873+
}
874+
875+
/* check for a race condition - if I was in the process of
876+
* creating a connection to the peer, or have already established
877+
* such a connection, then we need to reject this connection. We will
878+
* let the higher ranked process retry - if I'm the lower ranked
879+
* process, I'll simply defer until I receive the request
880+
*/
881+
if (is_new &&
882+
( MCA_OOB_TCP_CONNECTED == peer->state ||
883+
MCA_OOB_TCP_CONNECTING == peer->state ||
884+
MCA_OOB_TCP_CONNECT_ACK == peer->state ) ) {
885+
if (retry(peer, sd, false)) {
886+
return ORTE_ERR_UNREACH;
887+
}
888+
}
889+
813890
/* check that this is from a matching version */
814-
version = (char*)(msg);
891+
version = (char*)((void*)msg + offset);
892+
offset += strlen(version) + 1;
815893
if (0 != strcmp(version, orte_version_string)) {
816894
opal_output(0, "%s tcp_peer_recv_connect_ack: "
817895
"received different version from %s: %s instead of %s\n",
@@ -830,8 +908,8 @@ int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* pr,
830908
ORTE_NAME_PRINT(&peer->name));
831909

832910
/* check security token */
833-
cred = (char*)(msg + strlen(version) + 1);
834-
credsize = hdr.nbytes - strlen(version) - 1;
911+
cred = (char*)((void*)msg + offset);
912+
credsize = hdr.nbytes - offset;
835913
if (OPAL_SUCCESS != (rc = opal_sec.authenticate(cred, credsize, &peer->auth_method))) {
836914
char *hostname;
837915
hostname = orte_get_proc_hostname(&peer->name);
@@ -911,8 +989,6 @@ static void tcp_peer_connected(mca_oob_tcp_peer_t* peer)
911989
*/
912990
void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer)
913991
{
914-
mca_oob_tcp_send_t *snd;
915-
916992
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
917993
"%s tcp_peer_close for %s sd %d state %s",
918994
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@@ -964,10 +1040,12 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t *peer)
9641040
* handle these recycled messages. This prevents us from unintentionally
9651041
* attempting to send the message again across the now-failed interface
9661042
*/
1043+
/*
9671044
if (NULL != peer->send_msg) {
9681045
}
9691046
while (NULL != (snd = (mca_oob_tcp_send_t*)opal_list_remove_first(&peer->send_queue))) {
9701047
}
1048+
*/
9711049
}
9721050

9731051
/*

0 commit comments

Comments
 (0)