Xmipp  v3.23.11-Nereus
mpi_reconstruct_fourier_accel.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  *
3  * Authors: Jose Roman Bilbao (jrbcast@ace.ual.es)
4  * Roberto Marabini (roberto@cnb.csic.es)
5  * Vahid Abrishami (vabrishamoi@cnb.csic.es)
6  * David Strelak (davidstrelak@gmail.com)
7  *
8  * Unidad de Bioinformatica of Centro Nacional de Biotecnologia , CSIC
9  *
10  * This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
23  * 02111-1307 USA
24  *
25  * All comments concerning this program package may be sent to the
26  * e-mail address 'xmipp@cnb.csic.es'
27  ***************************************************************************/
29 
30 /* constructor ------------------------------------------------------- */
32 {
33  this->read(argc, argv);
34 }
35 
36 /* constructor providing an MpiNode
37  * this is useful for using this programs from others
38  */
39 ProgMPIRecFourierAccel::ProgMPIRecFourierAccel(const std::shared_ptr<MpiNode> &node)
40 {
41  this->setNode(node);
42 }
43 
44 /* Special way of reading to sync all nodes */
46 {
47  XmippMpiProgram::read(argc, argv);
48  ProgRecFourierAccel::read(argc, (const char **)argv);
49 }
50 
51 /* Usage ------------------------------------------------------------------- */
52 void ProgMPIRecFourierAccel::defineParams()
53 {
55  addParamsLine(" [--mpi_job_size <size=100>] : Number of images sent to a cpu in a single job ");
56  addParamsLine(" : 100 may be a good value");
57  addParamsLine(" : if -1 the computer will put the maximum");
58  addParamsLine(" : posible value that may not be the best option");
59 }
60 
61 /* Read parameters --------------------------------------------------------- */
62 void ProgMPIRecFourierAccel::readParams()
63 {
65  mpi_job_size=getIntParam("--mpi_job_size");
66 }
67 
68 /* Pre Run PreRun for all nodes but not for all works */
69 void ProgMPIRecFourierAccel::preRun()
70 {
71  if (nProcs < 2)
72  REPORT_ERROR(ERR_ARG_INCORRECT,"This program cannot be executed in a single working node");
73 
74  if (node->isMaster())
75  {
76  show();
77  SF.read(fn_in);
78  //Send verbose level to node 1
79  MPI_Send(&verbose, 1, MPI_INT, 1, TAG_SETVERBOSE, MPI_COMM_WORLD);
80  }
81  else
82  {
84  SF.firstRowId();
85  }
86 
87  //read projection file / divide by mpi_job_size
88  numberOfJobs=(size_t)ceil((double)SF.size()/mpi_job_size);
89 
90  //only one node will write in the console
91  if (node->rank == 1 )
92  {
93  // Get verbose status
94  MPI_Recv(&verbose, 1, MPI_INT, 0, TAG_SETVERBOSE, MPI_COMM_WORLD, &status);
95  //#define DEBUG
96 #ifdef DEBUG
97 
98  std::cerr << "SF.ImgNo() mpi_job_size "
99  << SF.ImgNo() << " "
100  << mpi_job_size
101  << std::endl;
102  std::cerr << "numberOfJobs: " << numberOfJobs << std::endl <<std::endl;
103 #endif
104 #undef DEBUGDOUBLE
105 
106  }
107 }
108 /* Run --------------------------------------------------------------------- */
109 void ProgMPIRecFourierAccel::run()
110 {
111  preRun();
112  struct timeval start_time, end_time;
113  MPI_Group orig_group, new_group;
114  MPI_Comm new_comm;
115  long int total_usecs;
116  double total_time_processing=0., total_time_communicating=0.;
117  int * ranks;
118 
119  // Real workers, rank=0 is the master, does not work
120  nProcs = nProcs - 1;
121 
122  if( numberOfJobs < nProcs )
123  {
124  if( node->isMaster() )
125  {
126  std::cerr << "\nReducing the number of MPI workers from " <<
127  nProcs << " to " <<
128  numberOfJobs << std::endl;
129  }
130 
132 
133  // Unused nodes are removed from the MPI communicator
134  node->active = (node->rank <= numberOfJobs);
135  }
136 
137  // Generate a new group to do all reduce without the master
138  ranks = new int [nProcs];
139  for (int i=0;i<(int)nProcs;i++)
140  ranks[i]=i+1;
141  MPI_Comm_group(MPI_COMM_WORLD, &orig_group);
142  MPI_Group_incl(orig_group, nProcs, ranks, &new_group);
143  MPI_Comm_create(MPI_COMM_WORLD, new_group, &new_comm);
144 
145  if (node->isMaster())
146  {
147  gettimeofday(&start_time,nullptr);
148  std::cerr<<"Computing volume"<<std::endl;
149  if ( verbose )
151 
152  for (size_t i=0;i<numberOfJobs;i++)
153  {
154 // #define DEBUG
155 #ifdef DEBUG
156  std::cerr << "master-recv i=" << i << std::endl;
157  std::cerr << "numberOfJobs: " << numberOfJobs << std::endl <<std::endl;
158 #endif
159 #undef DEBUG
160 
161  MPI_Recv(nullptr, 0, MPI_INT, MPI_ANY_SOURCE, TAG_FREEWORKER,
162  MPI_COMM_WORLD, &status);
163 
164  if ( status.MPI_TAG != TAG_FREEWORKER )
165  REPORT_ERROR(ERR_ARG_INCORRECT,"Unexpected TAG, please contact developers");
166 
167  // #define DEBUG
168 #ifdef DEBUG
169  std::cerr << "master-send i=" << i << std::endl;
170 #endif
171 #undef DEBUG
172  MPI_Send(&i,
173  1,
174  MPI_INT,
175  status.MPI_SOURCE,
177  MPI_COMM_WORLD);
178 
179  if (verbose)
180  progress_bar(i);
181  }
182 
183  // Wait for all processes to finish processing current jobs
184  // so time statistics are correct
185  for ( size_t i = 1 ; i <= nProcs ; i ++ )
186  {
187  MPI_Recv(nullptr,
188  0,
189  MPI_INT,
190  MPI_ANY_SOURCE,
192  MPI_COMM_WORLD,
193  &status);
194  }
195 
196  // update progress
197  gettimeofday(&end_time,nullptr);
198  total_usecs = (end_time.tv_sec-start_time.tv_sec) * 1000000 + (end_time.tv_usec-start_time.tv_usec);
199  total_time_processing += ((double)total_usecs/(double)1000000);
200 
201  gettimeofday(&start_time,nullptr);
202  // Start collecting results
203  for ( size_t i = 1 ; i <= nProcs ; i ++ )
204  {
205  MPI_Send(nullptr,
206  0,
207  MPI_INT,
208  i,
209  TAG_TRANSFER,
210  MPI_COMM_WORLD );
211  }
212 
213  gettimeofday(&end_time,nullptr);
214  total_usecs = (end_time.tv_sec-start_time.tv_sec) * 1000000 + (end_time.tv_usec-start_time.tv_usec);
215  total_time_communicating += ((double)total_usecs/(double)1000000);
216 
217  if (verbose > 0)
218  {
219  std::cout << "\n\nProcessing time: " << total_time_processing << " secs." << std::endl;
220  std::cout << "Transfers time: " << total_time_communicating << " secs." << std::endl;
221  std::cout << "Execution completed successfully"<< std::endl;
222  }
223  }
224  else if( node->active ) {
226  while (1)
227  {
228  int jobNumber;
229  //#define DEBUG
230 #ifdef DEBUG
231  std::cerr << "slave-send TAG_FREEWORKER rank=" << node->rank << std::endl;
232 #endif
233  #undef DEBUG
234  //I am free
235  MPI_Send(nullptr, 0, MPI_INT, 0, TAG_FREEWORKER, MPI_COMM_WORLD);
236  MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
237 
238  if (status.MPI_TAG == TAG_TRANSFER)
239  {
240  // reduce the data
242 
243  //If I do not read this tag
244  //master will no further process
245  MPI_Recv(nullptr, 0, MPI_INT, 0, TAG_TRANSFER, MPI_COMM_WORLD, &status);
246 #ifdef DEBUG
247  std::cerr << "Wr" << node->rank << " " << "TAG_STOP" << std::endl;
248 #endif
249  for(int z = 0; z <= maxVolumeIndexYZ; z++) {
250  for(int y = 0; y <= maxVolumeIndexYZ; y++) {
251  if (node->rank == 1) {
252  MPI_Reduce(MPI_IN_PLACE,&(tempVolume[z][y][0]),
253  (maxVolumeIndexX+1)*2, MPI_FLOAT,
254  MPI_SUM, 0, new_comm);
255  MPI_Reduce(MPI_IN_PLACE,&(tempWeights[z][y][0]),
256  (maxVolumeIndexX+1), MPI_FLOAT,
257  MPI_SUM, 0, new_comm);
258  } else {
259  MPI_Reduce(&(tempVolume[z][y][0]),&(tempVolume[z][y][0]),
260  (maxVolumeIndexX+1)*2, MPI_FLOAT,
261  MPI_SUM, 0, new_comm);
262  MPI_Reduce(&(tempWeights[z][y][0]),&(tempWeights[z][y][0]),
263  (maxVolumeIndexX+1), MPI_FLOAT,
264  MPI_SUM, 0, new_comm);
265  }
266  }
267  }
268 
269  if ( node->rank == 1 )
270  {
271  gettimeofday(&end_time,nullptr);
273  break;
274  }
275  else
276  {
277  // release data and finish the thread
279  break;
280  }
281  }
282  else if (status.MPI_TAG == TAG_WORKFORWORKER)
283  {
284  //get the job number
285  MPI_Recv(&jobNumber, 1, MPI_INT, 0, TAG_WORKFORWORKER, MPI_COMM_WORLD, &status);
286 
287  size_t min_i, max_i;
288  min_i = jobNumber*mpi_job_size;
289  max_i = min_i + mpi_job_size - 1;
290 
291  if ( max_i >= SF.size())
292  max_i = SF.size()-1;
293  // do the job
294  processImages( min_i, max_i);
295  } else {
296  std::cerr << "3) Received unknown TAG I quit" << std::endl;
297  exit(0);
298  }
299  }
300  }
301 
302  // Kill threads used on workers
303  if ( node->active && !node->isMaster() )
304  {
306  }
307  delete[] ranks;
308 }
void init_progress_bar(long total)
std::complex< float > *** tempVolume
virtual void read(int argc, const char **argv, bool reportErrors=true)
void read(const FileName &inFile, const std::vector< MDLabel > *desiredLabels=nullptr, bool decomposeStack=true) override
constexpr int TAG_TRANSFER
#define REPORT_ERROR(nerr, ErrormMsg)
Definition: xmipp_error.h:211
static double * y
size_t numberOfJobs
Definition: xmipp_mpi.h:168
size_t size() const override
#define i
int argc
Original command line arguments.
Definition: xmipp_program.h:86
void read(int argc, char **argv)
void finishComputations(const FileName &out_name)
Incorrect argument received.
Definition: xmipp_error.h:113
size_t firstRowId() const override
void progress_bar(long rlen)
std::shared_ptr< MpiNode > node
Definition: xmipp_mpi.h:164
const char ** argv
Definition: xmipp_program.h:87
#define TAG_WORKFORWORKER
double z
void processImages(int firstImageIndex, int lastImageIndex)
int verbose
Verbosity level.
#define TAG_FREEWORKER
MPI_Status status
Definition: xmipp_mpi.h:170
void setNode(const std::shared_ptr< MpiNode > &node)
Definition: xmipp_mpi.cpp:256
constexpr int TAG_SETVERBOSE
int getIntParam(const char *param, int arg=0)
void read(int argc, char **argv)
Definition: xmipp_mpi.cpp:240
void addParamsLine(const String &line)