diff --git a/ReleaseTests/MultTest.cpp b/ReleaseTests/MultTest.cpp index f8131b70..cf9ec2e6 100644 --- a/ReleaseTests/MultTest.cpp +++ b/ReleaseTests/MultTest.cpp @@ -179,6 +179,16 @@ int main(int argc, char* argv[]) { SpParHelper::Print("ERROR in double buffered multiplication, go fix it!\n"); } + + C = Mult_AnXBn_Overlap::DCCols >(A,B); + if (CControl == C) + { + SpParHelper::Print("Overlapped multiplication working correctly\n"); + } + else + { + SpParHelper::Print("ERROR in overlapped multiplication, go fix it!\n"); + } #endif OptBuf optbuf; PSpMat::MPI_DCCols ABool(A); diff --git a/ReleaseTests/MultTiming.cpp b/ReleaseTests/MultTiming.cpp index 047159f4..00219452 100644 --- a/ReleaseTests/MultTiming.cpp +++ b/ReleaseTests/MultTiming.cpp @@ -48,12 +48,14 @@ int main(int argc, char* argv[]) typedef PlusTimesSRing PTDOUBLEDOUBLE; PSpMat::MPI_DCCols A, B; // construct objects - A.ReadDistribute(Aname, 0); + //A.ReadDistribute(Aname, 0); + A.ReadGeneralizedTuples(Aname, maximum()); A.PrintInfo(); - B.ReadDistribute(Bname, 0); + //B.ReadDistribute(Bname, 0); + B.ReadGeneralizedTuples(Bname, maximum()); B.PrintInfo(); SpParHelper::Print("Data read\n"); - + { // force the calling of C's destructor PSpMat::MPI_DCCols C = Mult_AnXBn_DoubleBuff::DCCols >(A, B); int64_t cnnz = C.getnnz(); @@ -100,6 +102,27 @@ int main(int argc, char* argv[]) printf("%.6lf seconds elapsed per iteration\n", (t2-t1)/(double)ITERATIONS); } + {// force the calling of C's destructor + PSpMat::MPI_DCCols C = Mult_AnXBn_Overlap::DCCols >(A, B); + C.PrintInfo(); + } + SpParHelper::Print("Warmed up for Overlap\n"); + MPI_Barrier(MPI_COMM_WORLD); + MPI_Pcontrol(1,"SpGEMM_Overlap"); + t1 = MPI_Wtime(); // initilize (wall-clock) timer + for(int i=0; i::MPI_DCCols C = Mult_AnXBn_Overlap::DCCols >(A, B); + } + MPI_Barrier(MPI_COMM_WORLD); + MPI_Pcontrol(-1,"SpGEMM_Overlap"); + t2 = MPI_Wtime(); + if(myrank == 0) + { + cout<<"Comm-Comp overlapped multiplications finished"<::DCCols >(A, B); diff --git a/include/CombBLAS/ParFriends.h b/include/CombBLAS/ParFriends.h index 4eb5d973..344f86f0 100644 --- a/include/CombBLAS/ParFriends.h +++ b/include/CombBLAS/ParFriends.h @@ -1557,7 +1557,7 @@ SpParMat Mult_AnXBn_Synch /* * Experimental SUMMA implementation with communication and computation overlap. - * Not stable. + * Written by: Taufique * */ template SpParMat Mult_AnXBn_Overlap @@ -1577,8 +1577,6 @@ SpParMat Mult_AnXBn_Overlap IU C_m = A.spSeq->getnrow(); IU C_n = B.spSeq->getncol(); - //const_cast< UDERB* >(B.spSeq)->Transpose(); // do not transpose for colum-by-column multiplication - LIA ** ARecvSizes = SpHelper::allocate2D(UDERA::esscount, stages); LIB ** BRecvSizes = SpHelper::allocate2D(UDERB::esscount, stages); @@ -1591,10 +1589,12 @@ SpParMat Mult_AnXBn_Overlap Arr Aarrinfo = A.seqptr()->GetArrays(); Arr Barrinfo = B.seqptr()->GetArrays(); + std::vector< std::vector > ABCastIndarrayReq; std::vector< std::vector > ABCastNumarrayReq; std::vector< std::vector > BBCastIndarrayReq; std::vector< std::vector > BBCastNumarrayReq; + for(int i = 0; i < stages; i++){ ABCastIndarrayReq.push_back( std::vector(Aarrinfo.indarrs.size(), MPI_REQUEST_NULL) ); ABCastNumarrayReq.push_back( std::vector(Aarrinfo.numarrs.size(), MPI_REQUEST_NULL) ); @@ -1607,58 +1607,116 @@ SpParMat Mult_AnXBn_Overlap std::vector< SpTuples *> tomerge; - for(int i = 0; i < stages; ++i){ - std::vector ess; - if(i == Aself) ARecv[i] = A.spSeq; // shallow-copy - else{ - ess.resize(UDERA::esscount); - for(int j=0; j< UDERA::esscount; ++j) ess[j] = ARecvSizes[j][i]; // essentials of the ith matrix in this row - ARecv[i] = new UDERA(); // first, create the object - } - SpParHelper::IBCastMatrix(GridC->GetRowWorld(), *(ARecv[i]), ess, i, ABCastIndarrayReq[i], ABCastNumarrayReq[i]); // then, receive its elements + #pragma omp parallel + { + int T = omp_get_num_threads(); - ess.clear(); - - if(i == Bself) BRecv[i] = B.spSeq; // shallow-copy - else{ - ess.resize(UDERB::esscount); - for(int j=0; j< UDERB::esscount; ++j) ess[j] = BRecvSizes[j][i]; - BRecv[i] = new UDERB(); - } - SpParHelper::IBCastMatrix(GridC->GetColWorld(), *(BRecv[i]), ess, i, BBCastIndarrayReq[i], BBCastNumarrayReq[i]); // then, receive its elements - - if(i > 0){ - MPI_Waitall(ABCastIndarrayReq[i-1].size(), ABCastIndarrayReq[i-1].data(), MPI_STATUSES_IGNORE); - MPI_Waitall(ABCastNumarrayReq[i-1].size(), ABCastNumarrayReq[i-1].data(), MPI_STATUSES_IGNORE); - MPI_Waitall(BBCastIndarrayReq[i-1].size(), BBCastIndarrayReq[i-1].data(), MPI_STATUSES_IGNORE); - MPI_Waitall(BBCastNumarrayReq[i-1].size(), BBCastNumarrayReq[i-1].data(), MPI_STATUSES_IGNORE); - - SpTuples * C_cont = LocalHybridSpGEMM - (*(ARecv[i-1]), *(BRecv[i-1]), // parameters themselves - i-1 != Aself, // 'delete A' condition - i-1 != Bself); // 'delete B' condition - if(!C_cont->isZero()) tomerge.push_back(C_cont); + #pragma omp single + { + for(int i = 0; i < stages; ++i){ + double t_stage = MPI_Wtime(); + std::vector ess; + if(i == Aself) ARecv[i] = A.spSeq; // shallow-copy + else{ + ess.resize(UDERA::esscount); + for(int j=0; j< UDERA::esscount; ++j) ess[j] = ARecvSizes[j][i]; // essentials of the ith matrix in this row + ARecv[i] = new UDERA(); // first, create the object + } + SpParHelper::IBCastMatrix(GridC->GetRowWorld(), *(ARecv[i]), ess, i, ABCastIndarrayReq[i], ABCastNumarrayReq[i]); // then, receive its elements + + ess.clear(); + + if(i == Bself) BRecv[i] = B.spSeq; // shallow-copy + else{ + ess.resize(UDERB::esscount); + for(int j=0; j< UDERB::esscount; ++j) ess[j] = BRecvSizes[j][i]; + BRecv[i] = new UDERB(); + } + SpParHelper::IBCastMatrix(GridC->GetColWorld(), *(BRecv[i]), ess, i, BBCastIndarrayReq[i], BBCastNumarrayReq[i]); // then, receive its elements + int comm_complete = false; - SpTuples * C_tuples = MultiwayMerge(tomerge, C_m, C_n,true); - std::vector< SpTuples *>().swap(tomerge); - tomerge.push_back(C_tuples); - } - #ifdef COMBBLAS_DEBUG - std::ostringstream outs; - outs << i << "th SUMMA iteration"<< std::endl; - SpParHelper::Print(outs.str()); - #endif - } + // Communication task + // Continuously probe with MPI_Test to progress asynchronous broadcast + #pragma omp task + { + // Use only one thread for continuous probing + omp_set_num_threads(1); + double t_comm = omp_get_wtime(); + while(!comm_complete){ + int flag, flag1, flag2, flag3, flag4; + MPI_Test(ABCastIndarrayReq[i].data(), &flag1, MPI_STATUS_IGNORE); + MPI_Test(ABCastNumarrayReq[i].data(), &flag2, MPI_STATUS_IGNORE); + MPI_Test(BBCastIndarrayReq[i].data(), &flag3, MPI_STATUS_IGNORE); + MPI_Test(BBCastNumarrayReq[i].data(), &flag4, MPI_STATUS_IGNORE); + flag = flag1 && flag2 && flag3 && flag4; + if(flag){ + comm_complete = true; + } + else{ + usleep(100); + } + } + t_comm = omp_get_wtime() - t_comm; + //if(myrank == 0) fprintf(stdout, "Comm time for stage %d: %lf\n", i, t_comm); + } - MPI_Waitall(ABCastIndarrayReq[stages-1].size(), ABCastIndarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); - MPI_Waitall(ABCastNumarrayReq[stages-1].size(), ABCastNumarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); - MPI_Waitall(BBCastIndarrayReq[stages-1].size(), BBCastIndarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); - MPI_Waitall(BBCastNumarrayReq[stages-1].size(), BBCastNumarrayReq[stages-1].data(), MPI_STATUSES_IGNORE); + // Computation task + // Performs local SpGEMM and then merge on the data received for previous stage + // Can be safely assumed that the broadcasts of previous stage has finished because tasks are sychronized at the end of every stage + #pragma omp task + { + // Use one less thread for computation + omp_set_num_threads(T - 1); + double t_comp = omp_get_wtime(); + if(i > 0){ + + // MTH: Don't ask SpGEMM routing to delete ARecv and BRecv pieces + // Because that delete is not being successful due to some C++ issue + // TODO: Needs to be figured out why + SpTuples * C_cont = LocalHybridSpGEMM + (*(ARecv[i-1]), *(BRecv[i-1]), // parameters themselves + false, // 'delete A' condition + false); // 'delete B' condition + + // MTH: Explicitly detele respective ARecv and BRecv pieces + if(i-1 != Bself && (!BRecv[i-1]->isZero())) delete BRecv[i-1]; + if(i-1 != Aself && (!ARecv[i-1]->isZero())) delete ARecv[i-1]; + + if(!C_cont->isZero()) tomerge.push_back(C_cont); + + //SpTuples * C_tuples = MultiwayMerge(tomerge, C_m, C_n,true); + //std::vector< SpTuples *>().swap(tomerge); + //tomerge.push_back(C_tuples); + } + t_comp = omp_get_wtime() - t_comp; + //if(myrank == 0) fprintf(stdout, "Comp time for stage %d: %lf\n", i, t_comp); + } + + // Wait for the communication and computation tasks to finish + #pragma omp taskwait + t_stage = MPI_Wtime() - t_stage; + //if(myrank == 0) fprintf(stdout, "Total time for stage %d: %lf\n", i, t_stage); + #ifdef COMBBLAS_DEBUG + std::ostringstream outs; + outs << i << "th SUMMA iteration"<< std::endl; + SpParHelper::Print(outs.str()); + #endif + } + + } + } + + // MTH: Same reason as above SpTuples * C_cont = LocalHybridSpGEMM (*(ARecv[stages-1]), *(BRecv[stages-1]), // parameters themselves - stages-1 != Aself, // 'delete A' condition - stages-1 != Bself); // 'delete B' condition + false, // 'delete A' condition + false); // 'delete B' condition + + // MTH: Same reason as above + if(stages-1 != Bself && (!BRecv[stages-1]->isZero())) delete BRecv[stages-1]; + if(stages-1 != Aself && (!ARecv[stages-1]->isZero())) delete ARecv[stages-1]; + if(!C_cont->isZero()) tomerge.push_back(C_cont); if(clearA && A.spSeq != NULL) { @@ -1676,16 +1734,13 @@ SpParMat Mult_AnXBn_Overlap SpHelper::deallocate2D(ARecvSizes, UDERA::esscount); SpHelper::deallocate2D(BRecvSizes, UDERB::esscount); - // the last parameter to MergeAll deletes tomerge arrays + // the last parameter to merge function deletes tomerge arrays SpTuples * C_tuples = MultiwayMerge(tomerge, C_m, C_n,true); std::vector< SpTuples *>().swap(tomerge); UDERO * C = new UDERO(*C_tuples, false); delete C_tuples; - //if(!clearB) - // const_cast< UDERB* >(B.spSeq)->Transpose(); // transpose back to original - return SpParMat (C, GridC); // return the result object }