Xmipp  v3.23.11-Nereus
mpi_angular_project_library.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  *
3  * Authors: Roberto Marabini (roberto@cnb.csic.es)
4  *
5  * Unidad de Bioinformatica of Centro Nacional de Biotecnologia , CSIC
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
20  * 02111-1307 USA
21  *
22  * All comments concerning this program package may be sent to the
23  * e-mail address 'xmipp@cnb.csic.es'
24  ***************************************************************************/
25 
26 //#include "mpi_run.h"
27 
28 #include <mpi.h>
29 #include <cstring>
30 #include <cstdlib>
31 #include <iostream>
32 #include <sstream>
33 #include <fstream>
34 #include <iomanip>
35 #include <random>
37 
39 
40 constexpr int TAG_WORKFORWORKER = 0;
41 constexpr int TAG_STOP = 1;
42 constexpr int TAG_WAIT = 2;
43 constexpr int TAG_FREEWORKER = 3;
44 
45 #define DEBUG
47 {
48 public:
49  //int rank, size, num_img_tot;
50 
51 
53  int nProcs;
54 
57 
60 
62  int rank;
63 
65  MPI_Status status;
66 
67  /* constructor ------------------------------------------------------- */
69  {
70  //parent class constructor will be called by deault without parameters
71  MPI_Comm_size(MPI_COMM_WORLD, &nProcs);
72  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
73  //Blocks until all process have reached this routine.
74  //very likelly this is
75  MPI_Barrier(MPI_COMM_WORLD);
76  }
77 
78 
79  /* Read parameters --------------------------------------------------------- */
80  void readParams()
81  {
82  if (nProcs < 2)
83  error_exit("This program cannot be executed in a single working node");
85  mpi_job_size = getIntParam("--mpi_job_size");
86  }
87 
88  /* Usage ------------------------------------------------------------------- */
89  void defineParams()
90  {
92  addParamsLine(" [--mpi_job_size <size=10>]: Number of images sent to a cpu in a single job");
93  addParamsLine(" : 10 may be a good value");
94  addParamsLine(" : if -1 the computer will put the maximum");
95  addParamsLine(" : posible value that may not be the best option");
96  }
97 
98 
99  /* Show -------------------------------------------------------------------- */
100  void show()
101  {
103  std::cerr << " Size of mpi jobs " << mpi_job_size <<std::endl;
104  }
105 
106  /* Pre Run PreRun for all nodes but not for all works */
107  void preRun()
108  {
109  //do not use rank 0 here, since rank 1 will manipulate this file later
110  if (rank == 1)
111  {
112  unlink(output_file.c_str());
113  }
114 
115 
116  //#define DEBUGTIME
117 #ifdef DEBUGTIME
118 #include <ctime>
119 
120  time_t start,end;
121  double time_dif;
122  time (&start);
123  time (&end);
124  time_dif = difftime (end,start);
125  start=end;
126  std::cerr<<" starting prerun rank= "<<rank<<std::endl;
127 #endif
128 
129  int my_seed;
130  if (rank == 0)
131  {
132  show();
133  //random numbers must be the same in all nodes
135  {
136  srand(getpid());
137  my_seed = rand();
138  }
139  }
140 #ifdef DEBUGTIME
141  time (&end);
142  time_dif = difftime (end,start);
143  start=end;
144  std::cerr<<" set rand seed rank= "<<rank<<std::endl;
145 #endif
146  //Bcast must be seem by all processors
148  {
149  MPI_Bcast (&my_seed, 1, MPI_INT, 0, MPI_COMM_WORLD);
151 #ifdef DEBUGTIME
152 
153  time (&end);
154  time_dif = difftime (end,start);
155  start=end;
156  std::cerr<<" after perturb rank= "<<rank<<std::endl;
157 #endif
158 
159  }
160  //all ranks
162  //symmetry for sampling may be different from neighbourhs
164  REPORT_ERROR(ERR_NUMERICAL, (std::string)"angular_project_library::run Invalid symmetry" + fn_sym);//set sampling must go before set noise
165  if(angular_distance_bool!=0)
167 
168 #ifdef DEBUGTIME
169 
170  time (&end);
171  time_dif = difftime (end,start);
172  start=end;
173  std::cerr<<" setsampling rank= "<<rank<<std::endl;
174 #endif
175  //true -> half_sphere
178  //store symmetry matrices, this is faster than computing them each time
180 
181 #ifdef DEBUGTIME
182 
183  time (&end);
184  time_dif = difftime (end,start);
185  start=end;
186  std::cerr<<" compute sampling points rank= "<<rank<<std::endl;
187 #endif
188 
189  // We first sample The whole sphere
190  // Then we remove point redundant due to sampling symmetry
191  // use old symmetry, this is geometric does not use L_R
193 
194  //=========================
195  //======================
196  //recompute symmetry with neigh symmetry
198  REPORT_ERROR(ERR_NUMERICAL, (std::string)"angular_project_library::run Invalid neig symmetry" + fn_sym_neigh);
201  //precompute product between symmetry matrices and experimental data
202  if (FnexperimentalImages.size() > 0)
204 #ifdef DEBUGTIME
205 
206  time (&end);
207  time_dif = difftime (end,start);
208  start=end;
209  std::cerr<<" remove redundant rank= "<<rank<<std::endl;
210 #endif
211 
212  if (FnexperimentalImages.size() > 0 &&
214  {
216 #ifdef DEBUGTIME
217 
218  time (&end);
219  time_dif = difftime (end,start);
220  start=end;
221  std::cerr<<" remove points far away rank= "<<rank<<std::endl;
222 #endif
223 
224  }
225 
226  /* save files */
227  if (rank == 0)
228  {
230  {
231  //find sampling point closer to experimental point (only 0) and bool
232  //and save docfile with this information
234  }
235  //mysampling.createSymFile(symmetry, sym_order);
237  }
238 
239  if (rank != 0)
240  {
242  inputVol().setXmippOrigin();
243  Xdim = XSIZE(inputVol());
244  Ydim = YSIZE(inputVol());
245  }
246  if (rank == 0)
247  {
249  {
252  }
253  }
254  //release some memory
256 
257  if (mpi_job_size != -1)
258  {
259  numberOfJobs = (int)ceil((double)(mysampling.no_redundant_sampling_points_angles.size())/mpi_job_size);
260  }
261  else
262  {
263  numberOfJobs=nProcs-1;//one node is the master
264  mpi_job_size=(int)ceil((double)mysampling.no_redundant_sampling_points_angles.size()/numberOfJobs);
265  }
266 
267  verbose=false;
268  #define DEBUG
269 #ifdef DEBUG
270 
271  if (rank == 1)
272  {
273  std::cerr << "numberOfJobs: " << numberOfJobs << std::endl
274  << "number of projections to be created: " << mysampling.no_redundant_sampling_points_angles.size()
275  <<std::endl;
276  }
277 #endif
278  #undef DEBUG
279 
280  //create blanck outfile so main header does not need to be re-writen
281  //rank 1 has already xdim and ydim.
282  if (rank == 1)
283  {
284  int numberProjections = std::ceil(360.0 / psi_sampling);
285  numberProjections *= mysampling.no_redundant_sampling_points_angles.size();
286  std::cerr << "creating Blank file: "
287  << output_file << " for "
288  << numberProjections << " projections of size: " << Xdim << " " << Ydim
289  << std::endl;
290  createEmptyFile(output_file,Xdim,Ydim,1,numberProjections,true,WRITE_REPLACE);
291  }
292 
293  MPI_Barrier(MPI_COMM_WORLD);
294  }
295  /* Run --------------------------------------------------------------------- */
296  void process()
297  {
298  if (rank == 0)
299  {
300  int stopTagsSent =0;
301  int c = XMIPP_MAX(1, numberOfJobs / 60);
302  init_progress_bar(numberOfJobs);
303  for (int i=0;i<numberOfJobs;)
304  {
305  //collect data if available
306  //be aware that mpi_Probe will block the program until a message is received
307  MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
308  // worker is free
309  if (status.MPI_TAG == TAG_FREEWORKER)
310  {
311  MPI_Recv(nullptr, 0, MPI_INT, MPI_ANY_SOURCE, TAG_FREEWORKER,
312  MPI_COMM_WORLD, &status);
313  //#define DEBUG
314 #ifdef DEBUG
315 
316  std::cerr << "Mr_f received TAG_FREEWORKER from worker " << status.MPI_SOURCE << std::endl;
317 #endif
318  //send work
319  MPI_Send(&i,
320  1,
321  MPI_INT,
322  status.MPI_SOURCE,
324  MPI_COMM_WORLD);
325  i++; //increase job number
326  //#define DEBUG
327 #ifdef DEBUG
328 
329  std::cerr << "Ms_f sent TAG_WORKFORWORKER to worker " << status.MPI_SOURCE << std::endl;
330  std::cerr << "Sent jobNo " << i << std::endl;
331 #endif
332  //#undef DEBUG
333  }
334  else
335  {
336  std::cerr << "M_f Received unknown TAG" << std::endl;
337  exit(0);
338  }
339 
340  if (i % c == 0)
341  progress_bar(i);
342  }
343  progress_bar(numberOfJobs);
344 
345  //send TAG_STOP
346  while (stopTagsSent < (nProcs-1))
347  {
348  MPI_Recv(nullptr, 0, MPI_INT, MPI_ANY_SOURCE, TAG_FREEWORKER,
349  MPI_COMM_WORLD, &status);
350 #ifdef DEBUG
351 
352  std::cerr << "Mr received TAG_FREEWORKER from worker " << status.MPI_SOURCE << std::endl;
353  std::cerr << "Ms sent TAG_STOP to worker" << status.MPI_SOURCE << std::endl;
354 #endif
355  #undef DEBUG
356 
357  MPI_Send(nullptr, 0, MPI_INT, status.MPI_SOURCE, TAG_STOP, MPI_COMM_WORLD);
358  stopTagsSent++;
359  }
360  //only rank 0 create sel file
361  if(rank==0)
362  {
363 
364  MetaDataVec mySFin;
365  mySFin.read(output_file_root + "_angles.doc");
366 #define ANGLESDOC
367 #ifdef ANGLESDOC
368 // std::cerr << "DEBUG_ROB, output_file_root + angle.doc:"
369 // << output_file_root + "_angles.doc" << std::endl;
370 #endif
371  MetaDataVec mySF;
372  FileName fn_temp;
373 
374  size_t myCounter = 0;
375  size_t id;
376  int ref;
377 
378  for (size_t objId : mySFin.ids())
379  {
380  // FIXME: read whole row at once
381  double x,y,z, rot, tilt, psi;
382  mySFin.getValue(MDL_ANGLE_ROT,rot,objId);
383  mySFin.getValue(MDL_ANGLE_TILT,tilt,objId);
384  mySFin.getValue(MDL_ANGLE_PSI,psi,objId);
385  mySFin.getValue(MDL_X,x,objId);
386  mySFin.getValue(MDL_Y,y,objId);
387  mySFin.getValue(MDL_Z,z,objId);
388  mySFin.getValue(MDL_REF,ref,objId);
389 
390  for (int i = 0; i < std::ceil(360.0 / psi_sampling); ++i) {
391  //FIXME, do I have order?
392  fn_temp.compose(++myCounter,output_file);
393  id = mySF.addObject();
394  mySF.setValue(MDL_IMAGE,fn_temp, id);
395  mySF.setValue(MDL_ENABLED,1, id);
396 
397  mySF.setValue(MDL_ANGLE_ROT,rot, id);
398  mySF.setValue(MDL_ANGLE_TILT,tilt, id);
399  mySF.setValue(MDL_ANGLE_PSI,psi + i * psi_sampling, id);
400  mySF.setValue(MDL_X,x, id);
401  mySF.setValue(MDL_Y,y, id);
402  mySF.setValue(MDL_Z,z, id);
403  mySF.setValue(MDL_SCALE,1.0,id);
404  mySF.setValue(MDL_REF,ref,id);
405  }
406  }
407  fn_temp=output_file_root+".doc";
408  mySF.setComment("x,y,z refer to the coordinates of the unitary vector at direction given by the euler angles");
409  mySF.write(fn_temp);
410 #ifndef ANGLESDOC
411  unlink((output_file_root+"_angles.doc").c_str());
412 #endif
413 #undef ANGLESDOC
414  }
415  }
416  else
417  {
418  // Select only relevant part of selfile for this rank
419  // job number
420  // job size
421  // aux variable
422  while (1)
423  {
424  int jobNumber;
425  //I am free
426  MPI_Send(nullptr, 0, MPI_INT, 0, TAG_FREEWORKER, MPI_COMM_WORLD);
427  //#define DEBUG
428 #ifdef DEBUG
429 
430  std::cerr << "W" << rank << " " << "sent TAG_FREEWORKER to master " << std::endl;
431 #endif
432 #undef DEBUG
433  //get yor next task
434  MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
435 #ifdef DEBUG
436 
437  std::cerr << "W" << rank << " " << "probe MPI_ANY_TAG " << std::endl;
438 #endif
439 
440  if (status.MPI_TAG == TAG_STOP)//no more jobs exit
441  {
442  //If I do not read this tag
443  //master will no further process
444  //a posibility is a non-blocking send
445  MPI_Recv(nullptr, 0, MPI_INT, 0, TAG_STOP,
446  MPI_COMM_WORLD, &status);
447 #ifdef DEBUG
448 
449  std::cerr << "Wr" << rank << " " << "TAG_STOP" << std::endl;
450 #endif
451 
452  break;
453  }
454  if (status.MPI_TAG == TAG_WORKFORWORKER)
455  //there is still some work to be done
456  {
457  //get the jobs number
458  MPI_Recv(&jobNumber, 1, MPI_INT, 0, TAG_WORKFORWORKER, MPI_COMM_WORLD, &status);
459 #ifdef DEBUG
460 
461  std::cerr << "Wr" << rank << " " << "TAG_WORKFORWORKER" << std::endl;
462  std::cerr << "jobNumber " << jobNumber << std::endl;
463 #endif
464 #undef DEBUG
465  // Process all images
466  project_angle_vector(jobNumber*mpi_job_size,
467  std::min((size_t)((jobNumber+1)* mpi_job_size -1),
469  verbose);
470  //get yor next task
471  }
472  else
473  {
474  std::cerr << "3) Received unknown TAG I quit" << std::endl;
475  exit(0);
476  }
477  }
478  }
479  }
480 
482  {
483  if (rank==0 && fn_groups!="")
485  }
486 
487  /* a short function to print a message and exit */
488  void error_exit(const char * msg)
489  {
490  fprintf(stderr, "%s", msg);
491  MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
492  }
493 
494  void run()
495  {
496  try
497  {
498  preRun();
499  process();
501  }
502  catch (XmippError &XE)
503  {
504  std::cerr << "Error!" <<std::endl;
505  std::cerr << XE.what();
506  MPI_Finalize();
507  exit(1);
508  }
509 
510  if (rank == 0)
511  std::cout << "Done!" <<std::endl;
512  }
513 
514 };
void init_progress_bar(long total)
Rotation angle of an image (double,degrees)
#define YSIZE(v)
void setSampling(double sampling)
Definition: sampling.cpp:121
void min(Image< double > &op1, const Image< double > &op2)
void removeRedundantPoints(const int symmetry, int sym_order)
Definition: sampling.cpp:691
#define XMIPP_MAX(x, y)
Definition: xmipp_macros.h:193
void setNoise(double deviation, int my_seed=-1)
Definition: sampling.cpp:134
void read(const FileName &inFile, const std::vector< MDLabel > *desiredLabels=nullptr, bool decomposeStack=true) override
#define REPORT_ERROR(nerr, ErrormMsg)
Definition: xmipp_error.h:211
doublereal * c
void createAsymUnitFile(const FileName &docfilename)
Definition: sampling.cpp:1440
Z component (double)
void setNeighborhoodRadius(double neighborhood)
Definition: sampling.cpp:140
Tilting angle of an image (double,degrees)
static double * y
void findClosestSamplingPoint(const MetaData &DFi, const FileName &output_file_root)
Definition: sampling.cpp:1991
int readSymmetryFile(FileName fn_sym, double accuracy=SYM_ACCURACY)
Definition: symmetries.cpp:33
bool isSymmetryGroup(FileName fn_sym, int &pgGroup, int &pgOrder)
Definition: symmetries.cpp:601
constexpr int TAG_WORKFORWORKER
FileName fn_sym
symmetry file for sampling
void computeNeighbors(bool only_winner=false)
Definition: sampling.cpp:1715
void compose(const String &str, const size_t no, const String &ext="")
Special label to be used when gathering MDs in MpiMetadataPrograms.
void write(const FileName &outFile, WriteModeMetaData mode=MD_OVERWRITE) const
void saveSamplingFile(const FileName &fn_base, bool write_vectors=true, bool write_sampling_sphere=false)
Definition: sampling.cpp:1495
virtual IdIteratorProxy< false > ids()
doublereal * x
#define i
Is this image enabled? (int [-1 or 1])
virtual void setComment(const String &newComment="No comment")
constexpr int TAG_STOP
FileName fn_sym_neigh
symmetry file for heighbors computation
std::vector< Matrix1D< double > > exp_data_projection_direction_by_L_R
Definition: sampling.h:108
void computeSamplingPoints(bool only_half_sphere=true, double max_tilt=180, double min_tilt=0)
Definition: sampling.cpp:155
bool setValue(const MDObject &mdValueIn, size_t id)
size_t addObject() override
void project_angle_vector(int my_init, int my_end, bool verbose=true)
#define XSIZE(v)
void progress_bar(long rlen)
constexpr int TAG_WAIT
Error related to numerical calculation.
Definition: xmipp_error.h:179
double z
scaling factor for an image or volume (double)
void createEmptyFile(const FileName &filename, int xdim, int ydim, int Zdim, size_t select_img, bool isStack, int mode, int _swapWrite, const MDRowVec *md)
int verbose
Verbosity level.
void fillLRRepository(void)
Definition: sampling.cpp:2216
bool getValue(MDObject &mdValueOut, size_t id) const override
Class to which the image belongs (int)
std::vector< Matrix1D< double > > no_redundant_sampling_points_angles
Definition: sampling.h:121
Y component (double)
double psi(const double x)
X component (double)
fprintf(glob_prnt.io, "\)
int read(const FileName &name, DataMode datamode=DATA, size_t select_img=ALL_IMAGES, bool mapData=false, int mode=WRITE_READONLY)
void fillExpDataProjectionDirectionByLR(const MetaData &DFi)
Definition: sampling.cpp:2256
void removePointsFarAwayFromExperimentalData()
Definition: sampling.cpp:1928
int getIntParam(const char *param, int arg=0)
Name of an image (std::string)
SymList SL
Definition: sampling.h:138
void addParamsLine(const String &line)
constexpr int TAG_FREEWORKER