Xmipp  v3.23.11-Nereus
xmipp_mpi.h
Go to the documentation of this file.
1  /*
2  * Authors: J.M. de la Rosa Trevin (jmdelarosa@cnb.csic.es)
3  *
4  *
5  * Unidad de Bioinformatica of Centro Nacional de Biotecnologia , CSIC
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
20  * 02111-1307 USA
21  *
22  * All comments concerning this program package may be sent to the
23  * e-mail address 'xmipp@cnb.csic.es'
24  ***************************************************************************/
25 
26 #ifndef XMIPP_MPI_H_
27 #define XMIPP_MPI_H_
28 
29 #include <mpi.h>
30 #include "core/xmipp_threads.h"
31 #include "core/xmipp_program.h"
32 #include "core/metadata_vec.h"
33 #include "core/metadata_db.h"
34 
35 class FileName;
36 
37 #define XMIPP_MPI_SIZE_T MPI_UNSIGNED_LONG
38 
47 class MpiNode
48 {
49 public:
50 
51  //MPI_Comm *comm;
52  size_t rank, size, active;//, activeNodes;
53  MpiNode(int &argc, char **& argv);
54  ~MpiNode();
55  MpiNode(const MpiNode &)=delete;
56  MpiNode & operator =(const MpiNode &)=delete;
57 
59  bool isMaster() const;
60 
62  void barrierWait();
63 
65  template <typename T> // T = MetaData*
66  void gatherMetadatas(T &MD, const FileName &rootName);
67 
69 // void updateComm();
70 
71 protected:
73  size_t getActiveNodes();
74 
75 };
76 
77 //mpi macros
78 #define TAG_WORK 0
79 #define TAG_STOP 1
80 #define TAG_WAIT 2
81 
82 #define TAG_WORK_REQUEST 100
83 #define TAG_WORK_RESPONSE 101
84 
91 {
92 protected:
93  std::shared_ptr<MpiNode> node;
94 
95  virtual bool distribute(size_t &first, size_t &last);
96 
97 public:
98  MpiTaskDistributor(size_t nTasks, size_t bSize, const std::shared_ptr<MpiNode> &node);
102  void wait();
103 
104 private:
109  bool distributeMaster();
111  bool distributeSlaves(size_t &first, size_t &last);
112 }
113 ;//end of class MpiTaskDistributor
114 
118 class MpiFileMutex: public Mutex
119 {
120 protected:
121  std::shared_ptr<MpiNode> node;
122  int lockFile;
124 public:
126  MpiFileMutex(const std::shared_ptr<MpiNode> &node);
127 
129  ~MpiFileMutex();
130 
136  void lock();
137 
142  void unlock();
143 
144  char lockFilename[L_tmpnam];
145 }
146 ;//end of class MpiFileMutex
147 
160 class XmippMpiProgram: public virtual XmippProgram
161 {
162 protected:
164  std::shared_ptr<MpiNode> node;
166  size_t nProcs;
168  size_t numberOfJobs;
170  MPI_Status status;
171 
173  void setNode(const std::shared_ptr<MpiNode> & node);
174 
175 public:
177  void read(int argc, char **argv);
181  virtual int tryRun();
182 };
183 
185 {
186 protected:
189  std::vector<size_t> imgsId;
191  size_t first, last;
192 
193 public:
196  MpiMetadataProgram(const MpiMetadataProgram &)=delete;
197  MpiMetadataProgram(const MpiMetadataProgram &&)=delete;
198 
203 
205  void read(int argc, char **argv);
206 
207  void defineParams();
208  void readParams();
210  void createTaskDistributor(MetaData &mdIn, size_t blockSize = 0);
212  virtual void preProcess() { /* nothing to do */ };
214  virtual void finishProcessing() { /* nothing to do */ };
216  bool getTaskToProcess(size_t &objId, size_t &objIndex);
217 };
218 
221 template <typename BASE_CLASS>
222 class BasicMpiMetadataProgram : public BASE_CLASS, public MpiMetadataProgram {
223 protected:
224  void defineParams() override {
225  BASE_CLASS::defineParams();
227  }
228 
229  void readParams() override {
231  BASE_CLASS::readParams();
232  }
233 
234  void preProcess() override {
235  BASE_CLASS::preProcess();
236  MetaData &mdIn = *this->getInputMd();
237  mdIn.addLabel(MDL_GATHER_ID);
238  mdIn.fillLinear(MDL_GATHER_ID, 1, 1);
239  createTaskDistributor(mdIn, blockSize);
240  }
241 
243  if (node->rank == 1) {
244  verbose = 1;
245  BASE_CLASS::startProcessing();
246  }
247  node->barrierWait();
248  }
249 
250  void showProgress() {
251  if (node->rank == 1) {
252  BASE_CLASS::time_bar_done = first + 1;
253  BASE_CLASS::showProgress();
254  }
255  }
256 
257  bool getImageToProcess(size_t &objId, size_t &objIndex) override {
258  return getTaskToProcess(objId, objIndex);
259  }
260 
261  void finishProcessing() override {
262  node->gatherMetadatas(this->getOutputMd(), BASE_CLASS::fn_out);
263  MetaDataVec MDaux;
264  MDaux.sort(this->getOutputMd(), MDL_GATHER_ID);
265  MDaux.removeLabel(MDL_GATHER_ID);
266  this->getOutputMd() = MDaux;
267  if (node->isMaster())
268  BASE_CLASS::finishProcessing();
269  }
270 
271  void wait() { distributor->wait(); }
272 };
273 
277 void xmipp_MPI_Reduce(
278  void* send_data,
279  void* recv_data,
280  size_t count,
281  MPI_Datatype datatype,
282  MPI_Op op,
283  int root,
284  MPI_Comm communicator,
285  size_t blockSize=1048576);
286 
288 #endif /* XMIPP_MPI_H_ */
size_t size
Definition: xmipp_mpi.h:52
void xmipp_MPI_Reduce(void *send_data, void *recv_data, size_t count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm communicator, size_t blockSize=1048576)
Definition: xmipp_mpi.cpp:331
void sort(const MetaDataVec &MDin, const MDLabel sortLabel, bool asc=true, int limit=-1, int offset=0)
bool removeLabel(const MDLabel label) override
size_t getActiveNodes()
void finishProcessing() override
Definition: xmipp_mpi.h:261
virtual void preProcess()
Definition: xmipp_mpi.h:212
ParallelTaskDistributor * distributor
void defineParams() override
Definition: xmipp_mpi.h:224
bool getImageToProcess(size_t &objId, size_t &objIndex) override
Definition: xmipp_mpi.h:257
size_t numberOfJobs
Definition: xmipp_mpi.h:168
void barrierWait()
Definition: xmipp_mpi.cpp:171
void preProcess() override
Definition: xmipp_mpi.h:234
glob_log first
MpiNode * node
std::shared_ptr< MpiNode > node
Definition: xmipp_mpi.h:164
MpiNode(int &argc, char **&argv)
Definition: xmipp_mpi.cpp:144
MpiNode & operator=(const MpiNode &)=delete
size_t active
Definition: xmipp_mpi.h:52
std::shared_ptr< MpiNode > node
Definition: xmipp_mpi.h:121
virtual void finishProcessing()
Definition: xmipp_mpi.h:214
size_t rank
Definition: xmipp_mpi.h:52
MPI_Status status
Definition: xmipp_mpi.h:170
std::shared_ptr< MpiNode > node
Definition: xmipp_mpi.h:93
void readParams() override
Definition: xmipp_mpi.h:229
virtual bool addLabel(const MDLabel label, int pos=-1)=0
virtual void fillLinear(MDLabel label, double initial, double step)=0
std::vector< size_t > imgsId
Definition: xmipp_mpi.h:189
bool isMaster() const
Definition: xmipp_mpi.cpp:166
bool fileCreator
Definition: xmipp_mpi.h:123
file read(std::istream &is)
Definition: pdb2cif.cpp:6200
void gatherMetadatas(T &MD, const FileName &rootName)
Definition: xmipp_mpi.cpp:200