38 this->
read(argc, argv);
60 addParamsLine(
" [--mpi_job_size <size=10>] : Number of images sent to a cpu in a single job ");
63 addParamsLine(
" : posible value that may not be the best option");
109 std::cerr <<
"SF.ImgNo() mpi_job_size " 113 std::cerr <<
"numberOfJobs: " <<
numberOfJobs << std::endl <<std::endl;
123 struct timeval start_time, end_time;
124 MPI_Group orig_group, new_group;
126 long int total_usecs;
127 double total_time_processing=0., total_time_weightening=0., total_time_communicating=0., total_time;
129 int * ranks =
nullptr;
136 if(
node->isMaster() )
138 std::cerr <<
"\nReducing the number of MPI workers from " <<
142 std::cerr << std::flush;
156 MPI_Comm_group(MPI_COMM_WORLD, &orig_group);
157 MPI_Group_incl(orig_group,
nProcs, ranks, &new_group);
158 MPI_Comm_create(MPI_COMM_WORLD, new_group, &new_comm);
162 if (
node->isMaster())
164 gettimeofday(&start_time,
nullptr);
166 std::cerr<<std::endl;
168 std::cerr<<
"Computing weights "<<iter+1<<
"/"<<
NiterWeight<<std::endl;
170 std::cerr<<
"Computing volume"<<std::endl;
181 std::cerr <<
"master-recv i=" <<
i << std::endl;
182 std::cerr <<
"numberOfJobs: " << numberOfJobs << std::endl <<std::endl;
195 std::cerr <<
"master-send i=" <<
i << std::endl;
211 for (
size_t worker = 1 ; worker <=
nProcs ; worker ++ )
238 for (
size_t i = 1 ;
i <=
nProcs ;
i ++ )
251 gettimeofday(&end_time,
nullptr);
253 total_usecs = (end_time.tv_sec-start_time.tv_sec) * 1000000 + (end_time.tv_usec-start_time.tv_usec);
254 total_time_weightening += ((double)total_usecs/(
double)1000000);
258 gettimeofday(&end_time,
nullptr);
260 total_usecs = (end_time.tv_sec-start_time.tv_sec) * 1000000 + (end_time.tv_usec-start_time.tv_usec);
261 total_time_processing += ((double)total_usecs/(
double)1000000);
264 gettimeofday(&start_time,
nullptr);
266 for (
size_t i = 1 ;
i <=
nProcs ;
i ++ )
275 gettimeofday(&end_time,
nullptr);
276 total_usecs = (end_time.tv_sec-start_time.tv_sec) * 1000000 + (end_time.tv_usec-start_time.tv_usec);
277 total_time_communicating += ((double)total_usecs/(
double)1000000);
282 std::cout <<
"\n\nProcessing time: " << total_time_processing <<
" secs." << std::endl;
283 std::cout <<
"Transfers time: " << total_time_communicating <<
" secs." << std::endl;
284 std::cout <<
"Weighting time: " << total_time_weightening <<
" secs." << std::endl;
285 std::cout <<
"Execution completed successfully"<< std::endl;
288 else if(
node->active )
321 std::cerr <<
"slave-send TAG_FREEWORKER rank=" <<
node->rank << std::endl;
326 MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &
status);
335 if(
node->rank == 1 )
338 auto * recBuffer = (
double *) malloc (
sizeof(
double)*
BUFFSIZE);
341 pointer = fourierVolume;
352 currentSource =
status.MPI_SOURCE;
354 pointer = fourierVolume;
358 MPI_Probe( currentSource, MPI_ANY_TAG, MPI_COMM_WORLD, &
status );
374 MPI_Get_count( &
status, MPI_DOUBLE, &receivedSize );
376 for (
int i = 0 ;
i < receivedSize ;
i ++ )
378 pointer[
i] += recBuffer[
i];
381 pointer += receivedSize;
389 auxVolume1.
write((std::string)
fn_fsc +
"_1_Weights.vol");
393 auxFourierVolume1.
write((std::string)
fn_fsc +
"_1_Fourier.vol");
431 std::cerr <<
"Wr" <<
node->rank <<
" " <<
"TAG_STOP" << std::endl;
434 MPI_Allreduce(MPI_IN_PLACE, fourierWeights,
460 if (
node->rank == 1 )
463 auto * recBuffer = (
double *) malloc (
sizeof(
double)*
BUFFSIZE);
466 pointer = fourierVolume;
469 gettimeofday(&start_time,
nullptr);
475 for (
size_t i = 0 ;
i <= (
nProcs-2) ;
i++)
480 currentSource =
status.MPI_SOURCE;
482 pointer = fourierVolume;
486 MPI_Probe( currentSource, MPI_ANY_TAG, MPI_COMM_WORLD, &
status );
503 MPI_Get_count( &
status, MPI_DOUBLE, &receivedSize );
505 for (
int i = 0 ;
i < receivedSize ;
i ++ )
507 pointer[
i] += recBuffer[
i];
510 pointer += receivedSize;
534 gettimeofday(&end_time,
nullptr);
541 auxVolume2.
write((std::string)
fn_fsc +
"_2_Weights.vol");
545 auxFourierVolume2.
write((std::string)
fn_fsc +
"_2_Fourier.vol");
561 gettimeofday(&start_time,
nullptr);
565 gettimeofday(&end_time,
nullptr);
566 total_usecs = (end_time.tv_sec-start_time.tv_sec) * 1000000 + (end_time.tv_usec-start_time.tv_usec);
567 total_time=(double)total_usecs/(
double)1000000;
569 std::cout <<
"SumFile1: " << total_time <<
" secs." << std::endl;
578 remove(((std::string)
fn_fsc +
"_1_Weights.vol").c_str());
579 remove(((std::string)
fn_fsc +
"_2_Weights.vol").c_str());
580 remove(((std::string)
fn_fsc +
"_1_Fourier.vol").c_str());
581 remove(((std::string)
fn_fsc +
"_2_Fourier.vol").c_str());
582 gettimeofday(&end_time,
nullptr);
583 total_usecs = (end_time.tv_sec-start_time.tv_sec) * 1000000 + (end_time.tv_usec-start_time.tv_usec);
584 total_time=(double)total_usecs/(
double)1000000;
586 std::cout <<
"SumFile: " << total_time <<
" secs." << std::endl;
632 max_i = min_i + mpi_job_size - 1;
643 std::cerr <<
"3) Received unknown TAG I quit" << std::endl;
650 if (
node->active && !
node->isMaster() )
657 pthread_join(*(
th_ids+nt),
nullptr);
669 double * localPointer = pointer;
671 auto numChunks =(int)ceil((
double)totalSize/(double)buffSize);
675 for (
int i = 0 ;
i < numChunks ;
i ++ )
677 if (
i == (numChunks-1))
678 packetSize = totalSize-
i*buffSize;
680 packetSize = buffSize;
682 if ( (err = MPI_Send( localPointer, packetSize,
683 MPI_DOUBLE, dest, 0, MPI_COMM_WORLD ))
689 localPointer += packetSize;
int barrier_init(barrier_t *barrier, int needed)
void init_progress_bar(long total)
virtual void read(int argc, const char **argv, bool reportErrors=true)
int NiterWeight
Number of iterations for the weight.
constexpr int TAG_TRANSFER
#define REPORT_ERROR(nerr, ErrormMsg)
int numThreads
Number of threads to use in parallel to process a single image.
void read(int argc, char **argv)
constexpr int PROCESS_IMAGE
barrier_t barrier
To create a barrier synchronization for threads.
void write(const FileName &name="", size_t select_img=ALL_IMAGES, bool isStack=false, int mode=WRITE_OVERWRITE, CastWriteMode castMode=CW_CAST, int _swapWrite=0)
constexpr int TAG_COLLECT_FOR_FSC
#define FOR_ALL_DIRECT_ELEMENTS_IN_ARRAY3D(V)
void finishComputations(const FileName &out_name)
int * statusArray
A status array for each row in an image (processing, processed,etc..)
int barrier_destroy(barrier_t *barrier)
ql0001_ & k(htemp+1),(cvec+1),(atemp+1),(bj+1),(bl+1),(bu+1),(x+1),(clamda+1), &iout, infoqp, &zero,(w+1), &lenw,(iw+1), &leniw, &glob_grd.epsmac
#define A3D_ELEM(V, k, i, j)
ImageThreadParams * th_args
Contains parameters passed to each thread.
int argc
Original command line arguments.
MultidimArray< std::complex< double > > VoutFourier
pthread_mutex_t workLoadMutex
Controls mutual exclusion on critical zones of code.
void processImages(int firstImageIndex, int lastImageIndex, bool saveFSC=false, bool reprocessFlag=false)
Process one image.
void readParams()
Read arguments from command line.
Incorrect argument received.
void progress_bar(long rlen)
std::shared_ptr< MpiNode > node
#define TAG_WORKFORWORKER
static void * processImageThread(void *threadArgs)
Defines what a thread should do.
void produceSideinfo()
Produce side info: fill arrays with relevant transformation matrices.
int verbose
Verbosity level.
#define DIRECT_A3D_ELEM(v, k, i, j)
pthread_t * th_ids
IDs for the threads.
MultidimArray< double > FourierWeights
MultidimArray< std::complex< double > > VoutFourierTmp
void setNode(const std::shared_ptr< MpiNode > &node)
FourierTransformer transformerVol
constexpr int TAG_SETVERBOSE
int threadOpCode
Tells the threads what to do next.
constexpr int EXIT_THREAD
void initZeros(const MultidimArray< T1 > &op)
int sendDataInChunks(double *pointer, int dest, int totalSize, int buffSize, MPI_Comm comm)
void sumWithFile(const FileName &fn)
int barrier_wait(barrier_t *barrier)
void defineParams()
Read arguments from command line.
int getIntParam(const char *param, int arg=0)
void read(int argc, char **argv)
void addParamsLine(const String &line)
void forceWeightSymmetry(MultidimArray< double > &FourierWeights)
Force the weights to be symmetrized.