Skip to content

Commit b39015a

Browse files
Luke Robisonvdhyasani17
authored andcommitted
Create Connectivity.java and be able to compile/run java files
Signed-off-by: vdhyasani17 <[email protected]> Implement the code for Connectivity.java Add comments to explain the functionality Signed-off-by: vdhyasani17 <[email protected]> Make communication non-blocking by using iSend and iRecv Fix comment semantics and structure Add -v verbose check Signed-off-by: vdhyasani17 <[email protected]> Change nonblocking iSend/iRecv to sendRecv operation Signed-off-by: vdhyasani17 <[email protected]>
1 parent a099c87 commit b39015a

File tree

3 files changed

+102
-7
lines changed

3 files changed

+102
-7
lines changed

examples/Connectivity.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Test the connectivity between all processes
3+
*/
4+
5+
import mpi.*;
6+
import java.nio.IntBuffer;
7+
8+
class Connectivity {
9+
public static void main(String args[]) throws MPIException {
10+
MPI.Init(args);
11+
12+
/*
13+
* MPI.COMM_WORLD is the communicator provided when MPI is
14+
* initialized. It contains all the processes that are created
15+
* upon program execution.
16+
*/
17+
int myRank = MPI.COMM_WORLD.getRank();
18+
int numProcesses = MPI.COMM_WORLD.getSize();
19+
Status status;
20+
boolean verbose = false;
21+
int peerProcess;
22+
String processorName = MPI.getProcessorName();
23+
24+
for (String arg : args) {
25+
if (arg.equals("-v") || arg.equals("--verbose")) {
26+
verbose = true;
27+
break;
28+
}
29+
}
30+
31+
for (int i = 0; i < numProcesses; i++) {
32+
/* Find current process */
33+
if (myRank == i) {
34+
/* send to and receive from all higher ranked processes */
35+
for (int j = i + 1; j < numProcesses; j++) {
36+
if (verbose)
37+
System.out.printf("Checking connection between rank %d on %s and rank %d\n", i, processorName,
38+
j);
39+
40+
/*
41+
* rank is the Buffer passed into iSend to send to rank j.
42+
* rank is populated with myRank, which is the data to send off
43+
* peer is the Buffer received from rank j from iRecv
44+
*/
45+
IntBuffer rank = MPI.newIntBuffer(1);
46+
IntBuffer peer = MPI.newIntBuffer(1);
47+
rank.put(0, myRank);
48+
49+
/*
50+
* To avoid deadlocks, use non-blocking communication iSend and iRecv
51+
* This will allow the program to progress, in the event that
52+
* two ranks both send to each other at the same time and could
53+
* potentially cause a deadlock. The ranks can send their requests
54+
* without halting the program and immediately
55+
*/
56+
Request sendReq = MPI.COMM_WORLD.iSend(rank, 1, MPI.INT, j, myRank);
57+
Request recvReq = MPI.COMM_WORLD.iRecv(peer, 1, MPI.INT, j, j);
58+
sendReq.waitFor();
59+
recvReq.waitFor();
60+
}
61+
} else if (myRank > i) {
62+
IntBuffer rank = MPI.newIntBuffer(1);
63+
IntBuffer peer = MPI.newIntBuffer(1);
64+
rank.put(0, myRank);
65+
66+
/* receive from and reply to rank i */
67+
MPI.COMM_WORLD.iRecv(peer, 1, MPI.INT, i, i).waitFor();
68+
MPI.COMM_WORLD.iSend(rank, 1, MPI.INT, i, myRank).waitFor();
69+
}
70+
}
71+
72+
/* Wait for all processes to reach barrier before proceeding */
73+
MPI.COMM_WORLD.barrier();
74+
75+
/*
76+
* Once all ranks have reached the barrier,
77+
* have only one process print out the confirmation message.
78+
* In this case, we are having the "master" process print the message.
79+
*/
80+
if (myRank == 0) {
81+
System.out.printf("Connectivity test on %d processes PASSED.\n", numProcesses);
82+
}
83+
84+
MPI.Finalize();
85+
}
86+
}

ompi/mca/coll/han/coll_han_alltoallv.c

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,7 @@ int mca_coll_han_alltoallv_using_smsc(
804804
low_gather_out = malloc(sizeof(*low_gather_out) * low_size);
805805
struct peer_data *peers = malloc(sizeof(*peers) * low_size);
806806
opal_datatype_t *peer_send_types = malloc(sizeof(*peer_send_types) * low_size);
807+
bool have_bufs_and_types = false;
807808

808809
low_gather_in.serialization_buffer = serialization_buf;
809810
low_gather_in.sbuf = (void*)sbuf; // cast to discard the const
@@ -896,6 +897,7 @@ int mca_coll_han_alltoallv_using_smsc(
896897
peers[jrank].sendtype = &peer_send_types[jrank];
897898
}
898899

900+
have_bufs_and_types = true;
899901
send_from_addrs = malloc(sizeof(*send_from_addrs)*low_size);
900902
recv_to_addrs = malloc(sizeof(*recv_to_addrs)*low_size);
901903
send_counts = malloc(sizeof(*send_counts)*low_size);
@@ -964,14 +966,16 @@ int mca_coll_han_alltoallv_using_smsc(
964966
free(recv_types);
965967
}
966968

967-
for (int jlow=0; jlow<low_size; jlow++) {
968-
if (jlow != low_rank) {
969-
OBJ_DESTRUCT(&peer_send_types[jlow]);
970-
}
969+
if (have_bufs_and_types) {
970+
for (int jlow=0; jlow<low_size; jlow++) {
971+
if (jlow != low_rank) {
972+
OBJ_DESTRUCT(&peer_send_types[jlow]);
973+
}
971974

972-
for (int jbuf=0; jbuf<2; jbuf++) {
973-
if (peers[jlow].map_ctx[jbuf]) {
974-
mca_smsc->unmap_peer_region(peers[jlow].map_ctx[jbuf]);
975+
for (int jbuf=0; jbuf<2; jbuf++) {
976+
if (peers[jlow].map_ctx[jbuf]) {
977+
mca_smsc->unmap_peer_region(peers[jlow].map_ctx[jbuf]);
978+
}
975979
}
976980
}
977981
}

ompi/mca/coll/tuned/coll_tuned_dynamic_file.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,11 @@ static int ompi_coll_tuned_read_rules_config_file_classic (char *fname, ompi_col
494494
}
495495
opal_output_verbose(25, ompi_coll_tuned_stream,
496496
"Read communicator count %ld for dynamic rule for collective ID %ld\n", NCOMSIZES, COLID);
497+
if( NCOMSIZES > INT_MAX) {
498+
opal_output_verbose(25, ompi_coll_tuned_stream,
499+
"Refusing to proceed: suspiciously large number found for the number of communicator-based rules: %ld\n", NCOMSIZES);
500+
goto on_file_error;
501+
}
497502
alg_p->n_com_sizes = NCOMSIZES;
498503
alg_p->com_rules = ompi_coll_tuned_mk_com_rules (NCOMSIZES, COLID);
499504
if (NULL == alg_p->com_rules) {

0 commit comments

Comments
 (0)