Skip to content

Implement Connectivity.java example using Java bindings #13201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions examples/Connectivity.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Test the connectivity between all processes
*/

import mpi.*;
import java.nio.IntBuffer;

class Connectivity {
public static void main(String args[]) throws MPIException {
MPI.Init(args);

/*
* MPI.COMM_WORLD is the communicator provided when MPI is
* initialized. It contains all the processes that are created
* upon program execution.
*/
int myRank = MPI.COMM_WORLD.getRank();
int numProcesses = MPI.COMM_WORLD.getSize();
Status status;
boolean verbose = false;
int peerProcess;
String processorName = MPI.getProcessorName();

for (String arg : args) {
if (arg.equals("-v") || arg.equals("--verbose")) {
verbose = true;
break;
}
}

for (int i = 0; i < numProcesses; i++) {
/* Find current process */
if (myRank == i) {
/* send to and receive from all higher ranked processes */
for (int j = i + 1; j < numProcesses; j++) {
if (verbose)
System.out.printf("Checking connection between rank %d on %s and rank %d\n", i, processorName,
j);

/*
* rank is the Buffer passed into iSend to send to rank j.
* rank is populated with myRank, which is the data to send off
* peer is the Buffer received from rank j from iRecv
*/
IntBuffer rank = MPI.newIntBuffer(1);
IntBuffer peer = MPI.newIntBuffer(1);
rank.put(0, myRank);

/*
* To avoid deadlocks, use non-blocking communication iSend and iRecv
* This will allow the program to progress, in the event that
* two ranks both send to each other at the same time and could
* potentially cause a deadlock. The ranks can send their requests
* without halting the program and immediately
*/
Request sendReq = MPI.COMM_WORLD.iSend(rank, 1, MPI.INT, j, myRank);
Request recvReq = MPI.COMM_WORLD.iRecv(peer, 1, MPI.INT, j, j);
sendReq.waitFor();
recvReq.waitFor();
}
} else if (myRank > i) {
IntBuffer rank = MPI.newIntBuffer(1);
IntBuffer peer = MPI.newIntBuffer(1);
rank.put(0, myRank);

/* receive from and reply to rank i */
MPI.COMM_WORLD.iRecv(peer, 1, MPI.INT, i, i).waitFor();
MPI.COMM_WORLD.iSend(rank, 1, MPI.INT, i, myRank).waitFor();
}
}

/* Wait for all processes to reach barrier before proceeding */
MPI.COMM_WORLD.barrier();

/*
* Once all ranks have reached the barrier,
* have only one process print out the confirmation message.
* In this case, we are having the "master" process print the message.
*/
if (myRank == 0) {
System.out.printf("Connectivity test on %d processes PASSED.\n", numProcesses);
}

MPI.Finalize();
}
}
18 changes: 11 additions & 7 deletions ompi/mca/coll/han/coll_han_alltoallv.c
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,7 @@ int mca_coll_han_alltoallv_using_smsc(
low_gather_out = malloc(sizeof(*low_gather_out) * low_size);
struct peer_data *peers = malloc(sizeof(*peers) * low_size);
opal_datatype_t *peer_send_types = malloc(sizeof(*peer_send_types) * low_size);
bool have_bufs_and_types = false;

low_gather_in.serialization_buffer = serialization_buf;
low_gather_in.sbuf = (void*)sbuf; // cast to discard the const
Expand Down Expand Up @@ -896,6 +897,7 @@ int mca_coll_han_alltoallv_using_smsc(
peers[jrank].sendtype = &peer_send_types[jrank];
}

have_bufs_and_types = true;
send_from_addrs = malloc(sizeof(*send_from_addrs)*low_size);
recv_to_addrs = malloc(sizeof(*recv_to_addrs)*low_size);
send_counts = malloc(sizeof(*send_counts)*low_size);
Expand Down Expand Up @@ -964,14 +966,16 @@ int mca_coll_han_alltoallv_using_smsc(
free(recv_types);
}

for (int jlow=0; jlow<low_size; jlow++) {
if (jlow != low_rank) {
OBJ_DESTRUCT(&peer_send_types[jlow]);
}
if (have_bufs_and_types) {
for (int jlow=0; jlow<low_size; jlow++) {
if (jlow != low_rank) {
OBJ_DESTRUCT(&peer_send_types[jlow]);
}

for (int jbuf=0; jbuf<2; jbuf++) {
if (peers[jlow].map_ctx[jbuf]) {
mca_smsc->unmap_peer_region(peers[jlow].map_ctx[jbuf]);
for (int jbuf=0; jbuf<2; jbuf++) {
if (peers[jlow].map_ctx[jbuf]) {
mca_smsc->unmap_peer_region(peers[jlow].map_ctx[jbuf]);
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions ompi/mca/coll/tuned/coll_tuned_dynamic_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,11 @@ static int ompi_coll_tuned_read_rules_config_file_classic (char *fname, ompi_col
}
opal_output_verbose(25, ompi_coll_tuned_stream,
"Read communicator count %ld for dynamic rule for collective ID %ld\n", NCOMSIZES, COLID);
if( NCOMSIZES > INT_MAX) {
opal_output_verbose(25, ompi_coll_tuned_stream,
"Refusing to proceed: suspiciously large number found for the number of communicator-based rules: %ld\n", NCOMSIZES);
goto on_file_error;
}
alg_p->n_com_sizes = NCOMSIZES;
alg_p->com_rules = ompi_coll_tuned_mk_com_rules (NCOMSIZES, COLID);
if (NULL == alg_p->com_rules) {
Expand Down