21 #ifndef __ctpl_thread_pool_H__ 22 #define __ctpl_thread_pool_H__ 32 #include <boost/lockfree/queue.hpp> 35 #ifndef _ctplThreadPoolLength_ 36 #define _ctplThreadPoolLength_ 100 61 int size() {
return static_cast<int>(this->threads.size()); }
64 int n_idle() {
return this->nWaiting; }
71 if (!this->isStop && !this->isDone) {
72 int oldNThreads =
static_cast<int>(this->threads.size());
73 if (oldNThreads <= nThreads) {
74 this->threads.resize(nThreads);
75 this->flags.resize(nThreads);
77 for (
int i = oldNThreads;
i < nThreads; ++
i) {
78 this->flags[
i] = std::make_shared<std::atomic<bool>>(
false);
83 for (
int i = oldNThreads - 1;
i >= nThreads; --
i) {
84 *this->flags[
i] =
true;
85 this->threads[
i]->detach();
89 std::unique_lock<std::mutex> lock(this->mutex);
90 this->cv.notify_all();
92 this->threads.resize(nThreads);
93 this->flags.resize(nThreads);
100 std::function<void(int id)> * _f;
101 while (this->q.pop(_f))
106 std::function<void(int)>
pop() {
107 std::function<void(int id)> * _f =
nullptr;
109 std::unique_ptr<std::function<void(int id)>> func(_f);
111 std::function<void(int)>
f;
121 void stop(
bool isWait =
false) {
126 for (
int i = 0,
n = this->
size();
i <
n; ++
i) {
127 *this->flags[
i] =
true;
132 if (this->isDone || this->isStop)
137 std::unique_lock<std::mutex> lock(this->mutex);
138 this->cv.notify_all();
140 for (
int i = 0; i < static_cast<int>(this->threads.size()); ++
i) {
141 if (this->threads[
i]->joinable())
142 this->threads[
i]->join();
147 this->threads.clear();
151 template<
typename F,
typename... Rest>
152 auto push(F &&
f, Rest&&... rest) ->std::future<decltype(
f(0, rest...))> {
153 auto pck = std::make_shared<std::packaged_task<decltype(
f(0, rest...))(int)>>(
154 std::bind(std::forward<F>(
f), std::placeholders::_1, std::forward<Rest>(rest)...)
157 auto _f =
new std::function<void(int id)>([pck](
int id) {
162 std::unique_lock<std::mutex> lock(this->mutex);
163 this->cv.notify_one();
165 return pck->get_future();
171 auto push(F &&
f) ->std::future<decltype(
f(0))> {
172 auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(
f));
174 auto _f =
new std::function<void(int id)>([pck](
int id) {
179 std::unique_lock<std::mutex> lock(this->mutex);
180 this->cv.notify_one();
182 return pck->get_future();
194 void set_thread(
int i) {
195 std::shared_ptr<std::atomic<bool>>
flag(this->flags[i]);
196 auto f = [
this,
i, flag]() {
197 std::atomic<bool> & _flag = *
flag;
198 std::function<void(int id)> * _f;
199 bool isPop = this->q.pop(_f);
202 std::unique_ptr<std::function<void(int id)>> func(_f);
208 isPop = this->q.pop(_f);
212 std::unique_lock<std::mutex> lock(this->mutex);
214 this->cv.wait(lock, [
this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f);
return isPop || this->isDone || _flag; });
221 this->threads[
i].reset(
new std::thread(
f));
224 void init() { this->nWaiting = 0; this->isStop =
false; this->isDone =
false; }
226 std::vector<std::unique_ptr<std::thread>> threads;
227 std::vector<std::shared_ptr<std::atomic<bool>>> flags;
228 mutable boost::lockfree::queue<std::function<void(int id)> *> q;
229 std::atomic<bool> isDone;
230 std::atomic<bool> isStop;
231 std::atomic<int> nWaiting;
234 std::condition_variable cv;
239 #endif // __ctpl_thread_pool_H__
#define _ctplThreadPoolLength_
auto push(F &&f, Rest &&... rest) -> std::future< decltype(f(0, rest...))>
void resize(int nThreads)
thread_pool(int nThreads, int queueSize=_ctplThreadPoolLength_)
std::function< void(int)> pop()
std::thread & get_thread(int i)
void stop(bool isWait=false)
auto push(F &&f) -> std::future< decltype(f(0))>