Xmipp  v3.23.11-Nereus
xmipp_mpi.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * Authors: J.M. de la Rosa Trevin (jmdelarosa@cnb.csic.es)
3  *
4  *
5  * Unidad de Bioinformatica of Centro Nacional de Biotecnologia , CSIC
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
20  * 02111-1307 USA
21  *
22  * All comments concerning this program package may be sent to the
23  * e-mail address 'xmipp@cnb.csic.es'
24  ***************************************************************************/
25 
26 #include <unistd.h>
27 #include "xmipp_mpi.h"
28 #include "core/xmipp_filename.h"
29 #include "core/xmipp_error.h"
30 #include "core/xmipp_macros.h"
31 #include "core/metadata_db.h"
32 
33 MpiTaskDistributor::MpiTaskDistributor(size_t nTasks, size_t bSize, const std::shared_ptr<MpiNode> &node) :
34  ThreadTaskDistributor(nTasks, bSize)
35 {
36  this->node = node;
37 }
38 
39 bool MpiTaskDistributor::distribute(size_t &first, size_t &last)
40 {
41  return node->isMaster() ? distributeMaster() : distributeSlaves(first, last);
42 }
43 
44 bool MpiTaskDistributor::distributeMaster()
45 {
46  int size = node->size;
47  size_t workBuffer[3];
48  MPI_Status status;
49  int finalizedWorkers = 0;
50 
51  while (finalizedWorkers < size - 1)
52  {
53  //wait for request form workers
54  MPI_Recv(nullptr, 0, MPI_INT, MPI_ANY_SOURCE, TAG_WORK_REQUEST, MPI_COMM_WORLD, &status);
55 
56  workBuffer[0] = ThreadTaskDistributor::distribute(workBuffer[1], workBuffer[2]) ? 1 : 0;
57 
58  if (workBuffer[0] == 0) //no more jobs, count finalized workers
59  finalizedWorkers++;
60  //send response (either task or finish answer)
61  MPI_Send(workBuffer, 3, MPI_LONG_LONG_INT, status.MPI_SOURCE, TAG_WORK_RESPONSE, MPI_COMM_WORLD);
62  }
63  return false;
64 }
65 
66 bool MpiTaskDistributor::distributeSlaves(size_t &first, size_t &last)
67 {
68  // Worker nodes should ask for task to master
69  // Result of workBuffer:
70  // workBuffer[0] = 0 if no more jobs, 1 otherwise
71  // workBuffer[1] = first
72  // workBuffer[2] = last
73  size_t workBuffer[3];
74  MPI_Status status;
75  //any message from the master, is tag is TAG_STOP then stop
76  MPI_Send(nullptr, 0, MPI_INT, 0, TAG_WORK_REQUEST, MPI_COMM_WORLD);
77  MPI_Recv(workBuffer, 3, MPI_LONG_LONG_INT, 0, TAG_WORK_RESPONSE, MPI_COMM_WORLD, &status);
78 
79  first = workBuffer[1];
80  last = workBuffer[2];
81 
82  return (workBuffer[0] == 1);
83 }
84 
86 {
87  node->barrierWait();
88 }
89 
90 // ================= FILE MUTEX ==========================
91 MpiFileMutex::MpiFileMutex(const std::shared_ptr<MpiNode>& node)
92 {
93  fileCreator = false;
94 
95  if (node == nullptr || node->isMaster())
96  {
97  fileCreator = true;
98  strcpy(lockFilename, "pijol_XXXXXX");
99  if ((lockFile = mkstemp(lockFilename)) == -1)
100  {
101  perror("MpiFileMutex::Error generating tmp lock file");
102  exit(1);
103  }
104  close(lockFile);
105  }
106  //if using mpi broadcast the filename from master to slaves
107  if (node != nullptr)
108  MPI_Bcast(lockFilename, L_tmpnam, MPI_CHAR, 0, MPI_COMM_WORLD);
109 
110  if ((lockFile = open(lockFilename, O_RDWR)) == -1)
111  {
112  perror("MpiFileMutex: Error opening lock file");
113  exit(1);
114  }
115 }
116 
118 {
119  Mutex::lock();
120  lseek(lockFile, 0, SEEK_SET);
121  if (lockf(lockFile, F_LOCK, 0)==-1)
122  REPORT_ERROR(ERR_IO_NOPERM,"Cannot lock file");
123 }
124 
126 {
127  lseek(lockFile, 0, SEEK_SET);
128  if (lockf(lockFile, F_ULOCK, 0)==-1)
129  REPORT_ERROR(ERR_IO_NOPERM,"Cannot unlock file");
130  Mutex::unlock();
131 }
132 
134 {
135  close(lockFile);
136  if (fileCreator && remove(lockFilename) == -1)
137  {
138  perror("~MpiFileMutex: error deleting lock file");
139  exit(1);
140  }
141 }
142 
143 //------------ MPI ---------------------------
144 MpiNode::MpiNode(int &argc, char **& argv)
145 {
146  MPI_Init(&argc, &argv);
147  int irank, isize;
148  MPI_Comm_rank(MPI_COMM_WORLD, &irank);
149  MPI_Comm_size(MPI_COMM_WORLD, &isize);
150  rank=irank;
151  size=isize;
152  //comm = new MPI_Comm;
153  //MPI_Comm_dup(MPI_COMM_WORLD, comm);
154  active = 1;
155  //activeNodes = size;
156 }
157 
159 {
160  //active = 0;
161  //updateComm();
162  //std::cerr << "Send Finalize to: " << rank << std::endl;
163  MPI_Finalize();
164 }
165 
166 bool MpiNode::isMaster() const
167 {
168  return rank == 0;
169 }
170 
172 {
173  MPI_Barrier(MPI_COMM_WORLD);
174 }
175 #ifdef NEVERDEFINED
176 void MpiNode::updateComm()
177 {
178  size_t nodes = getActiveNodes();
179  if (nodes < activeNodes)
180  {
181  MPI_Comm *newComm = new MPI_Comm;
182  MPI_Comm_split(*comm, (int)active, (int)rank, newComm);
183  MPI_Comm_disconnect(comm);
184  delete comm;
185  comm = newComm;
186  activeNodes = nodes;
187  }
188 }
189 
191 {
192  int activeNodes = 0;
193  MPI_Allreduce(&active, &activeNodes, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
194  return activeNodes;
195 }
196 
197 #endif
198 
199 template <typename T>
200 void MpiNode::gatherMetadatas(T &MD, const FileName &rootname)
201 {
202  if (size == 1)
203  return;
204 
205  FileName fn;
206 
207  if (!isMaster()) //workers just write down partial results
208  {
209  fn = formatString("%s_node%d.xmd", rootname.c_str(), rank);
210  MD.write(fn);
211  }
213  barrierWait();
214  if (isMaster()) //master should collect and join workers results
215  {
216  MetaDataDb mdAll(MD), mdSlave;
217  for (size_t nodeRank = 1; nodeRank < size; nodeRank++)
218  {
219  fn = formatString("%s_node%d.xmd", rootname.c_str(), nodeRank);
220  mdSlave.read(fn);
221  //make sure file is not empty
222  if (!mdSlave.isEmpty())
223  mdAll.unionAll(mdSlave);
224  //Remove blockname
225  fn = fn.removeBlockName();
226  remove(fn.c_str());
227  }
228  //remove first metadata
229  fn = formatString("%s_node%d.xmd", rootname.c_str(), 1);
230  fn = fn.removeBlockName();
231  remove(fn.c_str());
232  MD = T(mdAll);
233  }
234 }
235 
236 template void MpiNode::gatherMetadatas<MetaDataVec>(MetaDataVec&, const FileName&);
237 template void MpiNode::gatherMetadatas<MetaDataDb>(MetaDataDb&, const FileName&);
238 
239 /* -------------------- XmippMPIProgram ---------------------- */
240 void XmippMpiProgram::read(int argc, char **argv)
241 {
242  errorCode = 0; //suppose no errors
243 
244  if (node == nullptr)
245  {
246  node = std::shared_ptr<MpiNode>(new MpiNode(argc, argv));
247  nProcs = node->size;
248 
249  if (!node->isMaster())
250  verbose = false;
251  }
252 
253  XmippProgram::read(argc, (const char **)argv);
254 }
255 
256 void XmippMpiProgram::setNode(const std::shared_ptr<MpiNode> &node)
257 {
258  this->node = node;
259  verbose = node->isMaster();
260 }
261 
263 {
264  try
265  {
266  if (doRun)
267  this->run();
268  }
269  catch (XmippError &xe)
270  {
271  std::cerr << xe.what();
272  errorCode = xe.__errno;
273  MPI_Abort(MPI_COMM_WORLD, xe.__errno);
274  }
275  return errorCode;
276 }
277 
278 /* -------------------- MpiMetadataProgram ------------------- */
280 {
281  delete distributor;
282 }
283 
284 void MpiMetadataProgram::read(int argc, char **argv)
285 {
286  XmippMpiProgram::read(argc, argv);
287  last = 0;
288  first = 1;
289 }
290 
292 {
293  addParamsLine("== MPI ==");
294  addParamsLine(" [--mpi_job_size <size=0>] : Number of images sent simultaneously to a mpi node");
295 }
296 
298 {
299  blockSize = getIntParam("--mpi_job_size");
300 }
301 
303  size_t blockSize)
304 {
305  size_t size = mdIn.size();
306  if (blockSize < 1)
307  blockSize = XMIPP_MAX(1, size/(node->size * 5));
308  else if (blockSize > size)
309  blockSize = size;
310 
311  mdIn.findObjects(imgsId);
312  distributor = new MpiTaskDistributor(size, blockSize, node);
313 }
314 
315 //Now use the distributor to grasp images
316 bool MpiMetadataProgram::getTaskToProcess(size_t &objId, size_t &objIndex)
317 {
318  bool moreTasks = true;
319  if (first > last)
320  moreTasks = distributor->getTasks(first, last);
321  if (moreTasks)
322  {
323  objIndex = first;
324  objId = imgsId[first++];
325  return true;
326  }
328  return false;
329 }
330 
332  void* send_data,
333  void* recv_data,
334  size_t count,
335  MPI_Datatype datatype,
336  MPI_Op op,
337  int root,
338  MPI_Comm communicator,
339  size_t blockSize)
340 {
341  int type_size;
342  MPI_Type_size(datatype,&type_size);
343  size_t quotient=count/blockSize;
344  if (quotient>0)
345  {
346  for (size_t c=0; c<quotient; c++)
347  MPI_Reduce((unsigned char*)(send_data)+c*blockSize*size_t(type_size),
348  (unsigned char*)(recv_data)+c*blockSize*size_t(type_size),
349  blockSize,datatype,op,root,communicator);
350  }
351  size_t remainder=count%blockSize;
352  if (remainder>0)
353  MPI_Reduce((unsigned char*)(send_data)+quotient*blockSize*size_t(type_size),
354  (unsigned char*)(recv_data)+quotient*blockSize*size_t(type_size),
355  remainder,datatype,op,root,communicator);
356 }
void xmipp_MPI_Reduce(void *send_data, void *recv_data, size_t count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm communicator, size_t blockSize)
Definition: xmipp_mpi.cpp:331
#define XMIPP_MAX(x, y)
Definition: xmipp_macros.h:193
virtual void read(int argc, const char **argv, bool reportErrors=true)
virtual bool distribute(size_t &first, size_t &last)
#define REPORT_ERROR(nerr, ErrormMsg)
Definition: xmipp_error.h:211
doublereal * c
virtual bool distribute(size_t &first, size_t &last)
Definition: xmipp_mpi.cpp:39
size_t getActiveNodes()
bool getTasks(size_t &first, size_t &last)
ParallelTaskDistributor * distributor
void barrierWait()
Definition: xmipp_mpi.cpp:171
glob_log first
MpiNode * node
virtual bool isEmpty() const
void read(int argc, char **argv)
Definition: xmipp_mpi.cpp:284
void createTaskDistributor(MetaData &mdIn, size_t blockSize=0)
Definition: xmipp_mpi.cpp:302
void unlock()
Definition: xmipp_mpi.cpp:125
#define TAG_WORK_REQUEST
Definition: xmipp_mpi.h:82
MpiNode(int &argc, char **&argv)
Definition: xmipp_mpi.cpp:144
bool getTaskToProcess(size_t &objId, size_t &objIndex)
Definition: xmipp_mpi.cpp:316
virtual void findObjects(std::vector< size_t > &objectsOut, const MDQuery &query) const =0
virtual size_t size() const =0
MpiTaskDistributor(size_t nTasks, size_t bSize, const std::shared_ptr< MpiNode > &node)
Definition: xmipp_mpi.cpp:33
FileName removeBlockName() const
std::shared_ptr< MpiNode > node
Definition: xmipp_mpi.h:93
void setNode(const std::shared_ptr< MpiNode > &node)
Definition: xmipp_mpi.cpp:256
String formatString(const char *format,...)
void read(const FileName &inFile, const std::vector< MDLabel > *desiredLabels=NULL, bool decomposeStack=true) override
virtual int tryRun()
Definition: xmipp_mpi.cpp:262
#define TAG_WORK_RESPONSE
Definition: xmipp_mpi.h:83
virtual void unlock()
bool isMaster() const
Definition: xmipp_mpi.cpp:166
ErrorType __errno
Definition: xmipp_error.h:227
MpiFileMutex(const std::shared_ptr< MpiNode > &node)
Definition: xmipp_mpi.cpp:91
Insufficient permissions to perform operation.
Definition: xmipp_error.h:138
void gatherMetadatas(T &MD, const FileName &rootName)
Definition: xmipp_mpi.cpp:200
void read(int argc, char **argv)
Definition: xmipp_mpi.cpp:240
virtual void lock()