Xmipp  v3.23.11-Nereus
Public Member Functions | Public Attributes | List of all members
ProgMPIRecFourier Class Reference

#include <mpi_reconstruct_fourier.h>

Inheritance diagram for ProgMPIRecFourier:
Inheritance graph
[legend]
Collaboration diagram for ProgMPIRecFourier:
Collaboration graph
[legend]

Public Member Functions

 ProgMPIRecFourier ()
 
 ProgMPIRecFourier (int argc, char *argv[])
 
 ProgMPIRecFourier (const std::shared_ptr< MpiNode > &node)
 
void read (int argc, char **argv)
 
void readParams ()
 
void defineParams ()
 
void preRun ()
 
void run ()
 
int sendDataInChunks (double *pointer, int dest, int totalSize, int buffSize, MPI_Comm comm)
 
- Public Member Functions inherited from ProgRecFourier
void readParams ()
 Read arguments from command line. More...
 
void defineParams ()
 Read arguments from command line. More...
 
void show ()
 
void run ()
 
void produceSideinfo ()
 Produce side info: fill arrays with relevant transformation matrices. More...
 
void finishComputations (const FileName &out_name)
 
void processImages (int firstImageIndex, int lastImageIndex, bool saveFSC=false, bool reprocessFlag=false)
 Process one image. More...
 
void correctWeight ()
 Method for the correction of the fourier coefficients. More...
 
void forceWeightSymmetry (MultidimArray< double > &FourierWeights)
 Force the weights to be symmetrized. More...
 
virtual void setIO (const FileName &fn_in, const FileName &fn_out)
 Functions of common reconstruction interface. More...
 
- Public Member Functions inherited from ProgReconsBase
virtual ~ProgReconsBase ()
 
- Public Member Functions inherited from XmippProgram
const char * getParam (const char *param, int arg=0)
 
const char * getParam (const char *param, const char *subparam, int arg=0)
 
int getIntParam (const char *param, int arg=0)
 
int getIntParam (const char *param, const char *subparam, int arg=0)
 
double getDoubleParam (const char *param, int arg=0)
 
double getDoubleParam (const char *param, const char *subparam, int arg=0)
 
float getFloatParam (const char *param, int arg=0)
 
float getFloatParam (const char *param, const char *subparam, int arg=0)
 
void getListParam (const char *param, StringVector &list)
 
int getCountParam (const char *param)
 
bool checkParam (const char *param)
 
bool existsParam (const char *param)
 
void addParamsLine (const String &line)
 
void addParamsLine (const char *line)
 
ParamDefgetParamDef (const char *param) const
 
virtual void quit (int exit_code=0) const
 
void initProgress (size_t total, size_t stepBin=60)
 
void setProgress (size_t value=0)
 
void endProgress ()
 
void processDefaultComment (const char *param, const char *left)
 
void setDefaultComment (const char *param, const char *comment)
 
virtual void initComments ()
 
void setProgramName (const char *name)
 
void addUsageLine (const char *line, bool verbatim=false)
 
void clearUsage ()
 
void addExampleLine (const char *example, bool verbatim=true)
 
void addSeeAlsoLine (const char *seeAlso)
 
void addKeywords (const char *keywords)
 
const char * name () const
 
virtual void usage (int verb=0) const
 
virtual void usage (const String &param, int verb=2)
 
int version () const
 
virtual void show () const
 
virtual void read (int argc, const char **argv, bool reportErrors=true)
 
virtual void read (int argc, char **argv, bool reportErrors=true)
 
void read (const String &argumentsLine)
 
 XmippProgram ()
 
 XmippProgram (int argc, const char **argv)
 
virtual ~XmippProgram ()
 
- Public Member Functions inherited from XmippMpiProgram
void read (int argc, char **argv)
 
virtual int tryRun ()
 

Public Attributes

long int sizeout
 
int mpi_job_size
 
- Public Attributes inherited from ProgRecFourier
FileName fn_out
 
FileName fn_sym
 
FileName fn_sel
 
FileName fn_doc
 
FileName fn_fsc
 
MetaDataVec SF
 
bool do_weights
 
bool useCTF
 
bool phaseFlipped
 
double minCTF
 
double Ts
 
double padding_factor_proj
 
double padding_factor_vol
 
double sampling_rate
 
double maxResolution
 Max resolution in Angstroms. More...
 
int NiterWeight
 Number of iterations for the weight. More...
 
int numThreads
 Number of threads to use in parallel to process a single image. More...
 
pthread_t * th_ids
 IDs for the threads. More...
 
ImageThreadParamsth_args
 Contains parameters passed to each thread. More...
 
int threadOpCode
 Tells the threads what to do next. More...
 
size_t rowsProcessed
 Number of rows already processed on an image. More...
 
pthread_mutex_t workLoadMutex
 Controls mutual exclusion on critical zones of code. More...
 
barrier_t barrier
 To create a barrier synchronization for threads. More...
 
int * statusArray
 A status array for each row in an image (processing, processed,etc..) More...
 
int thrWidth
 How many image rows are processed at a time by a single thread. More...
 
int imgSize
 
int volPadSizeX
 
int volPadSizeY
 
int volPadSizeZ
 
Matrix1D< double > Fourier_blob_table
 
Matrix1D< double > blobTableSqrt
 
Matrix1D< double > fourierBlobTableSqrt
 
double iDeltaFourier
 
double iDeltaSqrt
 
double maxResolution2
 
struct blobtype blob
 
std::vector< Matrix2D< double > > R_repository
 
FourierTransformer transformerVol
 
FourierTransformer transformerImg
 
MultidimArray< std::complex< double > > VoutFourier
 
MultidimArray< std::complex< double > > VoutFourierTmp
 
MultidimArray< double > FourierWeights
 
MultidimArray< double > paddedImg
 
Image< double > Vout
 
- Public Attributes inherited from XmippProgram
bool doRun
 
bool runWithoutArgs
 
int verbose
 Verbosity level. More...
 
int debug
 

Additional Inherited Members

- Static Public Member Functions inherited from ProgRecFourier
static void * processImageThread (void *threadArgs)
 Defines what a thread should do. More...
 
- Protected Member Functions inherited from XmippProgram
void defineCommons ()
 
- Protected Member Functions inherited from XmippMpiProgram
void setNode (const std::shared_ptr< MpiNode > &node)
 
- Protected Attributes inherited from XmippProgram
int errorCode
 
ProgramDefprogDef
 Program definition and arguments parser. More...
 
std::map< String, CommentListdefaultComments
 
int argc
 Original command line arguments. More...
 
const char ** argv
 
- Protected Attributes inherited from XmippMpiProgram
std::shared_ptr< MpiNodenode
 
size_t nProcs
 
size_t numberOfJobs
 
MPI_Status status
 

Detailed Description

Definition at line 59 of file mpi_reconstruct_fourier.h.

Constructor & Destructor Documentation

◆ ProgMPIRecFourier() [1/3]

ProgMPIRecFourier::ProgMPIRecFourier ( )
inline

Empty constructor

Definition at line 70 of file mpi_reconstruct_fourier.h.

71  {}

◆ ProgMPIRecFourier() [2/3]

ProgMPIRecFourier::ProgMPIRecFourier ( int  argc,
char *  argv[] 
)

Empty constructor

Definition at line 36 of file mpi_reconstruct_fourier.cpp.

37 {
38  this->read(argc, argv);
39 }
void read(int argc, char **argv)
int argc
Original command line arguments.
Definition: xmipp_program.h:86
const char ** argv
Definition: xmipp_program.h:87

◆ ProgMPIRecFourier() [3/3]

ProgMPIRecFourier::ProgMPIRecFourier ( const std::shared_ptr< MpiNode > &  node)

Definition at line 44 of file mpi_reconstruct_fourier.cpp.

45 {
46  this->setNode(node);
47 }
std::shared_ptr< MpiNode > node
Definition: xmipp_mpi.h:164
void setNode(const std::shared_ptr< MpiNode > &node)
Definition: xmipp_mpi.cpp:256

Member Function Documentation

◆ defineParams()

void ProgMPIRecFourier::defineParams ( )
virtual

destructor

Reimplemented from XmippProgram.

Definition at line 57 of file mpi_reconstruct_fourier.cpp.

58 {
60  addParamsLine(" [--mpi_job_size <size=10>] : Number of images sent to a cpu in a single job ");
61  addParamsLine(" : 10 may be a good value");
62  addParamsLine(" : if -1 the computer will put the maximum");
63  addParamsLine(" : posible value that may not be the best option");
64 }
void defineParams()
Read arguments from command line.
void addParamsLine(const String &line)

◆ preRun()

void ProgMPIRecFourier::preRun ( )

Definition at line 74 of file mpi_reconstruct_fourier.cpp.

75 {
76  if (nProcs < 2)
77  REPORT_ERROR(ERR_ARG_INCORRECT,"This program cannot be executed in a single working node");
78 
79  if (node->isMaster())
80  {
81  show();
82  SF.read(fn_sel);
83 
84  //Send verbose level to node 1
85  MPI_Send(&verbose, 1, MPI_INT, 1, TAG_SETVERBOSE, MPI_COMM_WORLD);
86  }
87  else
88  {
90  SF.firstRowId();
91  }
92 
93  //leer sel file / dividir por mpi_job_size
94  numberOfJobs=(size_t)ceil((double)SF.size()/mpi_job_size);
95 
96  //only one node will write in the console
97  if (node->rank == 1 )
98  {
99  // Get verbose status
100  MPI_Recv(&verbose, 1, MPI_INT, 0, TAG_SETVERBOSE, MPI_COMM_WORLD, &status);
101 
102  //use threads for volume inverse fourier transform, plan is created in setReal()
103  //only rank=1 makes inverse Fourier trnasform
105 
106  //#define DEBUG
107 #ifdef DEBUG
108 
109  std::cerr << "SF.ImgNo() mpi_job_size "
110  << SF.ImgNo() << " "
111  << mpi_job_size
112  << std::endl;
113  std::cerr << "numberOfJobs: " << numberOfJobs << std::endl <<std::endl;
114 #endif
115 #undef DEBUGDOUBLE
116 
117  }
118 }
void read(const FileName &inFile, const std::vector< MDLabel > *desiredLabels=nullptr, bool decomposeStack=true) override
void setThreadsNumber(int tNumber)
Definition: xmipp_fftw.h:119
#define REPORT_ERROR(nerr, ErrormMsg)
Definition: xmipp_error.h:211
int numThreads
Number of threads to use in parallel to process a single image.
size_t numberOfJobs
Definition: xmipp_mpi.h:168
size_t size() const override
Incorrect argument received.
Definition: xmipp_error.h:113
size_t firstRowId() const override
std::shared_ptr< MpiNode > node
Definition: xmipp_mpi.h:164
void produceSideinfo()
Produce side info: fill arrays with relevant transformation matrices.
int verbose
Verbosity level.
MPI_Status status
Definition: xmipp_mpi.h:170
FourierTransformer transformerVol
constexpr int TAG_SETVERBOSE

◆ read()

void ProgMPIRecFourier::read ( int  argc,
char **  argv 
)

Definition at line 50 of file mpi_reconstruct_fourier.cpp.

51 {
53  ProgRecFourier::read(argc, (const char **)argv);
54 }
virtual void read(int argc, const char **argv, bool reportErrors=true)
int argc
Original command line arguments.
Definition: xmipp_program.h:86
const char ** argv
Definition: xmipp_program.h:87
void read(int argc, char **argv)
Definition: xmipp_mpi.cpp:240

◆ readParams()

void ProgMPIRecFourier::readParams ( )
virtual

Function in which each program will read parameters that it need. If some error occurs the usage will be printed out.

Reimplemented from XmippProgram.

Definition at line 67 of file mpi_reconstruct_fourier.cpp.

68 {
70  mpi_job_size=getIntParam("--mpi_job_size");
71 }
void readParams()
Read arguments from command line.
int getIntParam(const char *param, int arg=0)

◆ run()

void ProgMPIRecFourier::run ( )
virtual

This function will be start running the program. it also should be implemented by derived classes.

Reimplemented from XmippProgram.

Definition at line 120 of file mpi_reconstruct_fourier.cpp.

121 {
122  preRun();
123  struct timeval start_time, end_time;
124  MPI_Group orig_group, new_group;
125  MPI_Comm new_comm;
126  long int total_usecs;
127  double total_time_processing=0., total_time_weightening=0., total_time_communicating=0., total_time;
128  int iter=0;
129  int * ranks = nullptr;
130 
131  // Real workers, rank=0 is the master, does not work
132  nProcs = nProcs - 1;
133 
134  if( numberOfJobs < nProcs )
135  {
136  if( node->isMaster() )
137  {
138  std::cerr << "\nReducing the number of MPI workers from " <<
139  nProcs << " to " <<
140  numberOfJobs << std::endl;
141 
142  std::cerr << std::flush;
143  }
144 
146 
147  // Unused nodes are removed from the MPI communicator
148  node->active = (node->rank <= numberOfJobs);
150  }
151 
152  // Generate a new group to do all reduce without the master
153  ranks = new int [nProcs];
154  for (int i=0;i<(int)nProcs;i++)
155  ranks[i]=i+1;
156  MPI_Comm_group(MPI_COMM_WORLD, &orig_group);
157  MPI_Group_incl(orig_group, nProcs, ranks, &new_group);
158  MPI_Comm_create(MPI_COMM_WORLD, new_group, &new_comm);
159 
160  do
161  {
162  if (node->isMaster())
163  {
164  gettimeofday(&start_time,nullptr);
165 
166  std::cerr<<std::endl;
167  if (iter != NiterWeight)
168  std::cerr<<"Computing weights "<<iter+1<<"/"<<NiterWeight<<std::endl;
169  else
170  std::cerr<<"Computing volume"<<std::endl;
171 
172  if ( verbose )
174 
175  size_t FSC=numberOfJobs/2;
176  for (size_t i=0;i<numberOfJobs;i++)
177  {
178 
179  //#define DEBUG
180 #ifdef DEBUG
181  std::cerr << "master-recv i=" << i << std::endl;
182  std::cerr << "numberOfJobs: " << numberOfJobs << std::endl <<std::endl;
183 #endif
184 #undef DEBUG
185 
186  MPI_Recv(nullptr, 0, MPI_INT, MPI_ANY_SOURCE, TAG_FREEWORKER,
187  MPI_COMM_WORLD, &status);
188 
189  if ( status.MPI_TAG != TAG_FREEWORKER )
190  REPORT_ERROR(ERR_ARG_INCORRECT,"Unexpected TAG, please contact developers");
191 
192  //#define DEBUG
193 #ifdef DEBUG
194 
195  std::cerr << "master-send i=" << i << std::endl;
196 #endif
197 #undef DEBUG
198 
199  MPI_Send(&i,
200  1,
201  MPI_INT,
202  status.MPI_SOURCE,
204  MPI_COMM_WORLD);
205 
206  if (iter == NiterWeight)
207  {
208  if( i == FSC && fn_fsc != "")
209  {
210  // sending every worker COLLECT_FOR_FSC
211  for ( size_t worker = 1 ; worker <= nProcs ; worker ++ )
212  {
213  MPI_Recv(nullptr,
214  0,
215  MPI_INT,
216  MPI_ANY_SOURCE,
218  MPI_COMM_WORLD,
219  &status);
220 
221  MPI_Send( nullptr,
222  0,
223  MPI_INT,
224  status.MPI_SOURCE,
226  MPI_COMM_WORLD);
227  }
228  }
229  }
230 
231  if (verbose)
232  progress_bar(i);
233  }
234 
235 
236  // Wait for all processes to finish processing current jobs
237  // so time statistics are correct
238  for ( size_t i = 1 ; i <= nProcs ; i ++ )
239  {
240  MPI_Recv(nullptr,
241  0,
242  MPI_INT,
243  MPI_ANY_SOURCE,
245  MPI_COMM_WORLD,
246  &status);
247  }
248 
249  if (iter != NiterWeight)
250  {
251  gettimeofday(&end_time,nullptr);
252 
253  total_usecs = (end_time.tv_sec-start_time.tv_sec) * 1000000 + (end_time.tv_usec-start_time.tv_usec);
254  total_time_weightening += ((double)total_usecs/(double)1000000);
255  }
256  else
257  {
258  gettimeofday(&end_time,nullptr);
259 
260  total_usecs = (end_time.tv_sec-start_time.tv_sec) * 1000000 + (end_time.tv_usec-start_time.tv_usec);
261  total_time_processing += ((double)total_usecs/(double)1000000);
262  }
263 
264  gettimeofday(&start_time,nullptr);
265  // Start collecting results
266  for ( size_t i = 1 ; i <= nProcs ; i ++ )
267  {
268  MPI_Send(nullptr,
269  0,
270  MPI_INT,
271  i,
272  TAG_TRANSFER,
273  MPI_COMM_WORLD );
274  }
275  gettimeofday(&end_time,nullptr);
276  total_usecs = (end_time.tv_sec-start_time.tv_sec) * 1000000 + (end_time.tv_usec-start_time.tv_usec);
277  total_time_communicating += ((double)total_usecs/(double)1000000);
278 
279  if (iter == NiterWeight)
280  if (verbose > 0)
281  {
282  std::cout << "\n\nProcessing time: " << total_time_processing << " secs." << std::endl;
283  std::cout << "Transfers time: " << total_time_communicating << " secs." << std::endl;
284  std::cout << "Weighting time: " << total_time_weightening << " secs." << std::endl;
285  std::cout << "Execution completed successfully"<< std::endl;
286  }
287  }
288  else if( node->active )
289  {
290  // Select only relevant part of selfile for this rank
291  // job number
292  // job size
293  // aux variable
294  auto * fourierVolume = (double *)VoutFourier.data;
295  double * fourierWeights = FourierWeights.data;
296 
298 
299  //First
301  pthread_mutex_init( &workLoadMutex, nullptr );
302  statusArray = nullptr;
303  th_ids = (pthread_t *)malloc(numThreads * sizeof(pthread_t));
304  th_args = (ImageThreadParams *)malloc(numThreads * sizeof(ImageThreadParams));
305 
306  for ( int nt = 0 ; nt < numThreads ; nt++ )
307  {
308  th_args[nt].parent=this;
309  th_args[nt].myThreadID = nt;
310  th_args[nt].selFile = new MetaDataVec(SF);
311  pthread_create((th_ids+nt),nullptr,processImageThread,(void*)(th_args+nt));
312  }
313 
314  while (1)
315  {
316  int jobNumber;
317 
318  //#define DEBUG
319 #ifdef DEBUG
320 
321  std::cerr << "slave-send TAG_FREEWORKER rank=" << node->rank << std::endl;
322 #endif
323  #undef DEBUG
324  //I am free
325  MPI_Send(nullptr, 0, MPI_INT, 0, TAG_FREEWORKER, MPI_COMM_WORLD);
326  MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
327 
328  if (status.MPI_TAG == TAG_COLLECT_FOR_FSC)
329  {
330  //If I do not read this tag
331  //master will no further process
332  //a posibility is a non-blocking send
333  MPI_Recv(nullptr, 0, MPI_INT, 0, TAG_COLLECT_FOR_FSC, MPI_COMM_WORLD, &status);
334 
335  if( node->rank == 1 )
336  {
337  // Reserve memory for the receive buffer
338  auto * recBuffer = (double *) malloc (sizeof(double)*BUFFSIZE);
339  int receivedSize;
340  double * pointer;
341  pointer = fourierVolume;
342  int currentSource;
343 
344  if ( nProcs > 2 )
345  {
346  // Receive from other workers
347  for ( size_t i = 2 ; i <= nProcs ; i++)
348  {
349  MPI_Recv(nullptr,0, MPI_INT, MPI_ANY_SOURCE, TAG_FREEWORKER,
350  MPI_COMM_WORLD, &status);
351 
352  currentSource = status.MPI_SOURCE;
353 
354  pointer = fourierVolume;
355 
356  while (1)
357  {
358  MPI_Probe( currentSource, MPI_ANY_TAG, MPI_COMM_WORLD, &status );
359 
360  if ( status.MPI_TAG == TAG_FREEWORKER )
361  {
362  MPI_Recv(nullptr,0, MPI_INT, currentSource, TAG_FREEWORKER, MPI_COMM_WORLD, &status );
363  break;
364  }
365 
366  MPI_Recv( recBuffer,
367  BUFFSIZE,
368  MPI_DOUBLE,
369  currentSource,
370  MPI_ANY_TAG,
371  MPI_COMM_WORLD,
372  &status );
373 
374  MPI_Get_count( &status, MPI_DOUBLE, &receivedSize );
375 
376  for ( int i = 0 ; i < receivedSize ; i ++ )
377  {
378  pointer[i] += recBuffer[i];
379  }
380 
381  pointer += receivedSize;
382  }
383  }
384  }
385  free( recBuffer );
386 
387  Image<double> auxVolume1;
388  auxVolume1().alias( FourierWeights );
389  auxVolume1.write((std::string)fn_fsc + "_1_Weights.vol");
390 
391  Image< std::complex<double> > auxFourierVolume1;
392  auxFourierVolume1().alias( VoutFourier );
393  auxFourierVolume1.write((std::string) fn_fsc + "_1_Fourier.vol");
394 
395 
396  // Normalize global volume and store data
397  finishComputations(FileName((std::string) fn_fsc + "_split_1.vol"));
398 
399  Vout().initZeros(volPadSizeZ, volPadSizeY, volPadSizeX);
400 
402  Vout().clear();
406  }
407  else
408  {
409 
410  MPI_Send( nullptr,0,MPI_INT,1,TAG_FREEWORKER, MPI_COMM_WORLD );
411 
412  sendDataInChunks( fourierVolume, 1, 2*sizeout, BUFFSIZE, MPI_COMM_WORLD );
413 
414  MPI_Send( nullptr,0,MPI_INT,1,TAG_FREEWORKER, MPI_COMM_WORLD );
415 
416  Vout().initZeros(volPadSizeZ, volPadSizeY, volPadSizeX);
418  Vout().clear();
422  }
423  }
424  else if (status.MPI_TAG == TAG_TRANSFER)
425  {
426  //If I do not read this tag
427  //master will no further process
428  MPI_Recv(nullptr, 0, MPI_INT, 0, TAG_TRANSFER, MPI_COMM_WORLD, &status);
429 #ifdef DEBUG
430 
431  std::cerr << "Wr" << node->rank << " " << "TAG_STOP" << std::endl;
432 #endif
433 
434  MPI_Allreduce(MPI_IN_PLACE, fourierWeights,
435  sizeout, MPI_DOUBLE,
436  MPI_SUM, new_comm);
437  /*if (iter != NiterWeight)
438  {
439  MPI_Allreduce(MPI_IN_PLACE, fourierWeights,
440  sizeout, MPI_DOUBLE,
441  MPI_SUM, new_comm);
442  forceWeightSymmetry(FourierWeights);
443  FOR_ALL_DIRECT_ELEMENTS_IN_ARRAY3D(VoutFourier)
444  {
445  double *ptrOut=(double *)&(DIRECT_A3D_ELEM(VoutFourier, k,i,j));
446  ptrOut[0] /= A3D_ELEM(FourierWeights,k,i,j);
447  }
448  if (iter == NiterWeight-1)
449  {
450  FOR_ALL_DIRECT_ELEMENTS_IN_ARRAY3D(VoutFourier)
451  {
452  double *ptrOut=(double *)&(DIRECT_A3D_ELEM(VoutFourier, k,i,j));
453  A3D_ELEM(FourierWeights,k,i,j) = ptrOut[0];
454  ptrOut[0] = 0;
455  }
456  }
457  break;
458  }*/
459 
460  if ( node->rank == 1 )
461  {
462  // Reserve memory for the receive buffer
463  auto * recBuffer = (double *) malloc (sizeof(double)*BUFFSIZE);
464  int receivedSize;
465  double * pointer;
466  pointer = fourierVolume;
467  int currentSource;
468 
469  gettimeofday(&start_time,nullptr);
470 
471  if ( nProcs > 1 )
472  {
473  // Receive from other workers
474 
475  for (size_t i = 0 ; i <= (nProcs-2) ; i++)
476  {
477  MPI_Recv(nullptr,0, MPI_INT, MPI_ANY_SOURCE, TAG_FREEWORKER,
478  MPI_COMM_WORLD, &status);
479 
480  currentSource = status.MPI_SOURCE;
481 
482  pointer = fourierVolume;
483 
484  while (1)
485  {
486  MPI_Probe( currentSource, MPI_ANY_TAG, MPI_COMM_WORLD, &status );
487 
488  if ( status.MPI_TAG == TAG_FREEWORKER )
489  {
490  MPI_Recv(nullptr,0, MPI_INT, currentSource, TAG_FREEWORKER, MPI_COMM_WORLD, &status );
491 
492  break;
493  }
494 
495  MPI_Recv( recBuffer,
496  BUFFSIZE,
497  MPI_DOUBLE,
498  currentSource,
499  MPI_ANY_TAG,
500  MPI_COMM_WORLD,
501  &status );
502 
503  MPI_Get_count( &status, MPI_DOUBLE, &receivedSize );
504 
505  for ( int i = 0 ; i < receivedSize ; i ++ )
506  {
507  pointer[i] += recBuffer[i];
508  }
509 
510  pointer += receivedSize;
511  }
512  }
513  }
514 
515  free( recBuffer );
516  if (iter==0)
517  {
520  {
521  auto *ptrOut=(double *)&(DIRECT_A3D_ELEM(VoutFourier, k,i,j));
522  ptrOut[0] = 1;
523  }
524  }
527  {
528  auto *ptrOut=(double *)&(DIRECT_A3D_ELEM(VoutFourier, k,i,j));
529  if (fabs(A3D_ELEM(FourierWeights,k,i,j))>1e-3)
530  ptrOut[0]/=A3D_ELEM(FourierWeights,k,i,j);
531  }
534  gettimeofday(&end_time,nullptr);
535 
536  if( fn_fsc != "")
537  {
538 
539  Image<double> auxVolume2;
540  auxVolume2().alias( FourierWeights );
541  auxVolume2.write((std::string)fn_fsc + "_2_Weights.vol");
542 
543  Image< std::complex<double> > auxFourierVolume2;
544  auxFourierVolume2().alias( VoutFourier );
545  auxFourierVolume2.write((std::string) fn_fsc + "_2_Fourier.vol");
546 
547 
548  // Normalize global volume and store data
549  finishComputations(FileName((std::string) fn_fsc + "_split_2.vol"));
550 
551  Vout().initZeros(volPadSizeZ, volPadSizeY, volPadSizeX);
553  Vout().clear();
557 
558  //int x,y,z;
559 
560  //FourierWeights.getDimension(y,x,z);
561  gettimeofday(&start_time,nullptr);
562 
563  auxVolume2.sumWithFile((std::string) fn_fsc + "_1_Weights.vol");
564 
565  gettimeofday(&end_time,nullptr);
566  total_usecs = (end_time.tv_sec-start_time.tv_sec) * 1000000 + (end_time.tv_usec-start_time.tv_usec);
567  total_time=(double)total_usecs/(double)1000000;
568  if (verbose > 0)
569  std::cout << "SumFile1: " << total_time << " secs." << std::endl;
570 
571  auxVolume2.sumWithFile((std::string) fn_fsc + "_2_Weights.vol");
572 
573  //VoutFourier.getDimension(y,x,z);
574  auxFourierVolume2.sumWithFile((std::string) fn_fsc + "_1_Fourier.vol");
575  auxFourierVolume2.sumWithFile((std::string) fn_fsc + "_2_Fourier.vol");
576 
577  //remove temporary files
578  remove(((std::string) fn_fsc + "_1_Weights.vol").c_str());
579  remove(((std::string) fn_fsc + "_2_Weights.vol").c_str());
580  remove(((std::string) fn_fsc + "_1_Fourier.vol").c_str());
581  remove(((std::string) fn_fsc + "_2_Fourier.vol").c_str());
582  gettimeofday(&end_time,nullptr);
583  total_usecs = (end_time.tv_sec-start_time.tv_sec) * 1000000 + (end_time.tv_usec-start_time.tv_usec);
584  total_time=(double)total_usecs/(double)1000000;
585  if (verbose > 0)
586  std::cout << "SumFile: " << total_time << " secs." << std::endl;
587 
588  /*Save SUM
589  //this is an image but not an xmipp image
590  auxFourierVolume.write((std::string)fn_fsc + "_all_Fourier.vol",
591  false,VDOUBLE);
592  auxVolume.write((std::string)fn_fsc + "_all_Weights.vol",
593  false,VDOUBLE);
594  */
595  }
596  if (NiterWeight==0 || iter == NiterWeight-1)
597  {
599  {
600  // Put back the weights to FourierWeights from temporary variable VoutFourier
601  auto *ptrOut=(double *)&(DIRECT_A3D_ELEM(VoutFourier, k,i,j));
602  A3D_ELEM(FourierWeights,k,i,j) = ptrOut[0];
603  }
605  // Normalize global volume and store data
607  }
608  break;
609  }
610  else
611  {
612  MPI_Send( nullptr,0,MPI_INT,1,TAG_FREEWORKER, MPI_COMM_WORLD );
613 
614  sendDataInChunks( fourierVolume, 1, 2 * sizeout, BUFFSIZE, MPI_COMM_WORLD);
615 
616  MPI_Send( nullptr,0,MPI_INT,1,TAG_FREEWORKER, MPI_COMM_WORLD );
617 
618  break;
619  }
620  }
621  else if (status.MPI_TAG == TAG_WORKFORWORKER)
622  {
623  //get the job number
624  MPI_Recv(&jobNumber, 1, MPI_INT, 0, TAG_WORKFORWORKER, MPI_COMM_WORLD, &status);
625  //LABEL
626  //(if jobNumber == -1) break;
628 
629  size_t min_i, max_i;
630 
631  min_i = jobNumber*mpi_job_size;
632  max_i = min_i + mpi_job_size - 1;
633 
634  if ( max_i >= SF.size())
635  max_i = SF.size()-1;
636  if (iter == 0)
637  processImages( min_i, max_i, false, false);
638  else
639  processImages( min_i, max_i, false, true);
640  }
641  else
642  {
643  std::cerr << "3) Received unknown TAG I quit" << std::endl;
644  exit(0);
645  }
646  }
647  }
648 
649  // Kill threads used on workers
650  if ( node->active && !node->isMaster() )
651  {
653  barrier_wait( &barrier );
654 
655  for ( int nt=0; nt<numThreads; nt++)
656  {
657  pthread_join(*(th_ids+nt),nullptr);
658  }
660  }
661  iter++;
662  }
663  while(iter<NiterWeight);
664  delete[] ranks;
665 }
int barrier_init(barrier_t *barrier, int needed)
void init_progress_bar(long total)
int NiterWeight
Number of iterations for the weight.
constexpr int TAG_TRANSFER
#define REPORT_ERROR(nerr, ErrormMsg)
Definition: xmipp_error.h:211
#define MULTIDIM_SIZE(v)
int numThreads
Number of threads to use in parallel to process a single image.
constexpr int PROCESS_IMAGE
barrier_t barrier
To create a barrier synchronization for threads.
void write(const FileName &name="", size_t select_img=ALL_IMAGES, bool isStack=false, int mode=WRITE_OVERWRITE, CastWriteMode castMode=CW_CAST, int _swapWrite=0)
constexpr int TAG_COLLECT_FOR_FSC
#define FOR_ALL_DIRECT_ELEMENTS_IN_ARRAY3D(V)
glob_prnt iter
void finishComputations(const FileName &out_name)
size_t numberOfJobs
Definition: xmipp_mpi.h:168
int * statusArray
A status array for each row in an image (processing, processed,etc..)
int barrier_destroy(barrier_t *barrier)
size_t size() const override
#define i
ql0001_ & k(htemp+1),(cvec+1),(atemp+1),(bj+1),(bl+1),(bu+1),(x+1),(clamda+1), &iout, infoqp, &zero,(w+1), &lenw,(iw+1), &leniw, &glob_grd.epsmac
constexpr int BUFFSIZE
#define A3D_ELEM(V, k, i, j)
ImageThreadParams * th_args
Contains parameters passed to each thread.
MultidimArray< std::complex< double > > VoutFourier
pthread_mutex_t workLoadMutex
Controls mutual exclusion on critical zones of code.
void processImages(int firstImageIndex, int lastImageIndex, bool saveFSC=false, bool reprocessFlag=false)
Process one image.
void setReal(MultidimArray< double > &img)
Definition: xmipp_fftw.cpp:129
Incorrect argument received.
Definition: xmipp_error.h:113
void progress_bar(long rlen)
std::shared_ptr< MpiNode > node
Definition: xmipp_mpi.h:164
#define TAG_WORKFORWORKER
free((char *) ob)
static void * processImageThread(void *threadArgs)
Defines what a thread should do.
int verbose
Verbosity level.
Image< double > Vout
#define DIRECT_A3D_ELEM(v, k, i, j)
pthread_t * th_ids
IDs for the threads.
#define j
ProgRecFourier * parent
MultidimArray< double > FourierWeights
MultidimArray< std::complex< double > > VoutFourierTmp
#define TAG_FREEWORKER
MPI_Status status
Definition: xmipp_mpi.h:170
void getFourierAlias(T &V)
Definition: xmipp_fftw.h:207
FourierTransformer transformerVol
int threadOpCode
Tells the threads what to do next.
constexpr int EXIT_THREAD
void initZeros(const MultidimArray< T1 > &op)
int sendDataInChunks(double *pointer, int dest, int totalSize, int buffSize, MPI_Comm comm)
void sumWithFile(const FileName &fn)
Definition: xmipp_image.h:1105
int barrier_wait(barrier_t *barrier)
void clear()
Definition: xmipp_image.h:144
void forceWeightSymmetry(MultidimArray< double > &FourierWeights)
Force the weights to be symmetrized.

◆ sendDataInChunks()

int ProgMPIRecFourier::sendDataInChunks ( double *  pointer,
int  dest,
int  totalSize,
int  buffSize,
MPI_Comm  comm 
)

Definition at line 667 of file mpi_reconstruct_fourier.cpp.

668 {
669  double * localPointer = pointer;
670 
671  auto numChunks =(int)ceil((double)totalSize/(double)buffSize);
672  int packetSize;
673  int err=0;
674 
675  for ( int i = 0 ; i < numChunks ; i ++ )
676  {
677  if ( i == (numChunks-1))
678  packetSize = totalSize-i*buffSize;
679  else
680  packetSize = buffSize;
681 
682  if ( (err = MPI_Send( localPointer, packetSize,
683  MPI_DOUBLE, dest, 0, MPI_COMM_WORLD ))
684  != MPI_SUCCESS )
685  {
686  break;
687  }
688 
689  localPointer += packetSize;
690  }
691 
692  return err;
693 }
#define i

Member Data Documentation

◆ mpi_job_size

int ProgMPIRecFourier::mpi_job_size

Dvide the job in this number block with this number of images

Definition at line 67 of file mpi_reconstruct_fourier.h.

◆ sizeout

long int ProgMPIRecFourier::sizeout

Fourier transform size for volumes

Definition at line 64 of file mpi_reconstruct_fourier.h.


The documentation for this class was generated from the following files: