Xmipp  v3.23.11-Nereus
Public Member Functions | List of all members
ctpl::thread_pool Class Reference

#include <ctpl.h>

Public Member Functions

 thread_pool ()
 
 thread_pool (int nThreads, int queueSize=_ctplThreadPoolLength_)
 
 ~thread_pool ()
 
int size ()
 
int n_idle ()
 
std::thread & get_thread (int i)
 
void resize (int nThreads)
 
void clear_queue ()
 
std::function< void(int)> pop ()
 
void stop (bool isWait=false)
 
template<typename F , typename... Rest>
auto push (F &&f, Rest &&... rest) -> std::future< decltype(f(0, rest...))>
 
template<typename F >
auto push (F &&f) -> std::future< decltype(f(0))>
 
 thread_pool ()
 
 thread_pool (int nThreads)
 
 ~thread_pool ()
 
int size ()
 
int n_idle ()
 
std::thread & get_thread (int i)
 
void resize (int nThreads)
 
void clear_queue ()
 
std::function< void(int)> pop ()
 
void stop (bool isWait=false)
 
template<typename F , typename... Rest>
auto push (F &&f, Rest &&... rest) -> std::future< decltype(f(0, rest...))>
 
template<typename F >
auto push (F &&f) -> std::future< decltype(f(0))>
 

Detailed Description

Definition at line 48 of file ctpl.h.

Constructor & Destructor Documentation

◆ thread_pool() [1/4]

ctpl::thread_pool::thread_pool ( )
inline

Definition at line 52 of file ctpl.h.

52 : q(_ctplThreadPoolLength_) { this->init(); }
#define _ctplThreadPoolLength_
Definition: ctpl.h:36

◆ thread_pool() [2/4]

ctpl::thread_pool::thread_pool ( int  nThreads,
int  queueSize = _ctplThreadPoolLength_ 
)
inline

Definition at line 53 of file ctpl.h.

53 : q(queueSize) { this->init(); this->resize(nThreads); }
void resize(int nThreads)
Definition: ctpl.h:70

◆ ~thread_pool() [1/2]

ctpl::thread_pool::~thread_pool ( )
inline

Definition at line 56 of file ctpl.h.

56  {
57  this->stop(true);
58  }
void stop(bool isWait=false)
Definition: ctpl.h:121

◆ thread_pool() [3/4]

ctpl::thread_pool::thread_pool ( )
inline

Definition at line 75 of file ctpl_stl.h.

75 { this->init(); }

◆ thread_pool() [4/4]

ctpl::thread_pool::thread_pool ( int  nThreads)
inline

Definition at line 76 of file ctpl_stl.h.

76 { this->init(); this->resize(nThreads); }
void resize(int nThreads)
Definition: ctpl.h:70

◆ ~thread_pool() [2/2]

ctpl::thread_pool::~thread_pool ( )
inline

Definition at line 79 of file ctpl_stl.h.

79  {
80  this->stop(true);
81  }
void stop(bool isWait=false)
Definition: ctpl.h:121

Member Function Documentation

◆ clear_queue() [1/2]

void ctpl::thread_pool::clear_queue ( )
inline

Definition at line 99 of file ctpl.h.

99  {
100  std::function<void(int id)> * _f;
101  while (this->q.pop(_f))
102  delete _f; // empty the queue
103  }

◆ clear_queue() [2/2]

void ctpl::thread_pool::clear_queue ( )
inline

Definition at line 122 of file ctpl_stl.h.

122  {
123  std::function<void(int id)> * _f;
124  while (this->q.pop(_f))
125  delete _f; // empty the queue
126  }

◆ get_thread() [1/2]

std::thread& ctpl::thread_pool::get_thread ( int  i)
inline

Definition at line 65 of file ctpl.h.

65 { return *this->threads[i]; }
#define i

◆ get_thread() [2/2]

std::thread& ctpl::thread_pool::get_thread ( int  i)
inline

Definition at line 88 of file ctpl_stl.h.

88 { return *this->threads[i]; }
#define i

◆ n_idle() [1/2]

int ctpl::thread_pool::n_idle ( )
inline

Definition at line 64 of file ctpl.h.

64 { return this->nWaiting; }

◆ n_idle() [2/2]

int ctpl::thread_pool::n_idle ( )
inline

Definition at line 87 of file ctpl_stl.h.

87 { return this->nWaiting; }

◆ pop() [1/2]

std::function<void(int)> ctpl::thread_pool::pop ( )
inline

Definition at line 106 of file ctpl.h.

106  {
107  std::function<void(int id)> * _f = nullptr;
108  this->q.pop(_f);
109  std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
110 
111  std::function<void(int)> f;
112  if (_f)
113  f = *_f;
114  return f;
115  }
double * f

◆ pop() [2/2]

std::function<void(int)> ctpl::thread_pool::pop ( )
inline

Definition at line 129 of file ctpl_stl.h.

129  {
130  std::function<void(int id)> * _f = nullptr;
131  this->q.pop(_f);
132  std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
133  std::function<void(int)> f;
134  if (_f)
135  f = *_f;
136  return f;
137  }
double * f

◆ push() [1/4]

template<typename F , typename... Rest>
auto ctpl::thread_pool::push ( F &&  f,
Rest &&...  rest 
) -> std::future<decltype(f(0, rest...))>
inline

Definition at line 152 of file ctpl.h.

152  {
153  auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
154  std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
155  );
156 
157  auto _f = new std::function<void(int id)>([pck](int id) {
158  (*pck)(id);
159  });
160  this->q.push(_f);
161 
162  std::unique_lock<std::mutex> lock(this->mutex);
163  this->cv.notify_one();
164 
165  return pck->get_future();
166  }
double * f

◆ push() [2/4]

template<typename F >
auto ctpl::thread_pool::push ( F &&  f) -> std::future<decltype(f(0))>
inline

Definition at line 171 of file ctpl.h.

171  {
172  auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
173 
174  auto _f = new std::function<void(int id)>([pck](int id) {
175  (*pck)(id);
176  });
177  this->q.push(_f);
178 
179  std::unique_lock<std::mutex> lock(this->mutex);
180  this->cv.notify_one();
181 
182  return pck->get_future();
183  }
double * f

◆ push() [3/4]

template<typename F , typename... Rest>
auto ctpl::thread_pool::push ( F &&  f,
Rest &&...  rest 
) -> std::future<decltype(f(0, rest...))>
inline

Definition at line 173 of file ctpl_stl.h.

173  {
174  auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
175  std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
176  );
177  auto _f = new std::function<void(int id)>([pck](int id) {
178  (*pck)(id);
179  });
180  this->q.push(_f);
181  std::unique_lock<std::mutex> lock(this->mutex);
182  this->cv.notify_one();
183  return pck->get_future();
184  }
double * f

◆ push() [4/4]

template<typename F >
auto ctpl::thread_pool::push ( F &&  f) -> std::future<decltype(f(0))>
inline

Definition at line 189 of file ctpl_stl.h.

189  {
190  auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
191  auto _f = new std::function<void(int id)>([pck](int id) {
192  (*pck)(id);
193  });
194  this->q.push(_f);
195  std::unique_lock<std::mutex> lock(this->mutex);
196  this->cv.notify_one();
197  return pck->get_future();
198  }
double * f

◆ resize() [1/2]

void ctpl::thread_pool::resize ( int  nThreads)
inline

Definition at line 70 of file ctpl.h.

70  {
71  if (!this->isStop && !this->isDone) {
72  int oldNThreads = static_cast<int>(this->threads.size());
73  if (oldNThreads <= nThreads) { // if the number of threads is increased
74  this->threads.resize(nThreads);
75  this->flags.resize(nThreads);
76 
77  for (int i = oldNThreads; i < nThreads; ++i) {
78  this->flags[i] = std::make_shared<std::atomic<bool>>(false);
79  this->set_thread(i);
80  }
81  }
82  else { // the number of threads is decreased
83  for (int i = oldNThreads - 1; i >= nThreads; --i) {
84  *this->flags[i] = true; // this thread will finish
85  this->threads[i]->detach();
86  }
87  {
88  // stop the detached threads that were waiting
89  std::unique_lock<std::mutex> lock(this->mutex);
90  this->cv.notify_all();
91  }
92  this->threads.resize(nThreads); // safe to delete because the threads are detached
93  this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals
94  }
95  }
96  }
#define i

◆ resize() [2/2]

void ctpl::thread_pool::resize ( int  nThreads)
inline

Definition at line 93 of file ctpl_stl.h.

93  {
94  if (!this->isStop && !this->isDone) {
95  int oldNThreads = static_cast<int>(this->threads.size());
96  if (oldNThreads <= nThreads) { // if the number of threads is increased
97  this->threads.resize(nThreads);
98  this->flags.resize(nThreads);
99 
100  for (int i = oldNThreads; i < nThreads; ++i) {
101  this->flags[i] = std::make_shared<std::atomic<bool>>(false);
102  this->set_thread(i);
103  }
104  }
105  else { // the number of threads is decreased
106  for (int i = oldNThreads - 1; i >= nThreads; --i) {
107  *this->flags[i] = true; // this thread will finish
108  this->threads[i]->detach();
109  }
110  {
111  // stop the detached threads that were waiting
112  std::unique_lock<std::mutex> lock(this->mutex);
113  this->cv.notify_all();
114  }
115  this->threads.resize(nThreads); // safe to delete because the threads are detached
116  this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals
117  }
118  }
119  }
#define i

◆ size() [1/2]

int ctpl::thread_pool::size ( )
inline

Definition at line 61 of file ctpl.h.

61 { return static_cast<int>(this->threads.size()); }

◆ size() [2/2]

int ctpl::thread_pool::size ( )
inline

Definition at line 84 of file ctpl_stl.h.

84 { return static_cast<int>(this->threads.size()); }

◆ stop() [1/2]

void ctpl::thread_pool::stop ( bool  isWait = false)
inline

Definition at line 121 of file ctpl.h.

121  {
122  if (!isWait) {
123  if (this->isStop)
124  return;
125  this->isStop = true;
126  for (int i = 0, n = this->size(); i < n; ++i) {
127  *this->flags[i] = true; // command the threads to stop
128  }
129  this->clear_queue(); // empty the queue
130  }
131  else {
132  if (this->isDone || this->isStop)
133  return;
134  this->isDone = true; // give the waiting threads a command to finish
135  }
136  {
137  std::unique_lock<std::mutex> lock(this->mutex);
138  this->cv.notify_all(); // stop all waiting threads
139  }
140  for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish
141  if (this->threads[i]->joinable())
142  this->threads[i]->join();
143  }
144  // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
145  // therefore delete them here
146  this->clear_queue();
147  this->threads.clear();
148  this->flags.clear();
149  }
void clear_queue()
Definition: ctpl.h:99
#define i
int size()
Definition: ctpl.h:61
int * n

◆ stop() [2/2]

void ctpl::thread_pool::stop ( bool  isWait = false)
inline

Definition at line 142 of file ctpl_stl.h.

142  {
143  if (!isWait) {
144  if (this->isStop)
145  return;
146  this->isStop = true;
147  for (int i = 0, n = this->size(); i < n; ++i) {
148  *this->flags[i] = true; // command the threads to stop
149  }
150  this->clear_queue(); // empty the queue
151  }
152  else {
153  if (this->isDone || this->isStop)
154  return;
155  this->isDone = true; // give the waiting threads a command to finish
156  }
157  {
158  std::unique_lock<std::mutex> lock(this->mutex);
159  this->cv.notify_all(); // stop all waiting threads
160  }
161  for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish
162  if (this->threads[i]->joinable())
163  this->threads[i]->join();
164  }
165  // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
166  // therefore delete them here
167  this->clear_queue();
168  this->threads.clear();
169  this->flags.clear();
170  }
void clear_queue()
Definition: ctpl.h:99
#define i
int size()
Definition: ctpl.h:61
int * n

The documentation for this class was generated from the following files: