c++11 最基础最简单的:
1 |
|
test:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28int main()
{
ThreadPool pool(4);
std::vector<std::future<int>>results;
for (size_t i = 0; i < 8; i++)
{
auto tp = [i] {
std::cout << "hello " << i << std::endl;
// std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "world " << i << std::endl;
return i * i;
};
results.emplace_back(
pool.enqueue((tp))
);
}
for (auto&& result : results)
std::cout << result.get() << ' ';
std::cout << std::endl;
return 0;
}
C++11/14 一般性的线程池
上面我们有很多不足,比如限定了task function的返回值,而且逻辑不够优美。我们接着进行优化!
关于task function的 return type 是未定的问题,c++11 给出了两种方式。一个是decltype(expr),另外一个是std::result_of. 都可以通过尾置返回值类型进行处理。
我们首先使用decltype. 完整代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
class ThreadPool
{
public:
ThreadPool();
ThreadPool(int num);
~ThreadPool();
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
// ->std::future<decltype(f(args...))>;
private:
std::vector <std::thread> workers; //thread array
std::queue<std::function<void()>>tasks; //task queue
std::mutex queue_mutex;
std::condition_variable cond;
bool stop;
};
inline ThreadPool::ThreadPool()
{
}
ThreadPool::ThreadPool(int num) :stop(false)
{
for (size_t i = 0; i < num; i++)
{
auto thread = [this] {
for (;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->cond.wait(lock, [this] {
return this->stop || !this->tasks.empty();//stop 或许任务队列不为空时唤醒。
}
);
if (stop && tasks.empty())
return;
task = std::move(tasks.front());
tasks.pop();//有点类似bfs的思路
}
task();
}
};
workers.emplace_back(thread);
}
}
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
cond.notify_all();
for (auto& worker : workers)
{
worker.join();
}
}
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<decltype(f(args...))>
{
using ret_type = std::future<decltype(f(args...))>;
std::function<decltype(f(args...))()> func =
std::bind(std::forward<F>(f), std::forward<Args>(args)...); // 连接函数和参数定义,特殊函数类型,避免左右值错误
auto task = std::make_shared< std::packaged_task<decltype(f(args...))()> >(
func
);
std::function<void()> warpper_func = [task]() {(*task)(); };
ret_type res = task->get_future();
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace(warpper_func);
cond.notify_one();
return res;
}
优化版createthread 单独抽出来组成一个接口。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
class ThreadPool
{
public:
ThreadPool();
ThreadPool(int num);
~ThreadPool();
void CreateThread(void);
//template<class F, class...Args>
//auto enqueue(F&& f, Args&&...args)->std::future<typename std::_Forced_result_type<F(Args...)>::type>;
template<class F, class... Args>
auto enqueue(F &&f, Args&&... args)
->std::future<decltype(f(args...))>;
private:
std::vector <std::thread> workers; //thread array
std::queue<std::function<void()>>tasks; //task queue
std::mutex queue_mutex;
std::condition_variable cond;
bool stop;
};
inline ThreadPool::ThreadPool()
{
}
inline void ThreadPool::CreateThread(void)
{
for (;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->cond.wait(lock, [this] {
return this->stop || !this->tasks.empty();//stop 或许任务队列不为空时唤醒。
}
);
if (stop && tasks.empty())
return;
task = std::move(tasks.front());
tasks.pop();//有点类似bfs的思路
}
task();
}
}
ThreadPool::ThreadPool(int num) :stop(false)
{
for (size_t i = 0; i < num; i++)
{
auto thread = std::bind(&ThreadPool::CreateThread,this);//&,this 不能丢
workers.emplace_back(thread);
//或者直接下面
//workers.emplace_back(std::bind(&ThreadPool::CreateThread, this));
}
}
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
cond.notify_all();
for (auto& worker : workers)
{
worker.join();
}
}
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
->std::future<decltype(f(args...))>
{
using ret_type = std::future< decltype(f(args...))>; //typename 此处加不加均可以的,下面同
std::function< decltype(f(args...))()> func =
std::bind(std::forward<F>(f), std::forward<Args>(args)...); // 连接函数和参数定义,特殊函数类型,避免左右值错误
auto task = std::make_shared< std::packaged_task< decltype(f(args...))()> >(
func
);
std::function<void()> warpper_func = [task]() {(*task)(); };
ret_type res = task->get_future();
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace(warpper_func);
cond.notify_one();
return res;
}
test 代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26int main()
{
ThreadPool pool(4);
std::vector<std::future<int>>results;
for (int i = 0; i < 8; i++)
{
auto tp = [i] {
std::cout << "hello " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "world " << i << std::endl;
return i * i;
};
auto ans = pool.enqueue(std::move(tp));
results.emplace_back(std::move(ans));
}
for (auto&& result : results)
std::cout << result.get() << ' ';
std::cout << std::endl;
return 0;
}
那我们现在采用第二种方式。
需要更改的地方如下:
enqueue 的声明:1
2
3template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
->std::future<typename std::result_of<F(Args...)>::type>;
enqueue 的定义:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
->std::future<typename std::result_of<F(Args...)>::type>
{
using ret_type = std::future<typename std::result_of<F(Args...)>::type>; //typename 此处加不加均可以的,下面同
std::function<typename std::result_of<F(Args...)>::type()> func =
std::bind(std::forward<F>(f), std::forward<Args>(args)...); // 连接函数和参数定义,特殊函数类型,避免左右值错误
auto task = std::make_shared< std::packaged_task<typename std::result_of<F(Args...)>::type()> >(
func
);
std::function<void()> warpper_func = [task]() {(*task)(); };
ret_type res = task->get_future();
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace(warpper_func);
cond.notify_one();
return res;
}