c++11 最基础最简单的: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 #pragma once #include <vector> #include <queue> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional> #include <stdexcept> #include <type_traits> class ThreadPool { public : ThreadPool (); ThreadPool (int num); ~ThreadPool (); template <class F, class ... Args> std::future<int > enqueue (F& f, Args&... args) ;private : std::vector <std::thread> workers; std::queue<std::function<void ()>>tasks; std::mutex queue_mutex; std::condition_variable cond; bool stop; }; inline ThreadPool::ThreadPool () {} ThreadPool::ThreadPool (int num) :stop (false ) { for (size_t i = 0 ; i < num; i++) { auto thread = [this ] { for (;;) { std::function<void ()> task; { std::unique_lock<std::mutex> lock (this ->queue_mutex) ; this ->cond.wait (lock, [this ] { return this ->stop || !this ->tasks.empty (); } ); if (stop && tasks.empty ()) return ; task = std::move (tasks.front ()); tasks.pop (); } task (); } }; workers.emplace_back (thread); } } inline ThreadPool::~ThreadPool (){ { std::unique_lock<std::mutex> lock (queue_mutex) ; stop = true ; } cond.notify_all (); for (auto & worker : workers) { worker.join (); } } template <class F, class ... Args>std::future<int > ThreadPool::enqueue (F& f, Args&... args) { std::function<decltype (f(args...))()> func = std::bind (std::forward<F>(f), std::forward<Args>(args)...); auto task = std::make_shared< std::packaged_task<int ()> >( func ); std::function<void ()> warpper_func = [task](){(*task)();}; std::unique_lock<std::mutex> lock (queue_mutex) ; tasks.emplace (warpper_func); cond.notify_one (); return task->get_future (); }
test:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 int main () { ThreadPool pool (4 ) ; std::vector<std::future<int >>results; for (size_t i = 0 ; i < 8 ; i++) { auto tp = [i] { std::cout << "hello " << i << std::endl; std::cout << "world " << i << std::endl; return i * i; }; results.emplace_back ( pool.enqueue ((tp)) ); } for (auto && result : results) std::cout << result.get () << ' ' ; std::cout << std::endl; return 0 ; }
C++11/14 一般性的线程池 上面我们有很多不足,比如限定了task function的返回值,而且逻辑不够优美。我们接着进行优化!
关于task function的 return type 是未定的问题,c++11 给出了两种方式。一个是decltype(expr),另外一个是std::result_of. 都可以通过尾置返回值类型进行处理。
我们首先使用decltype. 完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 #pragma once #include <vector> #include <queue> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional> #include <stdexcept> #include <type_traits> class ThreadPool { public : ThreadPool (); ThreadPool (int num); ~ThreadPool (); template <class F, class ... Args> auto enqueue (F&& f, Args&&... args) private : std::vector <std::thread> workers; std::queue<std::function<void ()>>tasks; std::mutex queue_mutex; std::condition_variable cond; bool stop; }; inline ThreadPool::ThreadPool () {} ThreadPool::ThreadPool (int num) :stop (false ) { for (size_t i = 0 ; i < num; i++) { auto thread = [this ] { for (;;) { std::function<void ()> task; { std::unique_lock<std::mutex> lock (this ->queue_mutex) ; this ->cond.wait (lock, [this ] { return this ->stop || !this ->tasks.empty (); } ); if (stop && tasks.empty ()) return ; task = std::move (tasks.front ()); tasks.pop (); } task (); } }; workers.emplace_back (thread); } } inline ThreadPool::~ThreadPool (){ { std::unique_lock<std::mutex> lock (queue_mutex) ; stop = true ; } cond.notify_all (); for (auto & worker : workers) { worker.join (); } } template <class F, class ... Args>auto ThreadPool::enqueue (F&& f, Args&&... args) -> std::future<decltype (f(args...)) > { using ret_type = std::future<decltype (f (args...))>; std::function<decltype (f(args...))()> func = std::bind (std::forward<F>(f), std::forward<Args>(args)...); auto task = std::make_shared< std::packaged_task<decltype (f (args...))()> >( func ); std::function<void ()> warpper_func = [task]() {(*task)(); }; ret_type res = task->get_future (); if (stop) throw std::runtime_error ("enqueue on stopped ThreadPool" ); std::unique_lock<std::mutex> lock (queue_mutex) ; tasks.emplace (warpper_func); cond.notify_one (); return res; }
优化版createthread 单独抽出来组成一个接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 #pragma once #include <vector> #include <queue> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional> #include <stdexcept> #include <type_traits> #include <utility> class ThreadPool { public : ThreadPool (); ThreadPool (int num); ~ThreadPool (); void CreateThread (void ) ; template <class F, class ... Args> auto enqueue (F &&f, Args&&... args) ->std::future<decltype (f(args...)) > ;private : std::vector <std::thread> workers; std::queue<std::function<void ()>>tasks; std::mutex queue_mutex; std::condition_variable cond; bool stop; }; inline ThreadPool::ThreadPool () {} inline void ThreadPool::CreateThread (void ) { for (;;) { std::function<void ()> task; { std::unique_lock<std::mutex> lock (this ->queue_mutex) ; this ->cond.wait (lock, [this ] { return this ->stop || !this ->tasks.empty (); } ); if (stop && tasks.empty ()) return ; task = std::move (tasks.front ()); tasks.pop (); } task (); } } ThreadPool::ThreadPool (int num) :stop (false ) { for (size_t i = 0 ; i < num; i++) { auto thread = std::bind (&ThreadPool::CreateThread,this ); workers.emplace_back (thread); } } inline ThreadPool::~ThreadPool (){ { std::unique_lock<std::mutex> lock (queue_mutex) ; stop = true ; } cond.notify_all (); for (auto & worker : workers) { worker.join (); } } template <class F, class ... Args>auto ThreadPool::enqueue (F&& f, Args&&... args) ->std::future<decltype (f(args...)) > { using ret_type = std::future< decltype (f (args...))>; std::function< decltype (f(args...))()> func = std::bind (std::forward<F>(f), std::forward<Args>(args)...); auto task = std::make_shared< std::packaged_task< decltype (f (args...))()> >( func ); std::function<void ()> warpper_func = [task]() {(*task)(); }; ret_type res = task->get_future (); if (stop) throw std::runtime_error ("enqueue on stopped ThreadPool" ); std::unique_lock<std::mutex> lock (queue_mutex) ; tasks.emplace (warpper_func); cond.notify_one (); return res; }
test 代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 int main () { ThreadPool pool (4 ) ; std::vector<std::future<int >>results; for (int i = 0 ; i < 8 ; i++) { auto tp = [i] { std::cout << "hello " << i << std::endl; std::this_thread::sleep_for (std::chrono::seconds (1 )); std::cout << "world " << i << std::endl; return i * i; }; auto ans = pool.enqueue (std::move (tp)); results.emplace_back (std::move (ans)); } for (auto && result : results) std::cout << result.get () << ' ' ; std::cout << std::endl; return 0 ; }
那我们现在采用第二种方式。
需要更改的地方如下:
enqueue 的声明:
1 2 3 template <class F, class ... Args> auto enqueue (F&& f, Args&&... args) ->std::future<typename std::result_of<F (Args...) >::type> ;
enqueue 的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 template <class F, class ... Args>auto ThreadPool::enqueue (F&& f, Args&&... args) ->std::future<typename std::result_of<F (Args...) >::type> { using ret_type = std::future<typename std::result_of<F (Args...)>::type>; std::function<typename std::result_of<F(Args...)>::type ()> func = std::bind (std::forward<F>(f), std::forward<Args>(args)...); auto task = std::make_shared< std::packaged_task<typename std::result_of<F (Args...)>::type ()> >( func ); std::function<void ()> warpper_func = [task]() {(*task)(); }; ret_type res = task->get_future (); if (stop) throw std::runtime_error ("enqueue on stopped ThreadPool" ); std::unique_lock<std::mutex> lock (queue_mutex) ; tasks.emplace (warpper_func); cond.notify_one (); return res; }