Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ReleaseTests/MultTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,16 @@ int main(int argc, char* argv[])
{
SpParHelper::Print("ERROR in double buffered multiplication, go fix it!\n");
}

C = Mult_AnXBn_Overlap<PTDOUBLEDOUBLE, double, PSpMat<double>::DCCols >(A,B);
if (CControl == C)
{
SpParHelper::Print("Overlapped multiplication working correctly\n");
}
else
{
SpParHelper::Print("ERROR in overlapped multiplication, go fix it!\n");
}
#endif
OptBuf<int32_t, int64_t> optbuf;
PSpMat<bool>::MPI_DCCols ABool(A);
Expand Down
29 changes: 26 additions & 3 deletions ReleaseTests/MultTiming.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ int main(int argc, char* argv[])
typedef PlusTimesSRing<ElementType, ElementType> PTDOUBLEDOUBLE;
PSpMat<ElementType>::MPI_DCCols A, B; // construct objects

A.ReadDistribute(Aname, 0);
//A.ReadDistribute(Aname, 0);
A.ReadGeneralizedTuples(Aname, maximum<ElementType>());
A.PrintInfo();
B.ReadDistribute(Bname, 0);
//B.ReadDistribute(Bname, 0);
B.ReadGeneralizedTuples(Bname, maximum<ElementType>());
B.PrintInfo();
SpParHelper::Print("Data read\n");

{ // force the calling of C's destructor
PSpMat<ElementType>::MPI_DCCols C = Mult_AnXBn_DoubleBuff<PTDOUBLEDOUBLE, ElementType, PSpMat<ElementType>::DCCols >(A, B);
int64_t cnnz = C.getnnz();
Expand Down Expand Up @@ -100,6 +102,27 @@ int main(int argc, char* argv[])
printf("%.6lf seconds elapsed per iteration\n", (t2-t1)/(double)ITERATIONS);
}

{// force the calling of C's destructor
PSpMat<ElementType>::MPI_DCCols C = Mult_AnXBn_Overlap<PTDOUBLEDOUBLE, ElementType, PSpMat<ElementType>::DCCols >(A, B);
C.PrintInfo();
}
SpParHelper::Print("Warmed up for Overlap\n");
MPI_Barrier(MPI_COMM_WORLD);
MPI_Pcontrol(1,"SpGEMM_Overlap");
t1 = MPI_Wtime(); // initilize (wall-clock) timer
for(int i=0; i<ITERATIONS; i++)
{
PSpMat<ElementType>::MPI_DCCols C = Mult_AnXBn_Overlap<PTDOUBLEDOUBLE, ElementType, PSpMat<ElementType>::DCCols >(A, B);
}
MPI_Barrier(MPI_COMM_WORLD);
MPI_Pcontrol(-1,"SpGEMM_Overlap");
t2 = MPI_Wtime();
if(myrank == 0)
{
cout<<"Comm-Comp overlapped multiplications finished"<<endl;
printf("%.6lf seconds elapsed per iteration\n", (t2-t1)/(double)ITERATIONS);
}


/*
C = Mult_AnXBn_ActiveTarget<PTDOUBLEDOUBLE, ElementType, PSpMat<ElementType>::DCCols >(A, B);
Expand Down
161 changes: 108 additions & 53 deletions include/CombBLAS/ParFriends.h
Original file line number Diff line number Diff line change
Expand Up @@ -1557,7 +1557,7 @@ SpParMat<IU, NUO, UDERO> Mult_AnXBn_Synch

/*
* Experimental SUMMA implementation with communication and computation overlap.
* Not stable.
* Written by: Taufique
* */
template <typename SR, typename NUO, typename UDERO, typename IU, typename NU1, typename NU2, typename UDERA, typename UDERB>
SpParMat<IU, NUO, UDERO> Mult_AnXBn_Overlap
Expand All @@ -1577,8 +1577,6 @@ SpParMat<IU, NUO, UDERO> Mult_AnXBn_Overlap
IU C_m = A.spSeq->getnrow();
IU C_n = B.spSeq->getncol();

//const_cast< UDERB* >(B.spSeq)->Transpose(); // do not transpose for colum-by-column multiplication

LIA ** ARecvSizes = SpHelper::allocate2D<LIA>(UDERA::esscount, stages);
LIB ** BRecvSizes = SpHelper::allocate2D<LIB>(UDERB::esscount, stages);

Expand All @@ -1591,10 +1589,12 @@ SpParMat<IU, NUO, UDERO> Mult_AnXBn_Overlap

Arr<IU,NU1> Aarrinfo = A.seqptr()->GetArrays();
Arr<IU,NU2> Barrinfo = B.seqptr()->GetArrays();

std::vector< std::vector<MPI_Request> > ABCastIndarrayReq;
std::vector< std::vector<MPI_Request> > ABCastNumarrayReq;
std::vector< std::vector<MPI_Request> > BBCastIndarrayReq;
std::vector< std::vector<MPI_Request> > BBCastNumarrayReq;

for(int i = 0; i < stages; i++){
ABCastIndarrayReq.push_back( std::vector<MPI_Request>(Aarrinfo.indarrs.size(), MPI_REQUEST_NULL) );
ABCastNumarrayReq.push_back( std::vector<MPI_Request>(Aarrinfo.numarrs.size(), MPI_REQUEST_NULL) );
Expand All @@ -1607,58 +1607,116 @@ SpParMat<IU, NUO, UDERO> Mult_AnXBn_Overlap

std::vector< SpTuples<IU,NUO> *> tomerge;

for(int i = 0; i < stages; ++i){
std::vector<IU> ess;
if(i == Aself) ARecv[i] = A.spSeq; // shallow-copy
else{
ess.resize(UDERA::esscount);
for(int j=0; j< UDERA::esscount; ++j) ess[j] = ARecvSizes[j][i]; // essentials of the ith matrix in this row
ARecv[i] = new UDERA(); // first, create the object
}
SpParHelper::IBCastMatrix(GridC->GetRowWorld(), *(ARecv[i]), ess, i, ABCastIndarrayReq[i], ABCastNumarrayReq[i]); // then, receive its elements
#pragma omp parallel
{
int T = omp_get_num_threads();

ess.clear();

if(i == Bself) BRecv[i] = B.spSeq; // shallow-copy
else{
ess.resize(UDERB::esscount);
for(int j=0; j< UDERB::esscount; ++j) ess[j] = BRecvSizes[j][i];
BRecv[i] = new UDERB();
}
SpParHelper::IBCastMatrix(GridC->GetColWorld(), *(BRecv[i]), ess, i, BBCastIndarrayReq[i], BBCastNumarrayReq[i]); // then, receive its elements

if(i > 0){
MPI_Waitall(ABCastIndarrayReq[i-1].size(), ABCastIndarrayReq[i-1].data(), MPI_STATUSES_IGNORE);
MPI_Waitall(ABCastNumarrayReq[i-1].size(), ABCastNumarrayReq[i-1].data(), MPI_STATUSES_IGNORE);
MPI_Waitall(BBCastIndarrayReq[i-1].size(), BBCastIndarrayReq[i-1].data(), MPI_STATUSES_IGNORE);
MPI_Waitall(BBCastNumarrayReq[i-1].size(), BBCastNumarrayReq[i-1].data(), MPI_STATUSES_IGNORE);

SpTuples<IU,NUO> * C_cont = LocalHybridSpGEMM<SR, NUO>
(*(ARecv[i-1]), *(BRecv[i-1]), // parameters themselves
i-1 != Aself, // 'delete A' condition
i-1 != Bself); // 'delete B' condition
if(!C_cont->isZero()) tomerge.push_back(C_cont);
#pragma omp single
{
for(int i = 0; i < stages; ++i){
double t_stage = MPI_Wtime();
std::vector<IU> ess;
if(i == Aself) ARecv[i] = A.spSeq; // shallow-copy
else{
ess.resize(UDERA::esscount);
for(int j=0; j< UDERA::esscount; ++j) ess[j] = ARecvSizes[j][i]; // essentials of the ith matrix in this row
ARecv[i] = new UDERA(); // first, create the object
}
SpParHelper::IBCastMatrix(GridC->GetRowWorld(), *(ARecv[i]), ess, i, ABCastIndarrayReq[i], ABCastNumarrayReq[i]); // then, receive its elements

ess.clear();

if(i == Bself) BRecv[i] = B.spSeq; // shallow-copy
else{
ess.resize(UDERB::esscount);
for(int j=0; j< UDERB::esscount; ++j) ess[j] = BRecvSizes[j][i];
BRecv[i] = new UDERB();
}
SpParHelper::IBCastMatrix(GridC->GetColWorld(), *(BRecv[i]), ess, i, BBCastIndarrayReq[i], BBCastNumarrayReq[i]); // then, receive its elements
int comm_complete = false;

SpTuples<IU,NUO> * C_tuples = MultiwayMerge<SR>(tomerge, C_m, C_n,true);
std::vector< SpTuples<IU,NUO> *>().swap(tomerge);
tomerge.push_back(C_tuples);
}
#ifdef COMBBLAS_DEBUG
std::ostringstream outs;
outs << i << "th SUMMA iteration"<< std::endl;
SpParHelper::Print(outs.str());
#endif
}
// Communication task
// Continuously probe with MPI_Test to progress asynchronous broadcast
#pragma omp task
{
// Use only one thread for continuous probing
omp_set_num_threads(1);
double t_comm = omp_get_wtime();
while(!comm_complete){
int flag, flag1, flag2, flag3, flag4;
MPI_Test(ABCastIndarrayReq[i].data(), &flag1, MPI_STATUS_IGNORE);
MPI_Test(ABCastNumarrayReq[i].data(), &flag2, MPI_STATUS_IGNORE);
MPI_Test(BBCastIndarrayReq[i].data(), &flag3, MPI_STATUS_IGNORE);
MPI_Test(BBCastNumarrayReq[i].data(), &flag4, MPI_STATUS_IGNORE);
flag = flag1 && flag2 && flag3 && flag4;
if(flag){
comm_complete = true;
}
else{
usleep(100);
}
}
t_comm = omp_get_wtime() - t_comm;
//if(myrank == 0) fprintf(stdout, "Comm time for stage %d: %lf\n", i, t_comm);
}

MPI_Waitall(ABCastIndarrayReq[stages-1].size(), ABCastIndarrayReq[stages-1].data(), MPI_STATUSES_IGNORE);
MPI_Waitall(ABCastNumarrayReq[stages-1].size(), ABCastNumarrayReq[stages-1].data(), MPI_STATUSES_IGNORE);
MPI_Waitall(BBCastIndarrayReq[stages-1].size(), BBCastIndarrayReq[stages-1].data(), MPI_STATUSES_IGNORE);
MPI_Waitall(BBCastNumarrayReq[stages-1].size(), BBCastNumarrayReq[stages-1].data(), MPI_STATUSES_IGNORE);
// Computation task
// Performs local SpGEMM and then merge on the data received for previous stage
// Can be safely assumed that the broadcasts of previous stage has finished because tasks are sychronized at the end of every stage
#pragma omp task
{
// Use one less thread for computation
omp_set_num_threads(T - 1);
double t_comp = omp_get_wtime();
if(i > 0){

// MTH: Don't ask SpGEMM routing to delete ARecv and BRecv pieces
// Because that delete is not being successful due to some C++ issue
// TODO: Needs to be figured out why
SpTuples<IU,NUO> * C_cont = LocalHybridSpGEMM<SR, NUO>
(*(ARecv[i-1]), *(BRecv[i-1]), // parameters themselves
false, // 'delete A' condition
false); // 'delete B' condition

// MTH: Explicitly detele respective ARecv and BRecv pieces
if(i-1 != Bself && (!BRecv[i-1]->isZero())) delete BRecv[i-1];
if(i-1 != Aself && (!ARecv[i-1]->isZero())) delete ARecv[i-1];

if(!C_cont->isZero()) tomerge.push_back(C_cont);

//SpTuples<IU,NUO> * C_tuples = MultiwayMerge<SR>(tomerge, C_m, C_n,true);
//std::vector< SpTuples<IU,NUO> *>().swap(tomerge);
//tomerge.push_back(C_tuples);
}
t_comp = omp_get_wtime() - t_comp;
//if(myrank == 0) fprintf(stdout, "Comp time for stage %d: %lf\n", i, t_comp);
}

// Wait for the communication and computation tasks to finish
#pragma omp taskwait
t_stage = MPI_Wtime() - t_stage;
//if(myrank == 0) fprintf(stdout, "Total time for stage %d: %lf\n", i, t_stage);

#ifdef COMBBLAS_DEBUG
std::ostringstream outs;
outs << i << "th SUMMA iteration"<< std::endl;
SpParHelper::Print(outs.str());
#endif
}

}
}

// MTH: Same reason as above
SpTuples<IU,NUO> * C_cont = LocalHybridSpGEMM<SR, NUO>
(*(ARecv[stages-1]), *(BRecv[stages-1]), // parameters themselves
stages-1 != Aself, // 'delete A' condition
stages-1 != Bself); // 'delete B' condition
false, // 'delete A' condition
false); // 'delete B' condition

// MTH: Same reason as above
if(stages-1 != Bself && (!BRecv[stages-1]->isZero())) delete BRecv[stages-1];
if(stages-1 != Aself && (!ARecv[stages-1]->isZero())) delete ARecv[stages-1];

if(!C_cont->isZero()) tomerge.push_back(C_cont);

if(clearA && A.spSeq != NULL) {
Expand All @@ -1676,16 +1734,13 @@ SpParMat<IU, NUO, UDERO> Mult_AnXBn_Overlap
SpHelper::deallocate2D(ARecvSizes, UDERA::esscount);
SpHelper::deallocate2D(BRecvSizes, UDERB::esscount);

// the last parameter to MergeAll deletes tomerge arrays
// the last parameter to merge function deletes tomerge arrays
SpTuples<IU,NUO> * C_tuples = MultiwayMerge<SR>(tomerge, C_m, C_n,true);
std::vector< SpTuples<IU,NUO> *>().swap(tomerge);

UDERO * C = new UDERO(*C_tuples, false);
delete C_tuples;

//if(!clearB)
// const_cast< UDERB* >(B.spSeq)->Transpose(); // transpose back to original

return SpParMat<IU,NUO,UDERO> (C, GridC); // return the result object
}

Expand Down