Xmipp  v3.23.11-Nereus
xmipp_threads.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  *
3  * Authors: J.M. De la Rosa Trevin (jmdelarosa@cnb.csic.es)
4  *
5  * Unidad de Bioinformatica of Centro Nacional de Biotecnologia , CSIC
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
20  * 02111-1307 USA
21  *
22  * All comments concerning this program package may be sent to the
23  * e-mail address 'xmipp@cnb.csic.es'
24  ***************************************************************************/
25 
26 #include <stdio.h>
27 #include <iostream>
28 
29 #include "xmipp_threads.h"
30 #include "xmipp_error.h"
31 #include "xmipp_log.h"
32 
33 
34 // ================= MUTEX ==========================
35 
37 {
38  pthread_mutex_init(&mutex, NULL);
39 }
40 
42 {
43  pthread_mutex_destroy(&mutex);
44 }
45 
47 {
48  pthread_mutex_lock(&mutex);
49 }
50 
52 {
53  pthread_mutex_unlock(&mutex);
54 }
55 
56 // ================= CONDITION ==========================
57 
59 {
60  mutex = new Mutex();
61  pthread_cond_init(&cond, NULL);
62 }
63 
65 {
66  delete mutex;
67  pthread_cond_destroy(&cond);
68 }
69 
71 {
72  mutex->lock();
73 }
74 
76 {
77  mutex->unlock();
78 }
79 
81 {
82  pthread_cond_wait(&cond, &(mutex->mutex));
83 }
84 
86 {
87  pthread_cond_signal(&cond);
88 }
89 
91 {
92  pthread_cond_broadcast(&cond);
93 }
94 
95 // ================= BARRIER ==========================
96 
97 Barrier::Barrier(int numberOfThreads)
98 {
99  needed = numberOfThreads;
100  called = 0;
101  condition = new Condition();
102 }
103 
105 {
106  delete condition;
107 }
108 
110 {
111  condition->lock();
112  ++called;
113  if (called == needed)
114  {
115  called = 0;
116  condition->broadcast();
117  }
118  else
119  condition->wait();
120  condition->unlock();
121 }
122 
123 
124 // ================= THREAD =======================
125 
127 {
128 }
129 
131 {
132  pthread_join(thId, NULL);
133 }
134 
136 {
137  int result = pthread_create(&thId, NULL, _singleThreadMain, (void*)this);
138 
139  if (result != 0)
140  {
141  std::cerr << "Thread: can't start thread." << std::endl;
142  exit(1);
143  }
144 }
145 
146 void * _singleThreadMain(void * data){
147  Thread * thread = (Thread*) data;
148  thread->run();
149  return NULL;
150 }
151 
152 // ================= THREAD MANAGER =======================
153 
155 {
156  thread_id = -1;
157  threads = -1;
158  manager = NULL;
159  data = NULL;
160  workClass = NULL;
161 }
162 
163 ThreadArgument::ThreadArgument(int id, ThreadManager * manager, void * data)
164 {
165  this->thread_id = id;
166  this->threads = manager->threads;
167  this->manager = manager;
168  this->data = data;
169  this->workClass = NULL;
170 }
171 
172 
173 void * _threadMain(void * data)
174 {
175  ThreadArgument * thArg = (ThreadArgument*) data;
176  ThreadManager * thMgr = thArg->manager;
177 
178  while (true)
179  {
180  //Wait for start working or leave
181  thMgr->wait();
182  //After awaked check what to do
183  if (thMgr->workFunction != NULL)
184  {
185  try
186  {
187  thMgr->workFunction(*thArg);
188  thMgr->wait(); //wait for finish together
189  }
190  catch (XmippError &xe)
191  {
192  std::cerr << xe.what() << std::endl
193  << "In thread " << thArg->thread_id << std::endl;
194 // pthread_exit(NULL);
195  exit(xe.__errno);
196  }
197  }
198  else //exit thread
199  {
200  pthread_exit(NULL);
201  }
202  }
203 }
204 
205 ThreadManager::ThreadManager(int numberOfThreads, void * workClass)
206 {
207  threads = numberOfThreads;
208  barrier = new Barrier(threads + 1);
209  workFunction = NULL;
210  ids = new pthread_t[threads];
211  arguments = new ThreadArgument[threads];
212  started = false;
213  this->workClass = workClass;
214 }
215 
216 void ThreadManager::setData(void * data, int idxThread)
217 {
218  if (idxThread == -1)
219  for (int i = 0; i < threads; ++i)
220  arguments[i].data = data;
221  else
222  arguments[idxThread].data = data;
223 }
224 
225 void ThreadManager::createThreads()
226 {
227  //Create threads
228  int result;
229 
230  for (int i = 0; i < threads; ++i)
231  {
232  arguments[i].thread_id = i;
233  arguments[i].threads = threads;
234  arguments[i].manager = this;
235  arguments[i].workClass = workClass;
236 
237  result = pthread_create(ids + i, NULL, _threadMain, (void*) (arguments + i));
238 
239  if (result != 0)
240  {
241  std::cerr << "ThreadManager: can't create threads." << std::endl;
242  exit(1);
243  }
244  }
245  started = true;
246 }
247 
249 {
250  //Destroy the threads
251  workFunction = NULL;
252  if (started)
253  {
254  wait();
255  for (int i = 0; i < threads; ++i)
256  pthread_join(ids[i], NULL);
257  }
258 
259  delete barrier;
260  delete[] ids;
261  delete[] arguments;
262 }
263 
264 void ThreadManager::run(ThreadFunction function, void * data)
265 {
266  runAsync(function, data);
267  //Wait on barrier to wait for threads finish
268  wait();
269 }
270 
271 void ThreadManager::runAsync(ThreadFunction function, void * data)
272 {
273  if (data != NULL)
274  setData(data);
275  workFunction = function;
276  if (!started)
277  createThreads();
278  //Wait on barrier to threads starts working
279  wait();
280 }
281 
283 {
284  barrier->wait();
285 }
286 
287 // =================== TASK_DISTRIBUTOR ============================
288 
290 {
291  if (!(nTasks && bSize && bSize <= nTasks))
292  REPORT_ERROR(ERR_ARG_INCORRECT, "nTasks and bSize should be > 0, also bSize <= nTasks");
293 
294  numberOfTasks = nTasks;
295  blockSize = bSize;
296  assignedTasks = 0;
297 }
298 
300 {
301  lock();
302  assignedTasks = 0;
303  unlock();
304 }
305 
307 {
308  lock();
309  blockSize = bSize;
310  unlock();
311 }
312 
314 {
315  return blockSize;
316 }
317 
318 bool ParallelTaskDistributor::getTasks(size_t &first, size_t &last)
319 {
320  lock();
321  bool result = distribute(first, last);
322  unlock();
323  return result;
324 }
325 
327 {
328  if (tasks < 0 || tasks >= numberOfTasks)
329  return false;
330  lock();
331  assignedTasks = tasks;
332  unlock();
333  return true;
334 }
335 
337 {
338  mutex.lock();
339 }
340 
342 {
343  mutex.unlock();
344 }
345 
346 bool ThreadTaskDistributor::distribute(size_t &first, size_t &last)
347 {
348  bool result = true;
349  first = last = 0;
350  if (assignedTasks >= numberOfTasks)
351  {
352  result = false;
353  }
354  else
355  {
356  first = assignedTasks;
357  assignedTasks
358  = (assignedTasks + blockSize < numberOfTasks) ? (assignedTasks
359  + blockSize) : numberOfTasks;
360  last = assignedTasks - 1;
361  }
362  return result;
363 }
364 
365 // =================== OLD THREADS IMPLEMENTATION ============================
367 {
368  barrier->needed = needed;
369  barrier->called = 0;
370  pthread_mutex_init(&barrier->mutex, NULL);
371  pthread_cond_init(&barrier->cond, NULL);
372  return 0;
373 }
374 
376 {
377  pthread_mutex_destroy(&barrier->mutex);
378  pthread_cond_destroy(&barrier->cond);
379  return 0;
380 }
381 
383 {
384  pthread_mutex_lock(&barrier->mutex);
385  barrier->called++;
386  if (barrier->called == barrier->needed)
387  {
388  barrier->called = 0;
389  pthread_cond_broadcast(&barrier->cond);
390  }
391  else
392  {
393  pthread_cond_wait(&barrier->cond,&barrier->mutex);
394  }
395  pthread_mutex_unlock(&barrier->mutex);
396  return 0;
397 }
398 
399 
400 
401 
402 
void run(ThreadFunction function, void *data=NULL)
int barrier_init(barrier_t *barrier, int needed)
void * _threadMain(void *data)
virtual void run()=0
friend class Condition
Definition: xmipp_threads.h:80
virtual bool distribute(size_t &first, size_t &last)
bool setAssignedTasks(size_t tasks)
#define REPORT_ERROR(nerr, ErrormMsg)
Definition: xmipp_error.h:211
void signal()
ThreadManager * thMgr
bool getTasks(size_t &first, size_t &last)
int threads
number of working threads.
int called
How many threads already arrived.
void start()
void setData(void *data, int nThread=-1)
void unlock()
int barrier_destroy(barrier_t *barrier)
#define i
void setBlockSize(size_t bSize)
void * _singleThreadMain(void *data)
virtual ~Mutex()
glob_log first
Barrier * barrier
void(* ThreadFunction)(ThreadArgument &arg)
Definition: xmipp_threads.h:38
void wait()
Incorrect argument received.
Definition: xmipp_error.h:113
virtual ~Thread()
int needed
How many threads should be awaited.
void broadcast()
pthread_cond_t cond
Condition on which the threads are waiting.
ThreadManager(int numberOfThreads, void *workClass=NULL)
pthread_mutex_t mutex
Mutex to update this structure.
ParallelTaskDistributor(size_t nTasks, size_t bSize)
int thread_id
The thread id.
virtual void unlock()
Barrier(int numberOfThreads)
void runAsync(ThreadFunction function, void *data=NULL)
ErrorType __errno
Definition: xmipp_error.h:227
int barrier_wait(barrier_t *barrier)
virtual void lock()