Xmipp  v3.23.11-Nereus
mpi_run.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  *
3  * Authors: Roberto Marabini (roberto@cnb.csic.es)
4  *
5  * Unidad de Bioinformatica of Centro Nacional de Biotecnologia , CSIC
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
20  * 02111-1307 USA
21  *
22  * All comments concerning this program package may be sent to the
23  * e-mail address 'xmipp@cnb.csic.es'
24  ***************************************************************************/
25 
26 #include <fstream>
27 #include "parallel/xmipp_mpi.h"
28 #include "core/xmipp_error.h"
29 #include "core/xmipp_filename.h"
30 
31 #define TAG_WORK 0
32 #define TAG_STOP 1
33 #define TAG_WAIT 2
34 
35 class ProgMPIRun: public XmippProgram
36 {
37 public:
40 
42  MPI_Status status;
43 
44  // Mpi node
46 
47  ProgMPIRun(int argc, char **argv)
48  {
49  node=new MpiNode(argc,argv);
50  if (!node->isMaster())
51  verbose=0;
52  }
53 
54  void readParams()
55  {
56  fn_commands = getParam("-i");
57  if (node->size < 2)
59  "This program cannot be executed in a single working node");
60  }
61 
62  void defineParams()
63  {
64  addUsageLine("Run commands in a text file in a parallel environment");
65  addUsageLine("+You may use the tag MPI_BARRIER to separate execution blocks.");
66  addUsageLine("+You may use the tag MPI_NEWLINE to force a line break (this is useful for programs accepting parameters directly from stdin).");
67  addParamsLine("-i <commandFile> : File with commands in different lines");
68  }
69 
70  void show()
71  {
72  if (!verbose)
73  return;
74  std::cout << "Commands file: " << fn_commands << std::endl;
75  }
76 
77  /* Run --------------------------------------------------------------------- */
78 #define MAX_LINE 2048
79  char szline[MAX_LINE+1];
80  void run()
81  {
82  if (node->rank == 0)
83  {
84  std::ifstream fh_in;
85  fh_in.open(fn_commands.c_str());
86  if (!fh_in)
87  REPORT_ERROR(ERR_IO_NOTOPEN, (std::string)"Cannot open " + fn_commands);
88  std::string line;
89  int number_of_node_waiting = 0; // max is nprocs -1
90  while (!fh_in.eof())
91  {
92  //wait until a server is free
93  MPI_Recv(0, 0, MPI_INT, MPI_ANY_SOURCE, 0,
94  MPI_COMM_WORLD, &status);
95  number_of_node_waiting++;
96  getline(fh_in, line);
97  line=findAndReplace(line,"MPI_NEWLINE","\n");
98  strcpy(szline, line.c_str());
99 
100  std::string::size_type loc = line.find("MPI_BARRIER", 0);
101  if (loc != std::string::npos)
102  {
103  while (number_of_node_waiting < (node->size - 1))
104  {
105  MPI_Recv(0, 0, MPI_INT, MPI_ANY_SOURCE, 0,
106  MPI_COMM_WORLD, &status);
107  number_of_node_waiting++;
108  }
109  while (number_of_node_waiting > 0)
110  {
111  MPI_Send(&szline, 1, MPI_CHAR, number_of_node_waiting,
112  TAG_WAIT, MPI_COMM_WORLD);
113  number_of_node_waiting--;
114  }
115  continue;
116  }
117 
118  //send work
119  MPI_Send(&szline, MAX_LINE, MPI_CHAR, status.MPI_SOURCE,
120  TAG_WORK, MPI_COMM_WORLD);
121  number_of_node_waiting--;
122  }
123 
124  fh_in.close();
125  for (size_t i = 1; i < node->size; i++)
126  MPI_Send(0, 0, MPI_INT, i, TAG_STOP, MPI_COMM_WORLD);
127  }
128  else
129  {
130  while (1)
131  {
132  //I am free
133  MPI_Send(0, 0, MPI_INT, 0, 0, MPI_COMM_WORLD);
134  //get your next task
135  MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
136  if (status.MPI_TAG == TAG_STOP)//I am free
137  break;
138  else if (status.MPI_TAG == TAG_WAIT)//wait
139  {
140  MPI_Recv(&szline, 1, MPI_CHAR, 0, TAG_WAIT, MPI_COMM_WORLD, &status);
141  continue;
142  }
143  else if (status.MPI_TAG == TAG_WORK)//work to do
144  {
145  MPI_Recv(&szline, MAX_LINE, MPI_CHAR, 0, TAG_WORK, MPI_COMM_WORLD, &status);
146  //do the job
147  if(strlen(szline)<1)
148  continue;
149  else
150  {
151  if (system(szline)==-1)
152  REPORT_ERROR(ERR_UNCLASSIFIED,"Cannot open shell");
153  }
154  }
155  else
156  std::cerr << "WRONG TAG RECEIVED" << std::endl;
157 
158  }
159  }
160 
161  }
162 };
size_t size
Definition: xmipp_mpi.h:52
Just to locate unclassified errors.
Definition: xmipp_error.h:192
#define REPORT_ERROR(nerr, ErrormMsg)
Definition: xmipp_error.h:211
FileName fn_commands
Definition: mpi_run.cpp:39
void defineParams()
Definition: mpi_run.cpp:62
#define TAG_STOP
Definition: mpi_run.cpp:32
#define MAX_LINE
Definition: mpi_run.cpp:78
#define i
void readParams()
Definition: mpi_run.cpp:54
int argc
Original command line arguments.
Definition: xmipp_program.h:86
#define TAG_WORK
Definition: mpi_run.cpp:31
const char * getParam(const char *param, int arg=0)
Incorrect argument received.
Definition: xmipp_error.h:113
String findAndReplace(const String &tInput, const String &tFind, const String &tReplace)
void run()
Definition: mpi_run.cpp:80
const char ** argv
Definition: xmipp_program.h:87
char szline[MAX_LINE+1]
Definition: mpi_run.cpp:79
int verbose
Verbosity level.
size_t rank
Definition: xmipp_mpi.h:52
MpiNode * node
Definition: mpi_run.cpp:45
File cannot be open.
Definition: xmipp_error.h:137
ProgMPIRun(int argc, char **argv)
Definition: mpi_run.cpp:47
MPI_Status status
Definition: mpi_run.cpp:42
#define TAG_WAIT
Definition: mpi_run.cpp:33
bool isMaster() const
Definition: xmipp_mpi.cpp:166
void addUsageLine(const char *line, bool verbatim=false)
void show()
Definition: mpi_run.cpp:70
void addParamsLine(const String &line)