41 return node->isMaster() ? distributeMaster() : distributeSlaves(first, last);
44 bool MpiTaskDistributor::distributeMaster()
46 int size =
node->size;
49 int finalizedWorkers = 0;
51 while (finalizedWorkers < size - 1)
54 MPI_Recv(
nullptr, 0, MPI_INT, MPI_ANY_SOURCE,
TAG_WORK_REQUEST, MPI_COMM_WORLD, &status);
58 if (workBuffer[0] == 0)
61 MPI_Send(workBuffer, 3, MPI_LONG_LONG_INT, status.MPI_SOURCE,
TAG_WORK_RESPONSE, MPI_COMM_WORLD);
66 bool MpiTaskDistributor::distributeSlaves(
size_t &
first,
size_t &last)
77 MPI_Recv(workBuffer, 3, MPI_LONG_LONG_INT, 0,
TAG_WORK_RESPONSE, MPI_COMM_WORLD, &status);
79 first = workBuffer[1];
82 return (workBuffer[0] == 1);
95 if (node ==
nullptr || node->isMaster())
98 strcpy(lockFilename,
"pijol_XXXXXX");
99 if ((lockFile = mkstemp(lockFilename)) == -1)
101 perror(
"MpiFileMutex::Error generating tmp lock file");
108 MPI_Bcast(lockFilename, L_tmpnam, MPI_CHAR, 0, MPI_COMM_WORLD);
110 if ((lockFile = open(lockFilename, O_RDWR)) == -1)
112 perror(
"MpiFileMutex: Error opening lock file");
120 lseek(lockFile, 0, SEEK_SET);
121 if (lockf(lockFile, F_LOCK, 0)==-1)
127 lseek(lockFile, 0, SEEK_SET);
128 if (lockf(lockFile, F_ULOCK, 0)==-1)
136 if (fileCreator &&
remove(lockFilename) == -1)
138 perror(
"~MpiFileMutex: error deleting lock file");
146 MPI_Init(&argc, &argv);
148 MPI_Comm_rank(MPI_COMM_WORLD, &irank);
149 MPI_Comm_size(MPI_COMM_WORLD, &isize);
173 MPI_Barrier(MPI_COMM_WORLD);
176 void MpiNode::updateComm()
178 size_t nodes = getActiveNodes();
179 if (nodes < activeNodes)
181 MPI_Comm *newComm =
new MPI_Comm;
182 MPI_Comm_split(*comm, (
int)active, (
int)rank, newComm);
183 MPI_Comm_disconnect(comm);
193 MPI_Allreduce(&active, &activeNodes, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
199 template <
typename T>
209 fn =
formatString(
"%s_node%d.xmd", rootname.c_str(), rank);
217 for (
size_t nodeRank = 1; nodeRank < size; nodeRank++)
219 fn =
formatString(
"%s_node%d.xmd", rootname.c_str(), nodeRank);
223 mdAll.unionAll(mdSlave);
229 fn =
formatString(
"%s_node%d.xmd", rootname.c_str(), 1);
246 node = std::shared_ptr<MpiNode>(
new MpiNode(argc, argv));
249 if (!
node->isMaster())
259 verbose = node->isMaster();
271 std::cerr << xe.what();
273 MPI_Abort(MPI_COMM_WORLD, xe.
__errno);
293 addParamsLine(
"== MPI ==");
294 addParamsLine(
" [--mpi_job_size <size=0>] : Number of images sent simultaneously to a mpi node");
299 blockSize = getIntParam(
"--mpi_job_size");
305 size_t size = mdIn.
size();
308 else if (blockSize > size)
318 bool moreTasks =
true;
324 objId = imgsId[
first++];
335 MPI_Datatype datatype,
338 MPI_Comm communicator,
342 MPI_Type_size(datatype,&type_size);
346 for (
size_t c=0;
c<quotient;
c++)
347 MPI_Reduce((
unsigned char*)(send_data)+
c*blockSize*
size_t(type_size),
348 (
unsigned char*)(recv_data)+
c*blockSize*
size_t(type_size),
349 blockSize,datatype,op,root,communicator);
353 MPI_Reduce((
unsigned char*)(send_data)+quotient*blockSize*
size_t(type_size),
354 (
unsigned char*)(recv_data)+quotient*blockSize*
size_t(type_size),
355 remainder,datatype,op,root,communicator);
void xmipp_MPI_Reduce(void *send_data, void *recv_data, size_t count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm communicator, size_t blockSize)
virtual void read(int argc, const char **argv, bool reportErrors=true)
virtual bool distribute(size_t &first, size_t &last)
#define REPORT_ERROR(nerr, ErrormMsg)
virtual bool distribute(size_t &first, size_t &last)
bool getTasks(size_t &first, size_t &last)
ParallelTaskDistributor * distributor
MpiNode(int &argc, char **&argv)
MpiTaskDistributor(size_t nTasks, size_t bSize, const std::shared_ptr< MpiNode > &node)
FileName removeBlockName() const
std::shared_ptr< MpiNode > node
void setNode(const std::shared_ptr< MpiNode > &node)
String formatString(const char *format,...)
#define TAG_WORK_RESPONSE
MpiFileMutex(const std::shared_ptr< MpiNode > &node)
Insufficient permissions to perform operation.
void gatherMetadatas(T &MD, const FileName &rootName)
void read(int argc, char **argv)