c++11/14、c++17/20 最简单的线程池

c++11 最基础最简单的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#pragma once
#include<vector>
#include<queue>
#include<memory>
#include<thread>
#include<mutex>
#include<condition_variable>
#include<future>
#include<functional>
#include<stdexcept>
#include<type_traits>

class ThreadPool
{
public:
ThreadPool();
ThreadPool(int num);
~ThreadPool();

template<class F, class... Args>
std::future<int> enqueue(F& f, Args&... args);

private:
std::vector <std::thread> workers; //thread array

std::queue<std::function<void()>>tasks; //task queue

std::mutex queue_mutex;
std::condition_variable cond;
bool stop;
};

inline ThreadPool::ThreadPool()
{
}

ThreadPool::ThreadPool(int num) :stop(false)
{
for (size_t i = 0; i < num; i++)
{
auto thread = [this] {
for (;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->cond.wait(lock, [this] {
return this->stop || !this->tasks.empty();//stop 或许任务队列不为空时唤醒。
}
);

if (stop && tasks.empty())
return;
task = std::move(tasks.front());
tasks.pop();//有点类似bfs的思路
}

task();
}
};
workers.emplace_back(thread);

}
}

inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
cond.notify_all();

for (auto& worker : workers)
{
worker.join();
}
}


//}

template<class F, class... Args>
std::future<int> ThreadPool::enqueue(F& f, Args&... args)
{
std::function<decltype(f(args...))()> func =
std::bind(std::forward<F>(f), std::forward<Args>(args)...); // 连接函数和参数定义,特殊函数类型,避免左右值错误

auto task = std::make_shared< std::packaged_task<int()> >(
func
);

std::function<void()> warpper_func = [task](){(*task)();};
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace(warpper_func);
cond.notify_one();
return task->get_future();
}

test:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int main()
{
ThreadPool pool(4);
std::vector<std::future<int>>results;

for (size_t i = 0; i < 8; i++)
{
auto tp = [i] {
std::cout << "hello " << i << std::endl;
// std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "world " << i << std::endl;
return i * i;

};
results.emplace_back(
pool.enqueue((tp))
);

}



for (auto&& result : results)
std::cout << result.get() << ' ';
std::cout << std::endl;

return 0;
}

C++11/14 一般性的线程池

上面我们有很多不足,比如限定了task function的返回值,而且逻辑不够优美。我们接着进行优化!

关于task function的 return type 是未定的问题,c++11 给出了两种方式。一个是decltype(expr),另外一个是std::result_of. 都可以通过尾置返回值类型进行处理。

我们首先使用decltype. 完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#pragma once
#include<vector>
#include<queue>
#include<memory>
#include<thread>
#include<mutex>
#include<condition_variable>
#include<future>
#include<functional>
#include<stdexcept>
#include<type_traits>

class ThreadPool
{
public:
ThreadPool();
ThreadPool(int num);
~ThreadPool();
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
// ->std::future<decltype(f(args...))>;
private:
std::vector <std::thread> workers; //thread array

std::queue<std::function<void()>>tasks; //task queue

std::mutex queue_mutex;
std::condition_variable cond;
bool stop;
};

inline ThreadPool::ThreadPool()
{
}

ThreadPool::ThreadPool(int num) :stop(false)
{
for (size_t i = 0; i < num; i++)
{
auto thread = [this] {
for (;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->cond.wait(lock, [this] {
return this->stop || !this->tasks.empty();//stop 或许任务队列不为空时唤醒。
}
);

if (stop && tasks.empty())
return;
task = std::move(tasks.front());
tasks.pop();//有点类似bfs的思路
}

task();
}
};
workers.emplace_back(thread);

}
}

inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
cond.notify_all();

for (auto& worker : workers)
{
worker.join();
}
}


template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<decltype(f(args...))>
{
using ret_type = std::future<decltype(f(args...))>;
std::function<decltype(f(args...))()> func =
std::bind(std::forward<F>(f), std::forward<Args>(args)...); // 连接函数和参数定义,特殊函数类型,避免左右值错误

auto task = std::make_shared< std::packaged_task<decltype(f(args...))()> >(
func
);

std::function<void()> warpper_func = [task]() {(*task)(); };
ret_type res = task->get_future();
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace(warpper_func);
cond.notify_one();
return res;
}

优化版createthread 单独抽出来组成一个接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#pragma once
#include<vector>
#include<queue>
#include<memory>
#include<thread>
#include<mutex>
#include<condition_variable>
#include<future>
#include<functional>
#include<stdexcept>
#include<type_traits>
#include <utility>

class ThreadPool
{
public:
ThreadPool();
ThreadPool(int num);
~ThreadPool();

void CreateThread(void);

//template<class F, class...Args>
//auto enqueue(F&& f, Args&&...args)->std::future<typename std::_Forced_result_type<F(Args...)>::type>;
template<class F, class... Args>
auto enqueue(F &&f, Args&&... args)
->std::future<decltype(f(args...))>;
private:
std::vector <std::thread> workers; //thread array

std::queue<std::function<void()>>tasks; //task queue

std::mutex queue_mutex;
std::condition_variable cond;
bool stop;
};

inline ThreadPool::ThreadPool()
{
}

inline void ThreadPool::CreateThread(void)
{
for (;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->cond.wait(lock, [this] {
return this->stop || !this->tasks.empty();//stop 或许任务队列不为空时唤醒。
}
);

if (stop && tasks.empty())
return;
task = std::move(tasks.front());
tasks.pop();//有点类似bfs的思路
}

task();
}
}

ThreadPool::ThreadPool(int num) :stop(false)
{
for (size_t i = 0; i < num; i++)
{
auto thread = std::bind(&ThreadPool::CreateThread,this);//&,this 不能丢
workers.emplace_back(thread);

//或者直接下面
//workers.emplace_back(std::bind(&ThreadPool::CreateThread, this));

}
}

inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
cond.notify_all();

for (auto& worker : workers)
{
worker.join();
}
}


template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
->std::future<decltype(f(args...))>
{
using ret_type = std::future< decltype(f(args...))>; //typename 此处加不加均可以的,下面同
std::function< decltype(f(args...))()> func =
std::bind(std::forward<F>(f), std::forward<Args>(args)...); // 连接函数和参数定义,特殊函数类型,避免左右值错误

auto task = std::make_shared< std::packaged_task< decltype(f(args...))()> >(
func
);

std::function<void()> warpper_func = [task]() {(*task)(); };
ret_type res = task->get_future();
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace(warpper_func);
cond.notify_one();
return res;
}

test 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int main()
{
ThreadPool pool(4);
std::vector<std::future<int>>results;

for (int i = 0; i < 8; i++)
{
auto tp = [i] {
std::cout << "hello " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "world " << i << std::endl;
return i * i;

};
auto ans = pool.enqueue(std::move(tp));
results.emplace_back(std::move(ans));

}


for (auto&& result : results)
std::cout << result.get() << ' ';
std::cout << std::endl;

return 0;
}

那我们现在采用第二种方式。

需要更改的地方如下:

enqueue 的声明:

1
2
3
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
->std::future<typename std::result_of<F(Args...)>::type>;

enqueue 的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
->std::future<typename std::result_of<F(Args...)>::type>
{
using ret_type = std::future<typename std::result_of<F(Args...)>::type>; //typename 此处加不加均可以的,下面同
std::function<typename std::result_of<F(Args...)>::type()> func =
std::bind(std::forward<F>(f), std::forward<Args>(args)...); // 连接函数和参数定义,特殊函数类型,避免左右值错误

auto task = std::make_shared< std::packaged_task<typename std::result_of<F(Args...)>::type()> >(
func
);

std::function<void()> warpper_func = [task]() {(*task)(); };
ret_type res = task->get_future();
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace(warpper_func);
cond.notify_one();
return res;
}