I am trying to do simple MPI distribution of for-loop iterations to “Worker” cores. The “Workers” should do the job (take vector of size “nGenes” in format double, typically ~6 in size) and send back the result (one variable in format double). But, I have trouble even with the first step, passing messages from “Master” core (0) to “Worker” cores (1,2,…,nWorkers). The program goes through the send messages part, but it’s stacked in the receiving part where the line with MPI_Recv(…) is. I can't see what might be the problem. Please help.
#include <iostream>
#include <mpi.h>
#include <math.h>
#include <stdlib.h> /* srand, rand */
double fR(double a);
void Calculate_MPI_Division_of_Work_Per_Core_Master0AndSlaves(int Number_of_Tasks, int NumberOfProcessors, int* START_on_core, int* END_on_core);
int main(int argc, char* argv[])
{
int nIndividuals = 10;
int nGenes = 6;
double** DATA;
DATA = new double* [nIndividuals];
for (int ii=0; ii<nIndividuals; ii++)
{
DATA[ii] = new double [nGenes];
}
for (int ii=0; ii<nIndividuals; ii++)
{
for (int jj=0; jj<nGenes; jj++)
{
DATA[ii][jj] = ii+jj; // random intialization of the elements.
}
}
int MyRank, NumberOfProcessors;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &MyRank);
MPI_Comm_size(MPI_COMM_WORLD, &NumberOfProcessors);
int NumberOfWorkers = NumberOfProcessors - 1;
int* Iteration_START_on_core;
int* Iteration_STOP_on_core;
Iteration_START_on_core = new int [NumberOfWorkers+1];
Iteration_STOP_on_core = new int [NumberOfWorkers+1];
Calculate_MPI_Division_of_Work_Per_Core_Master0AndSlaves(nIndividuals, NumberOfProcessors, Iteration_START_on_core, Iteration_STOP_on_core);
if (MyRank == 0)
{
std::cout << " ======================== " << std::endl;
std::cout << std::endl;
std::cout << "NumberOfProcessors=" << NumberOfProcessors << std::endl;
std::cout << "NumberOfWorkers= " << NumberOfWorkers << std::endl;
std::cout << "NumberOfTasks= " << nIndividuals << std::endl;
for (int ww=0; ww<=NumberOfWorkers; ww++)
{
std::cout << "(Core: " << ww << ") S:" << Iteration_START_on_core[ww] << " E:" << Iteration_STOP_on_core[ww] << " LoadOnCore: ";
if (ww==0)
{
std::cout << 0 ;
}
else
{
std::cout << Iteration_STOP_on_core[ww] - Iteration_START_on_core[ww] +1;
}
std::cout << std::endl;
}
std::cout << std::endl;
std::cout << " ======================== " << std::endl;
}/// End_If(MyRank==0)
if (MyRank == 0)
{
std::cout << "Start Sending...." << std::endl ;
double* sendbuff;
sendbuff = new double [nGenes];
for (int cc=1; cc<=NumberOfWorkers; cc++)
{
for (int jj=Iteration_START_on_core[cc]; jj<=Iteration_STOP_on_core[cc]; jj++)
{
for (int gg=0; gg<nGenes; gg++)
{
sendbuff[gg] = DATA[jj][gg];
}
std::cout << std::endl << "SEND to Core " << cc << ": Start=" << Iteration_START_on_core[cc] << ", End=" << Iteration_STOP_on_core[cc] << ". Taks#: " << jj << " -- DATA: ";
MPI_Send(&sendbuff, nGenes, MPI_DOUBLE, cc, 0, MPI_COMM_WORLD);
for (int mm=0; mm<nGenes; mm++)
{
std::cout << DATA[jj][mm] << " | ";
}
}
}
std::cout << std::endl;
delete[] sendbuff;
std::cout << std::endl << "Finish sending." << std::endl ;
}
else
{
std::cout << std::endl << "...Worker Cores..." << std::endl ;
for (int cc=1; cc<=NumberOfWorkers; cc++)
{
if (MyRank == cc)
{
MPI_Status status;
double* receivebuff;
receivebuff = new double [nGenes];
//std::cout << "Start Receiving on Core " << cc << ". FROM job: " << Iteration_START_on_core[cc] << " TO job: " << Iteration_STOP_on_core[cc] << "." << std::endl ;
for (int kk=Iteration_START_on_core[cc]; kk<=Iteration_STOP_on_core[cc]; kk++)
{
MPI_Recv(&receivebuff, nGenes, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::cout << std::endl << "RECEIVE on Core: " << cc << ". From Core " << 0 << ": Start=" << Iteration_START_on_core[cc] << ", End=" << Iteration_STOP_on_core[cc] << ". Work on task: " << kk << ".";
std::cout << " | ";
for (int aa=0; aa<nGenes; aa++)
{
std::cout << receivebuff[aa] << " | ";
}
std::cout << std::endl;
}
delete [] receivebuff;
std::cout << std::endl << "Finish receiving on core " << cc << "." << std::endl ;
}
}
}
for (int ii=1; ii<nIndividuals; ii++)
{
delete[] DATA[ii];
}
delete[] DATA;
if (MyRank==0) std::cout << std::endl << "Prepare to MPI_Finalize ... " << std::endl ;
MPI_Finalize();
if (MyRank==0) std::cout << std::endl << "... Completed MPI_Finalize. " << std::endl ;
///######################################################################################################
return 0;
} /// END MAIN PROGRAM
///===========================================================================================================================
///
/// Function: MPI Division of Work per Core.
void Calculate_MPI_Division_of_Work_Per_Core_Master0AndSlaves(int Number_of_Tasks, int NumberOfProcessors, int* START_on_core, int* END_on_core)
{
int NuberOfWorkers = NumberOfProcessors-1;
int integer_Num_Tasks_Per_Worker = floor(Number_of_Tasks/NuberOfWorkers);
int reminder_Num_Taska_Per_Worker = Number_of_Tasks - integer_Num_Tasks_Per_Worker*NuberOfWorkers;
START_on_core[0] = -1;
END_on_core[0] = -1;
//std::cout << std::endl << "F: integer_Num_Tasks_Per_Worker = " << integer_Num_Tasks_Per_Worker << std::endl;
//std::cout << "F: reminder_Num_Taska_Per_Worker = " << reminder_Num_Taska_Per_Worker << std::endl;
if (reminder_Num_Taska_Per_Worker==0)
{
START_on_core[1] = 0;
END_on_core[1] = START_on_core[1] + integer_Num_Tasks_Per_Worker - 1;
for (int iProcess=2; iProcess<NumberOfProcessors; iProcess++)
{
START_on_core[iProcess] = START_on_core[iProcess-1] + integer_Num_Tasks_Per_Worker;
END_on_core[iProcess] = END_on_core[iProcess-1] + integer_Num_Tasks_Per_Worker;
}
}
else
{
START_on_core[1] = 0;
END_on_core[1] = START_on_core[1] + integer_Num_Tasks_Per_Worker - 1 + 1;
for (int iProcess=2; iProcess<reminder_Num_Taska_Per_Worker+1; iProcess++)
{
START_on_core[iProcess] = START_on_core[iProcess-1] + integer_Num_Tasks_Per_Worker+1;
END_on_core[iProcess] = END_on_core[iProcess-1] + integer_Num_Tasks_Per_Worker+1;
}
for (int iProcess=reminder_Num_Taska_Per_Worker+1; iProcess<NumberOfProcessors; iProcess++)
{
START_on_core[iProcess] = END_on_core[iProcess-1] +1;
END_on_core[iProcess] = START_on_core[iProcess] +integer_Num_Tasks_Per_Worker-1;
}
}
//
}