Xmipp  v3.23.11-Nereus
mpi_reconstruct_fourier_gpu.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  *
3  * Authors: David Strelak (davidstrelak@gmail.com)
4  *
5  * Unidad de Bioinformatica of Centro Nacional de Biotecnologia , CSIC
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
20  * 02111-1307 USA
21  *
22  * All comments concerning this program package may be sent to the
23  * e-mail address 'xmipp@cnb.csic.es'
24  ***************************************************************************/
26 
27 /* constructor ------------------------------------------------------- */
29 {
30  this->read(argc, argv);
31 }
32 
33 /* constructor providing an MpiNode
34  * this is useful for using this programs from others
35  */
36 ProgMPIRecFourierGPU::ProgMPIRecFourierGPU(const std::shared_ptr<MpiNode> &node)
37 {
38  this->setNode(node);
39 }
40 
41 /* Special way of reading to sync all nodes */
43 {
44  XmippMpiProgram::read(argc, argv);
45  ProgRecFourierGPU::read(argc, (const char **)argv);
46 }
47 
48 /* Usage ------------------------------------------------------------------- */
49 void ProgMPIRecFourierGPU::defineParams()
50 {
52  addParamsLine(" [--mpi_job_size <size=1000>] : Number of images sent to a node in a single job.");
53  addParamsLine(" : Should be rather big to saturate GPU on the node and hide MPI overhead");
54  addParamsLine(" : if -1, maximum possible value will be used (all images/no_of_nodes)");
55  addParamsLine(" -gpusPerNode <num> : Number of GPUs present at each node");
56  addParamsLine(" -threadsPerGPU <num> : Number of CPU threads per each GPU. Should be at least 4");
57 }
58 
59 /* Read parameters --------------------------------------------------------- */
60 void ProgMPIRecFourierGPU::readParams()
61 {
63  mpi_job_size=getIntParam("--mpi_job_size");
64  gpusPerNode = getIntParam("-gpusPerNode");
65  threadsPerGPU = getIntParam("-threadsPerGPU");
66 }
67 
68 /* Pre Run PreRun for all nodes but not for all works */
69 void ProgMPIRecFourierGPU::preRun()
70 {
71  if (nProcs < 2)
72  REPORT_ERROR(ERR_ARG_INCORRECT,"This program cannot be executed in a single working node");
73 
74  if (node->isMaster())
75  {
76  show();
77  SF.read(fn_in);
78  //Send verbose level to node 1
79  MPI_Send(&verbose, 1, MPI_INT, 1, TAG_SETVERBOSE, MPI_COMM_WORLD);
80  }
81  else
82  {
84  SF.firstRowId();
85  }
86 
87  //read projection file / divide by mpi_job_size
88  numberOfJobs=(size_t)ceil((double)SF.size()/mpi_job_size);
89 
90  //only one node will write in the console
91  if (node->rank == 1 )
92  {
93  // Get verbose status
94  MPI_Recv(&verbose, 1, MPI_INT, 0, TAG_SETVERBOSE, MPI_COMM_WORLD, &status);
95  //#define DEBUG
96 #ifdef DEBUG
97 
98  std::cerr << "SF.ImgNo() mpi_job_size "
99  << SF.ImgNo() << " "
100  << mpi_job_size
101  << std::endl;
102  std::cerr << "numberOfJobs: " << numberOfJobs << std::endl <<std::endl;
103 #endif
104 #undef DEBUGDOUBLE
105 
106  }
107 }
108 /* Run --------------------------------------------------------------------- */
109 void ProgMPIRecFourierGPU::run()
110 {
111  preRun();
112  struct timeval start_time, end_time;
113  MPI_Group orig_group, new_group;
114  MPI_Comm new_comm;
115  long int total_usecs;
116  double total_time_processing=0., total_time_communicating=0.;
117  int * ranks;
118 
119  // Real workers, rank=0 is the master, does not work
120  nProcs = nProcs - 1;
121 
122  if( numberOfJobs < nProcs )
123  {
124  if( node->isMaster() )
125  {
126  std::cerr << "\nReducing the number of MPI workers from " <<
127  nProcs << " to " <<
128  numberOfJobs << std::endl;
129  }
130 
132 
133  // Unused nodes are removed from the MPI communicator
134  node->active = (node->rank <= numberOfJobs);
135  }
136 
137  // Generate a new group to do all reduce without the master
138  ranks = new int [nProcs];
139  for (int i=0;i<(int)nProcs;i++)
140  ranks[i]=i+1;
141  MPI_Comm_group(MPI_COMM_WORLD, &orig_group);
142  MPI_Group_incl(orig_group, nProcs, ranks, &new_group);
143  MPI_Comm_create(MPI_COMM_WORLD, new_group, &new_comm);
144 
145  if (node->isMaster())
146  {
147  gettimeofday(&start_time,nullptr);
148  std::cerr<<"Computing volume"<<std::endl;
149  if ( verbose )
151 
152  for (size_t i=0;i<numberOfJobs;i++)
153  {
154 // #define DEBUG
155 #ifdef DEBUG
156  std::cerr << "master-recv i=" << i << std::endl;
157  std::cerr << "numberOfJobs: " << numberOfJobs << std::endl <<std::endl;
158 #endif
159 #undef DEBUG
160 
161  MPI_Recv(nullptr, 0, MPI_INT, MPI_ANY_SOURCE, TAG_FREEWORKER,
162  MPI_COMM_WORLD, &status);
163 
164  if ( status.MPI_TAG != TAG_FREEWORKER )
165  REPORT_ERROR(ERR_ARG_INCORRECT,"Unexpected TAG, please contact developers");
166 
167  // #define DEBUG
168 #ifdef DEBUG
169  std::cerr << "master-send i=" << i << std::endl;
170 #endif
171 #undef DEBUG
172  MPI_Send(&i,
173  1,
174  MPI_INT,
175  status.MPI_SOURCE,
177  MPI_COMM_WORLD);
178 
179  if (verbose)
180  progress_bar(i);
181  }
182 
183  // Wait for all processes to finish processing current jobs
184  // so time statistics are correct
185  for ( size_t i = 1 ; i <= nProcs ; i ++ )
186  {
187  MPI_Recv(nullptr,
188  0,
189  MPI_INT,
190  MPI_ANY_SOURCE,
192  MPI_COMM_WORLD,
193  &status);
194  }
195 
196  // update progress
197  gettimeofday(&end_time,nullptr);
198  total_usecs = (end_time.tv_sec-start_time.tv_sec) * 1000000 + (end_time.tv_usec-start_time.tv_usec);
199  total_time_processing += ((double)total_usecs/(double)1000000);
200 
201  gettimeofday(&start_time,nullptr);
202  // Start collecting results
203  for ( size_t i = 1 ; i <= nProcs ; i ++ )
204  {
205  MPI_Send(nullptr,
206  0,
207  MPI_INT,
208  i,
209  TAG_TRANSFER,
210  MPI_COMM_WORLD );
211  }
212 
213  gettimeofday(&end_time,nullptr);
214  total_usecs = (end_time.tv_sec-start_time.tv_sec) * 1000000 + (end_time.tv_usec-start_time.tv_usec);
215  total_time_communicating += ((double)total_usecs/(double)1000000);
216 
217  if (verbose > 0)
218  {
219  std::cout << "\n\nProcessing time: " << total_time_processing << " secs." << std::endl;
220  std::cout << "Transfers time: " << total_time_communicating << " secs." << std::endl;
221  std::cout << "Execution completed successfully"<< std::endl;
222  }
223  }
224  else if( node->active ) {
225  while (1)
226  {
227  int jobNumber;
228  //#define DEBUG
229 #ifdef DEBUG
230  std::cerr << "slave-send TAG_FREEWORKER rank=" << node->rank << std::endl;
231 #endif
232  #undef DEBUG
233  //I am free
234  MPI_Send(nullptr, 0, MPI_INT, 0, TAG_FREEWORKER, MPI_COMM_WORLD);
235  MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
236 
237  if (status.MPI_TAG == TAG_TRANSFER)
238  {
239  // Get data from GPU
240  getGPUData();
241  // remove complex conjugate of the intermediate result
243 
244  //If I do not read this tag
245  //master will no further process
246  MPI_Recv(nullptr, 0, MPI_INT, 0, TAG_TRANSFER, MPI_COMM_WORLD, &status);
247 #ifdef DEBUG
248  std::cerr << "Wr" << node->rank << " " << "TAG_STOP" << std::endl;
249 #endif
250  for(int z = 0; z <= maxVolumeIndexYZ; z++) {
251  for(int y = 0; y <= maxVolumeIndexYZ; y++) {
252  if (node->rank == 1) {
253  MPI_Reduce(MPI_IN_PLACE,&(tempVolume[z][y][0]),
254  (maxVolumeIndexX+1)*2, MPI_FLOAT,
255  MPI_SUM, 0, new_comm);
256  MPI_Reduce(MPI_IN_PLACE,&(tempWeights[z][y][0]),
257  (maxVolumeIndexX+1), MPI_FLOAT,
258  MPI_SUM, 0, new_comm);
259  } else {
260  MPI_Reduce(&(tempVolume[z][y][0]),&(tempVolume[z][y][0]),
261  (maxVolumeIndexX+1)*2, MPI_FLOAT,
262  MPI_SUM, 0, new_comm);
263  MPI_Reduce(&(tempWeights[z][y][0]),&(tempWeights[z][y][0]),
264  (maxVolumeIndexX+1), MPI_FLOAT,
265  MPI_SUM, 0, new_comm);
266  }
267  }
268  }
269 
270  if ( node->rank == 1 )
271  {
272  gettimeofday(&end_time,nullptr);
274  break;
275  }
276  else
277  {
278  // release data and finish the thread
280  break;
281  }
282  }
283  else if (status.MPI_TAG == TAG_WORKFORWORKER)
284  {
285  //get the job number
286  MPI_Recv(&jobNumber, 1, MPI_INT, 0, TAG_WORKFORWORKER, MPI_COMM_WORLD, &status);
287 
288  size_t min_i, max_i;
289  min_i = jobNumber*mpi_job_size;
290  max_i = min_i + mpi_job_size - 1;
291 
292  if ( max_i >= SF.size())
293  max_i = SF.size()-1;
294 
295  // override no of threads and device
296  noOfThreads = threadsPerGPU;
297  device = node->rank % gpusPerNode;
298 
299  // do the job
300  processImages( min_i, max_i);
301  } else {
302  std::cerr << "3) Received unknown TAG I quit" << std::endl;
303  exit(0);
304  }
305  }
306  }
307  delete[] ranks;
308 }
void init_progress_bar(long total)
virtual void read(int argc, const char **argv, bool reportErrors=true)
constexpr int TAG_TRANSFER
#define REPORT_ERROR(nerr, ErrormMsg)
Definition: xmipp_error.h:211
static double * y
void finishComputations(const FileName &out_name)
size_t numberOfJobs
Definition: xmipp_mpi.h:168
#define i
int argc
Original command line arguments.
Definition: xmipp_program.h:86
void processImages(int firstImageIndex, int lastImageIndex)
Incorrect argument received.
Definition: xmipp_error.h:113
void progress_bar(long rlen)
std::shared_ptr< MpiNode > node
Definition: xmipp_mpi.h:164
const char ** argv
Definition: xmipp_program.h:87
#define TAG_WORKFORWORKER
double z
void read(int argc, char **argv)
int verbose
Verbosity level.
size_t size() const override
size_t firstRowId() const override
#define TAG_FREEWORKER
MPI_Status status
Definition: xmipp_mpi.h:170
std::complex< float > *** tempVolume
void setNode(const std::shared_ptr< MpiNode > &node)
Definition: xmipp_mpi.cpp:256
void read(const FileName &inFile, const std::vector< MDLabel > *desiredLabels=NULL, bool decomposeStack=true) override
constexpr int TAG_SETVERBOSE
int getIntParam(const char *param, int arg=0)
void read(int argc, char **argv)
Definition: xmipp_mpi.cpp:240
void addParamsLine(const String &line)