From 7f003fe57f5de59b953524fa73daf9ab7a065da6 Mon Sep 17 00:00:00 2001 From: Taufique Hussain Date: Thu, 7 Aug 2025 21:15:52 -0700 Subject: [PATCH 1/2] Comm-comp overlapped version of SUMMA enhanced - Used OpenMP tasking to drive two parallel progress in each SUMMA stage - one to drive the asynchronous broadcast with repeatative polling, second to drive the local spgemm on previously received data - Outdated function call to read triples files fixed in MultTiming --- ReleaseTests/MultTiming.cpp | 29 ++++++- include/CombBLAS/ParFriends.h | 157 +++++++++++++++++++++++----------- 2 files changed, 132 insertions(+), 54 deletions(-) diff --git a/ReleaseTests/MultTiming.cpp b/ReleaseTests/MultTiming.cpp index 047159f4..00219452 100644 --- a/ReleaseTests/MultTiming.cpp +++ b/ReleaseTests/MultTiming.cpp @@ -48,12 +48,14 @@ int main(int argc, char* argv[]) typedef PlusTimesSRing PTDOUBLEDOUBLE; PSpMat::MPI_DCCols A, B; // construct objects - A.ReadDistribute(Aname, 0); + //A.ReadDistribute(Aname, 0); + A.ReadGeneralizedTuples(Aname, maximum()); A.PrintInfo(); - B.ReadDistribute(Bname, 0); + //B.ReadDistribute(Bname, 0); + B.ReadGeneralizedTuples(Bname, maximum()); B.PrintInfo(); SpParHelper::Print("Data read\n"); - + { // force the calling of C's destructor PSpMat::MPI_DCCols C = Mult_AnXBn_DoubleBuff::DCCols >(A, B); int64_t cnnz = C.getnnz(); @@ -100,6 +102,27 @@ int main(int argc, char* argv[]) printf("%.6lf seconds elapsed per iteration\n", (t2-t1)/(double)ITERATIONS); } + {// force the calling of C's destructor + PSpMat::MPI_DCCols C = Mult_AnXBn_Overlap::DCCols >(A, B); + C.PrintInfo(); + } + SpParHelper::Print("Warmed up for Overlap\n"); + MPI_Barrier(MPI_COMM_WORLD); + MPI_Pcontrol(1,"SpGEMM_Overlap"); + t1 = MPI_Wtime(); // initilize (wall-clock) timer + for(int i=0; i::MPI_DCCols C = Mult_AnXBn_Overlap::DCCols >(A, B); + } + MPI_Barrier(MPI_COMM_WORLD); + MPI_Pcontrol(-1,"SpGEMM_Overlap"); + t2 = MPI_Wtime(); + if(myrank == 0) + { + cout<<"Comm-Comp overlapped multiplications finished"<::DCCols >(A, B); diff --git a/include/CombBLAS/ParFriends.h b/include/CombBLAS/ParFriends.h index 4eb5d973..0d58ecd0 100644 --- a/include/CombBLAS/ParFriends.h +++ b/include/CombBLAS/ParFriends.h @@ -1557,7 +1557,7 @@ SpParMat Mult_AnXBn_Synch /* * Experimental SUMMA implementation with communication and computation overlap. - * Not stable. + * Written by: Taufique * */ template SpParMat Mult_AnXBn_Overlap @@ -1577,8 +1577,6 @@ SpParMat Mult_AnXBn_Overlap IU C_m = A.spSeq->getnrow(); IU C_n = B.spSeq->getncol(); - //const_cast< UDERB* >(B.spSeq)->Transpose(); // do not transpose for colum-by-column multiplication - LIA ** ARecvSizes = SpHelper::allocate2D(UDERA::esscount, stages); LIB ** BRecvSizes = SpHelper::allocate2D(UDERB::esscount, stages); @@ -1591,10 +1589,12 @@ SpParMat Mult_AnXBn_Overlap Arr Aarrinfo = A.seqptr()->GetArrays(); Arr Barrinfo = B.seqptr()->GetArrays(); + std::vector< std::vector > ABCastIndarrayReq; std::vector< std::vector > ABCastNumarrayReq; std::vector< std::vector > BBCastIndarrayReq; std::vector< std::vector > BBCastNumarrayReq; + for(int i = 0; i < stages; i++){ ABCastIndarrayReq.push_back( std::vector(Aarrinfo.indarrs.size(), MPI_REQUEST_NULL) ); ABCastNumarrayReq.push_back( std::vector(Aarrinfo.numarrs.size(), MPI_REQUEST_NULL) ); @@ -1607,58 +1607,116 @@ SpParMat Mult_AnXBn_Overlap std::vector< SpTuples *> tomerge; - for(int i = 0; i < stages; ++i){ - std::vector ess; - if(i == Aself) ARecv[i] = A.spSeq; // shallow-copy - else{ - ess.resize(UDERA::esscount); - for(int j=0; j< UDERA::esscount; ++j) ess[j] = ARecvSizes[j][i]; // essentials of the ith matrix in this row - ARecv[i] = new UDERA(); // first, create the object - } - SpParHelper::IBCastMatrix(GridC->GetRowWorld(), *(ARecv[i]), ess, i, ABCastIndarrayReq[i], ABCastNumarrayReq[i]); // then, receive its elements + #pragma omp parallel + { + int T = omp_get_num_threads(); - ess.clear(); - - if(i == Bself) BRecv[i] = B.spSeq; // shallow-copy - else{ - ess.resize(UDERB::esscount); - for(int j=0; j< UDERB::esscount; ++j) ess[j] = BRecvSizes[j][i]; - BRecv[i] = new UDERB(); - } - SpParHelper::IBCastMatrix(GridC->GetColWorld(), *(BRecv[i]), ess, i, BBCastIndarrayReq[i], BBCastNumarrayReq[i]); // then, receive its elements - - if(i > 0){ - MPI_Waitall(ABCastIndarrayReq[i-1].size(), ABCastIndarrayReq[i-1].data(), MPI_STATUSES_IGNORE); - MPI_Waitall(ABCastNumarrayReq[i-1].size(), ABCastNumarrayReq[i-1].data(), MPI_STATUSES_IGNORE); - MPI_Waitall(BBCastIndarrayReq[i-1].size(), BBCastIndarrayReq[i-1].data(), MPI_STATUSES_IGNORE); - MPI_Waitall(BBCastNumarrayReq[i-1].size(), BBCastNumarrayReq[i-1].data(), MPI_STATUSES_IGNORE); - - SpTuples * C_cont = LocalHybridSpGEMM - (*(ARecv[i-1]), *(BRecv[i-1]), // parameters themselves - i-1 != Aself, // 'delete A' condition - i-1 != Bself); // 'delete B' condition - if(!C_cont->isZero()) tomerge.push_back(C_cont); + #pragma omp single + { + for(int i = 0; i < stages; ++i){ + double t_stage = MPI_Wtime(); + std::vector ess; + if(i == Aself) ARecv[i] = A.spSeq; // shallow-copy + else{ + ess.resize(UDERA::esscount); + for(int j=0; j< UDERA::esscount; ++j) ess[j] = ARecvSizes[j][i]; // essentials of the ith matrix in this row + ARecv[i] = new UDERA(); // first, create the object + } + SpParHelper::IBCastMatrix(GridC->GetRowWorld(), *(ARecv[i]), ess, i, ABCastIndarrayReq[i], ABCastNumarrayReq[i]); // then, receive its elements + + ess.clear(); + + if(i == Bself) BRecv[i] = B.spSeq; // shallow-copy + else{ + ess.resize(UDERB::esscount); + for(int j=0; j< UDERB::esscount; ++j) ess[j] = BRecvSizes[j][i]; + BRecv[i] = new UDERB(); + } + SpParHelper::IBCastMatrix(GridC->GetColWorld(), *(BRecv[i]), ess, i, BBCastIndarrayReq[i], BBCastNumarrayReq[i]); // then, receive its elements + int comm_complete = false; + + // Communication task + // Continuously probe with MPI_Test to progress asynchronous broadcast + #pragma omp task + { + omp_set_num_threads(1); + double t_comm = omp_get_wtime(); + while(!comm_complete){ + int flag, flag1, flag2, flag3, flag4; + MPI_Test(ABCastIndarrayReq[i].data(), &flag1, MPI_STATUS_IGNORE); + MPI_Test(ABCastNumarrayReq[i].data(), &flag2, MPI_STATUS_IGNORE); + MPI_Test(BBCastIndarrayReq[i].data(), &flag3, MPI_STATUS_IGNORE); + MPI_Test(BBCastNumarrayReq[i].data(), &flag4, MPI_STATUS_IGNORE); + flag = flag1 && flag2 && flag3 && flag4; + if(flag){ + comm_complete = true; + } + else{ + usleep(100); + } + } + t_comm = omp_get_wtime() - t_comm; + //if(myrank == 0) fprintf(stdout, "Comm time for stage %d: %lf\n", i, t_comm); + } + + // Computation task + // Performs local SpGEMM and then merge on the data received for previous stage + #pragma omp task + { + omp_set_num_threads(T - 1); + double t_comp = omp_get_wtime(); + if(i > 0){ + //MPI_Waitall(ABCastIndarrayReq[i-1].size(), ABCastIndarrayReq[i-1].data(), MPI_STATUSES_IGNORE); + //MPI_Waitall(ABCastNumarrayReq[i-1].size(), ABCastNumarrayReq[i-1].data(), MPI_STATUSES_IGNORE); + //MPI_Waitall(BBCastIndarrayReq[i-1].size(), BBCastIndarrayReq[i-1].data(), MPI_STATUSES_IGNORE); + //MPI_Waitall(BBCastNumarrayReq[i-1].size(), BBCastNumarrayReq[i-1].data(), MPI_STATUSES_IGNORE); + + SpTuples * C_cont = LocalHybridSpGEMM + (*(ARecv[i-1]), *(BRecv[i-1]), // parameters themselves + false, // 'delete A' condition + false); // 'delete B' condition + + if(i-1 != Bself && (!BRecv[i-1]->isZero())) delete BRecv[i-1]; + if(i-1 != Aself && (!ARecv[i-1]->isZero())) delete ARecv[i-1]; + + if(!C_cont->isZero()) tomerge.push_back(C_cont); + + //SpTuples * C_tuples = MultiwayMerge(tomerge, C_m, C_n,true); + //std::vector< SpTuples *>().swap(tomerge); + //tomerge.push_back(C_tuples); + } + t_comp = omp_get_wtime() - t_comp; + //if(myrank == 0) fprintf(stdout, "Comp time for stage %d: %lf\n", i, t_comp); + } - SpTuples * C_tuples = MultiwayMerge(tomerge, C_m, C_n,true); - std::vector< SpTuples *>().swap(tomerge); - tomerge.push_back(C_tuples); + // Wait for the communication and computation tasks to finish + #pragma omp taskwait + t_stage = MPI_Wtime() - t_stage; + //if(myrank == 0) fprintf(stdout, "Total time for stage %d: %lf\n", i, t_stage); + + #ifdef COMBBLAS_DEBUG + std::ostringstream outs; + outs << i << "th SUMMA iteration"<< std::endl; + SpParHelper::Print(outs.str()); + #endif + } + } - #ifdef COMBBLAS_DEBUG - std::ostringstream outs; - outs << i << "th SUMMA iteration"<< std::endl; - SpParHelper::Print(outs.str()); - #endif - } + } - MPI_Waitall(ABCastIndarrayReq[stages-1].size(), ABCastIndarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); - MPI_Waitall(ABCastNumarrayReq[stages-1].size(), ABCastNumarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); - MPI_Waitall(BBCastIndarrayReq[stages-1].size(), BBCastIndarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); - MPI_Waitall(BBCastNumarrayReq[stages-1].size(), BBCastNumarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); + //MPI_Waitall(ABCastIndarrayReq[stages-1].size(), ABCastIndarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); + //MPI_Waitall(ABCastNumarrayReq[stages-1].size(), ABCastNumarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); + //MPI_Waitall(BBCastIndarrayReq[stages-1].size(), BBCastIndarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); + //MPI_Waitall(BBCastNumarrayReq[stages-1].size(), BBCastNumarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); SpTuples * C_cont = LocalHybridSpGEMM (*(ARecv[stages-1]), *(BRecv[stages-1]), // parameters themselves - stages-1 != Aself, // 'delete A' condition - stages-1 != Bself); // 'delete B' condition + false, // 'delete A' condition + false); // 'delete B' condition + + if(stages-1 != Bself && (!BRecv[stages-1]->isZero())) delete BRecv[stages-1]; + if(stages-1 != Aself && (!ARecv[stages-1]->isZero())) delete ARecv[stages-1]; + if(!C_cont->isZero()) tomerge.push_back(C_cont); if(clearA && A.spSeq != NULL) { @@ -1683,9 +1741,6 @@ SpParMat Mult_AnXBn_Overlap UDERO * C = new UDERO(*C_tuples, false); delete C_tuples; - //if(!clearB) - // const_cast< UDERB* >(B.spSeq)->Transpose(); // transpose back to original - return SpParMat (C, GridC); // return the result object } From da6ca22bca8bef1e8e14e3dcbd71fb6e096125b5 Mon Sep 17 00:00:00 2001 From: Taufique Hussain Date: Fri, 8 Aug 2025 09:30:01 -0700 Subject: [PATCH 2/2] Test and comments added for Mult_AnXBn_Overlap --- ReleaseTests/MultTest.cpp | 10 ++++++++++ include/CombBLAS/ParFriends.h | 22 +++++++++++----------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/ReleaseTests/MultTest.cpp b/ReleaseTests/MultTest.cpp index f8131b70..cf9ec2e6 100644 --- a/ReleaseTests/MultTest.cpp +++ b/ReleaseTests/MultTest.cpp @@ -179,6 +179,16 @@ int main(int argc, char* argv[]) { SpParHelper::Print("ERROR in double buffered multiplication, go fix it!\n"); } + + C = Mult_AnXBn_Overlap::DCCols >(A,B); + if (CControl == C) + { + SpParHelper::Print("Overlapped multiplication working correctly\n"); + } + else + { + SpParHelper::Print("ERROR in overlapped multiplication, go fix it!\n"); + } #endif OptBuf optbuf; PSpMat::MPI_DCCols ABool(A); diff --git a/include/CombBLAS/ParFriends.h b/include/CombBLAS/ParFriends.h index 0d58ecd0..344f86f0 100644 --- a/include/CombBLAS/ParFriends.h +++ b/include/CombBLAS/ParFriends.h @@ -1639,6 +1639,7 @@ SpParMat Mult_AnXBn_Overlap // Continuously probe with MPI_Test to progress asynchronous broadcast #pragma omp task { + // Use only one thread for continuous probing omp_set_num_threads(1); double t_comm = omp_get_wtime(); while(!comm_complete){ @@ -1661,21 +1662,23 @@ SpParMat Mult_AnXBn_Overlap // Computation task // Performs local SpGEMM and then merge on the data received for previous stage + // Can be safely assumed that the broadcasts of previous stage has finished because tasks are sychronized at the end of every stage #pragma omp task { + // Use one less thread for computation omp_set_num_threads(T - 1); double t_comp = omp_get_wtime(); if(i > 0){ - //MPI_Waitall(ABCastIndarrayReq[i-1].size(), ABCastIndarrayReq[i-1].data(), MPI_STATUSES_IGNORE); - //MPI_Waitall(ABCastNumarrayReq[i-1].size(), ABCastNumarrayReq[i-1].data(), MPI_STATUSES_IGNORE); - //MPI_Waitall(BBCastIndarrayReq[i-1].size(), BBCastIndarrayReq[i-1].data(), MPI_STATUSES_IGNORE); - //MPI_Waitall(BBCastNumarrayReq[i-1].size(), BBCastNumarrayReq[i-1].data(), MPI_STATUSES_IGNORE); + // MTH: Don't ask SpGEMM routing to delete ARecv and BRecv pieces + // Because that delete is not being successful due to some C++ issue + // TODO: Needs to be figured out why SpTuples * C_cont = LocalHybridSpGEMM (*(ARecv[i-1]), *(BRecv[i-1]), // parameters themselves false, // 'delete A' condition false); // 'delete B' condition + // MTH: Explicitly detele respective ARecv and BRecv pieces if(i-1 != Bself && (!BRecv[i-1]->isZero())) delete BRecv[i-1]; if(i-1 != Aself && (!ARecv[i-1]->isZero())) delete ARecv[i-1]; @@ -1703,17 +1706,14 @@ SpParMat Mult_AnXBn_Overlap } } - - //MPI_Waitall(ABCastIndarrayReq[stages-1].size(), ABCastIndarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); - //MPI_Waitall(ABCastNumarrayReq[stages-1].size(), ABCastNumarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); - //MPI_Waitall(BBCastIndarrayReq[stages-1].size(), BBCastIndarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); - //MPI_Waitall(BBCastNumarrayReq[stages-1].size(), BBCastNumarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); - + + // MTH: Same reason as above SpTuples * C_cont = LocalHybridSpGEMM (*(ARecv[stages-1]), *(BRecv[stages-1]), // parameters themselves false, // 'delete A' condition false); // 'delete B' condition + // MTH: Same reason as above if(stages-1 != Bself && (!BRecv[stages-1]->isZero())) delete BRecv[stages-1]; if(stages-1 != Aself && (!ARecv[stages-1]->isZero())) delete ARecv[stages-1]; @@ -1734,7 +1734,7 @@ SpParMat Mult_AnXBn_Overlap SpHelper::deallocate2D(ARecvSizes, UDERA::esscount); SpHelper::deallocate2D(BRecvSizes, UDERB::esscount); - // the last parameter to MergeAll deletes tomerge arrays + // the last parameter to merge function deletes tomerge arrays SpTuples * C_tuples = MultiwayMerge(tomerge, C_m, C_n,true); std::vector< SpTuples *>().swap(tomerge);